Real-Time Analysis of Popular Uber Locations using Apache APIs


Speakers:

Carol McDonald

Industry Solutions Architect, MapR


In this on-demand webinar, we go over an example from the eBook Getting Started with Apache Spark 2.x.: Analysis of Popular Uber Locations using Apache APIs: Spark Structured Streaming, Machine Learning, Kafka and MapR Database.

The application of machine learning to geolocation data is being used to identify patterns and trends, for smarter advertising, vehicle location or price optimization, recommendations, anomaly detection, and more. Leveraging geolocation data requires processing events in real time, applying machine learning models to add value, and providing scalable, fast storage.

In this tutorial, we'll do the following:

  • Brief overview of Machine Learning Clustering
  • Use Spark ML to perform cluster analysis on public Uber data to train and save a model of popular trip locations.
  • Use the saved ML model with Apache Spark Structured Streaming and the Kafka API in a data processing pipeline to enrich Uber events with cluster locations and store the results in MapR Database, a JSON document store.
  • Explore and Query the continuous rapid results using Spark SQL with MapR Database.

Transcript

David: 00:00 Hello, and thank you for joining us today for our webinar, Real-Time Analysis of Popular Uber Locations Using Apache APIs, featuring Carol McDonald, Industry Solutions Architect at MapR. Our event today will run in total approximately one hour. You can submit a question at any time throughout the presentation via the chat box in the lower left hand corner of your browser, and we will follow up with you shortly after the event if we're not able to get to your question during the event. With that, I'd like to pass the ball over to Carol to get us started. Carol, it's all yours.

Carol McDonald: 00:38 Thanks. What I'm showing here is the application that we're gonna be talking about today. So what this is showing is we're receiving Uber trip events, and what we're doing is we're processing these and mixing them with the cluster IDs. And here you see the cluster IDs are these red markers. So, then we're merging these events with the cluster IDs. Then we're gonna publish these events to MapR Database in this case. And then also in this example we're displaying it on a map in real time, a Google heat map. And so you see, in the heat map, you see the real time events being displayed next to their cluster centers.

Carol McDonald: 01:22 So, that's the application that we're gonna go over in this example. I just wanted to show you that there's ... I did a similar application with a data scientist, and this is also using k-means clustering, this time on EKG waves, and what it's doing is it's modeling these EKG waves, parts of these waves, and then it's using this to detect anomalies. So, I won't go into detail about how it does that, but I'll show you later that there's a blog where you can read about this and get the code for this example also. It's very similar.

Carol McDonald: 01:59 Okay, now I'll go to the presentation about our example.

Carol McDonald: 02:03 What we're gonna go over specifically is first we're gonna go over an overview of unsupervised machine learning clustering. Then, we're going to use the k-means algorithm to cluster Uber locations, and we're gonna save this model. Next, we'll go over an overview of the Kafka API, a very brief overview. Then, we're gonna use this Kafka API with Spark Structured Streaming. We're gonna read from a Kafka topic, enrich the events from our machine learning model, then we're gonna write these events to MapR Database JSON document database, and finally we're gonna use Spark SQL to query this MapR Database.

Carol McDonald: 02:48 This shows our application, so we have events coming in, the Uber trip events coming in onto the Kafka topics, we have Spark Streaming reading these events, and then enriching these events with a saved machine learning model, and then writing these events to MapR Database JSON, then these events can be processed with Spark SQL, Apache Drill SQL, or displayed on a Google map, like we saw earlier.

Carol McDonald: 03:14 First, we're gonna go over the machine learning model part of this application. First we'll go into an introduction to machine learning, just unsupervised learning with k-means clustering. Machine learning uses algorithms to find patterns in data, and uses a model then that recognizes those patterns to make predictions on new data.

Carol McDonald: 03:42 There are typically two phases in machine learning with real time data. The first phase involves analysis on the historical data to build the machine learning model. The second phase uses the model in production on real live events. So what we're gonna go over is, again, this model-building phase first. In general, machine learning may be broken down into two classes of algorithms: supervised and unsupervised. With supervised learning, you have a known outcome that you want your data to predict. Unsupervised algorithms do not have the outputs or labels in advance.

