sql >> Database >  >> NoSQL >> HBase

Robuuste berichtserialisatie in Apache Kafka met behulp van Apache Avro, deel 1

In Apache Kafka schrijven Java-applicaties, producenten genaamd, gestructureerde berichten naar een Kafka-cluster (bestaande uit brokers). Op dezelfde manier lezen Java-applicaties die consumenten worden genoemd deze berichten van hetzelfde cluster. In sommige organisaties zijn er verschillende groepen die verantwoordelijk zijn voor het schrijven en beheren van de producenten en consumenten. In dergelijke gevallen kan een belangrijk pijnpunt liggen in de coördinatie van het overeengekomen berichtformaat tussen producenten en consumenten.

Dit voorbeeld laat zien hoe u Apache Avro kunt gebruiken om records te serialiseren die zijn geproduceerd naar Apache Kafka, terwijl de evolutie van schema's en niet-synchrone updates van producenten- en consumententoepassingen mogelijk zijn.

Serialisatie en deserialisatie

Een Kafka-record (voorheen bericht genoemd) bestaat uit een sleutel, een waarde en headers. Kafka is niet op de hoogte van de structuur van gegevens in de sleutel en waarde van records. Het behandelt ze als byte-arrays. Maar systemen die records van Kafka lezen, geven wel om gegevens in die records. U moet dus gegevens in een leesbaar formaat produceren. Het gegevensformaat dat u gebruikt, moet

  • Wees compact
  • Wees snel met coderen en decoderen
  • Evolutie toestaan
  • Sta upstream-systemen (die naar een Kafka-cluster schrijven) en downstream-systemen (die van hetzelfde Kafka-cluster lezen) toe om op verschillende tijdstippen te upgraden naar nieuwere schema's

JSON, bijvoorbeeld, spreekt voor zich, maar is geen compact gegevensformaat en is traag te ontleden. Avro is een snel serialisatieframework dat relatief compacte output creëert. Maar om Avro-records te lezen, hebt u het schema nodig waarmee de gegevens zijn geserialiseerd.

Een optie is om het schema met het record zelf op te slaan en over te dragen. Dit is prima in een bestand waarin u het schema één keer opslaat en voor een groot aantal records gebruikt. Het opslaan van het schema in elk Kafka-record voegt echter aanzienlijke overhead toe in termen van opslagruimte en netwerkgebruik. Een andere optie is om een ​​overeengekomen set identifier-schema mappings te hebben en naar schema's te verwijzen met hun identifiers in het record.

Van object naar Kafka-record en terug

Producer-toepassingen hoeven gegevens niet rechtstreeks naar byte-arrays te converteren. KafkaProducer is een generieke klasse waarvoor de gebruiker sleutel- en waardetypen moet specificeren. Producenten accepteren dan exemplaren van ProducerRecord die dezelfde typeparameters hebben. Conversie van het object naar byte-array wordt gedaan door een serializer. Kafka biedt enkele primitieve serializers:bijvoorbeeld IntegerSerializer , ByteArraySerializer , StringSerializer . Aan de kant van de consument zetten vergelijkbare deserializers byte-arrays om in een object waar de toepassing mee om kan gaan.

Het is dus logisch om op Serializer- en Deserializer-niveau aan te haken en ontwikkelaars van producenten- en consumententoepassingen de handige interface van Kafka te laten gebruiken. Hoewel de nieuwste versies van Kafka ExtendedSerializers . toestaan en ExtendedDeserializers om toegang te krijgen tot headers, hebben we besloten om de schema-ID op te nemen in de sleutel en waarde van Kafka-records in plaats van recordheaders toe te voegen.

Avro Essentials

Avro is een raamwerk voor gegevensserialisatie (en externe procedureaanroep). Het gebruikt een JSON-document met de naam schema om gegevensstructuren te beschrijven. Het meeste Avro-gebruik vindt plaats via GenericRecord of subklassen van SpecificRecord. Java-klassen die zijn gegenereerd op basis van Avro-schema's zijn subklassen van de laatste, terwijl de eerste kan worden gebruikt zonder voorafgaande kennis van de gegevensstructuur waarmee wordt gewerkt.

Wanneer twee schema's voldoen aan een set compatibiliteitsregels, kunnen gegevens die met het ene schema zijn geschreven (het schrijversschema genoemd) worden gelezen alsof het met het andere is geschreven (het lezerschema genoemd). Schema's hebben een canonieke vorm waarin alle details die niet relevant zijn voor de rangschikking, zoals opmerkingen, zijn verwijderd om de gelijkwaardigheidscontrole te vergemakkelijken.

VersionedSchema en SchemaProvider

