Kafka Schema Registry Demo

Implements a Kafka Schema Registry demo example that stores and retrieves Avro schemas.

Maven Dependencies

Add the following repositories to the POM file to resolve Confluent and MapR dependencies:

<repositories>
   <repository>
       <id>confluent</id>
       <url>http://packages.confluent.io/maven/</url>
   </repository>
   <repository>
       <id>mapr-maven</id>
       <url>http://repository.mapr.com/maven/</url>
       <releases><enabled>true</enabled></releases>
       <snapshots><enabled>true</enabled></snapshots>
   </repository>
</repositories>

The following dependencies are needed for MapR Kafka:

<dependency>
     <groupId>com.mapr.streams</groupId>
     <artifactId>mapr-streams</artifactId>
     <version>6.1.0-mapr</version>
</dependency>
<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>1.1.1-mapr-1808</version>
</dependency>
<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-streams</artifactId>
     <version>1.1.1-mapr-1808</version>
</dependency>

The following dependencies are needed for Avro:

<dependency>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro</artifactId>
   <version>1.8.2</version>
</dependency>
<dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-avro-serializer</artifactId>
   <version>4.1.1</version>
</dependency>

Creating a Java Class corresponding to the Avro Schema

Add the following plugins to the pom.xml file:

  • Plugin to build code:
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.8.2</version>
      <executions>
        <execution>
           <phase>generate-sources</phase>
           <goals>
               <goal>schema</goal>
               <goal>protocol</goal>
               <goal>idl-protocol</goal>
           </goals>
           <configuration>
              <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
              <stringType>String</stringType>
              <createSetters>false</createSetters>
              <enableDecimalLogicalType>true</enableDecimalLogicalType>
              <fieldVisibility>private</fieldVisibility>
           </configuration>
         </execution>
       </executions>
    </plugin>
  • Plugin to force the discovery of the generated classes:
    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>build-helper-maven-plugin</artifactId>
        <version>3.0.0</version>
        <executions>
           <execution>
             <id>add-source</id>
             <phase>generate-sources</phase>
             <goals>
                <goal>add-source</goal>
             </goals>
             <configuration>
               <sources>
                <source>target/generated-sources/avro</source>
               </sources>
             </configuration>
           </execution>
         </executions>
    </plugin>
  • Create a file with filename extension .avsc in the scr/main/resources directory.
    An example of an Avro schema is as follows:
    {
      "namespace": "com.example",
      "type": "record",
      "name": "Employee",
      "doc" : "Represents an Employee at a company",
      "fields": [
          {"name": "firstName", "type": "string", "doc": "The persons given name"},
          {"name": "lastName", "type": "string"},
          {"name": "age",  "type": "int", "default": -1},
          {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
          {"name": "phoneNumber",  "type": "string"}
       ]
    }
    The Employee.class Java class is auto-generated in the target/classes/com/example directory after executing the following commands:
    $ mvn clean
    $ mvn package            

You can use this class in your program after performing these steps.

Creating an Avro Producer

  1. Import the following properties for the Kafka Producer:
    import com.example.Employee;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
  2. Configure the following properties for MapR Event Store for Apache Kafka:
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            IntegerSerializer.class.getName());
    
    // Configure the KafkaAvroSerializer.
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            KafkaAvroSerializer.class.getName());
    
    //Schema registry location.
    properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaProducer<Integer, Employee> producer = 
            new KafkaProducer<>(properties);
  3. The following code sends n different objects of class Employee.java to the topic avro_example in the /sample-stream stream:
    String topic = "/sample-stream:avro_example";
    Employee employee;
    
    for (int i = 0; i < n; i++) {
    
        List<String> emails = new ArrayList<>();
        for (int j = 0; j < i; j++) {
            emails.add("john" + j + ".doe" + i + "@mail.com");
        }
    
        employee = Employee.newBuilder()
                .setFirstName("John" + i)
                .setLastName("Doe")
                .setAge(i + 5)
                .setEmails(emails)
                .setPhoneNumber("+1-202-555-" + i + i + i + i)
                .build();
    
        ProducerRecord<Integer, Employee> record = 
            new ProducerRecord(topic, i, employee);
    
        producer.send(record, (recordMetadata, e) -> {
            if (e == null) {
                System.out.println("Success!" );
                System.out.println(recordMetadata.toString());
            } else {
                e.printStackTrace();
            }
        });
    }
    
    producer.flush();
    producer.close();

Creating an Avro Consumer

  1. Import the following properties for the Kafka Consumer:
    import com.example.Employee;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    
    import java.util.Collections;
    import java.util.Properties;
  2. The properties to configure are similar to the Kafka producer, only Deserializers must be used instead of Serializers. Add one more property called KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG:
    Properties properties = new Properties();
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            IntegerDeserializer.class.getName());
    
    //Use Kafka Avro Deserializer.
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            KafkaAvroDeserializer.class.getName());
    
    //Use Specific Record or else you get Avro GenericRecord.
    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    
    //Schema registry location.
    properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaConsumer<Integer, Employee> consumer = 
            new KafkaConsumer<>(properties);
  3. The following code reads objects of the Employee.java class from the avro_example topic in the /sample-stream stream:
    String topic = "/sample-stream:avro_example";
    consumer.subscribe(Collections.singletonList(topic));
    
    try {
        while (true) {
            ConsumerRecords<Integer, Employee> records = 
                    consumer.poll(100);
            
            records.forEach(record -> {
    
                Employee employeeRecord = record.value();
    
                System.out.printf("%s %d %d %s \n", record.topic(),
                        record.partition(), record.offset(), employeeRecord);
            });
        }
    } finally {
        consumer.close();
    }