Release H: Protobuf
Introduction
Protobuf is data format used to serialize structured data.
Similar to AVRO it can be used to serialize JSON data and store it in binary format.
This help with transfer speed and data storage.
Protobuf Schema
We can use a protobuf schema with Apicurio registry that same way as we do with AVRO.
syntax = "proto3"; option java_package = "org.oran.protobuf.demo.pm"; option java_outer_classname = "PMProtos"; message PMDataType { Event event = 1; } message Event { CommonEventHeader commonEventHeader = 1; Perf3gppFields perf3gppFields = 2; } message CommonEventHeader { string domain = 1; string eventName = 2; string sourceName = 3; string reportingEntityName = 4; int64 startEpochMicrosec = 5; int64 lastEpochMicrosec = 6; string timeZoneOffset = 7; string eventId = 8; int32 sequence = 9; string priority = 10; string version = 11; string vesEventListenerVersion = 12; } message Perf3gppFields { string perf3gppFieldsVersion = 1; MeasDataCollection measDataCollection = 2; } message MeasDataCollection { int32 granularityPeriod = 1; string measuredEntityUserName = 2; string measuredEntityDn = 3; string measuredEntitySoftwareVersion = 4; repeated MeasInfoList measInfoList = 5; } message MeasInfoList { MeasInfoId measInfoId = 1; MeasTypes measTypes = 2; repeated MeasValuesList measValuesList = 3; } message MeasInfoId { string sMeasInfoId = 1; } message MeasTypes { repeated string sMeasTypesList = 1; } message MeasValuesList { string measObjInstId = 1; string suspectFlag = 2; repeated MeasResults measResults = 3; } message MeasResults { int32 p = 1; string sValue = 2; }
Protobuf Maven Plugin
We can use the protoc-jar-maven-plugin plugin to automatically generated the corresponding java class for our .proto file.
<plugin> <groupId>com.github.os72</groupId> <artifactId>protoc-jar-maven-plugin</artifactId> <version>3.11.4</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>run</goal> </goals> <configuration> <inputDirectories> <include>${project.basedir}/src/main/proto</include> </inputDirectories> <includeMavenTypes>direct</includeMavenTypes> <outputTargets> <outputTarget> <type>java</type> <addSources>main</addSources> <outputDirectory> ${project.basedir}/target/generated-sources/protobuf </outputDirectory> </outputTarget> </outputTargets> </configuration> </execution> </executions> </plugin>
The auto generated java class will be located in "${project.basedir}/target/generated-sources/protobuf" + "org.oran.protobuf.demo.pm" (option java_package = "org.oran.protobuf.demo.pm" from the proto file)
Java
The following class shows how to covert a JSON file to the protobuf format, send it to Kafka, read it back and unmarshall it with the auto generated proto class.
The nested records are then transformed to a flat data structure.
package org.oran.protobuf.demo; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.Struct; import com.google.protobuf.Struct.Builder; import com.google.protobuf.util.JsonFormat; import org.oran.protobuf.demo.pm.PMProtos.*; import io.apicurio.registry.serde.SerdeConfig; import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.oran.avro.demo.pm.PMFlat; public class PMDataProtobufExample { private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2"; private static final String BOOTSTRAP_HOST = "192.168.49.2"; private static final String BOOTSTRAP_PORT = "31809"; private static final String SERVERS = BOOTSTRAP_HOST+":"+BOOTSTRAP_PORT; private static final String PASSWORD = "Yx79VGQAWIMu"; private static final String KEYSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-keystore.jks"; private static final String TRUSTSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-truststore.jks"; private static final int NUMBER_OF_MESSAGES = 10; private static final String TOPIC_NAME = PMDataProtobufExample.class.getSimpleName(); private static final String SCHEMA_NAME = "PMData"; public static final void main(String [] args) throws Exception { PMDataProtobufExample app = new PMDataProtobufExample(); String jsonFile = "pm_report.json"; //String jsonFile = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json"; String jsonData = app.getFileAsString(jsonFile); PMDataType pm = fromJsonToClass(jsonData); System.out.println("Starting example " + PMDataProtobufExample.class.getSimpleName()); String topicName = TOPIC_NAME; String key = SCHEMA_NAME; // Create the producer. Producer<Object, PMDataType> producer = createKafkaProducer(); // Produce 2 messages. try { System.out.println("Producing (1) messages."); // Send/produce the message on the Kafka Producer ProducerRecord<Object, PMDataType> producedRecord = new ProducerRecord<>(topicName, key, pm); producer.send(producedRecord); System.out.println("Messages successfully produced."); } finally { System.out.println("Closing the producer."); producer.flush(); producer.close(); } // Create the consumer System.out.println("Creating the consumer."); KafkaConsumer<Long, PMDataType> consumer = createKafkaConsumer(); // Subscribe to the topic System.out.println("Subscribing to topic " + topicName); consumer.subscribe(Collections.singletonList(topicName)); // Consume the message. try { int messageCount = 0; System.out.println("Consuming (1) messages."); while (messageCount < 1) { final ConsumerRecords<Long, PMDataType> records = consumer.poll(Duration.ofSeconds(1)); messageCount += records.count(); if (records.count() == 0) { // Do nothing - no messages waiting. System.out.println("No messages waiting..."); } else records.forEach(record -> { PMDataType value = record.value(); System.out.println("Consumed a message: " ); // unmarshall the message List<PMFlat> flatList = transformPMS(value); flatList.forEach(System.out::println); }); } } finally { consumer.close(); } System.out.println("Done (success)."); } /** * Creates the Kafka producer. */ private static Producer<Object, PMDataType> createKafkaProducer() { Properties props = new Properties(); // Configure kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Use the Apicurio Registry provided Kafka Serializer for Protobuf props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufKafkaSerializer.class.getName()); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); props.putIfAbsent(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "default"); // Register the artifact if not found in the registry. props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); //Just if security values are present, then we configure them. configureSecurityIfPresent(props); // Create the Kafka producer Producer<Object, PMDataType> producer = new KafkaProducer<>(props); return producer; } /** * Creates the Kafka consumer. */ private static KafkaConsumer<Long, PMDataType> createKafkaConsumer() { Properties props = new Properties(); // Configure Kafka props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Use the Apicurio Registry provided Kafka Deserializer for Protobuf props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufKafkaDeserializer.class.getName()); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); //Just if security values are present, then we configure them. configureSecurityIfPresent(props); // Create the Kafka Consumer KafkaConsumer<Long, PMDataType> consumer = new KafkaConsumer<>(props); return consumer; } private static void configureSecurityIfPresent(Properties props) { props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE); props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD); // configure the following three settings for SSL Authentication props.putIfAbsent(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE); props.putIfAbsent(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD); props.putIfAbsent(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD); } public static Message fromJson(String json) throws IOException { Builder structBuilder = Struct.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder); return structBuilder.build(); } @SuppressWarnings({"unchecked", "rawtypes"}) public static PMDataType fromJsonToClass(String json) { PMDataType.Builder pmBuilder = PMDataType.newBuilder(); try { JsonFormat.parser().merge(json, pmBuilder); } catch(InvalidProtocolBufferException e) { e.printStackTrace(); } PMDataType value = pmBuilder.build(); return value; } public String getFileAsString(String fileName)throws Exception { ClassLoader classLoader = getClass().getClassLoader(); InputStream inputStream = classLoader.getResourceAsStream(fileName); // the stream holding the file content if (inputStream == null) { throw new IllegalArgumentException("file not found! " + fileName); } else { String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8); return result; } } public static List<PMFlat> transformPMS(PMDataType pms){ List<PMFlat> flatList = new ArrayList<>(); Event event = pms.getEvent(); CommonEventHeader commonEventHeader = event.getCommonEventHeader(); String domain = commonEventHeader.getDomain(); String eventName = commonEventHeader.getEventName(); String sourceName = commonEventHeader.getSourceName(); String reportingEntityName = commonEventHeader.getReportingEntityName(); long startEpochMicrosec = commonEventHeader.getStartEpochMicrosec(); long lastEpochMicrosec = commonEventHeader.getLastEpochMicrosec(); String timeZoneOffset = commonEventHeader.getTimeZoneOffset(); Perf3gppFields perf3gppFields = event.getPerf3GppFields(); String perf3gppFieldsVersion = perf3gppFields.getPerf3GppFieldsVersion(); MeasDataCollection measDataCollection = perf3gppFields.getMeasDataCollection(); int granularityPeriod = measDataCollection.getGranularityPeriod(); String measuredEntityUserName = measDataCollection.getMeasuredEntityUserName(); String measuredEntityDn = measDataCollection.getMeasuredEntityDn(); String measuredEntitySoftwareVersion = measDataCollection.getMeasuredEntitySoftwareVersion(); List<MeasInfoList> measInfoLists = measDataCollection.getMeasInfoListList(); for(MeasInfoList measInfoList:measInfoLists) { MeasInfoId measInfoId = measInfoList.getMeasInfoId(); String sMeasInfoId = measInfoId.getSMeasInfoId(); MeasTypes measTypes = measInfoList.getMeasTypes(); List<String> sMeasTypesList = measTypes.getSMeasTypesListList(); List<MeasValuesList> measValuesLists = measInfoList.getMeasValuesListList(); for(MeasValuesList measValuesList:measValuesLists) { String measObjInstId = measValuesList.getMeasObjInstId(); String suspectFlag = measValuesList.getSuspectFlag(); List<MeasResults> measResults = measValuesList.getMeasResultsList(); for(MeasResults measResult : measResults) { // Create new PMSFlat object PMFlat flat = new PMFlat(); flat.setDomain(domain); flat.setEventName(eventName); flat.setSourceName(sourceName); flat.setStartEpochMicrosec(startEpochMicrosec); flat.setLastEpochMicrosec(lastEpochMicrosec); flat.setTimeZoneOffset(timeZoneOffset); flat.setGranularityPeriod(granularityPeriod); flat.setMeasuredEntityDn(measuredEntityDn); flat.setMeasuredEntityUserName(measuredEntityUserName); flat.setSMeasInfoId(sMeasInfoId); flat.setMeasObjInstId(measObjInstId); flat.setSuspectFlag(suspectFlag); String sMeasType = sMeasTypesList.get(measResult.getP()-1); flat.setSMeasType(sMeasType); String sValue = measResult.getSValue(); flat.setSValue(sValue); // add the object to the list flatList.add(flat); } } } return flatList; } }