Sample Java Consumer

You need to first add the following dependency to the POM file:
<dependency>
  <groupId>commons-logging</groupId>
  <artifactId>commons-logging</artifactId>
  <version>1.1.1</version>
</dependency>        
/* This code is successfully tested for common-logging version 1.11 and 1.2.   */ 

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.util.*;

public class SampleConsumer {
    // Set the stream and topic to read from
    public static String topic = "/demos/hl7demo/hl7stream:allMessages";

    // Declare a new consumer.
    public static KafkaConsumer consumer;

    public static void main(String[] args) throws IOException {
        configureConsumer(args);

        // Subscribe to the topic.
        List < String > topics = new ArrayList < > ();
        topics.add(topic);
        consumer.subscribe(topics);

        // Set the timeout interval for requests for unread messages.
        long pollTimeout = 1000;

        while (true) {
            ConsumerRecords < String, String > records = consumer.poll(pollTimeout);
            for (ConsumerRecord < String, String > record: records)

            {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

    /* Set the value for a configuration parameter. This configuration parameter specifies which class to use to deserialize the value of each message.*/
    public static void configureConsumer(String[] args)

    {
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer < String, String > (props);
    }
}