Zoals eerder vermeld, hebben we een één-op-één-toewijzing nodig tussen schema's en hun identifiers. Soms is het gemakkelijker om met namen naar schema's te verwijzen. Wanneer een compatibel schema is gemaakt, kan dit worden beschouwd als een volgende versie van het schema. We kunnen dus verwijzen naar schema's met een naam, versiepaar. Laten we het schema, zijn identifier, naam en versie samen een VersionedSchema noemen . Dit object kan extra metadata bevatten die de applicatie nodig heeft.

public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider objecten kunnen de instanties van VersionedSchema . opzoeken .

public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

Hoe deze interface wordt geïmplementeerd, wordt besproken in "Een schemawinkel implementeren" in een toekomstige blogpost.

Serialiseren van generieke gegevens

Bij het serialiseren van een record moeten we eerst uitzoeken welk schema we moeten gebruiken. Elk record heeft een getSchema methode. Maar het kan tijdrovend zijn om de id uit het schema te achterhalen. Het is over het algemeen efficiënter om het schema in te stellen tijdens de initialisatie. Dit kan direct op identifier of op naam en versie. Bovendien willen we bij het produceren van meerdere onderwerpen mogelijk verschillende schema's instellen voor verschillende onderwerpen en het schema achterhalen via de onderwerpnaam die als parameter is opgegeven voor de methode serialize(T, String) . Deze logica is in onze voorbeelden weggelaten omwille van de beknoptheid en eenvoud.

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

Met het schema in de hand moeten we het in ons bericht opslaan. Het serialiseren van de ID als onderdeel van het bericht geeft ons een compacte oplossing, aangezien alle magie gebeurt in de Serializer/Deserializer. Het maakt ook een zeer eenvoudige integratie mogelijk met andere frameworks en bibliotheken die Kafka al ondersteunen en laat de gebruiker zijn eigen serializer gebruiken (zoals Spark).

Met deze benadering schrijven we eerst de schema-ID op de eerste vier bytes.

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

Dan kunnen we een DatumWriter . maken en serialiseer het object.

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

Als we dit allemaal samenvoegen, hebben we een generieke data-serializer geïmplementeerd.

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Deserialiseren van generieke gegevens

Deserialisatie kan werken met een enkel schema (de schemagegevens zijn geschreven met), maar u kunt een ander lezerschema specificeren. Het lezerschema moet compatibel zijn met het schema waarmee de gegevens zijn geserialiseerd, maar hoeft niet equivalent te zijn. Om deze reden hebben we schemanamen geïntroduceerd. We kunnen nu specificeren dat we gegevens willen lezen met een specifieke versie van een schema. Bij initialisatie lezen we gewenste schemaversies per schemanaam en slaan metadata op in readerSchemasByName voor snelle toegang. Nu kunnen we elk record lezen dat is geschreven met een compatibele versie van het schema alsof het is geschreven met de opgegeven versie.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

Wanneer een record moet worden gedeserialiseerd, lezen we eerst de identifier van het schrijversschema. Dit maakt het mogelijk om het lezerschema op naam op te zoeken. Met beide schema's beschikbaar kunnen we een GeneralDatumReader . maken en lees het verslag.

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

Omgaan met specifieke gegevens

Vaker wel dan niet is er één klasse die we willen gebruiken voor onze administratie. Deze klasse wordt dan meestal gegenereerd op basis van een Avro-schema. Apache Avro biedt tools om Java-code te genereren op basis van schema's. Een van die tools is de Avro Maven-plug-in. Gegenereerde klassen hebben het schema waaruit ze zijn gegenereerd tijdens runtime beschikbaar. Dit maakt serialisatie en deserialisatie eenvoudiger en effectiever. Voor serialisatie kunnen we de klasse gebruiken om meer te weten te komen over de te gebruiken schema-ID.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

We hebben dus geen logica nodig om het schema uit onderwerp en gegevens te bepalen. We gebruiken het schema dat beschikbaar is in de recordklasse om records te schrijven.

Evenzo kan voor deserialisatie het lezerschema worden gevonden in de klasse zelf. Deserialisatielogica wordt eenvoudiger, omdat het lezerschema vastligt tijdens de configuratie en niet hoeft te worden opgezocht op schemanaam.

@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

Aanvullende lezing

Raadpleeg de Avro-specificatie voor Schemaresolutie voor meer informatie over schemacompatibiliteit.

Raadpleeg de Avro-specificatie voor het ontleden van canonieke formulieren voor schema's voor meer informatie over canonieke formulieren.

Volgende keer...

Deel 2 toont een implementatie van een systeem om de Avro-schemadefinities op te slaan.


  1. Wanneer CouchDB over MongoDB gebruiken en vice versa?

  2. Kunnen strikte JSON $dates worden gebruikt in een MongoDB-query?

  3. Yii2 + Redis als database

  4. Webscraping en crawlen met Scrapy en MongoDB