Real-Time Analysis of Popular Uber Locations using Apache APIs: Spark Structured Streaming, Machine Learning, Kafka and MapR Database

Contributed by

14 min read

According to Gartner, smart cities will be using about 1.39 billion connected cars, IoT sensors, and devices by 2020. The analysis of location and behavior patterns within cities will allow optimization of traffic, better planning decisions, and smarter advertising. For example, the analysis of GPS car data can allow cities to optimize traffic flows based on real-time traffic information. Telecom companies are using mobile phone location data to provide insights by identifying and predicting the location activity trends and patterns of a population in a large metropolitan area. The application of machine learning to geolocation data is being used in telecom, travel, marketing, and manufacturing to identify patterns and trends for services such as recommendations, anomaly detection, and fraud.

In this blog post, we discuss using Spark Structured Streaming in a data processing pipeline for cluster analysis on Uber event data to detect and visualize popular Uber locations.

We start with a review of several Structured Streaming concepts then explore the end-to-end use case.(Note the code in this example is not from Uber, only the data.)

Streaming Concepts

Publish-Subscribe Event Streams with MapR Event Store

MapR Event Store is a distributed publish-subscribe event streaming system that enables producers and consumers to exchange events in real time in a parallel and fault-tolerant manner via the Apache Kafka API.

A stream represents a continuous sequence of events that goes from producers to consumers, where an event is defined as a key-value pair.

Topics are a logical stream of events. Topics organize events into categories and decouple producers from consumers. Topics are partitioned for throughput and scalability. MapR Event Store can scale to very high throughput levels, easily delivering millions of messages per second using very modest hardware.

You can think of a partition like an event log: new events are appended to the end and are assigned a sequential ID number called the offset.

icture11

Like a queue, events are delivered in the order they are received.

Unlike a queue, however, messages are not deleted when read.They remain on the partition available to other consumers. Messages, once published, are immutable and can be retained forever.

icture12

Not deleting messages when they are read allows for high performance at scale and also for processing of the same messages by different consumers for different purposes such as multiple views with polyglot persistence.

icture13

Spark Dataset, DataFrame, SQL

A Spark Dataset is a distributed collection of typed objects partitioned across multiple nodes in a cluster. A Dataset can be manipulated using functional transformations (map, flatMap, filter, etc.) and/or Spark SQL. A DataFrame is a Dataset of Row objects and represents a table of data with rows and columns.

Spark Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. Structured Streaming enables you to view data published to Kafka as an unbounded DataFrame and process this data with the same DataFrame, Dataset, and SQL APIs used for batch processing.

As streaming data continues to arrive, the Spark SQL engine incrementally and continuously processes it and updates the final result.

Stream processing of events is useful for real-time ETL, filtering, transforming, creating counters and aggregations, correlating values, enriching with other Data sources or machine learning, persisting to files or Database, and publishing to a different topic for pipelines.

Spark Structured Streaming Use Case Example Code

Below is the data processing pipeline for this use case of cluster analysis on Uber event data to detect popular pickup locations.

  1. Uber trip data is published to a MapR Event Store topic using the Kafka API.
  2. A Spark streaming application subscribed to the topic:
    1. Ingests a stream of Uber trip data
    2. Uses a deployed machine learning model to enrich the trip data with a cluster ID and cluster location
    3. Stores the transformed and enriched data in MapR Database JSON

Example Use Case Data

The example data set is Uber trip data, which you can read more about in this post on cluster analysis of Uber event data to detect popular pickup locations using Spark machine learning. The incoming data is in CSV format, an example is shown below, with the header:

date/time,latitude,longitude,base, reverse time stamp

2014-08-06T05:29:00.000-07:00, 40.7276, -74.0033, B02682, 9223370505593280605

We enrich this data with the cluster ID and location then transform it into the following JSON object:

{  
"_id":0_922337050559328,
"dt":"2014-08-01 08:51:00",
"lat":40.6858,
"lon":-73.9923,
"base":"B02682",
"cid":0,
"clat":40.67462874550765,
"clon":-73.98667466026531  
}

Loading the K-Means Model

The Spark KMeansModel class is used to load a k-means model, which was fitted on the historical Uber trip data and then saved to the MapR XD cluster. Next, a Dataset of Cluster Center IDs and location is created to join later with the Uber trip locations.

Below the cluster centers are displayed on a google map in a Zeppelin Notebook:

Reading Data from Kafka Topics

In order to read from Kafka, we must first specify the stream format, topic, and offset options.For more information on the configuration parameters, see the MapR Event Store documentation.

This returns a DataFrame with the following schema:

The next step is to parse and transform the binary values column into a Dataset of Uber objects.

Parsing the Message Values into a Dataset of Uber Objects

A Scala Uber case class defines the schema corresponding to the CSV records.The parseUber function parses a comma separated value string into an Uber object.

