Release H: AVRO

Introduction

Apache Avro is a data serialization system that enables storing JSON files with a schema definition to be stored in binary format.

This increases the efficiency of message throughput/storage in Kafka.

In order to use it we need to use a schema registry like Apicurio.

Apicurio

Apicurio Registry is a schema registry and an API registry, which stores and retrieves event schemas and API designs.

To run it with Strimzi over SSL please we use the following service definition:

Apicurio
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: kafka-registry
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      # Group Id to consume information for the different topics used by the Service Registry.
      # Name equals to metadata.name property in ApicurioRegistry object
      - resource:
          type: group
          name: service-registry
        operation: Read
      # Rules for the Global global-id-topic
      - resource:
          type: topic
          name: global-id-topic
        operation: Read
      - resource:
          type: topic
          name: global-id-topic
        operation: Describe
      - resource:
          type: topic
          name: global-id-topic
        operation: Write
      - resource:
          type: topic
          name: global-id-topic
        operation: Create
      # Rules for the Global storage-topic
      - resource:
          type: topic
          name: storage-topic
        operation: Read
      - resource:
          type: topic
          name: storage-topic
        operation: Describe
      - resource:
          type: topic
          name: storage-topic
        operation: Write
      - resource:
          type: topic
          name: storage-topic
        operation: Create
      # Rules for the local topics created by our Service Registry instance
      # Prefix value equals to metadata.name property in ApicurioRegistry object
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Read
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Describe
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Write
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Create
      # Rules for the local transactionalsIds created by our Service Registry instance
      # Prefix equals to metadata.name property in ApicurioRegistry object
      - resource:
          type: transactionalId
          name: service-registry-
          patternType: prefix
        operation: Describe
      - resource:
          type: transactionalId
          name: service-registry-
          patternType: prefix
        operation: Write
      # Rules for internal Apache Kafka topics
      - resource:
          type: topic
          name: __consumer_offsets
        operation: Read
      - resource:
          type: topic
          name: __transaction_state
        operation: Read
      # Rules for Cluster objects
      - resource:
          type: cluster
        operation: IdempotentWrite
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-registry
  namespace: kafka
  labels:
    run: kafka-registry
spec:
  type: LoadBalancer
  selector:
    run: kafka-registry
  ports:
  - port: 8080
    targetPort: 8080
    protocol: TCP
    name: http
    nodePort: 31808
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-registry
  namespace: kafka
  labels:
    run: kafka-registry
spec:
  selector:
    matchLabels:
      run: kafka-registry
  template:
    metadata:
      labels:
        run: kafka-registry
    spec:
      containers:
      - name: kafka-registry
        image: apicurio/apicurio-registry-mem:2.4.2.Final
        imagePullPolicy: IfNotPresent
        env:
        - name : QUARKUS_PROFILE
          value: prod
        - name : KAFKA_BOOTSTRAP_SERVERS
          value: my-cluster-kafka-bootstrap:9093
        - name : APPLICATION_SERVER_HOST
          value: kafka-registry
        - name : APPLICATION_SERVER_PORT
          value: "8080"
        - name : APPLICATION_ID
          value: strimzi-apicurioregistry
        - name : REGISTRY_PROPERTIES_PREFIX
          value: REGISTRY
        - name : REGISTRY_STREAMS_TOPOLOGY_SECURITY_PROTOCOL
          value: SSL
        - name:  REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_TYPE
          value: PKCS12
        - name : REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_LOCATION
          value: /etc/ssl/keystore/keystore.p12
        - name: REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: kafka-registry
              key: user.password
        - name:  REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_TYPE
          value: PKCS12
        - name : REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_LOCATION
          value: /etc/ssl/truststore/truststore.p12
        - name: REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: my-cluster-cluster-ca-cert
              key: ca.password
        ports:
        - name: http
          containerPort: 8080
        volumeMounts:
        - name: keystore
          mountPath: "/etc/ssl/keystore"
          readOnly: true
        - name: truststore
          mountPath: "/etc/ssl/truststore"
          readOnly: true
      volumes:
      - name: keystore
        secret:
          secretName: kafka-registry
          items:
          - key: user.p12
            path: keystore.p12
      - name: truststore
        secret:
          secretName: my-cluster-cluster-ca-cert
          items:
          - key: ca.p12
            path: truststore.p12
      restartPolicy: Always