Carol McDonald: 04:19 Supervised algorithms use labeled data, in which both the input and the target outcome or label are provided to the algorithm to build the model. Then, this model can be used with the new data, the features for the new data, to predict this label, or this outcome.

Carol McDonald: 04:36 With unsupervised algorithms, they find similarities in unlabeled data. For example, grouping similar customers based on purchase data. So here you have unlabeled purchase data. The algorithm's gonna find similarities in this and build a model which recognizes these similarities. Then you can use new data to group, with this model, to find similar groups. Google news uses a technique called clustering to group news articles into different categories based on the content.

Carol McDonald: 05:10 Clustering algorithms discover groupings that occur in collections of data by analyzing similarities between input examples. Clustering examples include grouping similar customers, grouping search results, text categorization, and also anomaly detection, which finds what is unusual and not similar. So let's go to a clustering example using the k-means algorithm, and this is the algorithm we're gonna use later. We want to create k number of clusters that group these data points with those that are most similar or closest. So what we start off is we begin by initializing all the coordinates to k cluster centers. So here we see the k cluster centers, and they're just put in a random location, and then we're gonna initialize each coordinate to these cluster centers. With every pass of the algorithm, each point is assigned to its nearest cluster center, so it's measured by distance, which cluster center is the nearest. And then, the centroids are updated to be the centers of all the points in that path. This repeats until there's a minimum change of the cluster centers from the last iteration. So here we see that now we have the cluster centers in the center of their points.

Carol McDonald: 06:35 Next we're gonna look at clustering these Uber trip locations. First just a brief introduction about Spark. So a Spark application runs as executor processes, coordinated by the Spark session object. And the resource or cluster manager assigns tasks to workers, with one task per partition. A task applies its unit of work to the dataset in its partition, and outputs a new partition dataset. The results are sent back to the driver application, or they can be saved to disk.

Carol McDonald: 07:09 A dataset is a distributed collection of objects spread out across multiple nodes in a cluster. A DataFrame is like a table partitioned across a cluster.

Carol McDonald: 07:22 When you're reading a dataset from a file, the tasks are sent to the worker nodes, and then the block from a distributed file is read by the task into cache.

Carol McDonald: 07:35 In our example, this is what our Uber data looks like. We're gonna discover the clusters of Uber data based on the latitude and longitude, and then we're gonna analyze the cluster centers by date and time. So we have the date and time, the latitude of the Uber pickup, the longitude, and then the base company affiliated with the Uber pickup. And the data's in this format, it's in comma-separated values format.

Carol McDonald: 08:04 Here we're specifying a Scala case class to define the schema, and then a Spark schema, corresponding to the CSV file format. Here we're reading the data from the file into DataFrame with the Spark session read method. We're specifying the format type, the option that we don't want infer the schema. It would be possible to infer the schema from the CSV header, but you'll get better performance if you specify the schema. So here we're specifying the schema that we've defined right here, and we're not using the header, and then we're loading the file. So this is gonna load the file into a DataFrame, and this is the result of this.

Carol McDonald: 08:48 So here we see that we have a DataFrame, which is, and if we take a few fields from this, it's going to return an array of SQL rows. We could also load this, this shows the difference between the DataFrame and a dataset, we can load this into a dataset, and this is how we do that. We just specify as Uber, and this is going to then return a dataset of Uber objects. And here if we take a few of those, we see that that returns an array of Uber objects.

Carol McDonald: 09:18 With Spark 2.0, the DataFrame API was merged with the dataset API. So the different is that a dataset is a collection of typed objects. We saw that with a dataset of Uber objects, if we loaded it that way. A DataFrame is a dataset of row objects. So, the advantage of having these combined is you can use the SQL on both of these. With a dataset, you can also use functions on that.

Carol McDonald: 09:49 There are two types of operations you can perform on the dataset. The transformations create a new dataset from the current dataset, and actions trigger computations that return a result to the driver program.

Carol McDonald: 10:04 The Spark Machine Learning provides a set of APIs built on top of DataFrames for machine learning workflows. We're gonna use a transformer to extract the features into a feature vector, which is a column in a DataFrame. And then we're gonna use an estimator to train on a DataFrame and to produce a model.