In the code below, we register a user-defined function (UDF) to deserialize the message value strings using the parseUber function.Then we use the UDF in a select expression with a String Cast of the df1 column value, which returns a DataFrame of Uber objects.

Enriching the Dataset of Uber Objects with Cluster Center IDs and Location

A VectorAssembler is used to transform and return a new DataFrame with the latitude and longitude feature columns in a vector column.

The k-means model is used to get the clusters from the features with the model transform method, which returns a DataFrame with the cluster ID (labeled predictions).This resulting Datasetis joined with the cluster center Dataset created earlier (ccdf)to create a Datasetof UberC objects, which contain the trip information combined with the cluster Center ID and location.

The final Dataset transformation is to add a unique IDto our objects for storing in MapR Database JSON. The createUberwId function creates a unique IDconsisting of the cluster ID and the reverse timestamp.Since MapR Database partitions and sorts rows by the id, the rows will be sorted by cluster ID with the most recent first. This function is used with a map to create a Dataset of UberwId objects.

Writing to a Memory Sink

We have now set up the enrichments and transformations on the streaming data. Next, for debugging purposes, we can start receiving data and storing the data in memory as an in-memory table, which can then be queried.

Here is example output from %sqlselect * from uber limit 10:

Now we can query the streaming data to ask questions like which hours and clusters have the highest number of pickups? (Output is shown in a Zeppelin notebook.)

%sql
SELECT hour(uber.dt) as hr,cid, count(cid) as ct FROM uber group By hour(uber.dt), cid

Spark Streaming Writing to MapR Database

The MapR Database Connector for Apache Spark enables you to use MapR Database as a sink for Spark Structured streaming or Spark Streaming.

One of the challenges when you are processing lots of streaming data is where do you want to store it? For this application, MapR Database JSON, a high performance NoSQL database, was chosen for its scalability and flexible ease of use with JSON.

JSON Schema Flexibility

MapR Database supports JSON documents as a native data store. MapR Database makes it easy to store, query, and build applications with JSON documents.The Spark connector makes it easy to build real-time or batch pipelines between your JSON data and MapR Database and leverage Spark within the pipeline.

With MapR Database, a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.In this use case the row key, the _id, consists of the cluster ID and reverse timestamp, so the table is automatically partitioned and sorted by cluster ID with the most recent first.

The Spark MapR Database Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR Database tablets (partitions).

Writing to a MapR Database Sink

To write a Spark Stream to MapR Database, specify the format with the tablePath, idFieldPath, createTable, bulkMode, and sampleSize parameters. The following example writes out the cdf DataFrame to MapR Database and starts the stream.

Querying MapR Database JSON with Spark SQL

The Spark MapR Database Connector enables users to perform complex SQL queries and updates on top of MapR Database using a Spark Dataset, while applying critical techniques such as projection and filter pushdown, custom partitioning, and data locality.

Loading Data from MapR Database into a Spark Dataset

To load data from a MapR Database JSON table into an Apache Spark Dataset, we invoke the loadFromMapRDB method on a SparkSession object, providing the tableName, schema, and case class. This returns a Dataset of UberwId objects:

Explore and Query the Uber Data with Spark SQL

Now we can query the data that is continuously streaming into MapR Database to ask questions with the Spark DataFrames domain-specific language or with Spark SQL.

Show the first rows (note how the rows are partitioned and sorted by the _id, which is composed of the cluster id and reverse timestamp, the reverse timestamp sorts most recent first ).

df.show

How many pickups occurred in each cluster?

df.groupBy("cid").count().orderBy(desc( "count")).show

or with Spark SQL:

%sql SELECT COUNT(cid), cid FROM uber GROUP BY cid ORDER BY COUNT(cid) DESC

With Angular and Google Maps script in a Zeppelin notebook, we can display cluster center markers and the latest 5000 trip locations on a map, which shows that the most popular locations-- 0, 3, and 9 -- are in Manhattan.

Which hours have the highest number of pickups for cluster 0?

df.filter($"\_id" <= "1")
  .select(hour($"dt").alias("hour"), $"cid")
  .groupBy("hour","cid").agg(count("cid")
  .alias("count"))show

Which hours of the day and which cluster had the highest number of pickups?

%sql SELECT hour(uber.dt), cid, count(cid) FROM uber GROUP BY hour(uber.dt), cid

Display cluster counts for Uber trips by datetime.

%sql select cid, dt, count(cid) as count from uber group by dt, cid order by dt, cid limit 100

Summary

In this post, you learned how to use the following:

  • A Spark machine learning model in a Spark Structured Streaming application
  • Spark Structured Streaming with MapR Event Store to ingest messages using the Kafka API
  • Spark Structured Streaming to persist to MapR Database for continuously rapidly available SQL analysis

All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Data Platform.

Code

Running the Code

There are several ways you can get started with the MapR Data Platform:

Additional Resources


This blog post was published July 25, 2018.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now