Dataware for data-driven transformation

How to Use Secondary Indexes in Spark With Open JSON Application Interface (OJAI)

Contributed by

7 min read

Introduction

Starting with MapR 6.0, MapR Database supports secondary indexes on fields in JSON tables.

Indexes provide flexibility to access data stored in MapR Database JSON. Secondary indexes provide efficient access to a wide range of queries on MapR Database JSON tables.

By default, there is only one index on _id column; if applications query any other column, it can result in a full table scan to extract data from the underlying JSON tables. Secondary indexes solve this limitation by reducing the number of documents that applications would have to read from large tables. These indexes can be used with OJAI API, MapR Database JSON REST API, or MapR Drill, but not from Spark.

In this blog post, we will look into how Spark can use OJAI API to leverage secondary indexes.

How to Use Secondary Indexes in Spark?

OJAI is the API to interface with MapR Database JSON. Most applications build using OJAI for filtering and sorting, which leverages secondary indexes to improve query response times. Here we will see how we can use this API in Spark to leverage secondary indexes.

Here are the steps:

  1. Create a JSON table (user-info) with some JSON documents. One of the fields from this table will be used to look up fields from another table. The sample program, below, ingests the data into the table.
// Create JSON table
$ maprcli table create -path /tmp/user-info -tabletype json

// After the data is ingested into the table
$ echo "find /tmp/user-info" | mapr dbshell
====================================================
*                  MapR Database Shell                   *
* NOTE: This is a shell for JSON table operations. *
====================================================
Version: 6.0.1-mapr

MapR Database Shell
maprdb mapr:> find /tmp/user-info
{"_id":"101","address":{"Pin":{"$numberLong":95985},"city":"sunnyvale","street":"35 town way"},"dob":{"$dateDay":"1987-05-04"},"interests":["squash","comics","movies"]}
{"_id":"102","address":{"Pin":{"$numberLong":95652},"city":"san jose","street":"305 city way"},"dob":{"$dateDay":"1976-01-09"},"interests":["cricket","sketching"]}
2 document(s) found.
  1. Create another JSON table (data-table) with JSON documents, with one of the fields (uid) in this table matching the field in the table created in step #1, and ingest sample data.
// Create JSON table
$ maprcli table create -path /tmp/data-table -tabletype json

//Sample data for querying
$ cat data-type.json
{"_id":"1","uid":"101","first_name":"tom"}
{"_id":"2","uid":"102","first_name":"john"}
{"_id":"3","uid":"103","first_name":"sam"}
{"_id":"4","uid":"104","first_name":"thomas"}
{"_id":"5","uid":"105","first_name":"david"}
{"_id":"6","uid":"106","first_name":"robert"}
{"_id":"7","uid":"107","first_name":"william"}
{"_id":"8","uid":"108","first_name":"michael"}
{"_id":"9","uid":"109","first_name":"bill"}
{"_id":"10","uid":"110","first_name":"jarred"}

// Put the sample data in hdfs
$ hadoop fs -put data-table.json /tmp

// Ingest the sample data into the JSON table
$ mapr importJSON -idfield '_id' -mapreduce true -src /tmp/data-table.json -dst /tmp/data-table
  1. Create a secondary index on data-table field uid.
$ maprcli table index add -path /tmp/data-table -index uid_idx -indexedfields uid
  1. Enable log tracing by setting the property "log4j.logger.com.mapr.ojai.store.impl=TRACE, stdout" in log4j.properties, located in /opt/mapr/conf directory. This step is optional; it is used to see if the OJAI query plan used the secondary index. This is not recommended for production clusters as it will generate lot of log data.

  2. Complete sample program is listed below. In this sample program, getDocuments() method invokes the OJAI API to leverage the secondary index and returns an RDD.

/* Copyright (c) 2009 & onwards. MapR Tech, Inc., All rights reserved */

package com.mapr.demo.spark.ojai.secondaryindex

import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonProperty}
import com.mapr.db.spark.impl.OJAIDocument
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.ojai.types.ODate
import com.mapr.db.spark.{field, _}
import org.ojai.store.DriverManager
import scala.collection.mutable.ListBuffer

object SparkOjaiApplication {

  val userInfo = "/tmp/user-info"
  val dataTable = "/tmp/data-table"

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("Spark-OJAI Secondary Index Application").master("local[*]").getOrCreate()
    val allUsers = spark.sparkContext.parallelize(getUsers())
    val sc = spark.sparkContext

    //Save users to JSON table
    allUsers.saveToMapRDB(userInfo, createTable = false
    )

    //Load all the people from the JSON table
    val allUsersInfo = sc.loadFromMapRDB(userInfo)