Carol McDonald: 10:26 In order for the features to be used by a machine learning algorithm, they have to be transformed into feature vectors, which are vectors of numbers representing the value for each feature.

Carol McDonald: 10:37 In our example the features that we're gonna use are the latitude and longitude to cluster these Uber locations, and then we're gonna use Spark SQL to analyze the day of the week, time, and rush hour for these groupings. Uber really does do things like real time price searching. They do a lot of analysis using Spark, but this example is using real Uber data, but it's just the code from me. This isn't the code from Uber.

Carol McDonald: 11:03 So again what we're gonna do is we're gonna put this latitude and longitude as a feature, so we need to put this into a feature vector. And here what we use is a transformer, a vector assembler, to combine a given list of columns into a single vector column. We specify the input columns, which is gonna be the latitude and longitude, and in the output is gonna be the features. So, what it's going to do is take the latitude and longitude and put this in a feature column, which is a feature vector that we're gonna pass on to the machine learning estimator.

Carol McDonald: 11:40 Next what we do is we're creating the estimator, which is our k-means estimator. Here we specify the number of clusters that we want. Here we're specifying 10. Actually in the notebook that I'm gonna show later, we're gonna use 20 clusters. We set the input column, which is the features vector that we had before from the vector assembler. We set the prediction column. Here we're gonna have a prediction column that's the cluster IDs, and we're gonna name it CID for cluster IDs. And then we specify how many iterations we want the k-means algorithm to do. And this is then going to return our model, the k-means model.

Carol McDonald: 12:22 With a k-means model, we can call fit with an input DataFrame, and that's gonna train on this DataFrame, and so it's gonna use that features vector, it's gonna train on that, and return the model. So that's gonna return the model that we can then use for the clustering. So the k-means estimator takes the features and returns the k-means model. Then, with this model what we can do is we can print out. Here, we're printing out the cluster centers that it found.

Carol McDonald: 12:57 So, these are the 20 cluster centers that it found for New York. We're doing this for New York area. You can see that most of the cluster centers were in Manhattan, so this is a more concentrated area of Uber trips.

Carol McDonald: 13:15 This just shows this in more detail, the clusters.

Carol McDonald: 13:19 The next thing that we want to do with the model is we want to get the cluster IDs for the features. So there's two ways that we can do this. If we just want to get the predictions or the cluster centers for the training data that we use, we can call predictions, and that's gonna return a DataFrame that looks like this, which has the cluster IDs for the features. Or, if we have new data that we want to get the cluster IDs for, with this model we call transform on a new dataset, and that's gonna return the clusters that also looks like this. So it has the cluster ID corresponding to the features.

Carol McDonald: 13:57 Now, what we're gonna do is we're going to analyze these cluster IDs corresponding to the features based on the date and time. So in order to analyze this what we're gonna also do is we're going to create a view so that we can also use the Spark SQL to analyze this.

Carol McDonald: 14:17 So here's an example of an analysis that we can do. With the DataFrame, we can use DataFrame transformations to create queries. So here we're creating which clusters had the highest number of pickups. So we're grouping by the cluster ID, we're getting the count, and then we're ordering it by the count descending and showing the top five. And so we see that the top five are 6, 5, 0, 16, and 13, and so these are in the Manhattan area. So, does that make sense? This is where there are a lot of cluster pickups.

Carol McDonald: 14:54 We can also with a Zeppelin notebook, or another Spark-type notebook, we can show this in graphical format using Spark SQL. So here we're doing the same thing. We're selecting the count of the cluster IDs, and we're grouping by the cluster ID and then ordering it by the count. And this is showing again these cluster counts that we see 5, 6 had the highest numbers and also 0, which corresponds to what we saw here.

Carol McDonald: 15:23 And this one we're querying how many pickups occurred in the busiest five clusters by hour. So here we're selecting by the hour from the date and time as the hour and the cluster ID, and then we're counting the cluster ID, and then we're specifying we just want the cluster ID for the busiest clusters. I actually need to update this. Those are not the busiest clusters in this case. Then we're grouping by the hour and the cluster ID. And so then this graph is showing here we have the cluster IDs in color so we can see that number 5 and 6 are these so that we see the 5 and 6 are our busiest clusters here by the hour, so these are the hours form 0 to 23.