AVRO Schema

There are various tools available to help us define our AVRO schema (see link below).

Here is the schema definition for the PM data file:

PM Schema
{
   "type":"record",
   "name":"PMData",
   "namespace":"org.oran.avro.demo.pm",
    "fields":[
      {
         "name":"event",
         "type":{
            "type":"record",
            "name":"Event",
            "fields":[
               {
                  "name":"commonEventHeader",
                  "type":{
                     "type":"record",
                     "name":"CommonEventHeader",
                     "fields":[
                        {
                           "name":"domain",
                           "type":"string"
                        },
                        {
                           "name": "eventId",
                           "type":"string"
                        },                       
                        {
                           "name":"eventName",
                           "type":"string"
                        },
                        {
                           "name":"sequence",
                           "type": "int"                         
                        },                        
                        {
                           "name":"reportingEntityName",
                           "type":"string"
                        },
                        {
                           "name":"vesEventListenerVersion",
                           "type":"string"
                        },
                        {
                           "name":"version",
                           "type":"string"
                        },                                                
                        {
                           "name":"sourceName",
                           "type":"string"
                        },
                        {
                           "name":"priority",
                           "type":"string"
                        },                        
                        {
                           "name":"lastEpochMicrosec",
                           "type":"long"
                        },
                        {
                           "name":"startEpochMicrosec",
                           "type":"long"
                        },
                        {
                           "name":"timeZoneOffset",
                           "type":"string"
                        }
                     ]
                  }
               },
               {
                  "name":"perf3gppFields",
                  "type":{
                     "type":"record",
                     "name":"Perf3gppFields",
                     "fields":[
                        {
                           "name":"perf3gppFieldsVersion",
                           "type":"string"
                        },
                        {
                           "name":"measDataCollection",
                           "type":{
                              "type":"record",
                              "name":"MeasDataCollection",
                              "fields":[
                                 {
                                    "name":"granularityPeriod",
                                    "type":"int"
                                 },
                                 {
                                    "name":"measuredEntityDn",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measuredEntitySoftwareVersion",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measuredEntityUserName",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measInfoList",
                                    "type":{
                                       "type":"array",
                                       "items":{
                                          "type":"record",
                                          "name":"MeasInfoList",
                                          "fields":[
                                             {
                                                "name":"measInfoId",
                                                "type":{
                                                   "type":"record",
                                                   "name":"MeasInfoId",
                                                   "fields":[
                                                      {
                                                         "name":"sMeasInfoId",
                                                         "type":"string"
                                                      }
                                                   ]
                                                }
                                             },
                                             {
                                                "name":"measTypes",
                                                "type":{
                                                   "type":"record",
                                                   "name":"MeasTypes",
                                                   "fields":[
                                                      {
                                                         "name":"sMeasTypesList",
                                                         "type":{
                                                            "type":"array",
                                                            "items":"string"
                                                         }
                                                      }
                                                   ]
                                                }
                                             },
                                             {
                                                "name":"measValuesList",
                                                "type":{
                                                   "type":"array",
                                                   "items":{
                                                      "type":"record",
                                                      "name":"MeasValuesList",
                                                      "fields":[
                                                         {
                                                            "name":"measObjInstId",
                                                            "type":"string"
                                                         },
                                                         {
                                                            "name":"suspectFlag",
                                                            "type":"string"
                                                         },                                                         
                                                         {
                                                            "name":"measResults",
                                                            "type":{
                                                               "type":"array",
                                                               "items":{
                                                                  "type":"record",
                                                                  "name":"MeasResult",
                                                                  "fields":[
                                                                     {
                                                                        "name":"p",
                                                                        "type":"int"
                                                                     },
                                                                     {
                                                                        "name":"sValue",
                                                                        "type":"string"
                                                                     }
                                                                  ]
                                                               }
                                                            }
                                                         }
                                                      ]
                                                   }
                                                }
                                             }
                                          ]
                                       }
                                    }
                                 }
                              ]
                           }
                        }
                     ]
                  }
               }
            ]
         }
      }
   ]
}

