15 min read
"Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch- and stream-processing methods. This approach to architecture attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation. The rise of lambda architecture is correlated with the growth of big data, real-time analytics, and the drive to mitigate the latencies of map-reduce." - Wikipedia
Previously, I've written some blogs covering many use-cases for using Oracle Data Integrator (ODI) for batch processing on top of MapR distribution and for using Oracle GoldenGate (OGG) to stream transactional data into MapR Event Store and other Hadoop components. While combining both products perfectly fit for the lambda architecture, the latest release of ODI (188.8.131.52.6) has many new great features, including the ability to deal with Kafka streams as source and target from ODI itself. This feature has tremendous advantages to anyone already having or planning to have a lambda architecture, by simplifying the way we process and handle both batch and fast data within the same logical design, under one product. Now if we combine OGG streaming capabilities and ODI batch/streaming capabilities, the possibilities are endless.
In this blog I'm going to show you how to configure MapR Event Store (aka Kafka) on Oracle Data Integrator with Spark Streaming to create a true lambda architecture: a fast layer complementing the batch and serving layer.
I will skip the "hailing and praising" part for ODI in this post, but I only want to highlight one point: the mappings designed for this blog, just like every other mapping you would design, since the very first release of ODI, are going to run with native code on your Hadoop/Spark cluster, 100%, out of the box, with you coding zero line or worry about how and where.
I've done this on MapR so I can do a "two birds one stone"; showing you MapR Event Store steps and Kafka. Since both aren't so much different in concept, or API implementation, you can easily apply the same steps if you are using Kafka.
If you are unfamiliar with MapR Event Store and/or Kafka concepts, I suggest that you spend some time reading about them. The following content assume that you know what MapR Event Store and Kafka are (and of course, ODI). Otherwise, you'll still get a great idea on the possible capabilities.
Obviously, we need to have MapR Event Store paths and topics created. Unlike Kafka, MapR uses its own APIs via the "maprcli" command line utility to create and define topics. Hence, this step would be slightly different if you are using commodity Kafka. The web has plenty of examples on how to create and configure Kafka topics and server, so you aren't alone.
For the sake of this demo, I've created one path and two topics under that path. We'll let ODI consume from one of those topics (registrations) and produce to another (registrations2). That way, you'll see how that works in action via ODI.
Creating a MapR Event Store path called "users-stream" and a topic called "registrations":
Creating the second topic, "registrations2", on the same path I defined previously:
Not a lot of preparations here since I'm using a personal pre-configured VM with MapR installed and running. However, some steps were needed to get ODI mappings complete successfully. If you interested to know how I got ODI to work on MapR distribution, you may want to refer to this blog post.
The previous image is a snippet of my "site-core.xml" showing you the credential store I added. The next step would be to verify that the store is there, and then create an alias for the password value:
You won't need to restart any of hadoop components after those changes, even after editing core-site.xml.
Note: If you hit "os process exception", such as 137, make sure you've got enough free memory available.
The usual preparations that you would do in ODI. I'll show the relevant ones to this blog.
Hadoop Data Server
The following configuration is specific to MapR's. If you are using some other distribution, you need to enter the relevant port numbers, and paths:
Spark-Python Data Server
With this release of ODI, 184.108.40.206.6, you need to create multiple Spark data servers if you want to use Spark Streaming and general Spark server/cluster. In this demo, I've created only Spark Streaming server, and called it Spark-Async.
You would need to change the "Master Cluster" value to what you actually have and use: yarn-client or yarn-cluster, and select the Hadoop DataServer which we created previously.
Now the interesting part of the configuration here is the properties for the Spark-Async data server:
I've highlighted the most important ones that you need to pay attention to. ASYNC is used because we are going to use Spark Streaming. The rest of the properties are performance related.
Kafka Data Server
Here we'll define the MapR Event Store data server:
The metadata broker has a "dummy" address to comply with the Kafka API only. MapR Event Store client will do the needed for you to connect to MapR Event Store. You may NOT test the data server here, because there is no such Kafka Server running on MapR. So safely, ignore the test connection here because it'll fail (and that's OK).
For properties, you need to define the following:
You need to manually define the "key.deserializer" and "value.deserializer". Both are needed by MapR Event Store and jobs would fail if they are not defined.
I've done my tests here to cover five use-cases. However, I'll cover only one fully, and highlight the others to save you from reading redundant and common-sense steps.
1) MapR Event Store (Kafka) => Spark Streaming => MapR Event Store (Kafka):
In this mapping, we'll read streaming data from one of the topics we created earlier, apply some function (simple one) and then produce results to another topic. Here is the logical design of the mapping:
The MapR_Streams_Registrations1 model is something I defined by duplicating one of the models I have reverse-engineered for MySQL (structure is the same), but of course the technology selected would be Kafka in this case. You'll be able to select what is format of the streaming data: Avro, JSON, Parquet or Delimited:
And here is how the physical design looks like:
The properties for the physical implementation are:
You NEED to select the staging location as Spark Async AND enable "Streaming".
To load streaming data from our topic, registrations, to Spark Streaming, we need to select the proper LKM, which is LKM Kafka to Spark:
And then to load from Spark Streaming to MapR Stream target topic, registrations2, we need to select LKM Spark to Kafka:
2) MapR XD (HDFS) => Spark Streaming => MapR Event Store (Kafka):
I won't show you much here except for the knowledge modules used. To load from MapR XD (HDFS) to Spark Streaming, I've used LKM File to Spark:
And to load from Spark Streaming to MapR Event Store, I've used LKM Spark to Kafka like I did in previous mapping.
Note: The LKM File to Spark will act as a stream, a file stream (obviously). ODI will only pick up any updated/new files, NOT static ones.
3) MapR Event Store (Kafka) => Spark Streaming => MySQL:
To load from MapR Event Store (Kafka) to Spark Streaming, I've used LKM Kafka to Spark like I did in the first mapping. And then to load from Spark Streaming to MySQL, I've used LKM Spark to SQL:
4) MapR Event Store (Kafka) => Spark Streaming => MapR XD (HDFS)
To load from MapR Event Store to Spark Streaming, I've used LKM Kafka to Spark like we did before, and then to load from Spark Stream to MapR XD (HDFS), I've used LKM Spark to File:
5) MapR Event Store (Kafka) & Oracle DB => Spark Streaming => MySQL
This is another interesting use case, where you can actually join Kafka stream with SQL source on the spot. This ONLY (currently) works for the lookup component:
Notice that the Driver Source has to be Kafka (or MapR Event Store in our case), and the Lookup Source has to be a SQL database. I've used pretty much the same LKMs as previous mappings: LKM SQL to Spark, LKM Kafka to Spark and LKM Spark to SQL.
I'll show you the execution steps for the first use-case only, which is MapR Event Store (Kafka) => Spark Streaming => MapR Event Store (Kafka). To simulate the case, I've created a Kafka producer console and another Kafka consumer console so I can monitor the results. Looking at the producer below, I've pasted some records:
I've highlighted one of the URLs just to make sure you notice that it's in lower case. Waiting a few seconds, Spark will process those messages and send them to the target MapR Event Store topic:
Notice that all the URLs have been uppercased. Success!
Going through the mappings, the results were as expected. I'm not going to show the testing steps for them since they are just as simple. The idea here is to show you how to configure ODI with MapR Event Store (Kafka).
It worth mentioning that while any of the mappings are being executed, you'll be able to drill into the logs and see what's happening (the code generated, etc...). Moreover, you'll get a link to the job history URL to access it on Spark UI:
Opening the link will take us to Spark UI:
If you want to control how long your streaming job will survive, you need to increase the "spark.streaming.timeout" property of the Spark-Async data server OR override it from mapping configuration itself. You may also want to create an ODI package that has a loop and other useful components to serve your business needs.
ODI can handle both layers in the lambda architecture: batch and fast layers. This is not only a great feature which ODI added to its very long list of comprehensive capabilities, but also one that would increase productivity and efficiency in designing data pipelines from one unified, easy to use, interface. It was also clear that ODI can easily work with MapR Event Store just like it would with commodity Kafka, thanks to MapR for having their binaries compatible with Kafka APIs, and ODI for not being one framework dependant. This assures you that ODI is truly open and modular E-LT tool unlike others.
Some other relevant posts:
_Editor's Note: This blog was originally posted by Issam Hijazi on LinkedIn._
The thoughts, practices and opinions expressed here are those of the author alone and do not necessarily reflect the views of Oracle.