Using Structured Streaming to Create a Word Count Application in Spark

The example in this section creates a DataSet representing a stream of input lines from Kafka and prints out a running word count of the input lines to the console.

val spark = SparkSession
       .builder
       .appName("StructuredKafkaWordCount")
       .getOrCreate()
              
import spark.implicits._
//Create a DataSet representing the stream of input lines from Kafka
val lines = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option(subscribeType, topics)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
//Generate a running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//Run the query that prints the running counts to the console
val query = wordCounts.writeStream
       .outputMode("complete")
       .format("console")
       .option("checkpointLocation", checkpointLocation)
       .start()
              
query.awaitTermination()
SparkSession spark = SparkSession
              .builder()
              .appName("JavaStructuredKafkaWordCount")
              .getOrCreate();
//Create a DataSet representing the stream of input lines from Kafka
Dataset<String> lines = spark
              .readStream()
              .format("kafka")
              .option("kafka.bootstrap.servers", bootstrapServers)
              .option(subscribeType, topics)
              .load()
              .selectExpr("CAST(value AS STRING)")
              .as(Encoders.STRING());
//Generate a running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();
              
//Run the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
              .outputMode("complete")
              .format("console")
              .start();
              
query.awaitTermination();         
spark = SparkSession\
          .builder\
          .appName("StructuredKafkaWordCount")\
          .getOrCreate()
              
#Create a DataSet representing the stream of input lines from Kafka
lines = spark\
          .readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers", bootstrapServers)\
          .option(subscribeType, topics)\
          .load()\
          .selectExpr("CAST(value AS STRING)")
              
#Split the lines into words
words = lines.select(
#explode turns each item in an array into a separate row
explode(
        split(lines.value, ' ')
       ).alias('word')
     )
               
#Generate a running word count
wordCounts = words.groupBy('word').count()
              
#Run the query that prints the running counts to the console
query = wordCounts\
           .writeStream\
           .outputMode('complete')\
           .format('console')\
           .start()
              
query.awaitTermination()