Apache Spark and MapR Database JSON Integration

Contributed by

7 min read

Apache Spark is an open source big data processing framework, which is being widely used for analytics on streaming and batch workloads. Spark is fully supported on MapR, and it typically uses data in the form of large files. With the Spark/MapR Database connectors, you can use MapR Database as a data source and as a data destination for Spark jobs.

MapR Database is a high performance NoSQL database, which supports two primary data models: JSON documents and wide column tables. A Spark connector is available for each data model. The Native Spark Connector for MapR Database JSON provides APIs to access MapR Database JSON documents from Apache Spark, using the Open JSON Application Interface (OJAI) API. To access the wide column data model, which is often referred to as “MapR Database Binary,” the Spark HBase and MapR Database Binary Connector should be used. This blog article will describe examples of the connector for MapR Database JSON.

Background

Big data applications are moving towards data sets with flexible (or no predefined) schemas. Hence, the OJAI API was introduced in the MapR Database 5.1 release. The OJAI API is the set of interfaces that allows the application to manipulate structured, semi-structured, or unstructured data. Please refer to the following GitHub repository for more details on the OJAI API: https://github.com/ojai/ojai.

With the new release (MapR 5.2, MEP 3.0), a new connector was developed to integrate MapR Database JSON tables with Spark. This connector uses OJAI API internally to access/mutate the tables. It is this connector API that will be further explored in this blog post.

The Spark/MapR Database JSON Connector API

In the MapR Ecosystem Pack (MEP) 3.0 release, the Native Spark Connector for MapR Database JSON supports loading data from a MapR Database table as a Spark Resilient Distributed Dataset (RDD) of OJAI documents and saving a Spark RDD into a MapR Database JSON table. (An RDD is the base format for storing data for use by Spark.)

Here are the interfaces for loading the JSON table into an RDD:

loadFromMapRDB(<path-of-mapr-db-json-table>)

The above function also supports another variant wherein one can directly load the documents as an RDD of Scala objects:

loadFromMapRDB[<BeanClass>](<path-of-mapr-db-json-table>)

Below is the API for saving the objects into a MapR Database table:

rdd.saveToMapRDB(<path-of-mapr-db-json-table>)

The above function (i.e., saveToMapRDB) also contains more self-explanatory parameters:

createTable – Create the table before saving the documents, and throw an exception if the table already exists. The default value is set to false. bulkInsert – Save a group of rows of data at once into a MapR Database table. The default value is set to false. idFieldPath – Key to be used to identify the document. The default value is set to “id."

Similar to loading the document into a Scala bean class, one can save an RDD of user-specified Scala class objects into the MapR Database JSON table.

Saving Objects in a MapR Database JSON Table

To access the connector API, it is required to import the Scala package “com.mapr.db.spark._.” All the required implicit definitions are included in the com.mapr.db.spark package.

Below is the code, which saves the RDD of Person objects into the MapR Database JSON table:

    val spark = new SparkConf().setAppName("json app")
 .setMaster(“local[*]”)
    val sc = new SparkContext(spark)
    val people = sc.parallelize(getUsers())
    people.saveToMapRDB("/tmp/UserInfo", createTable= true)

Here is the getUsers function, which allocates Person objects:

  def getUsers(): Array[Person] = {
   val users: Array[Person] =

Array(
      Person("DavUSCalif", "David", "Jones",
                   ODate.parse("1947-11-29"),
                   Seq("football", "books", "movies"),
                   Map("city" -> "milpitas", "street" -> "350 holger way", "Pin" -> 95035)),

      Person("PetUSUtah", "Peter", "pan",
                   ODate.parse("1974-1-29"),
                   Seq("boxing", "music", "movies"),
                   Map("city" -> "salt lake", "street" -> "351 lake way", "Pin" -> 89898)),

      Person("JamUSAriz", "James", "junior",
                   ODate.parse("1968-10-2"),
                   Seq("tennis", "painting", "music"),
                   Map("city" -> "phoenix", "street" -> "358 pond way", "Pin" -> 67765)),

      Person("JimUSCalif", "Jimmy", "gill",
                  ODate.parse("1976-1-9"),
                  Seq("cricket", "sketching"),
                  Map("city" -> "san jose", "street" -> "305 city way", "Pin" -> 95652)),

Person("IndUSCalif", "Indiana", "Jones",
              ODate.parse("1987-5-4"),
              Seq("squash", "comics", "movies"),
            Map("city" -> "sunnyvale", "street" -> "35 town way", "Pin" -> 95985)))

    people
  }

