MapR Database Atomic Document Updates

Contributed by

8 min read

Originally published 12/15/18 on Medium

In a previous post, we discussed some of the features of MapR Database that make this distributed database especially interesting. In this blog post, we intend to continue that effort by presenting a specific use case.

The Problem

The problem to be solved can be described as follows.

A series of messages are coming through a stream. Each value has an id and a count. For each id, we must update the existing count in the database by incrementing its value using the count coming in the stream.

The following image shows an example of the problem.

There are various ways to solve this problem. One could be by reading the current state of a given id stored on the database, then updating its count using the values on the stream to finally save the updated values back to the database.

Let's see this process in detail.

As we can see in the image above, in order to update the state in the database, we need to load every piece of data first from it, join the value with the incoming stream to calculate the new state, and, finally, save the new state back to persistent storage. In general, we will have to follow this same process regardless of the database technology we choose to use, whether it is MapR Database or any other persistent technology.

The following code shows how we could implement this idea using Apache Spark integrated with MapR Event Store for Apache Kafka and MapR Database.

defmain(args: Array[String]): Unit = {  
   valappConfig = Configuration.parse(args)  

   valconfig = newSparkConf().setAppName(appConfig.appName)  

   valsc = newSparkContext(config)  
   valsparkSession = SparkSession.builder().config(config).getOrCreate()  
   valssc: StreamingContext= newStreamingContext(sc, Milliseconds(500))  

   val messages = getStream(appConfig.stream)(ssc)  

   messages  
     .map(_.value())  
     .map(MapRDBSpark.newDocument)  
     .map(doc => (doc.getString(pathCol), 1))  
     .reduceByKey(_ + _)  
     .foreachRDD { rdd =>  
       valrowRDD = rdd.map { case(path, sum) => Row(path, sum) }  

       val rowDF = sparkSession.createDataFrame(rowRDD, streamSchema)  

       val fromDBDF = sparkSession.loadFromMapRDB(appConfig.tableName, dbSchema)  

       valjoinDF = rowDF  
         .join(fromDBDF, Seq(idCol), left_outer_join)  
         .na  
         .fill(0, Seq(totalCol))  

       val finalDF = joinDF  
         .withColumn(valueCol, joinDF(sumCol) + joinDF(totalCol))  
         .select(idCol, valueCol)  
         .withColumnRenamed(valueCol, totalCol)  

       finalDF.saveToMapRDB(appConfig.tableName)                                            
     }  

   ssc.start()  
   ssc.awaitTermination()  
 }

It is important to notice that on every streaming slide, we load from MapR Database (or any other database for this matter) and merge the loaded data frame with the stream. Then we save the current, recently calculated, new state back to the database.

This process makes total sense, and in most databases out there, there is no way around it. However, these operations are costly to execute every time we receive data on the stream.

MapR Database Mutations

When others fall short, MapR Database shines.

MapR Database is able to incrementally update documents without the need of loading them first. More specifically, it is possible to update only some of the fields of a document without touching anything else at the document level.

A reasonable question quickly arises: Are these updates atomically applied, given the distributed nature of MapR Database? The answer is YES.

Let's first write some code to prove the last statement, and then we will move to solve the problem in question using this concept.

We can start with the following code snippet.

package com.github.anicolaspp.maprdb.transactions  

import com.mapr.db.MapRDB  
import org.ojai.store.DocumentStore  

import scala.concurrent.{ExecutionContext, Future}  

objectUpdateSameId {  

 defrun(id: String, times: Int)(implicitec: ExecutionContext, store: DocumentStore) = Future {  

   (1 to times).foreach { _ =>  
     valmutation = MapRDB  
       .newMutation()  
       .increment("count", 1)  

     store.update(id, mutation)  
   }  

   store.findById(id).getInt("count")
 }  
}

As we can see, the run function will increment the count for a given id, specifically times times.

Based on this code, we can create another function that does the same in parallel by creating a number of threads and execute UpdateSameId.run on each thread.

package com.github.anicolaspp.maprdb.transactions  

import org.ojai.store.DocumentStore  

import scala.concurrent.{ExecutionContext, Future}  

