Loading Data from MapR-DB as a Spark RDD

You can use the following API to load JSON-format data from a MapR-DB table into a Spark RDD of an OJAI document:

def loadFromMapRDB[T](table: String): RDD(T)

The following example creates a userprofilesRDD by calling loadFromMapRDB from sparkContext and supplying the table (“/tmp/user_profiles”):

val userprofilesRDD = sc.loadFromMapRDB(“/tmp/user_profiles”)

The following example creates a usersInfo RDD by calling loadFromMapRDB from sparkContext and supplying the table (“/tmp/UserInfo”).

val usersInfo = sc.loadFromMapRDB(“/tmp/UserInfo”)
In this example, the usersInfo data contains the following information:
  • Address (map type)
  • Date of birth (date type)
  • First name (String type)
  • Interests (String type)
  • Last name (String type)

For example:

usersInfo.foreach(println(_))
 
{
  "address":
{"Pin":95035,"city":"milpitas","street":"350 holger way"},
  "dob":"1947-11-29",
  "first_name":"David",
  "interests":["football","books","movies"],
   "last_name":"Jones"
}

The following example shows a join operation performed on two different OJAI documents using address.city as the join key. The resulting RDD, collection, contains the count of the users in the user_profiles and user_income MapR-DB tables:

val maprd1 = sc.loadFromMapRDB("/tmp/user_profiles")

val maprd2 = sc.loadFromMapRDB("/tmp/user_income")

val collection = maprd1.map(a => (a.`address.city`[String],a))
.cogroup(maprd2.map(a=>(a.`address.city`[String],a)))
.map(a => (a._1,a._2._1.size,a._2._2.size)).collect

The following example adds a new field into all the JSON documents:

val maprd = sc.loadFromMapRDB("/tmp/user_profiles")
val documents = maprd.map(a => { a.`address.country` = "USA"; a}).collect
documents.saveToMapRDB(“/tmp/cleaned_user_profiles”)

Improving Performance by Using Projection Pushdown and Filter Pushdown

To improve performance, you can supply a WHERE clause and projection fields to the loadFromMapRDB API. In the following example, a condition is supplied to the loadFromMapRDB function. The data is loaded based on the condition. The condition is pushed down to the server, and the server returns data based on the filtering. For example, you might want to load only certain fields of a table. In this case, only the fields specified in the SELECT clause are projected:

val userprofilesRDD = sc.loadFromMapRDB(“/tmp/user_profiles”)
                    .where([condition])
                    .select(“address”,
                           “first_name”,
                           “_id”,
                           “last_name”)

In the following example, the WHERE clause is used as a filter condition. The userprofilesRDD includes only those documents with a salary field greater than 100:

val userprofilesRDD = sc.loadFromMapRDB(“/tmp/user_profiles”)
                     .where(field(“salary”) >= 100) 

By specifying an _id field, you can look up and fetch a row for a given key:

val userprofilesRDD = sc.loadFromMapRDB(“/tmp/user_profiles”)
                     .where(field(“_id”) === “k2”)

For more information about querying with conditions, see Querying with Conditions.

WHERE Clause Semantics

The loadFromMapRDB API supports a WHERE clause to push down the filter to the OJAI document API, ensuring that only relevant documents are propagated to the RDD.

You can use two options to provide the filter condition:

  • QueryCondition (from OJAI API)
  • Scala domain-specific language (DSL) form of predicate

Here is an example of using loadFromMapRDB and supplying a condition by using Scala DSL:

Condition isDoe = field(“last_name”) === “Doe” 
  val userprofilesRDD = sc.loadFromMapRDB(“/tmp/user_profiles”).where(isDoe

Here is an example of passing the condition using the QueryCondition API:

  val maprd  = sc.loadFromMapRDB(tableName).
                                where(MapRDB.newCondition().
                                      is("_id", 
                                 QueryCondition.Op.EQUAL, "k2").build())