Introduction to Apache Flink

by Ellen Friedman and Kostas Tzoumas

Batch Is a Special Case of Streaming

So far in this book, we have been talking about unbounded stream processing—that is, processing data from some time continuously and forever. This condition is depicted in Figure 6-1.

Unbounded stream processing: the input does not have an end, and data processing starts from the present or some point in the past and continues indefinitely.
Figure 6-1. Unbounded stream processing: the input does not have an end, and data processing starts from the present or some point in the past and continues indefinitely.

A different style of processing is bounded stream processing, or processing data from some starting time until some end time, as depicted in Figure 6-2. The input data might be naturally bounded (meaning that it is a data set that does not grow over time), or it can be artificially bounded for analysis purposes (meaning that we are only interested in events within some time bounds).

Bounded stream processing: the input has a beginning and an end, and data processing stops after some time.
Figure 6-2. Bounded stream processing: the input has a beginning and an end, and data processing stops after some time.

Bounded stream processing is clearly a special case of unbounded stream processing; data processing just happens to stop at some point. In addition, when the results of the computation are not produced continuously during execution, but only once at the end, we have the case called batch processing (data is processed “as a batch”).

Batch processing is a very special case of stream processing; instead of defining a sliding or tumbling window over the data and producing results every time the window slides, we define a global window, with all records belonging to the same window. For example, a simple Flink program that counts visitors in a website every hour, grouped by region continuously, is the following:

val counts = visits
  .keyBy("region")
  .timeWindow(Time.hours(1))
  .sum("visits")

If we know that our input data set was already bounded, we can get the equivalent “batch” program by writing:

val counts = visits
  .keyBy("region")
  .window(GlobalWindows.create)
  .trigger(EndOfTimeTrigger.create)
  .sum("visits")

Flink is unusual in that it can process data as a continuous stream or as bounded streams (batch). With Flink, you process bounded data streams also by using Flink’s DataSet API, which is made for exactly that purpose. The above program in Flink’s DataSet API would look like this:

val counts = visits
  .groupBy("region")
  .sum("visits")

This program will produce the same results when we know that the input is bounded, but it looks friendlier to a programmer accustomed to using batch processors.

Batch Processing Technology

In principle, batch processing is a special case of stream processing: when the input is bounded and we want only the final result at the end, it suffices to define a global window over the complete data set and perform the computation on that window. But how efficient is it?

Traditionally, dedicated batch processors are used to process bounded data streams, and there are cases where this approach is more efficient than using the stream processor naively as described above. However, it is possible to integrate most optimizations necessary for efficient large-scale batch processing in a stream processor. This approach is what Flink does, and it works very efficiently (as shown in Figure 6-3).

Flink’s architecture supports both stream and batch processing styles, with one underlying engine.
Figure 6-3. Flink’s architecture supports both stream and batch processing styles, with one underlying engine.

The same backend (the stream processing engine) is used for both bounded and unbounded data processing. On top of the stream processing engine, Flink overlays the following mechanisms:

  • A checkpointing mechanism and state mechanism to ensure fault-tolerant, stateful processing

  • The watermark mechanism to ensure event-time clock

  • Available windows and triggers to bound the computation and define when to make results available

A different code path in Flink overlays different mechanisms on top of the same stream processing engine to ensure efficient batch processing. Although reviewing these in detail are beyond the scope of this book, the most important mechanisms are:

  • Backtracking for scheduling and recovery: the mechanism introduced by Microsoft Dryad and now used by almost every batch processor

  • Special memory data structures for hashing and sorting that can partially spill data from memory to disk when needed

  • An optimizer that tries to transform the user program to an equivalent one that minimizes the time to result

At the time of writing, these two code paths result in two different APIs (the DataStream API and the DataSet API), and one cannot create a Flink job that mixes the two and takes advantage of all of Flink’s capabilities. However, this need not be the case; in fact, the Flink community is discussing a unified API that includes the capabilities of both APIs. And the Apache Beam (incubating) community has created exactly that: an API for both batch and stream processing that generates Flink programs for execution.

1See the slides and video of the talk at http://2015.flink-forward.org/?session=a-comparative-performance-evaluation-of-flink.