Code for the Sample Java Consumer

Use this sample producer when you follow the steps in "Getting Started with MapR Streams".

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class SampleConsumer {
    // Set the stream and topic to read from.
    public static String topic = "/<streamname>:<topicname>";

    // Declare a new consumer.
    public static KafkaConsumer consumer;

    public static void main(String[] args) throws IOException {

        // Subscribe to the topic.
        List<String> topics = new ArrayList<>();

        // Set the timeout interval for requests for unread messages.
        long pollTimeOut = 1000;

        boolean stop = false;
        int pollTimeout = 1000;
        while (!stop) {
            // Request unread messages from the topic.
            ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            if (iterator.hasNext()) {
                while (iterator.hasNext()) {
                    ConsumerRecord<String, String> record =;
                    // Iterate through returned records, extract the value
                    // of each message, and print the value to standard output.
                    System.out.println((" Consumed Record: " + record.toString()));
            } else {
                stop = true;
        System.out.println("All done.");

    /* Set the value for a configuration parameter.
       This configuration parameter specifies which class
       to use to deserialize the value of each message.*/
    public static void configureConsumer(String[] args) {
        Properties props = new Properties();

        consumer = new KafkaConsumer<String, String>(props);