Carol McDonald: 16:18 Here we're just showing which hours have the highest number of pickups, so here we're selecting by the hour, counting the cluster ID, and grouping by the hour. And then here we see the number. So here we see that the rush hour, so from 3:00 to 9:00, has the highest number of pickups.

Carol McDonald: 16:40 Next what we want to do is we want to save the model to the distributed file system, because we're gonna use this then with the streaming data. So we call the model write and save disk to distributed file system, and then later we can use the k-means model to load this data. And that's what we're gonna do with the streaming data. And just to show you what this looks like then, This is going to save to the file system, the metadata, and the data. And the metadata is in JSON format, and the data is in Parquet format.

Carol McDonald: 17:18 So now what I want to do is show the notebook for that before I go on to talk about the next part. Here I have a Zeppelin notebook for the machine learning part that I just went over. And so I just wanted to show that in the notebook. I'm not going to run it because that would take more time. So here I would define the schema for this csv file map. Format. Here I am reading from the csv file into this schema. Then I'm showing the first 20 rows. So here I have date, time, the latitude, longitude, and the base for that. I can also print out the schema. Here I'm defining for the Vector Assembler that I want to get the feature columns latitude and longitude. And that puts these into a column with the feature's vector.

Carol McDonald: 18:13 Next I'm trading the KMeans estimator. Specifying that this time I want 20 clusters, and I'm specifying 200 durations for that one. Then with this KMeans estimator, I call fit passing in the data frame, and that's going to return the model.

Carol McDonald: 18:34 Then with the model, I can either call transform on a data frame or I can get the predictions for that data frame that I just trained. And here I'm showing the results of this. So here, now I had the cluster id's corresponding to the features. And this is just showing then what this data frame looks like. And here I'm showing the cluster centers on a Google map. So again, here we see zero, six, five clusters are in Manhattan. And then I'm calling the group Y to determine how many pickups occurred in each cluster. So I'm grouping on the cluster ID and then I'm counting. And getting the results here that six and five had the highest. And this just shows the same thing with SQL. Showing again that five, and six, and zero had the highest count.

Carol McDonald: 19:39 Show this is showing the pickups just for the clusters that are zero, five, six, 13 and 16. Here I'm showing just the pick ups in the busiest clusters with the cluster ID in zero, five, six, four by hour. So here we see the colors are the clusters and then these are the different hours from zero to 23.

Carol McDonald: 20:09 Then I'm looking at which hours of the day and which clusters had the highest number of pickups. So here I am showing the hour and the cluster ID. This is using the data frame query. And then using SQL, which hours of the day had the most pickups. Again the rush hour had the most pickups which is normal. Then which ones had the highest number of pickups during the morning rush hour. So here I'm just using hours in the morning rush hour from six to nine o'clock. And these are the different cluster IDs.

Carol McDonald: 20:49 Here I'm seeing which had the highest number of pickups during the evening rush hour. So that's from four to seven o'clock, for these cluster IDs. And here I'm saving the model.

Carol McDonald: 21:07 So now I'm going to go on the next part, where we're going to talk about the Kafka API. It's just a brief overview of the Kafka API first. So this is part of our application.

Carol McDonald: 21:24 So first of all, what is a stream? A stream is an unbounded sequence of events that go from producers to consumers. And with Kafka, these are key value pairs. Just an example of streaming data, here's an example of combining streaming data with machine learning. So this is an example from the www.technologyreview.com, where a Stanford team has shown that the machine learning model can identify heart arrhythmias from the EKG better than an expert. And I'm putting this one out because that's similar to that video I showed you in the beginning where I did this application with this data scientist at MapR where we showed machine learning to detect EKG anomalies. And you can read about this one at the blog; at this blog. You will get a link to the slides later so that you can find this article; this example.

Carol McDonald: 22:18 So now, going over the Kafka API. So first of all for collecting data what we're going to use in this example is the MapR event store, which is a distributed messaging system, which enables producers and consumers to exchange events in real time via the Kafka API. And with Kafka or MapR ES (Now called MapR Event Store), topics are logical collections of messages, which organize events into categories and decouple the producers from the consumers.

