11 min read
A very common use case for the MapR Data Platform is collecting and analyzing data from a variety of sources, including traditional relational databases. Until recently, data engineers would build an ETL pipeline that periodically walks the relational database and loads the data into files on the MapR cluster, then perform batch analytics on that data.
This model breaks down when use cases demand more instant access to that same data, in order to make a decision, raise an alert, or make an offer, since these batch pipelines are often scheduled to run hourly or even daily. To get to real-time data processing one must build a real-time data pipeline that is continuously collecting the latest data. Building a real-time data pipeline doesn’t mean you have to give up batch analytics, since you can always write data from the streaming pipeline to files or tables, but you can’t do real-time analytics using a batch pipeline.
To build a real-time data pipeline, you should start with MapR Event Store, the publish/subscribe event streaming service of the MapR Data Platform, as it is the most critical component to handle distribution of real-time data between applications.
Next, a tool is needed to extract the data out of the database and produce it into MapR Event Store, as well as take data out of MapR Event Store and write it to files or database tables. StreamSets is an easy to use, GUI-based tool that runs directly on the MapR cluster and allows anyone to build robust data pipelines.
In this blog, we’ll walk through an example of building a real-time data pipeline from a MySQL database into MapR Event Store, and even show how this data can be written to MapR XD for batch or interactive analytics.
For this example, I assume you have a MapR 5.1 cluster running and StreamSets properly installed and configured.
Here is a document on MapR Community that explains how to install StreamSets on the MapR Sandbox:
Download the two Streamsets jobs here. These jobs will be used in this post.
The source will be a MySQL database that is running on my MapR cluster.
Note: MySQL could also run outside of the cluster; I just wanted to remove complexity in this architecture.
We will stream data from the clients table in that database and produce data to MapR Event Store:
Then we will stream data from a streams/topic to MapR-FS.
First of all, we need to add MySQL JDBC .jar to StreamSets. Download the .jar here:
Then copy it into
Now, we need to make sure that the user which is running StreamSets exists in the MySQL users’ table and has enough rights.
Log into MySQL using root from your node and enter the following queries:
>CREATE USER '<User_Running_Streamsets>'@'<Host_Running_Streamsets>' IDENTIFIED BY 'password'; >GRANT ALL PRIVILEGES ON *.* TO '<User_Running_Streamsets>'@'<Host_Running_Streamsets>' WITH GRANT OPTION;
Then create a database crm and a table named clients:
>CREATE DATABASE crm; >CREATE TABLE clients (ID INT, Name VARCHAR(10), Surname VARCHAR(10), City VARCHAR(10), Timestamp VARCHAR(10)); Now let’s add some clients into this table: >INSERT INTO clients VALUES (1,'Velfre','Raphael','Paris','20160701'); >INSERT INTO clients VALUES (2,'Dupont','Jean','Paris','20160701');
We will create a streams named clients and two topics:
>maprcli stream create -path /clients >maprcli stream edit -path /clients -produceperm p -consumeperm p -topicperm p >maprcli stream topic create -path /clients -topic clients_from_paris >maprcli stream topic create -path /clients -topic clients_from_everywhere_else
Runtime properties can be set up in a file locally and used from a pipeline. This is really useful in a production running environment. Here we will set up three properties related to MySQL. Open
$SDC_HOME/etc/sdc.properties and add:
runtime.conf_MYSQL_HOST=jdbc:mysql://<Mysql_Host_IP>:3306/crm runtime.conf_MYSQL_USER=root runtime.conf_MYSQL_PWD=password
Please add it below the existing runtime.conf.location properties. By doing this, you will be able to use the data pipeline that I developed.
Log into StreamSets (port 18630), import
Mysql_to_Streams.json and click on Import:
Now you are able to the see the pipeline. Click on any component to see its configuration.
This origin component is used to query the MySQL database and to retrieve data.
JDBC Connection String
Connection String is required to be able to connect to MySQL. Here we will be using the runtime property called “MYSQL_HOST” that we set up in
Here we want our data pipeline to increment data from our MySQL table. By checking this property, our pipeline will increment an offset from the Initial Offset set.
We want to retrieve all data from the clients table. WHERE clause is mandatory because we are running a pipeline and not a batch (like classic ETL). So the pipeline will run many small batch files based on the WHERE clause and offset column. Here we chose ID as the offset, since it will be unique and increment each time a client will be created.
This is the initial offset value.
Offset Column As said before, ID will be our offset column for this pipeline.
This component will run many batch files. Query interval is the time between two batches.
The Stream Selector processor is used to dispatch data in many pipelines depending on one or more condition. Here I would like to split my clients that are from “Paris” and the other using the “City” field from table clients.
Default condition is mandatory.
Data that passes condition 1 will be streamed into the first pipeline. Data that does not pass condition 1 will be streamed into the second pipeline, which is the default one.
Based on Stream Selector output, data will be dispatched in two different topics in /clients stream. Both of the MapR Event Store Producers are configured the same way. Only the topic name changes.
Here we will produce data to topic clients_from_paris int /clients stream.
We will used delimited format and configure the format to be “Default CSV”:
Import Streams_to_MapRFS.json and click on Import.
Consumer configuration is almost the same as Producer, except for the following properties:
Consumer group name that will retrieve data from topic clients_from_paris and streams /clients.
MapR Event Store Configuration
Here we would like to retrieve all data from the streams including data that had already been produced.
This is the default, but it can be changed. It corresponds to the File Prefix that will be created by the pipeline.
Again, this is the default.
Folder /tmp will be located at MapR-FS root.
Note: here we are using the MapR-FS specific component, but we could also use the Local FS component since we can access MapR-FS from a node that is running NFS Gateway. In this case, the directory template would be: /mapr/<cluster_name>/tmp/out/…
Go to the StreamSets home page, select the two pipelines and start them:
Now that both pipelines are running, the two records that we’ve created in the clients table should have been produced in our stream. Let’s confirm it using the streamanalyzer tool:
> mapr streamanalyzer -path /clients -topics clients_from_paris
Output should be:
Total number of messages: 2
We can also open Mysql_to_Streams pipeline and look at the metrics information:
The same information is displayed in Streams_to_MapRFS:
For now, the output file at: maprfs://tmp/out/
One hour is the default value.
The output file will remain hidden for one hour, or if the pipeline is stopped.
Note: this is not useful to MapR-FS, since it’s a full random R/W file system.
Now let’s add some data into the clients table:
>INSERT INTO clients VALUES (3,'Lee','Camille','Paris','20160702'); >INSERT INTO clients VALUES (4,'Petit','Emma','Paris','20160702');
Again, we can use streamanalyzer:
> mapr streamanalyzer -path /clients -topics clients_from_paris
Output should be:
Total number of messages: 4
And the metrics from the 2 pipeline should show 4 records:
Here’s a quick reminder on how to query data with Drill. You have three tools to query data using Drill:
Here I will query the data just injected in my cluster using Drill Explorer.
Since the output file generated on MapR-FS by StreamSets has no extension, we need to configure a default input format in the storage plugin page. Let’s update dfs Storage Plugins that is enabled by default:
“defaultInputFormat” : “csv” is the change
Open Drill Explorer and navigate into MapR-FS to find the output file. The output file should be located at:
Dfs.root > tmp > out > YYY-MM-DD-hh
Then let’s click on this file:
If go into the SQL tab, you will see the query that has been executed.
If you want to execute more complex queries, you can do that via this tab:
Thanks to Drill, you are able to query data immediately after the data has been written, without any ETL process to build.
Our job is done. In this blog post, you have learned how to easily integrate any data from your relational database with MapR Event Store and subsequently MapR-FS, and even use Drill to query this data with ANSI SQL.
For more information on StreamSets, go to:
If you have any questions about this tutorial, please ask them in the comments section below.
Stay ahead of the bleeding edge...get the best of Big Data in your inbox.