Dataware for data-driven transformation

MapR Database Spark Connector with Secondary Indexes Support

Contributed by

14 min read

This blog post was originally published on Medium.

MapR Data Platform offers significant advantages over any other tool on the big data space. MapR Database is one of the core components of the platform, and it offers state-of-the-art capabilities that blow away most of the NoSQL databases out there.

An important add-on to MapR Database is the ability to use, for writing and querying, Apache Spark through the Connector for Apache Spark. Using this connector comes very handy, since it can read and write from Spark to MapR Database, using the different Spark APIs, such as RDDs, DataFrames, and Streams.

Using the connector, we can issue queries like the following one:

val df: DataFrame = sparkSession.loadFromMapRDB("/tmp/user_profiles", someSchema)

The resulting type is a DataFrame that we can use as any other DataFrame from any other source, as we normally do in Spark.

If we then filter our dataset out, problems start to emerge. For instance, let's look at the following query:

val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")

val filteredDF = df.filter("first_name = 'Bill'")

The filter is being pushed down, so MapR Database does the filtering and only sends back the data that complies with the filter, reducing the amount of data transferred between MapR Database and Spark. However, if there is an index created on the field first_name, the index is ignored and the table is fully scanned, trying to find the rows that comply with the filter.

By having an index on a field, we expect to use it so queries on that field are optimized, ultimately speeding up the computation. The provided connector is simply not using this capability.

Necessity

Our team, MapR Professional Services, knows that filtering using MapR Database secondary indexes is huge for performance, and since many of our customers do actually try to take advantages of this feature (secondary indexes), we have taken different approaches in order to force the use of the indexes when using Spark.

The following blog post, "How to Use Secondary Indexes in Spark with OJAI," was written by a fellow coworker, who explains some ways to overcome the issue on hand.

Even when we can take some shortcuts, we have to give up some of the nice constructs the default connector has, such as .loadFromMapRDB(...). Even though this solution is not scalable, we can use some of these ideas, which aim to a generalization of the concept that can be used for general purpose computation with Spark with a generic approach.

An Independent Connector

In the past, I have extended Apache Spark in too many ways. I have written my own Custom Data Sources and most recently a Custom Streaming Source for Spark Structured Streams.

Once again, I have sailed into the adventure of writing my own Spark data source, but this time for MapR Database, so we leverage the full advantages of secondary indexes while keeping the same API that the current MapR Database Connector for Apache Spark has.

At the end of this post, we will be able to write a query in the following way while fully using secondary indexes:

val schema = StructType(Seq(StructField("_id", StringType), StructField("uid", StringType)))

val data = sparkSession
  .loadFromMapRDB("/user/mapr/tables/data", schema)
  .filter("uid = '101'")
  .select("_id")

data.take(3).foreach(println)

Spark Data Sources, Version 2

The following data source implementation uses Spark 2.3.1 and uses the data source API V2.

Let's start by looking at the things we need.

  1. ReadSupportWithSchema, which allows us to create a DataSourceReader.
  2. DataSourceReader, which allows us to get the schema for our data, while we need to specify how to create a DataReaderFactory.
  3. SupportsPushDownFilters, which allows us to intercept the query filters, so we can push them down to MapR Database.
  4. SupportsPushDownRequiredColumns, which allows us to intercept the query projections, so we can push them down to MapR Database.

Let's start by implementing ReadSupportWithSchema.

class Reader extends ReadSupportWithSchema {

  override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {

    val tablePath = options.get("path").get()

    new MapRDBDataSourceReader(schema, tablePath)
  }
}

As we can see, we simply get the table path and the schema we want to use when reading from MapR Database. Then we pass them to MapRDBDataSourceReader.

MapRDBDataSourceReader

MapRDBDataSourceReader implements DataSourceReader, and we are also mixing in SupportsPushDownFilters and SupportsPushDownRequiredColumns to indicate that we want to push filters and projections down to MapR Database.

Let's look at each piece separately, so we can understand them better.

class MapRDBDataSourceReader(schema: StructType, tablePath: String)
  extends DataSourceReader
    with SupportsPushDownFilters
    with SupportsPushDownRequiredColumns {

  private var projections: Option[StructType] = None

  override def readSchema(): StructType = ???

  override def pushFilters(filters: Array[Filter]): Array[Filter] = ???

  override def pushedFilters(): Array[Filter] = ???

  override def pruneColumns(requiredSchema: StructType): Unit = ???

  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = ???

}