Carol McDonald: 22:50 Topics are partitioned for throughput and scalability. So these partitions make the partitions scalable by spreading the lows for topics across multiple servers. You can think of a partition like an event log or queue. New messages are added to the end and are assigned a sequential id called the offset. And like a queue, events are delivered in order they are received. However, unlike a queue, messages are not deleted when read. They remain on the partition available to other consumers.

Carol McDonald: 23:25 Messages can be persisted forever or expired automatically based on a time to live. Not deleting the messages when they are read, allows for high performance of scale by minimizing the disk reads and writes. Not deleting the messages when read, also allows processing the same message for different consumers for different purposes or multiple views.

Carol McDonald: 23:54 So now we'll go over the Spark Structured Streaming. This part of the application we are going to enrich. Use our machine learning model with Spark's Structured Streaming to enrich the events and then write them to MapR Database.

Carol McDonald: 24:09 This how we are going to be reading the events from the partitions that we just talked about. And the tasks of reading these partitions and then processing the data in cache.

Carol McDonald: 24:24 With Spark's Structured Streaming is a scalable streaming processing engine built on the Spark SQL engine. So, this enables you to view data published to Kafka as an unbounded data frame. And you can process this data with the same data frame, data set, and SQL API used for batch processing. So we can use the same API's that we were looking at with the data read from a file.

Carol McDonald: 24:51 As the streaming data continues to arrive. The Spark SQL engine continuously processes it and updates the final results. So here we have data for example account data coming in and we have a query that's going to group by the account IDs and write this for example to a database. This is going to be continuously processed, and here we see the results are from the mount here.

Carol McDonald: 25:23 Spark Structured Streaming converts the SQL to incremental execution plans; operating on the streaming data. Stream processing of events is useful for real time ETL filtering, transforming, and enriching with machine learning like we're going to do. And then writing these results to files, data bases, or to a different topic.

Carol McDonald: 25:50 Here we are using the Spark KMeansModel Class to load the saved KMeansModel which was fitted on the Uber trip data that we went over earlier. Here we're reading from a Kafka topic, here we're specifying to read stream before we were reading from a file with Spark read. So that's one thing that's different but also in that frame similar. So we specify read stream in the format of Kafka, and then we specify the topic that we are subscribing to, the offset that we want to start at. Here we are starting off with the earliest offset. We could also specify the latest and we don't want to fail on data loss, and the maximum amount of offsets that we want to get at a time. Here we're getting a 1000 at a time.

Carol McDonald: 26:41 And what this is going to return is a KafkaData frame, which looks like this. So we have the key values that we talked about for the stream is key values, the topic name, the partition that it came from, the offset and then a timestamp. And if we do a data frame print schema that's going to show this schema that we just talked about. What we're interested in is this value, which comes in as a binary format. So we are going to have to convert this to a string, and then parse this csv string. We'll look at how to do that next.

Carol McDonald: 27:16 So, what we're going to do is we're going to specify the case class, which we did earlier and corresponds to the incoming data. And here we're specifying a function, which is going to receive a string, split the string by the comma values, and then create an Uber case class from that string.

Carol McDonald: 27:38 Here what we're doing is we're registering a user defined function, so that's a udf. To deserialize the message into using this parse Uber function, so this receives a string and then uses the parseUberup function to create Uber objects. Then what we are doing with that data framed we received from Kafka, we're using a select expression with this udf. So we're calling the udf deserialize. We're casting this binary value as a string, and then that's going to pass the string to the deserialize function. Which is going to call the parse Uber and return Uber objects and then we're putting these Uber objects into those Uber case classes. So what this is going to return a data set of Uber objects. So if we were using JSON data, actually they provide a from JSON function. Spark provides a from JSON function. But with csv data this is how you do it. You specify your own deserialize function.

Carol McDonald: 28:44 So next, what we're going to do is we're going to use the Vector Assembler, just like we did before but this time what we're going to do is get these features so that we can get the cluster centers corresponding to those features. So we create the Vector Assembler, we specify the feature columns, the input columns, and then the output column. And then we're going to use this to transform our input data and get the output data frame.

