Accessing MapR Event Store For Apache Kafka in Zeppelin Using the Livy Interpreter

This section contains a MapR Event Store For Apache Kafka streaming example that you can run in your Apache Zeppelin notebook using the Livy interpreter.

Note: See MapR Data Science Refinery Support by MapR Core Version for limitations in version support when accessing MapR Event Store.
The example references a stream named test_stream created in the path /streaming_test/test_stream. The stream contains a topic called test_topic. You can use the following commands to create this stream and topic, but you cannot run them in Zeppelin; they must be run in a MapR cluster.
hadoop fs -mkdir /streaming_test
hadoop fs -chown <user>:<group> /streaming_test
sudo su - <user>
maprcli stream create -path /streaming_test/test_stream
maprcli stream topic create -path /streaming_test/test_stream -topic test_topic

When the stream and topic are available, perform the following actions in your notebook:

  1. Configure the Livy interpreter. Make sure to follow the steps described in the Spark Jobs section to allow Spark jobs to run in parallel.
  2. Create a streaming consumer in your notebook using the %livy.spark interpreter:
    import org.apache.kafka.clients.consumer.ConsumerConfig
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka09.{ConsumerStrategies, KafkaUtils, LocationStrategies}    
    
    val ssc = new StreamingContext(sc, Seconds(1))
    
    val topicsSet = Set("/streaming_test/test_stream:test_topic")
    val kafkaParams = Map[String, String](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "none",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
        "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
        "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false" 
    )
    
    val consumerStrategy =
          ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
    val messages = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          consumerStrategy)
    
    val lines = messages.map(_.value())
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    
    ssc.start()
    ssc.awaitTerminationOrTimeout(3 * 60 * 1000)
  3. Create a streaming producer in another notebook, also with the %livy.spark interpreter:
    import java.util.Properties
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("acks", "all")
    props.put("retries", "0")
    props.put("batch.size", "16384")
    props.put("linger.ms", "1")
    props.put("buffer.memory", "33554432")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
    val producer = new KafkaProducer[String, String](props)
    
    for (i <- 1 to 1000) {
        val message = new ProducerRecord[String, String]("/streaming_test/test_stream:test_topic", i.toString(), i.toString())
        producer.send(message)
    }
  4. Start running the consumer notebook from Step 2.

    Wait until the Livy session in YARN for this consumer is initialized and running. You can determine this by locating the session in the YARN resource manager UI:

    The URL for the YARN resource manager is one of the following:

    • Secure cluster:
      https://<resource-manager-host>:8090
    • Non-secure cluster:
      http://<resource-manager-host>:8088
  5. Run the producer notebook from Step 3 several times.

    The consumer notebook has a three-minute timeout that was set by the following line:

    ssc.awaitTerminationOrTimeout(3 * 60 * 1000)

    You will see output in the consumer notebook after the timeout has expired.