Sample Java Consumer

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.util.*; 

public class SampleConsumer {
    // Set the stream and topic to read from.
    public static String topic = "/<streamname>:<topicname>";

    // Declare a new consumer.
    public static KafkaConsumer consumer;

    public static void main(String[] args) throws IOException {
        configureConsumer(args);

        // Subscribe to the topic.
        List<String> topics = new ArrayList<>();
        topics.add(topic);
        consumer.subscribe(topics);

        // Set the timeout interval for requests for unread messages.
        long pollTimeout = 1000;

        boolean stop = false;
        while (true){ 
            ConsumerRecords<String, String> records = consumer.poll(pollTimeout); 
            for (ConsumerRecord<String, String> record : records) 
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
                }
        }
        consumer.close();
        System.out.println("All done.");
    }

    /* Set the value for a configuration parameter.
       This configuration parameter specifies which class
       to use to deserialize the value of each message.*/
    public static void configureConsumer(String[] args) {
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");


        consumer = new KafkaConsumer<String, String>(props);
    }
}