Carol McDonald: 29:16 So next what were doing is we're using that saved Kmeans model. We're calling transfer on our input data frame and that's going to return the clusters. So what it does is, we're passing in these features to our model and this is the features inside of this data frame. And we're calling transform, and that's returning the clusters into the cluster data frame, which has the clusters corresponding to the features.

Carol McDonald: 29:52 So this is what our data frame looks like so far. Here what we're going to do is we're going to select the columns that we want to keep. So we don't want to keep that features column. And then what we're going to do is we're going to create a unique ID for writing to map our db. So our unique ID is going to be the cluster ID and a reverse timestamp, which is also coming in with the data. So we have a reverse timestamp, and what this is going to look like is this. The ID is going to look like the cluster ID and a reverse timestamp and that's what we have in this class. We have the id and then we have the date, time, the base, the cluster id, and then the latitude and longitude for the clusters.

Carol McDonald: 30:40 Next what we can do, we can do for debugging methods. We wouldn't want to do this with a whole lot of data because we could run out of memory. But, we can write this to a memory sync and then we query this sync in memory. So for a limited amount of data. So this is good for debugging purposes, and also for visualization purposes.

Carol McDonald: 30:57 So here what we're doing is we're specifying. Oh this doesn't correspond to what we're doing actually. Sorry. Anyway, we could write this to a memory string. This is what we're going to go write later to map our db. So, in the notebook I'll show you writing to a memory sync. And this is what you can then do with the memory sync is you can show this data what it looks like. So here we have, this is what our data looks like after we've transformed it. We have the id, the date and time, the latitude and longitude, the base, the cluster id, and the cluster latitude and longitude.

Carol McDonald: 31:35 Then we can do queries like this on the streaming data. We can select, again we saw this earlier on the file data, but now we can do this on the streaming data. Here we're reflecting the hour, the cluster id and we're getting the count of the cluster id and grouping this by the hour and the cluster id. So now I'll show you what that notebook looks like. So this is the notebook for the Structured Streaming that we just went over first. We're loading the saved machine loading model for the KMeans model. And here what we're doing is a little more stuff that I didn't show is we're also getting the cluster sensors from that machine learning model; the latitude and longitude from the machine learning model.

Carol McDonald: 32:29 Here we're defining the schema, and the function to parse the string into Uber objects. Then we're reading from the stream and we're registering this udf to deserialize using that Uber function. And we're deserializing this into Uber objects. And this shows the schema for the Kafka schema. Then we're using the Vector Assembler to get the features and then we're using the model to get the cluster sensors for these features. So the models transform, and this is returning the cluster centers corresponding to those features. And then, we're getting just icons that we want to save, and we're actually then joining this in this one and we're joining these on the cluster id to also get the cluster latitude and longitude.

Carol McDonald: 33:29 Then, we're creating this unique id with the cluster id and the reverse timestamp and then finally here what we're doing is we're writing the stream to memory so that we can query this in memory. So then, we can query this as it's coming in. This shows what the data frame looks like as it's coming in streaming; after it's been transformed. So we have a cluster id, the date, time, the latitude and longitude, the base, the reverse timestamp, the C latitude and longitude and then the ID that we created. This is the ID that we're gonna use for writing to MapR Database.

Carol McDonald: 34:14 So then we can perform queries on this. For example, what's the trip count by hour and cluster? And these colors are the cluster IDs... and these are the hours. Here we see what is the trip count by cluster, so these are our 20 different clusters. Here we're getting the trip count by hour. And here we're getting the trip count by day and cluster. So here we see, when I was running this, that so far it's only read 4 days from the clusters. This would be updated as we're running it. And this is what's the trip count by base and cluster. Here then we can stop writing this to memory and write this to MapR Database instead. Another option would be to write to this a topic.

Carol McDonald: 35:27 Next what we're going to go over is Spark and MapR Database. We're going to go over first this part of the application where we're writing to MapR Database JSON. These enriched [inaudible 00:35:38]. The MapR Database connector for Apache Spark enables you to use MapR Database as a sync for Spark Streaming. The connector architecture has the connection object and every Spark executor, allowing for distributed parallel writes and reads. So, that makes it really fast.

