Saving Objects to a MapR-DB JSON Table

The MapR-DB OJAI Connector for Apache Spark provides the following API to save an RDD[OJAIDocument] to a MapR-DB table:

def saveToMapRDB(tablename: String, createTable: Boolean,
                 bulkInsert: Boolean, idFieldPath: String): Unit

In this example, address and first_name data is loaded from the “/tmp/user_profiles” table, stored as an RDD (userprofilesRDD) and then saved to the “/tmp/user_firstname_and_address” table:

val userprofilesRDD = sc.loadFromMapRDB(“/tmp/user_profiles”)
  	.where(“condition”)
         .select(“address”,
               “first_name”)
  
userprofilesRDD.saveToMapRDB(“/tmp/user_firstname_and_address”)

This API supports the following parameters:

Parameter Default Value Description
createTable false Create the table before saving the documents. Note that if the table already exists and the createTable is set to true, the API throws an exception.
idFieldPath _id Specify the key to be used for the document.
bulkInsert false Load a group of rows at once. bulkInsert is similar to a bulk load in MapReduce.

The following example specifies a key by using the idFieldPath parameter and the bulkInsert value to save the MapR-DB table as parameters:

userprofilesRDD.saveToMapRDB(“/tmp/user_firstname_and_address”, idFieldPath = “user_id”, bulkInsert = false)

The following example saves the RDD of Person objects into the newly created /tmp/Userinfo table:

     val spark = new SparkConf().setAppName("json app")
                               .setMaster(“local[*]”)
	val sc = new SparkContext(spark)
	val people = sc.parallelize(getUsers())
     people.saveToMapRDB("/tmp/UserInfo", createTable= true)

Following is the getUsers function that allocates Person objects.

  def getUsers(): Array[Person] = {
   val users: Array[Person] =
 
Array(
      Person("DavUSCalif", "David", "Jones",
               	ODate.parse("1947-11-29"),
               	Seq("football", "books", "movies"),
               	Map("city" -> "milpitas", "street" -> "350 holger way", "Pin" -> 95035)),
 
      Person("PetUSUtah", "Peter", "pan",
               	ODate.parse("1974-1-29"),
               	Seq("boxing", "music", "movies"),
               	Map("city" -> "salt lake", "street" -> "351 lake way", "Pin" -> 89898)),
 
      Person("JamUSAriz", "James", "junior",
               	ODate.parse("1968-10-2"),
               	Seq("tennis", "painting", "music"),
               	Map("city" -> "phoenix", "street" -> "358 pond way", "Pin" -> 67765)),
 
	  Person("JimUSCalif", "Jimmy", "gill",
              	ODate.parse("1976-1-9"),
              	Seq("cricket", "sketching"),
              	Map("city" -> "san jose", "street" -> "305 city way", "Pin" -> 95652)),
  	
Person("IndUSCalif", "Indiana", "Jones",
              ODate.parse("1987-5-4"),
              Seq("squash", "comics", "movies"),
            Map("city" -> "sunnyvale", "street" -> "35 town way", "Pin" -> 95985)))
 
    users
  }

The following example saves JSON data to a MapR-DB table by first converting the JSON data into OJAI documents.

val documents = sc.parallelize((1 to 10)
                               .map( i => s"{\42_id\42 : \42$i\42, \42test\42: $i}"))
val maprd = documents.map(a => MapRDBSpark.newDocument(a))
maprd.saveToMapRDB(“/tmp/testData”)
Note:
  • An _id field is required to save JSON data into a table, so an _id field must be present.
  • If you only need to convert the JSON data to an OJAIDocument (without saving to MapR-DB), the _id field is not required.
  • If the MapR-DB table already contains a record with the same _id value, MapR-DB replaces the record. Otherwise, it inserts a new record.

Just as you can load a JSON document into a Scala bean class (see Creating an RDD of a Class), you can also save the RDD of Scala class objects in a MapR-DB JSON table. saveToMapRDB can save any bean object as a JSON document by converting it to an OJAI document.

Table Splits and saveToMapRDB

If the createTable parameter is set to true, saveToMapRDB can use the partition information from lineage to create the splits for a new table. For example:

sc.loadFromMapRDB(“/tmp/user_profiles”).saveToMapRDB(“/userProfiles”,
                                               createTable = true)

Suppose /tmp/user_profiles has a table with five splits. saveToMapRDB uses this information to create the /userProfiles table with the same number and range of splits. You can also supply this information by using the the MapRDBSpark.newPartitioner. For example:

sc.loadFromMapRDB(“/tmp/user_profiles”).keyBy(doc => doc.get(“_id”))
.repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String]
(“/profiles”))
.saveToMapRDB(“/userProfiles”, createTable = true)

For more information about partitioning, see Using the Custom Partitioner.