objectUpdateSameIdInParallel {  

 defrun(id: String, times: Int, threads: Int)(implicitec: ExecutionContext, store: DocumentStore): Future[Int] = {  

   valupdates = (1 to threads).map { _ => UpdateSameId.run(id, times) }  

   Future.reduceLeft(updates)(_ + _)  
 }  
}

We can run the above code in the following way.

package com.github.anicolaspp.maprdb  

import com.github.anicolaspp.maprdb.store.DocumentStore._  
import com.github.anicolaspp.maprdb.transactions.UpdateSameIdInParallel  
import org.ojai.store.{DocumentStore, DriverManager}  

import scala.concurrent.Await  
import scala.concurrent.ExecutionContext.Implicits.global  
import scala.concurrent.duration._  

objectApp {  

 privatelazyvalconnection = DriverManager.getConnection("ojai:mapr:")  

 defmain(args: Array[String]): Unit = {  

   implicitvaldocumentStore: DocumentStore = connection.getStore("/user/mapr/tables/view_counts")  

   documentStore.getJsonDocuments().foreach(println)  

   valsessionID = "001"  

   valupdates = UpdateSameIdInParallel  
     .run(sessionID, times = 10, threads = 20)  
     .map { _ =>  
       documentStore.getJsonDocuments().foreach(println)  
       documentStore.close()  
     }  

   Await.ready(updates, 10 minutes)  

   println("done....")  
 }  
}

This will print out the final state of each document on MapR Database, and by that, we can certainly say that updates are atomically applied.

The entire code is part of the reactor project that you can find here.

As we can see, even when running on a multi-thread or multi-processor environment, MapR Database guarantees that values are consistently (atomically) updated. We could write similar code using Apache Spark in order to increase parallelism, but the results will be the same; we tried it out.

Based on these findings, we could improve our original app to solve the problem we first stated.

Let's start by removing the parts that first load from MapR Database and replacing them with updates instead.

defmain(args: Array[String]): Unit = {  
   valappConfig = Configuration.parse(args)  

   valconfig = newSparkConf().setAppName(appConfig.appName)  

   valsc = newSparkContext(config)  
   valsparkSession = SparkSession.builder().config(config).getOrCreate()  
   valssc: StreamingContext= newStreamingContext(sc, Milliseconds(500))  

   val messages = getStream(appConfig.stream)(ssc)  

   messages  
     .map(_.value())  
     .map(MapRDBSpark.newDocument)  
     .map(doc => (doc.getString(pathCol), 1))  
     .reduceByKey(_ + _)  
     .foreachPartition { partition =>  
       valconnection = DriverManager.getConnection("ojai:mapr:")  
       valdocumentStore: DocumentStore = connection.getStore("/user/mapr/tables/view_counts")  

       partition  
         .map { case(id, count) => (id, MapRDBSpark.newMutation().increment("count", count)) }  
         .foreach { case (id, mutation) => documentStore.update(id, mutation) }  
     }  

   ssc.start()  
   ssc.awaitTermination()  
 }

Let's review a few important changes from our original app.

First, there is no need to load the current state from the database at all. We, instead, create the necessary mutations and apply them in parallel to MapR Database.

Secondly, we have significantly reduced the number of tasks that Spark has to execute, increasing the overall performance while reducing the load in our database.

Thirdly, the code is simpler, more elegant, and easy to understand, increasing the maintainable indexes while making other people's lives easier.

Finally, we are creating a connection per partition, so we avoid any Spark serialization issue while increasing parallelism since each partition operation will be run at the same time (Spark magic at work here).

After the process runs, we can rest assured that our data will be correctly and efficiently updated by our latest approach.

Conclusion

As discussed before, the MapR Database is a non-SQL database with very interesting features that are worth exploring. These features not only put it on top of the picks for highly performant and distributed workloads but also make our lives easier as problem solvers. Despite the intrinsic distributed nature of MapR Database, we can abstract ourselves from many of the issues that complex systems like this one has and use MapR Database with ease to solve the most complicated business problems in the market.


Please, read Interacting with MapR Database for a more comprehensive view of how to use MapR Database and the tooling around it.


This blog post was published February 21, 2019.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now