Note: Every field in the AVRO schema is mandatory. Please remove any fields not included in the JSON file.

AVRO Code Generation

You can generate java classes from your schema using th avro-tools jar.

e.g.

Code Generation
java -jar /mnt/c/Users/ktimoney/Downloads/avro-tools-1.11.1.jar  compile schema ./src/main/resources/schemas/pm-all.avsc ./src/main/java/

This will produce the following classes in the org/oran/avro/demo/pm/ directory (same as namespace in the schema)

CommonEventHeader.java
Event.java
MeasDataCollection.java
MeasInfoId.java
MeasInfoList.java
MeasResult.java
MeasTypes.java
MeasValuesList.java
Perf3gppFields.java
PMData.java

Note: We need to add some extra annotation to some of the fields in the generated classes

  @JsonProperty("sMeasInfoId")
  private java.lang.CharSequence sMeasInfoId;

  @JsonProperty("sMeasTypesList")
  private java.util.List<java.lang.CharSequence> sMeasTypesList;

  @JsonProperty("sValue")
  private java.lang.CharSequence sValue;

It has problems using fields starting with a lowercase character followed by an uppercase character.

JAVA Producer/Consumer


AVRO Producer/Consumer
package org.oran.avro.demo;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.apicurio.registry.rest.v2.beans.IfExists;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.oran.avro.demo.pm.MeasInfoList;
import org.oran.avro.demo.pm.MeasResult;
import org.oran.avro.demo.pm.MeasTypes;
import org.oran.avro.demo.pm.MeasValuesList;
import org.oran.avro.demo.pm.PMData;
import org.oran.avro.demo.pm.PMFlat;


public class PMDataAvroExample {

    private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2";
    private static final String CCOMPAT_API_URL = "http://localhost:8080/apis/ccompat/v6";
    private static final String BOOTSTRAP_HOST = "192.168.49.2";
    private static final String BOOTSTRAP_PORT = "30053";
    private static final String SERVERS = BOOTSTRAP_HOST+":"+BOOTSTRAP_PORT;
    private static final String TOPIC_NAME = PMDataAvroExample.class.getSimpleName();
    private static final String SUBJECT_NAME = "key";
    private static final String PASSWORD = "GkELEyyxccrp";
    private static final String KEYSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-keystore.jks";
    private static final String TRUSTSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-truststore.jks";
    private static final int NUMBER_OF_MESSAGES = 10;


