Introduction to Apache Flink

by Ellen Friedman and Kostas Tzoumas

Stateful Computation

Streaming computation can be either stateless or stateful. A stateless program looks at each individual event and creates some output based on that last event. For example, a streaming program might receive temperature readings from a sensor and raise an alert if the temperature goes beyond 90 degrees. A stateful program creates output based on multiple events taken together. Examples of stateful programs include:

  • All types of windows that we discussed in Handling Time. For example, getting the average temperature reported by a sensor over the last hour is a stateful computation.

  • All kinds of state machines used for complex event processing (CEP). For example, creating an alert after receiving 2 temperature readings that differ by more than 20 degrees within 1 minute is a stateful computation.

  • All kinds of joins between streams as well as joins between streams, and static or slowly changing tables.

Figure 5-1 exemplifies the main difference between stateless and stateful stream processing. A stateless program (a transformation of black records to white records in the figure) receives each record separately (black input) and produces each output record based on the last input record alone (white records). A stateful program maintains state that is updated based on every input and produces output (gray records) based on the last input and the current value of the state.

Stateless and stateful processing are compared here. Input records are shown as black bars. The left diagram shows how a stateless operation transforms each input record at a time and outputs each result based solely on that last record or event (white bar). The diagram on the right shows that a stateful program maintains the value of state for all of the records processed so far and updates it with each new input, such that the output (grey bar) reflects results that take into account more than one event.
Figure 5-1. Stateless and stateful processing are compared here. Input records are shown as black bars. The left diagram shows how a stateless operation transforms each input record at a time and outputs each result based solely on that last record or event (white bar). The diagram on the right shows that a stateful program maintains the value of state for all of the records processed so far and updates it with each new input, such that the output (gray bar) reflects results that take into account more than one event.

While stateless computation is important by itself, the most interesting applications of stream processing as was just described are stateful. It also turns out that stateful computation is a lot more challenging to implement correctly than stateless computation. Whereas older stream processing systems did not provide support for stateful computations, the newer group of stream processors are all about state and guaranteeing the existence and correctness of that state under various failure scenarios.

Notions of Consistency

When we include state in a distributed system, naturally the question of consistency is raised. Consistency is, really, a different word for “level of correctness”; that is, how correct are my results after a failure and a successful recovery compared to what I would have gotten without any failures? For example, assume that we are simply counting user logins within the last hour. What is the count (the state) if the system experiences a failure? In the terminology of the stream processing world, people distinguish between three different levels of consistency:

  • At most once: At most once is really a euphemism for no correctness guarantees whatsoever—the count may be lost after a failure.

  • At least once: At least once, in our setting, means that the counter value may be bigger than but never smaller than the correct count. So, our program may over-count (in a failure scenario) but guarantees that it will never under-count.

  • Exactly once: Exactly once means that the system guarantees that the count will be exactly the same as it would be in the failure-free scenario.

It used to be that at least once was very popular in the industry, with the first stream processors (Apache Storm, Apache Samza) guaranteeing only at least once when they first came out. This was the case for two reasons:

1. It is trickier to implement systems that guarantee exactly once. Exactly once is challenging at both the fundamental level (to decide what correct means exactly, and what is the scope of exactly once), and at the implementation level.

2. Early adopters of stream processing were willing to work around the framework limitations at the application level (e.g., by making their applications idempotent or simply redoing all calculations using a batch compute layer).

The first solutions that provided exactly once (Trident, Spark Streaming) came at a substantial cost in terms of performance and expressiveness. In order to guarantee exactly once behavior, these systems do not apply the application logic to each record separately, but instead process several (a batch of) records at a time, guaranteeing that either the processing of each batch will succeed as a whole or not at all. This situation implies that you have to wait for a batch to complete before getting any results. For this reason, users were often left having to use two stream processing frameworks together (one for exactly once and one for per-element, low-latency processing), resulting in even more complexity in the infrastructure. Guaranteeing exactly once and having low latency and efficiency used to be a tradeoff that users had to navigate. In contrast, Apache Flink does away with that tradeoff.

