Consumer Application for CDC JSON Data

This example consumes changed data records from MapR Database JSON tables.

Example of Consuming JSON Changed Data Records

In this example, the following occurs:
  • Initialize the consumer properties using Apache Kafka and MapR configuration parameters.
  • Display the change data record properties.
  • Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
  • Retrieve the properties of individual change node (for example: data type, field name, field value, and so on) by using various methods of the ChangeDataReader interface.
  • Display the change data record values by using the ChangeNode interface.
  • Subscribe to the stream topic, consume the events, and determine record type.
For changed data records from MapR Database JSON table data, the following are unique:
  • There are multiple property values that can be retrieved through the ChangeDataReader interface. For example, getDouble or getFloat.
  • There are multiple values for single fields in documents that can be retrieved through ChangeNode interface. See the code line: Value value = changeNode.getValue();
package example.cdps;

import com.mapr.db.MapRDB;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import java.util.*;

public class CDPConsumer {

     * Initialize Basic Consumer Properties
     * @return
    public Properties getBasicListnerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // Use MapR CDP Specific Deserializer to parse the change contents
        props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
        props.put("fetch.min.bytes", "10");
        props.put("", "5000");
        props.put("auto.offset.reset", "earliest");
        props.put("", "false");
        return props;

     * Display Utility
     * @param consumerRecordkey
     * @param id
     * @param changeDataRecordType
     * @param recordOpTime
     * @param recordServerOpTime
     * @param field
     * @param op
     * @param changeNodeOpTime
     * @param changeNodeServerOpTime
     * @param valueType
     * @param value
    public void display(String consumerRecordkey,
                        Value id,
                        ChangeDataRecordType changeDataRecordType,
                        Long recordOpTime,
                        Long recordServerOpTime,
                        String field,
                        ChangeOp op,
                        Long changeNodeOpTime,
                        Long changeNodeServerOpTime,
                        Value.Type valueType,
                        Value value) {

        Document document = MapRDB.newDocument();
        document.set("consumerRecordkey",  consumerRecordkey);

        if(id != null)
            document.set("id", id);

        if(changeDataRecordType != null)

        document.set("recordOpTime", recordOpTime);
        document.set("recordServerOpTime", recordServerOpTime);

        if(field != null)
            document.set("field", field);


        document.set("changeNodeOpTime", changeNodeOpTime);
        document.set("changeNodeServerOpTime", changeNodeServerOpTime);

        if(valueType != null)

        if(value != null)
            document.set("value", value);

        System.out.println("\t\n********* Propagated Change **************************\t\n");
        System.out.println("\t\n" + document.asJsonString() + "\t\n");

     * Parse change node contents via iterator
     * @param consumerRecordkey
     * @param changeDataRecord
    public void iteratorDisplay(Value id,
                                ChangeDataRecordType changeDataRecordType,
                                Long recordOpTime,
                                Long recordServerOpTime,
                                String consumerRecordkey,
                                ChangeDataRecord changeDataRecord) {

        for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {

            // field if operation was done one a field
            String field = fieldChangePair.getKey().asJsonString();

            // Actual change node object, which holds change values
            ChangeNode changeNode = fieldChangePair.getValue();

            // Change Op, based on op done can be NULL, SET, MERGE, DELETE, DELETE_EXACT
            ChangeOp op = changeNode.getOp();

            // change node op time
            Long changeNodeOpTime = changeNode.getOpTimestamp();
            Long changeNodeServerOpTime  = changeNode.getServerTimestamp();

            // the value type if it was non delete operation, such as insert replace etc
            Value.Type valueType = changeNode.getType();

            // value of the operation such as insert value or replace
            Value value = changeNode.getValue();

            // display the change contents
            display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
                    field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);

     * Get Parsed Value
     * @param changeDataReader
     * @param field
     * @param valueType
     * @return
    public Value getValue(ChangeDataReader changeDataReader, String field, Value.Type valueType) {
        Document valDoc = MapRDB.newDocument();

        if(field == null) {
            field = "null";

        switch (valueType) {
            case NULL:
            case BOOLEAN:
                valDoc.set(field, changeDataReader.getBoolean());
            case STRING:
                valDoc.set(field, changeDataReader.getString());
            case SHORT:
                valDoc.set(field, changeDataReader.getShort());
            case BYTE:
                valDoc.set(field, changeDataReader.getByte());
            case INT:
                valDoc.set(field, changeDataReader.getInt());
            case LONG:
                valDoc.set(field, changeDataReader.getLong());
            case FLOAT:
                valDoc.set(field, changeDataReader.getFloat());
            case DOUBLE:
                valDoc.set(field, changeDataReader.getDouble());
            case DECIMAL:
                valDoc.set(field, changeDataReader.getDecimal());
            case DATE:
                valDoc.set(field, changeDataReader.getDate());
            case TIME:
                valDoc.set(field, changeDataReader.getTime());
            case TIMESTAMP:
                valDoc.set(field, changeDataReader.getTimestamp());
            case INTERVAL:
                valDoc.set(field, changeDataReader.getInterval());
            case BINARY:
                valDoc.set(field, changeDataReader.getBinary());
        return valDoc.getValue(field);
     * Parse change node contents via reader
     * @param consumerRecordkey
     * @param changeDataRecord
    public void readerDisplay(Value id,
                              ChangeDataRecordType changeDataRecordType,
                              Long recordOpTime,
                              Long recordServerOpTime,
                              String consumerRecordkey,
                              ChangeDataRecord changeDataRecord) {
        ChangeEvent changeEvent;
        // get reader from the event
        ChangeDataReader changeDataReader = changeDataRecord.getReader();

        while ((changeEvent = != null) {
            // parse through change events
            switch (changeEvent) {
                case NODE:
                    System.out.println("node event get the value type");
                    Value.Type valueType = changeDataReader.getType();
                    String field = changeDataReader.getFieldName();
                    Long serverTimestamp = changeDataReader.getServerTimestamp();
                    Long opTimestamp = changeDataReader.getOpTimestamp();
                    ChangeOp op = changeDataReader.getOp();
                    Value value = getValue(changeDataReader, field, valueType);

                    display(consumerRecordkey, id, changeDataRecordType,
                        recordOpTime, recordServerOpTime, field, op, opTimestamp,
                            serverTimestamp, valueType, value);

     * Consume from changelog topics
     * @param pollTimeout
     * @param topics
    public void consume(long pollTimeout, String topics, boolean method) {
        // initialize consumer
        KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>

        // subscribe to /stream:topic
        List<String> topicList = new ArrayList<String>();

        // Get consumer records
        ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);

        // iterate over consumer records
        for(ConsumerRecord<String, ChangeDataRecord> consumerRecord: consumerRecords) {

            String consumerRecordkey = consumerRecord.key().trim();
            ChangeDataRecord changeDataRecord = consumerRecord.value();

            // record key for the change
            Value id = changeDataRecord.getId();

            // record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
            ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();

            // record level op-time & server op-time
            Long recordOpTime = changeDataRecord.getOpTimestamp();
            Long recordServerOpTime = changeDataRecord.getServerTimestamp();

            if(method) {
                // Method 1 - via iterator interface
                iteratorDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            } else {
                // Method 2 - via reader interface
                readerDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);

     * Driver
     * @param args
    public static void main(String[] args) {
        Long pollTimeout = Long.parseLong(args[0]);
        String topic = args[1];
        boolean method = Boolean.parseBoolean(args[2]);
        CDPConsumer cdpConsumer = new CDPConsumer();
        cdpConsumer.consume(pollTimeout, topic, method);