The projections variable will hold the schema we want to project, if any. In case we don't explicitly project fields by doing .select, we will project all the fields on the schema variable.

readSchema works in conjunction with projections and pruneColumns. If in our Spark query we specify a select, then the selected fields are passed to pruneColumns, and those are the only fields we will bring from MapR Database.

private var projections: Option[StructType] = None

override def readSchema(): StructType = projections match {
  case None                  => schema
  case Some(fieldsToProject) => fieldsToProject
}

override def pruneColumns(requiredSchema: StructType): Unit = projections =
  Some(requiredSchema)

pushFilters indicates what filters we have specified in the where or filter clause in our Spark query. Basically, we have to decide which of those we want to push down to MapR Database; the other ones will be applied by Spark after the data is in memory.

private var supportedFilters: List[Filter] = List.empty

override def pushFilters(filters: Array[Filter]): Array[Filter] =
 filters.partition(isSupportedFilter) match {
   case (supported, unsupported) =>
     supportedFilters = supported.toList

     unsupported
 }

override def pushedFilters(): Array[Filter] = supportedFilters.toArray

private def isSupportedFilter(filter: Filter) = filter match {
 case _: And => true
 case _: Or => true
 case _: IsNull => true
 case _: IsNotNull => true
 case _: In => true
 case _: StringStartsWith => true
 case _: EqualTo => true
 case _: LessThan => true
 case _: LessThanOrEqual => true
 case _: GreaterThan => true
 case _: GreaterThanOrEqual => true

 case _ => false
}

In the snippet above, the series of filters we are pushing down match the filters handled by the official connector, so we provide the same functionality at this level.

createDataReaderFactories creates a list of data readers that actually do the heavy work of reading from our source, MapR Database. In our case, we are getting the table information and creating a reader for each region/partition in the table, so we can take advantage of the parallelism offered by MapR Database.

override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] =
    com.mapr.db.MapRDB
       .getTable(tablePath)
       .getTabletInfos
       .zipWithIndex
       .map { case (descriptor, idx) =>
          logTabletInfo(descriptor, idx)

          MapRDBTabletInfo(idx,
                           descriptor.getLocations,           
                           descriptor.getCondition.asJsonString)
       }
       .map(createReaderFactory)
       .toList

private def createReaderFactory(tabletInfo: MapRDBTabletInfo) =
 new MapRDBDataPartitionReader(
   tablePath,
   supportedFilters,
   readSchema(),
   tabletInfo,
   hintedIndexes)

}

MapRDBDataPartitionReader

We are almost done, yet the most important part is about to come.

The MapRDBDataPartitionReader is where we actually build the MapR Database query and execute it again in our MapR Database table. Notice we are passing the table we are going to read from, the filters and projections we want to push down, along with the partition each particular reader will be reading from. Remember that we are creating multiple instances of this class; each will read from a different MapR Database region/partition.

class MapRDBDataPartitionReader(table: String,
                               filters: List[Filter],
                               schema: StructType,
                               tabletInfo: MapRDBTabletInfo,
                               hintedIndexes: List[String]
) extends DataReaderFactory[Row] {

  override def createDataReader(): DataReader[Row] = ???
}

Now we need to connect to MapR Database by opening a connection and creating a document store object.

class MapRDBDataPartitionReader(table: String,
                               filters: List[Filter],
                               schema: StructType,
                               tabletInfo: MapRDBTabletInfo,
                               hintedIndexes: List[String]
) extends DataReaderFactory[Row] {


  import org.ojai.store._

  @transient private lazy val connection = DriverManager.getConnection("ojai:mapr:")

  @transient private lazy val store: DocumentStore = connection.getStore(table)

  override def createDataReader(): DataReader[Row] = ???
}

query creates the final command to be sent to MapR Database. This task is a matter of applying the query condition and the projections to our connection object.

private def query = {
  val condition = buildQueryConditionFrom(filters)(connection)

  val query = connection
    .newQuery()
    .select(schema.fields.map(_.name): _*)  // push projections down
    .where(condition)                       // push filters down
    .build()

  query
}
}