    public static final void main(String [] args) throws Exception {
      PMDataAvroExample app = new PMDataAvroExample();
      //String jsonFile =  "pm_report.json";
      String jsonFile =  "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
      String jsonData = app.getFileAsString(jsonFile);


        System.out.println("Starting example " + PMDataAvroExample.class.getSimpleName());
        String topicName = TOPIC_NAME;
        String subjectName = SUBJECT_NAME;
        String artifactId = topicName + "-PMData";

        String schemaData = app.getFileAsString("schemas/pm-all.avsc");

        // Create the producer.
        Producer<String, Object> producer = createKafkaProducer();
        int producedMessages = 0;
        // Produce messages
        try {
            System.out.println("Producing ("+NUMBER_OF_MESSAGES+") messages.");
            for (int idx = 0; idx < NUMBER_OF_MESSAGES; idx++) {
                // Use the schema to create a record
                GenericRecord record = parseJson(jsonData, schemaData);

                // Send/produce the message on the Kafka Producer
                ProducerRecord<String, Object> producedRecord = new ProducerRecord<>(topicName, subjectName, record);
                producer.send(producedRecord);

                Thread.sleep(100);
            }
            System.out.println(NUMBER_OF_MESSAGES + " messages sent successfully.");
        } finally {
            System.out.println("Closing the producer.");
            producer.flush();
            producer.close();
        }

        // Create the consumer
        System.out.println("Creating the consumer.");
        KafkaConsumer<Long, GenericRecord> consumer = createKafkaConsumer();

        // Subscribe to the topic
        System.out.println("Subscribing to topic " + topicName);
        consumer.subscribe(Collections.singletonList(topicName));

        // Consume messages.
        try {
            int messageCount = 0;
            System.out.println("Consuming ("+NUMBER_OF_MESSAGES+") messages.");
            while (messageCount < NUMBER_OF_MESSAGES) {
                final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofSeconds(1));
                messageCount += records.count();
                if (records.count() == 0) {
                    // Wait for messages to become available.
                    System.out.println("Waiting for messages...");
                } else records.forEach(record -> {
                    GenericRecord recordValue = record.value();
                    System.out.println("Consumed a message: ");
                    ObjectMapper mapper = new ObjectMapper();
                    try {
                      PMData pms =  mapper.readValue(String.valueOf(record.value()), PMData.class);
                      List<PMFlat> flatList = transformPMS(pms);
                      flatList.forEach(System.out::println);
                    } catch (Exception e) {
                      e.printStackTrace();
                    }

                });
            }
        } finally {
            System.out.println("Closing the consumer.");
            consumer.close();
        }