Loading Data from a MapR Database JSON Table

The code provided below will load the documents from the "/tmp/UserInfo" table into an RDD:

val usersInfo = sc.loadFromMapRDB("/tmp/UserInfo").collect

Here is the result from the printing of usersInfo documents:

usersInfo.foreach(println(_))

{
  "address":
{"Pin":95035,"city":"milpitas","street":"350 holger way"},
  "dob":"1947-11-29",
  "first_name":"David",
  "interests":["football","books","movies"],
   "last_name":"Jones"
}

{
  "address":{"Pin":95985,"city":"sunnyvale","street":"35 town way"},
   "dob":"1987-05-04",
   "first_name":"Indiana",
   "interests":["squash","comics","movies"],
   "last_name":"Jones"
}



{
  "address":{"Pin":67765,"city":"phoenix","street":"358 pond way"},
   "dob":"1968-10-02",
   "first_name":"James",
   "interests":["tennis","painting","music"],
   "last_name":"junior"
}

{
  "address":{"Pin":95652,"city":"san jose","street":"305 city way"},
  "dob":"1976-01-09",
   "first_name":"Jimmy",
   "interests":["cricket","sketching"],
    "last_name":"gill"
}

{
   "address":{"Pin":89898,"city":"salt lake","street":"351 lake way"},
   "dob":"1974-01-29",
    "first_name":"Peter",
    "interests":["boxing","music","movies"],
    "last_name":"pan"
}

Projection Pushdown and Predicate Pushdown for the Load API

The “load” API of the connector also supports “select” and “where” clauses. These can be used for projection pushdown of subsets of fields and/or can filter out documents by using a condition.

Here is an example on how to use the “where” clause to restrict the rows:

val usersLivingInMilpitas = sc.loadFromMapRDB("/tmp/UserInfo")
.where(field("address.city") === "milpitas")

Similarly, if one wants to project only first_name and last_name fields, the following code will generate the required output:

val namesOfUsers = sc.loadFromMapRDB("/tmp/UserInfo")
.select("first_name", "last_name")

Setting up the Project to Use the Spark/MapR Database Connector

To access the loadFromMapRDB and saveToMapRDB API, the following Maven package and artifactId information is required in the project’s pom.xml file:

        <dependency>
            <groupId>com.mapr.db</groupId>
            <artifactId>maprdb-spark</artifactId>
            <version>5.2.1-mapr</version>
        </dependency>

To add the Spark core dependency into the pom.xml file:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0-mapr-1703</version>
        </dependency>

MapR specific jars are located in the mapr-releases repository. The following repository information should be included in the pom.xml file to enable Maven to download the dependencies:

        <repository>
            <id>mapr-releases</id>
            <url>http://repository.mapr.com/maven/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
       </repository>

The code for the example application can be accessed here.

Summary

Once the data is loaded as an RDD of either OJAI documents or an RDD of a Scala bean class, it can be processed further using Spark transformations. The data loaded from MapR Database tables can be enriched using the data from other data sources.

The Spark connector will be further enhanced to support the DataFrame and DataSet APIs. It enables you to use Spark SQL and Spark Streaming to transform the data seamlessly from MapR Database JSON tables.


This blog post was published April 27, 2017.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now