Note

One significant value that Flink has brought to the industry is that it is able to provide exactly once guarantees, low-latency processing, and high throughput all at once.

Essentially, Flink eliminates these tradeoffs by allowing a single framework to handle all requirements, a meaningful technological leap in the industry, which, like all such leaps, seems magical from the outside but makes a lot of sense when explained.

Savepoints: Versioning State

Previously, we saw that checkpoints are automatically generated by Flink to provide a way to reprocess records while correcting state in case of a failure. But Flink users also have a way to consciously manage versions of state through a feature called savepoints.

A savepoint is taken in exactly the same way as a checkpoint but is triggered manually by the user (using the Flink command-line tools or the web console) instead of by Flink itself. Like checkpoints, savepoints are also stored in stable storage and give the user the ability to start a new version of the job or to restart the job from a savepoint rather than from a beginning in time. You can think of savepoints as snapshots of a job at a certain time (the time that the savepoint was taken).

Another way to think about savepoints is saving versions of the application state at well-defined times. This is similar to saving versions of applications themselves using version control systems. The simplest example is taking snapshots at regular intervals without changing the code of the application—that is, keeping the application version as is. This situation is depicted in Figure 5-9.

Savepoints (represented by circles) are triggered manually to capture the state of a running Flink application at different times
Figure 5-9. Savepoints (represented by circles) are triggered manually to capture the state of a running Flink application at different times.AU

Here, we have a running version of an application (version 0) and took a savepoint of our application at time t1 and a savepoint at t2. At any given time, we could go back and restart the program from these times. Even more significant, we are able to start a modified version of a program from a savepoint. For example, we can change the code of the application (let’s call it version 0.1) and start it from the savepoint taken at t1. In this way, we have both version 0 and version 0.1 of the programs running at the same time while taking subsequent savepoints to both versions at later times, as shown in Figure 5-10.

Using savepoints to advance the version of a Flink application. The new version can be started from a savepoint created by an older version.
Figure 5-10. Using savepoints to advance the version of a Flink application. The new version can be started from a savepoint created by an older version.

You can use savepoints to solve a variety of production issues for streaming jobs:

1. Application code upgrades: Assume that you have found a bug in an already running application and you want the future events to be processed by the updated code with the bug fixed. By taking a savepoint of the job and restarting from that savepoint using the new code, downstream applications will not see the difference (except for the update of course).

2. Flink version upgrades: Upgrading Flink itself also becomes easy because you can take savepoints of running pipelines and replay them from the savepoints using an upgraded Flink version.

3. Maintenance and migration: Using savepoints, you can easily “pause and resume” an application. This is especially useful for cluster maintenance as well as migrating jobs consistently to a new cluster. In addition, this is useful for developing, testing, and debugging applications, as you do not need to replay the complete event stream.

4. What-if simulations (reinstatements): Many times, it is very useful to run an alternative application logic to model “what-if” scenarios from controllable points in the past.

5. A/B testing: By running two different versions of application code in parallel from the exact same savepoint, you can model A/B testing scenarios.

All of these issues occur in the real world. Flink’s internal checkpointing mechanism surfaces as savepoints, solving issues like the ones described here. This reflects that concept that Flink’s checkpoint feature is essentially a programmable mechanism to consistently upgrade state versions, much like a database system with multiversion concurrency control. This fundamental characteristic of the checkpoint mechanism will surface again when we look at how to provide end-to-end consistency in the next section.

End-to-End Consistency and the Stream Processor as a Database

We have seen how Flink can guarantee that state is kept consistent (exactly once) in a simple application that counts or aggregates data. Let us now look at this application end-to-end, as it might be deployed in production (depicted in Figure 5-11).

Application architecture consisting of a stateful Flink application consuming data from a message queue and writing data to an output system used for querying. The callout shows what goes on inside the Flink application.
Figure 5-11. Application architecture consisting of a stateful Flink application consuming data from a message queue and writing data to an output system used for querying. The callout shows what goes on inside the Flink application.