The buildQueryConditionFrom method reads the Spark filters and transforms them into OJAI filters with the corresponding data types; this is where we push the filters down. Find more details here.

It is very important to notice that since we are using OJAI, it will automatically use any secondary indexes for fields that are part of the filters we are applying. Make sure you check the output at the end of this post.

documents is a stream of data coming from MapR Database, based on query.

@transient private lazy val documents = {
  val queryResult = store.find(query)

  println(s"QUERY PLAN: ${queryResult.getQueryPlan}")

  queryResult.asScala.iterator
}

createDataReader uses the stream we have created (documents) to do the actual reading and returning the data back to Spark.

override def createDataReader(): DataReader[Row] = new DataReader[Row] {
    override def next(): Boolean = documents.hasNext

    override def get(): Row = {
      val document = ParsableDocument(documents.next())

      val values = schema
                      .fields
                      .foldLeft(List.empty[Any])((xs, field) =>
                         document.get(field) :: xs)
                      .reverse

      Row.fromSeq(values)
    }

    override def close(): Unit = {
      store.close()
      connection.close()
    }
  }

Notice that ParsableDocument(document).get(field) handles the transformation from OJAI types back to Spark types. We support all OJAI types, except for Interval. Types are transformed recursively, so if we have a Map that has another Map inside with Arrays of Ints, we've got you covered.

Using Our Connector

At this point, we are ready to plug in our custom data source into Spark in the following way:

sparkSession
  .read
  .format("com.github.anicolaspp.spark.sql.Reader")
  .schema(schema)
  .load(path)

This allows us to use our own way to read from MapR Database, so that any filter being applied that is part of a secondary index on the physical table will be used to optimize the reading.

Syntax

In order to maintain a similar API to the one offered by the default MapR Database Connector, we added some syntax to our library in the following way:

object MapRDB {

  implicit class ExtendedSession(sparkSession: SparkSession) {

    def loadFromMapRDB(path: String, schema: StructType): DataFrame = {

      sparkSession
        .read
        .format("com.github.anicolaspp.spark.sql.Reader")
        .schema(schema)
        .load(path)
    }
  }

}

Notice that our loadFromMapRDB method requires a schema to be passed in. This is a small difference from the official connector that supports schema inference. However, this is a design decision, since we know that most of the time we have the schema available. On the other hand, we know that inferring the schema does not always work correctly on the official connector.

We can now use our connector in the same way we used the default/official connector.

val schema = StructType(Seq(StructField("_id", StringType), StructField("uid", StringType)))

val data = sparkSession
  .loadFromMapRDB("/user/mapr/tables/data", schema)
  .filter("uid = '101'")
  .select("_id")

data.take(3).foreach(println)

Using MapR Database Secondary Indexes

When we run the code above, the TRACE output from OJAI looks similar to the following:

QUERY PLAN: {"QueryPlan":[
  [{
    "streamName":"DBDocumentStream",
    "parameters":{
      "queryConditionPath":false,
      "indexName":"uid_idx",
      "projectionPath":[
        "uid",
        "_id"
      ],
      "primaryTable":"/user/mapr/tables/data"
    }
  }
  ]
]}

Notice that it automatically uses the index called uid_idx, which is an index for the field uid that at the same time is the field being used in the Spark filter.

Conclusions

MapR Database is a powerful tool that runs as part of the MapR Data Platform. The Spark Connector offers an interesting way to interact with MapR Database, since it allows us to use all Spark constructs at scale when working with this NoSQL system. However, sometimes the default connector falls short because it does not use the secondary index capabilities of MapR Database when we need them the most.

On the other hand, our implementation mimics the Connector API and ensures that the implemented Spark data source uses MapR Database secondary indexes, since it relies on pure OJAI queries that are able to support secondary indexes out of the box.


Our library code can be found here: MapRDBConnector.

You can get the binaries directly from Maven Central:

<dependency>
  <groupId>com.github.anicolaspp</groupId>
  <artifactId>maprdbconnector_2.11</artifactId>
  <version>1.0.2</version>
</dependency>

Or using sbt:

libraryDependencies += "com.github.anicolaspp" % "maprdbconnector_2.11" % "1.0.2"

Disclaimer: This is an independent effort to improve querying MapR Database. This library is not a substitute for the official Connector for Apache Spark offered by MapR as part of its distribution.


This blog post was published March 08, 2019.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now