Configure Spark to Produce MapR Streams Messages

Using the Kafka 0.9 API, you can configure a Spark application to produce MapR Streams messages.

  1. Add the following dependency:
    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-producer_2.11
    version = <spark_version>-mapr-<mapr_eco_version>
    Note: If you are using Spark 2.1.0-1703, specify the version as 2.1.0-mapr-1703. If you are using Spark 2.0.1-1611, specify the version as 2.0.1-mapr-1611.
    Note: If you would like to use Streaming Producer Examples, you must add the appropriate Spark streaming Kafka producer jar from the MapR Maven repository to the Spark classpath (/opt/mapr/spark/spark-<spark_version>/jars/. If you are using Spark 2.1.0-1703, the name of that jar is spark-streaming-kafka-producer_2.11-2.1.0-mapr-1703.jar. If you are using Spark 2.0.1-1611, the name of the jar is spark-streaming-kafka-producer_2.11-2.0.1-mapr-1611.jar.
  2. When you write the Spark program, import and use classes from org.apache.spark.streaming.kafka.producer._ and org.apache.spark.streaming.dstream
    The import of org.apache.spark.streaming.stream.DStream adds the following method from DStream:
    sendToKafka[S <: Serializer[T]](
      topic: String,
      conf: ProducerConf
    ) 
    In the code below, calling sendToKafka will send numMessages messages to the set of topics specified by the topics parameter.
    val kafkaBrokers = "host:port,host:port"
    val producerConf = new ProducerConf(bootstrapServers = kafkaBrokers.split(",").toList)  
    val items = (0 until numMessages.toInt).map(i => Item(i, i))  
    val defaultRDD: RDD[Item] = ssc.sparkContext.parallelize(items)  
    val dStream: DStream[Item] = new ConstantInputDStream[Item](ssc, defaultRDD) 
    
    dStream.sendToKafka[ItemJsonSerializer](topics, producerConf)   
    dStream.count().print()