Saving an Apache Spark Dataset to a MapR Database JSON Table

Starting in the MEP 4.1.0 release, the MapR Database OJAI Connector for Apache Spark provides the following API to save a Dataset to a MapR Database table:

For saving a Dataset, apply the following method on a Spark object:
def saveToMapRDB(tableName: String, idFieldPath : String = "_id", 
            createTable: Boolean = false, bulkInsert:Boolean = false): Unit
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val ds = spark.loadFromMapRDB("/tmp/user_profiles")
ds.saveToMapRDB(tableName, createTable = true)
For saving a Dataset, apply the following method on a MapRDBJavaSession object:
def saveToMapRDB[T](ds: Dataset[T], tableName: String, idFieldPath: String, 
            createTable:oolean, bulkInsert: Boolean): Unit
import org.apache.spark.sql.SparkSession;
MapRDBJavaSession maprSession = new MapRDBJavaSession(spark);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles");
maprSession.saveToMapRDB(ds, true);

The MapR Database OJAI Connector for Apache Spark also provides the following API to insert a Dataset into a MapR Database table:

import com.mapr.db.spark._
ds.insertToMapRDB(tableName, idFieldPath, bulkInsert)
maprSession.insertToMapRDB(ds, tableName, idFieldPath, bulkInsert)
Note: The insertToMapRDB API throws an exception if a row with the same ID already exists.

Word Count Example Using MapR Database OJAI Connector

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

// scalastyle:off println
package org.apache.spark.examples.maprdbconnector

import org.apache.spark.sql.SparkSession

import com.mapr.db.spark.sql._

object MaprDBJsonConnectorWordCount {

  def main(args: Array[String]): Unit = {


    val pathToFileWithData = args(0)
    val tableName = args(1)
    val tableNameWithResult = args(2)

    val spark = SparkSession
      .appName("OJAI MaprDB connector wordcount example")

    import spark.implicits._
    val wordSequenceDS = importDataIntoSeq(pathToFileWithData).toDS()

    wordSequenceDS.saveToMapRDB(tableName, createTable = true)

    val dfWithDataFromMaprDB = spark.loadFromMapRDB(tableName)
      .flatMap(line => line.getAs[String](1).split(" "))

    println("Dataset with counted words:")

    dfWithDataFromMaprDB.withColumn("_id", $"value")
      .saveToMapRDB(tableNameWithResult, createTable = true)
    println("Dataset with counted words was saved into the MaprDB table.")


  private def parseArgs(args: Array[String]): Unit = {
    if (args.length != 3) {

  private def printUsage(): Unit = {
    val usage =
      """OJAI MaprDB connector wordcount example
        |1) path to the file with data (words.txt can be used for the test);
        |2) name of the MaprDB table where data from file will be saved;
        |3) name of the MaprDB table where result will be saved;


  private def importDataIntoSeq(filePath: String): Seq[Word] = {
      .map(line => {
        val wordWithId = line.split(" ")
        Word(wordWithId(0), wordWithId.drop(1).mkString(" "))

  private case class Word(_id: String, words: String)