Writing a Structured Spark Stream to MapR-DB

The example in this section writes a structured stream in Spark to MapR-DB.

To write a structured Spark stream to MapR-DB, use MapRDBSourceConfig.Format for Java and Scala and com.mapr.db.spark.streaming for Python to format the tablePath, idFieldPath, createTable, bulkMode, and sampleSize parameters.

import com.mapr.db.spark.streaming.MapRDBSourceConfig
import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
              
def dataStreamWriter(spark: SparkSession, df: DataFrame): DataStreamWriter[Row] = {
import spark.implicits._
              
df.select($"value" as "_id")
  .writeStream
  .format(MapRDBSourceConfig.Format)
  .option(MapRDBSourceConfig.TablePathOption, "/table/path")
  .option(MapRDBSourceConfig.IdFieldPathOption, "value")
  .option(MapRDBSourceConfig.CreateTableOption, true)
  .option(MapRDBSourceConfig.BulkModeOption, true)
  .option(MapRDBSourceConfig.SampleSizeOption, 1000)
  .outputMode("append")
}
import com.mapr.db.spark.streaming.MapRDBSourceConfig;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQueryException;
              
DataStreamWriter<Row> dataStreamWriter(Dataset<Row> df) {
   return df.selectExpr("CAST(value AS STRING) as _id")
            .writeStream()
            .format(MapRDBSourceConfig.Format())
            .option(MapRDBSourceConfig.TablePathOption(), "/table/path")
            .option(MapRDBSourceConfig.IdFieldPathOption(), "value")
            .option(MapRDBSourceConfig.CreateTableOption(), true)
            .option(MapRDBSourceConfig.BulkModeOption(), true)
            .option(MapRDBSourceConfig.SampleSizeOption(), 1000)
            .outputMode("append");
 }
from pyspark.sql import *
              
def data_stream_writer_func(df, checkpoint_dir, table_path):
  return df.selectExpr("CAST(value AS STRING) as _id") \
           .writeStream \
           .format("com.mapr.db.spark.streaming") \
           .option("checkpointLocation", checkpoint_dir) \
           .option("tablePath", table_path) \
           .option("idFieldPath", "value") \
           .option("createTable", True) \
           .option("bulkMode", True) \
           .option("sampleSize", 1000)