    //Extract JSON documents using secondary index
    val documentsRDD = allUsersInfo.mapPartitions(getDocuments)

    // print a few documents
    documentsRDD.take(3).foreach(println(_))

    System.out.println("Number of documents extracted:" + documentsRDD.count())
  }

  //Invokes OJAI api to query JSON documents using seconary index.
  def getDocuments(iterator: Iterator[OJAIDocument]): Iterator[String] = {
    val connection = DriverManager.getConnection("ojai:mapr:")
    val store = connection.getStore(dataTable)
    val dm  = ListBuffer[String]()

    iterator.foreach(r => {
      val qs = "{\"$eq\": {\"uid\":\"%s\"}}".format(r.getDoc.getId.getString)
      System.out.println("Finding  documents for qs:" + qs);
      val  query = connection.newQuery().select("_id")
        //This option is not required. OJAI client makes the determination to use secondary index.
        // Since the sample data set is small, I'm enabling this option to use secondary index.
        .setOption(com.mapr.ojai.store.impl.OjaiOptions.OPTION_USE_INDEX, "uid_idx")
        .where(qs).build()
      val iterator = store.find(query).iterator()
      if (iterator.hasNext) {
        dm += iterator.next().asJsonString()
      }
    })

    //Close the Document Store
    store.close()

    //Close the OJAI connection
    connection.close()

    dm.toIterator
  }

  // User documents. The _id field of users is used to query  the user in the data-table.
  def getUsers(): Array[Person] = {
    val users: Array[Person] = Array(
      Person("101", ODate.parse("1976-1-9"), Seq("cricket", "sketching"), Map("city" -> "san jose", "street" -> "305 city way", "Pin" -> 95652)),
      Person("102", ODate.parse("1987-5-4"), Seq("squash", "comics", "movies"), Map("city" -> "sunnyvale", "street" -> "35 town way", "Pin" -> 95985))
    )
    users
  }
}

@JsonIgnoreProperties(ignoreUnknown = true)
case class Person (@JsonProperty("_id") id: String, @JsonProperty("dob") dob: ODate,
                   @JsonProperty("interests") interests: Seq[String], @JsonProperty("address") address: Map[String, Any])
  1. To build the sample program, clone the Git repo and use Maven to build the program.
$ git clone https://github.com/ranjitreddy2013/spark-using-ojai-secondary-index-example
$ cd spark-using-ojai-secondary-index-example
$ mvn clean install
  1. To run, copy spark-ojai-secondaryindex-1.0-SNAPSHOT.jar from target folder to an edge node or cluster node and submit to the cluster using spark-submit.
/opt/mapr/spark/spark-2.2.1/bin/spark-submit --class com.mapr.demo.spark.ojai.secondaryindex.SparkOjaiApplication --master yarn --deploy-mode client --driver-java-options "-Dlog4j.configuration=file:///opt/mapr/conf/log4j.properties" --conf "spark.yarn.executor.memoryOverhead=1G"  --executor-memory 2G --num-executors 1 --executor-cores 1 /home/mapr/spark-ojai-secondaryindex-1.0-SNAPSHOT.jar
  1. Sample output is shown below. In addition to this output, there will be DEBUG and TRACE logs.
Finding  documents for qs:{"$eq": {"uid":"101"}}
Finding  documents for qs:{"$eq": {"uid":"102"}}
{"_id":"1","first_name":"tom","uid":"101"}
{"_id":"2","first_name":"john","uid":"102"}

Number of documents extracted:2
  1. Verify the logs if the secondary index is used by the OJAI query plan. Note the indexName used by the OJAI query plan.
2019-01-11 10:49:35,876 TRACE com.mapr.ojai.store.impl.OjaiDocumentStore logQueryPlan Executor task launch worker for task 204: Ojai Query Plan: '[{"streamName":"DBDocumentStream","parameters":{"queryConditionPath":true,"indexName":"uid_idx","projectionPath":["_id"],"primaryTable":"/tmp/data-table"}},{"streamName":"RowkeyLookup","parameters":{"condition":"(uid = "88d800cf-39c9-482f-856c-486090c3de2c")","primaryTable":"/tmp/data-table"}}]'

2019-01-28 12:04:33,087 TRACE com.mapr.ojai.store.impl.OjaiDocumentStore logQueryPlan Executor task launch worker for task 201: Ojai Query Plan: '[{"streamName":"DBDocumentStream","parameters":{"queryConditionPath":true,"indexName":"uid_idx","projectionPath":["_id"],"primaryTable":"/tmp/data-table"}}]'

Conclusion:

In this blog post, you learned how to use OJAI secondary indexes from Spark. If there are questions, please post them in the comments section.

Additional Resources:


This blog post was published February 12, 2019.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now