        System.out.println("Done (success).");
    }

    /**
     * Creates the Kafka producer.
     */
    private static Producer<String, Object> createKafkaProducer() {
        Properties props = new Properties();

        // Configure kafka settings
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
        props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
        // Get an existing schema - do not auto-register the schema if not found.
        props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
        props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT_IF_EXISTS, IfExists.RETURN.name());

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        RestService restService = new RestService(CCOMPAT_API_URL);
        final Map<String, String> restServiceProperties = new HashMap<>();
        CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, restServiceProperties);

        Map<String, String> properties = new HashMap<>();

        // Configure Service Registry location (Confluent API)
        properties.put("schema.registry.url", CCOMPAT_API_URL);
        properties.put("auto.register.schemas", "true");
        // Map the topic name to the artifactId in the registry
        properties.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");

        // Use the Confluent provided Kafka Serializer for Avro
        KafkaAvroSerializer valueSerializer = new KafkaAvroSerializer(schemaRegistryClient, properties);
        StringSerializer keySerializer = new StringSerializer();

        // Create the Kafka producer
        Producer<String, Object> producer = new KafkaProducer<String, Object>(props, keySerializer, valueSerializer);

        return producer;
    }

    /**
     * Creates the Kafka consumer.
     */
    private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
        Properties props = new Properties();

        // Configure Kafka
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
        props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Use the Apicurio Registry provided Kafka Deserializer for Avro
        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
        // Enable "Confluent Compatible API" mode in the Apicurio Registry deserializer
        props.putIfAbsent(SerdeConfig.ENABLE_CONFLUENT_ID_HANDLER, Boolean.TRUE);

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        // Create the Kafka Consumer
        KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
        return consumer;
    }


    private static void configureSecurityIfPresent(Properties props) {
      props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE);
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);

      // configure the following three settings for SSL Authentication
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE);
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD);
      props.putIfAbsent(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD);
    }


    private static GenericData.Record parseJson(String json, String schema) throws IOException {
      Schema parsedSchema = new Schema.Parser().parse(schema);
      Decoder decoder = DecoderFactory.get().jsonDecoder(parsedSchema, json);

      DatumReader<GenericData.Record> reader =
          new GenericDatumReader<>(parsedSchema);
      return reader.read(null, decoder);
    }

    public String getFileAsString(String fileName)throws Exception
    {
      ClassLoader classLoader = getClass().getClassLoader();
      InputStream inputStream = classLoader.getResourceAsStream(fileName);

      // the stream holding the file content
      if (inputStream == null) {
          throw new IllegalArgumentException("file not found! " + fileName);
      } else {
          String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
          return result;
      }
    }


    public static List<PMFlat> transformPMS(PMData pms){
      List<PMFlat> flatList = new ArrayList<>();
      // Get the constant values
      String domain = pms.getEvent().getCommonEventHeader().getDomain().toString();
      String eventName = pms.getEvent().getCommonEventHeader().getEventName().toString();
      String sourceName = pms.getEvent().getCommonEventHeader().getSourceName().toString();
      long startEpochMicrosec = pms.getEvent().getCommonEventHeader().getStartEpochMicrosec();
      long lastEpochMicrosec = pms.getEvent().getCommonEventHeader().getLastEpochMicrosec();
      String timeZoneOffset = pms.getEvent().getCommonEventHeader().getTimeZoneOffset().toString();
      int granularityPeriod = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getGranularityPeriod();
      String measuredEntityDn = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityDn().toString();
      String measuredEntityUserName = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityUserName().toString();

      List<MeasInfoList> measInfoLists = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasInfoList();
      // Loop through the measurements
      for(MeasInfoList measInfoList: measInfoLists) {
        String sMeasInfoId = measInfoList.getMeasInfoId().getSMeasInfoId().toString();
        MeasTypes measTypes = measInfoList.getMeasTypes();
        List<CharSequence> sMeasTypesList = measTypes.getSMeasTypesList();
        List<MeasValuesList> measValuesLists = measInfoList.getMeasValuesList();
        for(MeasValuesList measValuesList: measValuesLists) {
          String measObjInstId = measValuesList.getMeasObjInstId().toString();
          String suspectFlag = measValuesList.getSuspectFlag().toString();
          List<MeasResult> measResultList = measValuesList.getMeasResults();
          for(MeasResult measResult: measResultList) {
            // Create new PMSFlat object
            PMFlat flat = new PMFlat();
            flat.setDomain(domain);
            flat.setEventName(eventName);
            flat.setSourceName(sourceName);
            flat.setStartEpochMicrosec(startEpochMicrosec);
            flat.setLastEpochMicrosec(lastEpochMicrosec);
            flat.setTimeZoneOffset(timeZoneOffset);
            flat.setGranularityPeriod(granularityPeriod);
            flat.setMeasuredEntityDn(measuredEntityDn);
            flat.setMeasuredEntityUserName(measuredEntityUserName);
            flat.setSMeasInfoId(sMeasInfoId);
            flat.setMeasObjInstId(measObjInstId);
            flat.setSuspectFlag(suspectFlag);

            String sMeasType = sMeasTypesList.get(measResult.getP()-1).toString();
            flat.setSMeasType(sMeasType);
            String sValue = measResult.getSValue().toString();
            flat.setSValue(sValue);
            // add the object to the list
            flatList.add(flat);
          }
        }
      }
      return flatList;
    }

}

Note: Extra configuration is required to make the the message(s) compatible with the confluent format (CCOMPAT_API_URL).

We unmarshall the message to the generated code classes and then map to the PMFlat class.

package org.oran.avro.demo.pm;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class PMFlat {
  private String domain;
  private String eventName;
  private String sourceName;
  private long startEpochMicrosec;
  private long lastEpochMicrosec;
  private String timeZoneOffset;
  private int granularityPeriod;
  private String measuredEntityDn;
  private String measuredEntityUserName;
  private String sMeasInfoId;
  private String measObjInstId;
  private String suspectFlag;
  private String sMeasType;
  private String sValue;
}

Links

Apache Avro - a data serialization system

Apicurio Registry

AVRO SCHEMA TOOLS

Example applications using the Apicurio Registry

AVRO Getting Started (Java)