Configure Spark 2.2.1 and later to Consume MapR Event Store For Apache Kafka Messages

Using the Kafka 0.9 API, you can configure a Spark application to query MapR Event Store For Apache Kafka for new messages at a given interval. This information is for Spark 2.2.1 and later users.

  1. Install the MapR core Kafka package, if you have not already done so.
  2. Copy the Kafka client jar into the Spark jars directory as shown below:
    cp /opt/mapr/lib/kafka-clients-<version>.jar SPARK_HOME/jars
  3. Add the following dependency:
    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-0-9_2.11
    version = <spark_version>-mapr-<mapr_eco_version>
    Note: If you would like to use Streaming Producer Examples, you must add the appropriate Spark streaming Kafka producer jar from the MapR Maven repository to the Spark classpath (/opt/mapr/spark/spark-<spark_version>/jars/.
  4. Consider the following when you write the Spark application:
    1. Verify that it meets the following requirements:
      • Imports and use classes from org.apache.spark.streaming.kafka09. The following code snippet imports three classes.
        import org.apache.spark.streaming.kafka09.{ConsumerStrategies, KafkaUtils, LocationStrategies}
      • Defines key and value deserializers in the kafkaParams map.
        val kafkaParams = Map[String, String](
           ConsumerConfig.GROUP_ID_CONFIG -> groupId,
           ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
              "org.apache.kafka.common.serialization.StringDeserializer",
           ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
              "org.apache.kafka.common.serialization.StringDeserializer",
           ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset)
      • Does not configure a broker address or Zookeeper as these are not required for MapR Event Store For Apache Kafka.
    2. Optionally, define a value for spark.streaming.kafka.consumer.poll.ms in the Spark configuration.
      Note: You can configure the poll timeout using Spark option spark.streaming.kafka.consumer.poll.ms. If you do not configure spark.streaming.kafka.consumer.poll.ms, the spark.network.timeout property is used. If spark.network.timeout is empty, the default is 120 seconds.
      val sparkConf = new SparkConf()
            .setAppName("v09DirectKafkaWordCount")
            .set("spark.streaming.kafka.consumer.poll.ms", pollTimeout)
    Example:

    https://github.com/mapr/spark/blob/2.2.1-mapr-1803/examples/src/main/scala/org/apache/spark/examples/streaming/V09DirectKafkaWordCount.scala is a sample consumer program.

    The KafkaUtils.createDirectStream method creates an input stream to read MapR Event Store For Apache Kafka messages. The ConsumerStrategies.Subscribe method creates the consumerStrategy that will limit the set of topics the stream subscribes to. This is derived from the topics parameter passed into the program. Using LocationStategies.PreferConsistent will distribute partitions evenly across available executors.

    val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
    val messages = KafkaUtils.createDirectStream[String, String](
       ssc, LocationStrategies.PreferConsistent, consumerStrategy)