End to End Application for Monitoring Real-Time Uber Data Using Apache APIs: Kafka, Spark, HBase – Part 4: Spark Streaming, DataFrames, and HBase

Contributed by

8 min read

Editor's Note: This is a 4-Part Series, see the previously published posts below: Part 1 - Spark Machine Learning Part 2 - Kafka and Spark Streaming Part 3 – Real-Time Dashboard Using Vert.x

According to Gartner, 20.8 billion connected things will be in use worldwide by 2020. Danny Lange, the head of machine learning at Uber, aims to bring machine learning to every corner of Uber’s business. Examples of connected things include connected cars and devices as well as applications used for healthcare, telecom, manufacturing, retail, and finance. Leveraging the huge amounts of data coming from these devices requires processing events in real time, applying machine learning to add value, and scalable fast storage. Architectures for these types of applications are usually an event-driven microservices architecture.

This is the fourth in a series of blogs which discusses the architecture of an end-to-end application that combines streaming data with machine learning to do real-time analysis and visualization of where and when Uber cars are clustered, so as to predict and visualize the most popular Uber locations.

  1. The first part of this series discusses creating a machine learning model, using the Apache Spark K-means algorithm to cluster Uber data by location.
  2. The second post discusses using the saved K-means model with streaming data to do real-time analysis of where and when Uber cars are clustered.
  3. The third post discusses building a real–time dashboard to visualize the cluster data on a Google map. The following figure depicts the data pipeline:


In this post, we will go over Spark Streaming writing to MapR Database using the Spark HBase and MapR Database Binary connector and reading from MapR Database Binary using Spark SQL and DataFrames. The following figure depicts the data pipeline:

  • Uber trip data is published to a MapR Event Store topic using the Kafka API.
  • A Spark Streaming application subscribed to the first topic enriches the event with the cluster location and publishes the results in JSON format to another topic.
  • A Vert.x web application subscribed to the second topic displays the Uber trip clusters in a heatmap.
  • A Spark Streaming application subscribed to the second topic stores the data in MapR Database using the Spark HBase and MapR Database Binary Connector.
  • A Spark batch application queries MapR Database with Spark SQL using the Spark HBase and MapR Database Binary Connector.


Spark and MapR Database

One of the challenges when you are processing lots of streaming data is where do you want to store it? With MapR Database (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.


The Spark HBase and MapR Database Binary Connector leverages the Spark DataSource API. The connector architecture has an HConnection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR Database tablets.


Spark Streaming writing to MapR Database


You can read about the MapR Event Store Spark Streaming code in part 2 of this series; here, we will focus on the writing to MapR Database part. The messages from the MapR Database topic are in JSON format and contain the following for each Uber trip: the cluster center id, datetime, latitude and longitude for the trip, base for the trip, and latitude and longitude for the cluster center. An example is shown below:

{"cid":18, "dt":"2014-08-01 08:51:00", "lat":40.6858, "lon":-73.9923, "base":"B02682", "clat":40.67462874550765, "clon":-73.98667466026531}

In the code below, we create an HBaseContext object with an HBaseConfiguration object. The HbaseContext carries the configuration broadcast information to the HConnections in the executors.


In the code below, we:

  • Get the message value from the message key value pair.
  • Call the HBaseContext streamBulkPut method, passing the message value DStream, the TableName to write to, and a function to convert the Dstream values to HBase put records.


The converToPut function parses the JSON string and creates an HBase Put object.


The Spark Streaming bulk put enables massively parallel sending of puts to HBase.


SparkSQL and DataFrames

The Spark HBase and MapR Database Binary Connector enables users to perform complex relational SQL queries on top of MapR Database using a Spark DataFrame, while applying critical techniques such as partition pruning, column pruning, predicate pushdown, and data locality.

To use the Spark HBase and MapR Database Binary Connector, you need to define the Catalog for the schema mapping between the HBase and Spark tables. Below is the schema for storing the Uber trip data:

  • A composite row key contains the cluster id, the base, the data, and the time, separated by an underline.
  • There is a column family data for storing all the data and a column family stat for statistical roll ups.
  • There are two columns, one for the latitude and one for the longitude of each trip.


The Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog. One is the rowkey definition. The other is the mapping between the table columns in Spark and the column family and column names in HBase. The following example defines the Catalog schema for the MapR Database table with name as /user/user01/db/uber, row key as key, and columns lat and lon. Note that the rowkey also has to be defined as a column (key), which has a specific cf (rowkey).


Loading data from MapR Database into a Spark DataFrame

In the withCatalog function below,

  • the SQLContext read returns a DataFrameReader that can be used to read data in a DataFrame.
  • The options function adds input options for the underlying data source to the DataFrameReader.
  • The format function specifies the input data source format for the DataFrameReader.
  • The load() function loads input as a DataFrame. The first 20 rows of the data frame df returned by the withCatalog function are output with the df.show.


The output of df.show is displayed below:


In the following example, df.filter filters rows using the given SQL expression to filter for cluster ids (the beginning of the row key) >= 9. The select selects a set of columns: key, lat, and lon.


The results of the df show are shown below:


All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Data Platform.

Downloading and running the example

This blog post was published June 09, 2017.

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.

Get our latest posts in your inbox

Subscribe Now