18 min read
In this post, we focus on a seemingly simple, extremely widespread, but surprisingly difficult (in fact, an unsolved) problem in practice: counting in streams. Essentially, we have a continuous stream of events (e.g., visits, clicks, metrics, sensor readings, tweets, etc.), and we want to partition the stream by some key, and produce a rolling count over some period of time (e.g., count the number of visitors in the last hour per country).
First, let's take a look at how to solve this problem using a classic batch architecture, i.e., using tools that operate on finite data sets. In the diagram below, imagine time flowing from left to right. In a batch architecture, continuous counting consists of a continuous ingestion stage (e.g., by using Apache Flume) which produces periodic files (e.g., in HDFS). For example, if we are counting hourly, we are producing a file every hour. Then, periodic batch jobs (using any kind of batch processor - such as MapReduce or Spark) are scheduled on these files, perhaps with the help of a scheduler such as Oozie.
While this architecture can be made to work, it can also be very fragile, and suffers from a multitude of problems:
To address the first problem (high latency), the big data community (and in particular the Storm community) came up with the idea of the Lambda Architecture, using a stream processor alongside a batch architecture in order to provide early results. This architecture looks as follows:
The lower part of the figure is the same batch architecture we saw before. In addition to the previous tools, we have added Kafka (for stream ingestion) and Storm to provide early approximate results. While this resolves the latency issue, the rest of the issues remain, and it introduces two more systems, as well as code duplication: the same business logic needs to be expressed in Storm and the batch processor with two very different programming APIs, introducing two different codebases to maintain with two different sets of bugs. To recap, some problems with Lambda are:
An elegant and easy solution to all of these problems is a streaming architecture. There, a stream processor (in our case Flink, but can be any stream processor with a certain feature set) is used in conjunction with a message queue (e.g., Kafka) to provide the counts:
Using Flink, such an application becomes trivial. Below is a code template of a Flink program for continuous counting that does not suffer of any of the aforementioned problems:
DataStream<LogEvent> stream = env
.addSource(new FlinkKafkaConsumer(...)) // create stream from Kafka
.keyBy("country") // group by country
.timeWindow(Time.minutes(60)) // window of size 1 hour
.apply(new CountPerWindowFunction()); // do operations per window
It is worth noting that a fundamental difference between the streaming and the batch approach is that the granularity of counting (here 1 hour) is part of the application code itself and not part of the overall wiring between systems. We will come back to this later in this article, but for now, the point is that changing the granularity from 1 hour to 5 minutes needs only a trivial change in the Flink program (changing the argument of the window function).
Counting hierarchy of needs
Maslow’s hierarchy of needs describes a “pyramid” of human needs as they evolve from simple physiological needs up to self-actualization. Inspired by that (and Martin Kleppmann’s use of the same parabole in the context of data management), we will describe a hierarchy of needs for our counting use case:
Let's go through these needs, starting from the bottom:
Now, let's climb through this hierarchy of needs and see how we can satisfy them using the current state of the art in open source. We already established that a streaming architecture (and thus a stream processing system) is necessary to do continuous counting, so we will disregard batch processing systems and focus on stream processors. We will consider the most popular open source systems: Spark Streaming, Storm, Samza, and Flink, all of which can count continuously. The following sequence shows how certain stream processors are eliminated at every step in the pyramid:
Going up in the pyramid, we see how more sophisticated needs for counting eliminate the ability to use certain streaming frameworks. The need for low latency eliminates Spark Streaming (due to its micro-batching architecture). The need for efficiently handling streams of very high volume eliminates Storm. The need to provide strict fault tolerance guarantees (exactly once) eliminate Samza (and Storm), and the need to provide accurate and repeatable results eliminate all frameworks besides Flink. The final need, querying the results inside the stream processor without exporting them to an external database, is not satisfied by any of the stream processors right now (including Flink), but this is an upcoming feature in Flink.
Next, we will go into all of these stages in detail, and see how certain frameworks are eliminated at every stage.
To measure the latency of stream processors, the Storm team at Yahoo! published last December a blog post and a benchmark comparing Apache Storm, Apache Spark, and Apache Flink. This was a very important step for the space, as it was the first streaming benchmark modeled after a real-world use case at Yahoo!. In fact, the benchmark task was essentially counting, which is the use case that we have focused on in this blog post as well. In particular, the job does the following (from the original blog post):
“The benchmark is a simple advertisement application. There are a number of advertising campaigns, and a number of advertisements for each campaign. The job of the benchmark is to read various JSON events from Kafka, identify the relevant events, and store a windowed count of relevant events per campaign into Redis. These steps attempt to probe some common operations performed on data streams.”
The results obtained by the Yahoo! Team showed that “Storm 0.10.0, 0.11.0-SNAPSHOT and Flink 0.10.1 show sub- second latencies at relatively high throughputs, with Storm having the lowest 99th percentile latency. Spark streaming 1.5.1 supports high throughputs, but at a relatively higher latency.” This is shown in the diagram below:
Essentially, when using Spark Streaming, there is a latency-throughput tradeoff, whereas both Storm and Flink do not show such a tradeoff.
Efficiency and scalability
While the Yahoo! benchmark was an excellent starting point in comparing the performance of stream processors, it was limited in two dimensions:
Using a similar cluster (same number of machines but faster interconnect) as the original benchmark, we got the following results:
We managed to scale Storm to a higher throughput (0.5 million events per second) than the original benchmark due to the faster interconnect. Flink, at the same setting, scaled to 3 million events per second. The Storm job was bottlenecked on CPU, while the Flink job was bottlenecked on the network bandwidth between the machines running Kafka and the machines running Flink. To eliminate that bottleneck (which had nothing to do with Kafka, but was just the available cluster setup), we moved the data generator inside the Flink cluster. In that setting, Flink could scale to 15 million events per second.
While it may be possible to scale Storm to higher volumes using more machines, it is worth noting that efficiency (using the hardware efficiently) is as important as scalability, especially when the hardware budget is limited.
Fault tolerance and repeatability
When talking about fault tolerance, the following terms are often used:
Flink guarantees exactly once with selected sources (e.g., Kafka), and end to end exactly once for selected sources and sinks (e.g., Kafka → Flink → HDFS with more coming up). The only frameworks from the ones examined in this post that guarantee exactly-once are Flink and Spark.
Equally important to tolerating failures is supporting operational needs for deploying applications in production and making those repeatable. Flink internally provides fault tolerance via a mechanism called checkpoints, essentially taking a consistent snapshot of the computation periodically without ever stopping the computation. Recently, we introduced a feature called savepoints, which essentially makes this checkpointing mechanism available directly to the user. Savepoints are Flink checkpoints that are triggered externally by the user, are durable, and never expire. Savepoints make it possible to “version” applications by taking consistent snapshots of the state at well-defined time points, and then rerunning the application (or a different version of the application code from that time point). In practice, savepoints are essential for production use, enabling easy debugging, code upgrades (being that the application or Flink itself), what-if simulations, and A/B testing. You can read more about savepoints here.
Explicit handling of time
In addition to being able to replay a streaming application from a well-defined point in time, repeatability in stream processing requires support for what is called event time. An easy way to explain event time is the Star Wars series: the time in the movies themselves (when the events happened) is called event time, whereas the time that the movies came out in the theaters is called processing time:
In the context of stream processing, event time is measured by the timestamps embedded in the records themselves, whereas the time measured by the machines that execute the computation (when we see the records) is called processing time. The stream is out of order when the event time order is not the same as the processing time order (like the Star Wars series).
Event time support is essential in many settings, including providing correct results in out of order streams, and providing consistent results when replaying the computation (e.g., in a time travel scenario, as we saw before). In short, event time support makes the computation repeatable and deterministic, as the results do not depend on the time of the day that the computation is run.
Of all the frameworks examined in this post, only Flink supports event time (with others building up partial support right now).
Queryable state and what’s coming up in Flink
The tip of the hierarchy pyramid was the ability to query the counts in the stream processor. The motivation for this functionality is the following: we saw that Flink can support a very high throughput in counting and related applications. However, these counts need to presented somehow to the external world, i.e., we need the ability to ask in real time the value of a specific counter. The typical way to do that is to export the counts into a database or key-value store and query them there. However, this may become the bottleneck of the processing pipeline. Since Flink holds these counts already in its internal state, why not query these counts directly in Flink? This is possible right now using a custom Flink operator (see Jamie Grier’s stateful stream processing at in-memory speed), but not supported natively. We are working on bringing this functionality natively into Flink.
A few other features that we are actively working on are:
We saw that even seemingly simple streaming use cases (counting) have more “depth” when performance and production use are involved. Based on a “hierarchy of needs,” we saw how Flink has a unique combination of features and performance in the open source space to support these use cases well in practice.
Stay ahead of the bleeding edge...get the best of Big Data in your inbox.