Kafka Streams Demo

Provides a Kafka Streams demo example that creates a stream and topics and runs the WordCountDemo class code. The sample code produces and consumes messages.

  1. Create the a stream named /sample-stream:
    maprcli stream create -path /sample-stream -produceperm p -consumeperm p -topicperm p
  2. Create word-count-input and word-count-output topics:
    maprcli stream topic create -path /sample-stream -topic word-count-input
    maprcli stream topic create -path /sample-stream -topic word-count-output
  3. Build the word count application and copy its JAR file to your cluster.
  4. Run the WordCountDemo class.
    java -cp "$(mapr clientclasspath):<Word Count Application Name>.jar" WordCountDemo
  5. Run the console producer:
    /opt/mapr/kafka/kafka-<version>/bin/kafka-console-producer.sh 
    --broker-list fake.server.id:9092 
    --topic /sample-stream:word-count-input
  6. Run the console consumer:
    /opt/mapr/kafka/kafka-<version>/bin/kafka-console-consumer.sh 
    --bootstrap-server fake.server.id:9092 
    --topic /sample-stream:word-count-output 
    --property print.key=true
  7. Produce some input with the console producer:
    >word27 word28 word27 word29
  8. Get the following output:
    word28 1
    word27 2
    Word29 1

WordCountDemo Class Code

import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCountDemo {

    public static final String INPUT_TOPIC = "/sample-stream:word-count-input";
    public static final String OUTPUT_TOPIC = "word-count-output"; // Default stream will be used

    public static final String DEFAULT_STREAM = "/sample-stream";

    public static final String APP_ID = "app-id";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);


        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500); // Put attention to this property
        props.put(StreamsConfig.STREAMS_DEFAULT_STREAM_CONFIG, DEFAULT_STREAM);

        final StreamsBuilder builder = new StreamsBuilder();


        KStream<String, String> wordCountStream = builder.<String, String>stream(INPUT_TOPIC)
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .mapValues(x -> x.toString())

                .toStream();

        wordCountStream.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }
}
Note: The kafka-console-producer.sh and kafka-console-consumer.sh scripts are part of the mapr-kafka package.