Carol McDonald: 35:59 With MapR Database, a table is automatically partitioned across a cluster by key range and each server is the source for a subset of a table. Grouping the data by key range provides for really fast reads and writes by row key. So here we see the data is automatically partitioned by key range and this is, again, what our row key is going to be like. So our table is going to be automatically partitioned by this row key, which consists of the cluster ID and the reverse timestamp. And the reverse timestamp is also going to be sorted. So the reverse timestamp is gonna mean that the most recent is going to be first. Then this is the format of the rest of our JSON data.

Carol McDonald: 36:47 This says writing to a MapR Database sync, so we specify write stream. Here we're specifying the format and then the table name, the row key ID - what that column name is. The default is actually just blank ID. Then we're specifying a tech client location and that we want to do bulk mode and sample size option, so we're doing it a thousand at a time. Then we start running this and this is then going to read from the partitions, transform these data sets - which every time you transfer it creates a new data set - and then the new data sets are gonna be written into the partitions of MapR Database. So it's running this in parallel.

Carol McDonald: 37:33 Next, after we've written this data, we can explore continually with Spark SQL and the Spark MapR Database connector enables users to perform complex SQL queries on top of MapR Database, using the Spark data set while also applying critical techniques such as projection and filter pushdown, customer partitioning, and data locality. And we'll go over some of that too.

Carol McDonald: 37:59 So this shows loading the data from the MapR Database partitions. We specify the Spark sets and we specify load from MapR Database with the table name and the schema and then optionally, you can specify as the case classes. So that's the case class we're using now and this is showing that this is gonna launch tasks, which are gonna be reading from these partitions - the MapR Database partitions - into cache.

Carol McDonald: 38:28 Here we're creating a temporary view so that we could also use the Spark SQL. And this is what our dataframe looks like now. With the dataframe show, we see the first 20 rows. And we see that this is automatically partitioned by the row key and you can see this and we see that the most recent ones are sorted first, so that's the last one. This is only one month of data, so it's August 31st was the last day and then almost at midnight was the last trip. And then we see that it gets earlier.

Carol McDonald: 39:05 Then we can do some queries similar to what we saw before. So here we're grouping by the cluster ID and we're getting the count. The difference here is this is going to be showing this continuously, so as it's being written to MapR Database you can get continuous updates of these counts being read from MapR Database.

Carol McDonald: 39:29 And you can do things like I showed before, then using Google Maps you can display the cluster location, we're displaying the cluster locations with the red markers and then the circles are the trip locations. Here we're displaying the most recent trip locations. So from the data framework, selecting the latitude and longitude and the cluster ID and then we're displaying those on the map.

Carol McDonald: 39:58 Here we have the query: which hours have the highest pickups for cluster ID 0? So what we're doing here is we're filtering on the ID that's less than 1, so that's going to give us only the ID 0, the cluster ID 0. Then we're selecting by the hour and the cluster ID and we're grouping by the hour and the cluster ID. And then we're aggregating on the count to get the count.

Carol McDonald: 40:24 This shows the query plan for this query. And what we see in this query plan is the projection, so that's the select on the hour and the cluster ID. This is going to be pushed down into MapR Database and also this filter where the ID is less than or equal to 1 - that's going to be pushed down into MapR Database.

Carol McDonald: 40:52 So what this means is that the selection will take place in MapR Database before returning the scan data to Spark. So this is going to be selected and filtered here in MapR Database. So the projection pushdown minimizes the data transfer between MapR Database and the Spark engine by omitting necessary fields from the table scan. So we're specifying just the fields that we want so this is going to give you better performance. If you're only interested in this example, in the cluster ID and the hour, then that's gonna give you better performance to minimize the data transferred back to the Spark executor. And this is especially beneficial when a table has a lot of columns.

Carol McDonald: 41:38 Also, the filter pushdown is going to improve the performance by reducing the amount of data passed between the MapR Database engine when filtering the data. SO if we're only interested in cluster 0, then that's going to make it a lot faster just to get the data for cluster 0. So those are some optimizations that you can use when you're using something like MapR Database.

Carol McDonald: 42:02 Here's some examples of some more clusters that we can use. So here we're selecting the hour and the count and grouping by the hour and the cluster ID. We're reflecting the hour and the cluster ID and counting the cluster ID. And here these are the cluster IDs and these are the hours from 0 to 24.

