Working with Complex JSON Document Types

Suppose you want to calculate the number of users located in each city. The following Spark code calculates the number:

val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
Val numberOfCustaccCities= customerprofilesRDD.map(a =>
                                   (a.`address.city`[String],a))
                   	   	   		.groupByKey()
                       		    		.map(a => (a._1, a._2.size))

If an explicit cast has not been provided, then the object is returned as AnyRef. To access methods specific to a class, such as String or Integer, you can cast it to a specific type further down the pipeline.

Now suppose you want to collect all the addresses (address is of type Map) of all customers:

val customerprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
val customersAddress = customerprofilesRDD.map(a => a.address).collect

customersAddress contains all of the addresses, but is returned as an AnyRef object.

The MapR-DB OJAI Connector for Apache Spark introduces three new classes to wrap complex JSON types:

Class Type
DBArrayValue Array[AnyRef]
DBMapValue Map[String, AnyRef]
DBBinaryValue ByteBuffer

These classes are not exposed; however, you can access the underlying elements of DBArrayValue and DBMapValue by using the same functions as in Seq and Map. DBArrayValue works like a sequence, while DBMapValue works like a map.

DBBinaryValue is a class wrapper around ByteBuffer. ByteBuffer is not serializable, so you will get serialization errors if you use the ByteBuffer in Spark code. You must ensure that byte buffers are converted to DBBinaryValue or serialized byte buffers. The MapR-DB OJAI Connector for Apache Spark provides an API to convert ByteBuffers to serializable byte buffers.

Accessing Values in a Map

DBMapValue is a type of Map[String, AnyRef]. Any functions that can access values in the Map can be used for DBMapValue. In this example, customeraddress contains the address of the customers who reside in “San Jose”. customeraddress is an Array[DBMapValue]:

val customeraddress = maprd.map(a => a.address[Map[String, AnyRef]])
       .filter(a => a!= null && 
               a.get("city").contains("San Jose")).collect

Accessing the Array JSON Object

This example uses a sequence to access the Array JSON object:

val custInterests = maprd.map(a => a.interests[Seq[AnyRef]])
               		.filter(a => a!= null && a(0) == "sports").collect

ByteBuffer Serialization

The MapR-DB OJAI Connector for Apache Spark provides the following API to enable serialization of the ByteBuffer:

MapRDBSpark.serializableBinaryValue(byteBuffer)

The following example shows an array of byte buffers or binary values that are converted to serialized byte buffers by using MapRDBSpark.serializableBinaryValue:

val dstSplits = arrayOfByteBuffer.map(x => MapRDBSpark.serializableBinaryValue(x))