A partitioned storage system (e.g., a message queue such as Kafka or MapR Streams) serves as the data input. The Flink topology, shown as a callout in Figure 5-11, consists of three operators: the data source reads data from the input, partitions it by key, and routes records to instances of the stateful operators, which can be a mapWithState as we saw in the previous section, a window aggregation, etc. This operator writes the contents of the state (the counts in our previous example) or some derivative results to a sink, which transfers these to a partitioned storage system (e.g., a file system or a database) that serves as output storage. A query service (e.g., the database’s query API) then allows users to query the state (in the simplest case, the counts) as they were written in the output storage. Note that the figure depicts the contents of the state written to the output.

Note

Keep in mind that, in this case, the output reflects the contents of the state in the Flink application as of the time it was last written out.

The first question is, how can we transfer the contents of the state to the output with exactly once guarantees? (This is called end-to-end exactly once.) There are essentially two ways to do that, and the right way depends on the nature of the system used for output and the application requirements:

1. The first way is to buffer all output at the sink and commit this atomically when the sink receives a checkpoint record. This method ensures that the output storage system only contains results that are guaranteed to be consistent and that duplicates will never be visible. Essentially, the output storage takes part in Flink’s checkpointing. For this to work, the output storage system needs to provide the ability to atomically commit.

2. The second way is to eagerly write data to the output, keeping in mind that some of this data might be “dirty” and replayed after a failure. If there is a failure, then we need to roll back the output, in addition to the input and the Flink job, thus overwriting the dirty data and effectively deleting dirty data that has already been written to the output. Note that even with this way, in many cases there will be no deletions. For example, if new records are only overwriting old records (and not adding to the output), then the dirty values will be transient only between checkpoints and eventually overridden by new and refined values.

Note that these two alternatives correspond exactly to two well-known levels of isolation in relational database systems: read committed and read uncommitted. Read committed guarantees that all reads (queries to the output) will read committed data and no intermediate, in-flight, or dirty data. Subsequent reads may return different results because the data may have changed. Read uncommitted does allow dirty reads; in other words, the queries always see the latest version of data as it is being processed.

For some applications, weaker semantics may be acceptable, so Flink provides several build-in sinks with multiple semantics; for example, a distributed filesink with read uncommitted semantics (for a full list, visit the current Flink documentation). Depending on the capabilities of the output system and the application requirements, the user can choose the right semantics.

We saw that, depending on the type of output, Flink together with the corresponding connector can provide exactly once guarantees end-to- end with a variety of isolation levels.

Now, recall the application architecture shown in Figure 5-11. One reason for the output storage system here is that the state inside Flink is not accessible to the outside world in this example, so the output storage is the target of the query. If, however, we would be able to query the state in the stateful operator (the counts in this case), we might not even need to have an output system in certain situations where the state contains all the information needed for the query. This is true for a variety of applications, and in these cases, querying the state directly can lead to the vastly simplified architecture shown in Figure 5-12 as well as to vastly improved performance.

Simplified application architecture using Flink’s queryable state. In those cases when the state is all the information that is needed, querying the state directly can improve performance.
Figure 5-12. Simplified application architecture using Flink’s queryable state. For those cases when the state is all the information that is needed, querying the state directly can improve performance.

Queryable state is currently a work in progress by the Flink community. With queryable state, Flink offers a query API to issue read requests to Flink and get the current value of the state. In some sense, in a limited number of scenarios, Flink becomes a replacement for a database system, offering both a write path (the input stream that changes the state) as well as a read path (queryable state). Although this makes sense for a lot of applications, queryable state is definitely more limited than a general-purpose database.

Conclusion

In this chapter, we saw how stateful stream processing changes the rules of the game. By having checkpointed state as a first-class citizen inside the stream processor, we can get correct results after failures, very high throughput, and low latency all at the same time, completely eliminating past tradeoffs that people thought of as fundamental (but are not). This is one of the most important advantages of Flink.

Another advantage of Flink is its ability to handle streaming and batch using a single technology, completely eliminating the need for a dedicated batch layer. Batch Is a Special Case of Streaming provides a brief overview of how batch processing with Flink is possible.