Carol McDonald: 42:25 So now I'll go over the last notebook is for that one. And here we're specifying the imports... for this one. We're specifying the schema, so this is the schema. The difference here is that we've added this unique ID from MapR Database. Then we're loading the data from MapR Database into a data set. So here we specified load per MapR Database the table name and now we're loading that as this Uber with net ID case class.

Carol McDonald: 43:14 Here I got an hour- this error message just means that I need to restart the Spark interpreter for the notebook. So, I won't be running the rest of it cause I don't have time for that.

Carol McDonald: 43:26 So here what we have is the dataframe showing the first 20 rows. Here we have the same thing using SQL to show the first 20 rows and here we're printing out the schema. Here we're showing the cluster locations that have the highest number of pickups using the dataframe transformations. Here we're showing the same thing using SQL. So, again, here we see that 5 and 6 and 0 have the highest number of trip counts. Here we see the highest pickups for cluster ID 0, so we're filtering on the ID less than 1 and then we're getting the counts by hour and cluster ID.

Carol McDonald: 44:15 And this shows that explain plan that we talked about where we see... here we see the pushdown filter.

Carol McDonald: 44:26 Here we see which day and hour have the highest number of pickups. So here we see that day 21 and 10 o'clock with the cluster ID 6 had the highest count. So this is kind of unusual cause most of- you see the other ones are in rush hour, like 6 o'clock or 5 o'clock. 10 o'clock, maybe there was some special event in Manhattan that made this day different, but usually the highest hour was in rush hour, the highest pickups were in rush hour.

Carol McDonald: 44:59 Here we see the hours of clusters that have the highest pickups... and these are, again, these are the cluster IDS and those are the hours. Then we've seen this one before...

Carol McDonald: 45:21 The top 5 cluster counts by hour. So here we're putting the cluster ID in the set and the hours, grouping by hour. And then this is for the top cluster IDs and during rush hour, so form 4 o'clock to 7 o'clock.

Carol McDonald: 45:41 Here's something else that you can do, you can calculate hourly window counts by cluster. So here we're setting a window of 1 hour and then we're using a new view of Uber counts to see the windows by hour. So here we see for the cluster ID in this window, we have the counts. Here we're showing the date and time of the cluster accounts for Uber trips and we're just limiting this to 50 results. So here we have the date and time and the counts for the cluster and here are the colors for the different clusters.

Carol McDonald: 46:24 And here I wanted to show plotting the latest on the map, but since the Spark interpreter needs restarting, I can't show that right now.

Carol McDonald: 46:35 But, anyway, I'll give you the link that you can download the code and the notebooks from the link in the Spark ebook, which you could also download. So I'll show you that, those links.

Carol McDonald: 46:58 All the components that we just talked about in this architecture can run on the same cluster with a MapR Data platform. So the Spark - everything that we talked about - Spark, the distributed file system, the MapR Database, and the MapR event store. You can run all this on the same cluster with the MapR Data platform.

Carol McDonald: 47:19 And you can read about this example and more about Spark 2.0x in the Spark ebook, which is available for free download at this link. And in this Spark ebook you can also get a link for the codes. There's also other ebooks you can download at mapr.com/ebooks. So some interesting ones could be the Streaming Architecture of Machine Learning Logistics and some other books. We also have free on-demand training at learn.mapr.com and one of the trainings in Spark 2.0. And then if you want to, you can also get certified. MapR has a Spark certification. We also have this blog that has this example and other examples at mapr.com.blog.

Carol McDonald: 48:17 Oh, I didn't mean to do that. So... just to go over a few questions. One question was: is it possible to share this notebook on GitHub? Yes, the notebook is on GitHub, so the GitHub link is in the appendix of the Spark ebook. So if you download the Spark ebook, then you'll have the link for all the code and the notebooks. And, again, there was another question about the notebooks. So this is a [inaudible 00:48:51] notebook. Yes, that will be available on GitHub if you download the ebook. And I will send out the link for the presentation and the link for the ebook also. That will be sent through an email afterwards. So, thanks a lot for this and thanks for attending. Bye.