Writing a Structured Spark Stream to MapR Database JSON Table

The example in this section writes a structured stream in Spark to MapR Database JSON table.

To write a structured Spark stream to MapR Database JSON table, use MapRDBSourceConfig.Format for Java and Scala and com.mapr.db.spark.streaming for Python to format the tablePath, idFieldPath, createTable, bulkMode, and sampleSize parameters.

import com.mapr.db.spark.streaming.MapRDBSourceConfig
import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
              
def dataStreamWriter(spark: SparkSession, df: DataFrame): DataStreamWriter[Row] = {
import spark.implicits._
              
df.select($"value" as "_id")
  .writeStream
  .format(MapRDBSourceConfig.Format)
  .option(MapRDBSourceConfig.TablePathOption, "/table/path")
  .option(MapRDBSourceConfig.IdFieldPathOption, "value")
  .option(MapRDBSourceConfig.CreateTableOption, true)
  .option(MapRDBSourceConfig.BulkModeOption, true)
  .option(MapRDBSourceConfig.SampleSizeOption, 1000)
  .outputMode("append")
}
import com.mapr.db.spark.streaming.MapRDBSourceConfig;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQueryException;
              
DataStreamWriter<Row> dataStreamWriter(Dataset<Row> df) {
   return df.selectExpr("CAST(value AS STRING) as _id")
            .writeStream()
            .format(MapRDBSourceConfig.Format())
            .option(MapRDBSourceConfig.TablePathOption(), "/table/path")
            .option(MapRDBSourceConfig.IdFieldPathOption(), "value")
            .option(MapRDBSourceConfig.CreateTableOption(), true)
            .option(MapRDBSourceConfig.BulkModeOption(), true)
            .option(MapRDBSourceConfig.SampleSizeOption(), 1000)
            .outputMode("append");
 }
from pyspark.sql import *
              
def data_stream_writer_func(df, checkpoint_dir, table_path):
  return df.selectExpr("CAST(value AS STRING) as _id") \
           .writeStream \
           .format("com.mapr.db.spark.streaming") \
           .option("checkpointLocation", checkpoint_dir) \
           .option("tablePath", table_path) \
           .option("idFieldPath", "value") \
           .option("createTable", True) \
           .option("bulkMode", True) \
           .option("sampleSize", 1000)