This post is the third in a series discussing stateful streaming services. In the first we defined distributed services and stateful services. In the second described patterns of managing state for distributed streaming services.
To recap the points of the previous posts:
- A streaming service has three components: stream, compute and data.
- Some computations by their nature require no state because there is no need to consider what happened before.
- Other computations are stateful by their nature.
- State must be maintained for stateful computations.
The question is whether or not the state is maintained in the service, that is, either in the service container (ie Docker instance) or in the service process (ie JVM).
Stateful services pose challenges when building distributed, scalable systems. There are three particular challenges that we want to address here:
- Parallel processing: Insure two service instances do not change the same state value simultaneously
- Only once processing: Insure that all events are processed and that the same event is not processed twice
- Recreating state: Insure that the state is recreated or distributed when adding a new instance of the service, when removing an instance or when an instance fails
In this post we will discuss patterns for managing the first challenge: parallel processing.
We are addressing the issues of distributed processing. Some compute challenges become too big for one server to process within a reasonable time. The only way to effectively scale is to run processes in parallel across multiple servers. The inherent challenge to parallel processing is to avoid changing the same state value simultaneously in separate instances.
We should walk through an example to explain. Suppose I have three instances of the same service that are updating accounts. A debit message for account A arrives on the stream and is processed by service instance 1. The very next message is a debit for account A. The state will become corrupted if this second message is processed by instance 2 or 3 before instance 1 is complete with its processing.
There are three known solutions to implementing parallel processing across multiple servers:
We will discuss the first two. Blockchain is beyond the scope of this post. Besides, in its current implementations, it is too slow for most streaming services.
There are three approaches to sharing state among distributed parallel processes:
- Every service instance shares the same external state store
- Every service instance maintains the entire state in its local memory
- Each service instance maintains a subset of the state in its local memory
The first approach of an external state store is the traditional solution. The compute instances do not maintain state. The solution relies on the database solving the problems of parallel access. If a database is fast enough and large enough, use it to store state. Let the database handle the complexity. It’s the simplest solution. However, the challenge is that in many situations traditional databases are not fast enough or large enough.
The second approach is every service instance maintains the entire state in its local memory. The advantage to this approach is that any instance can process any message. A disadvantage is that each instance requires a very large data store. Another disadvantage is that the instances must share locks. The following sequence diagram shows how the lock process works.
This example is just like the previous one, except now the instance must request a lock for the account. Instance one gets the the lock for account A when debit A1 message arrives. It publishes balance A1 then releases the lock. Instance 2 must wait for the lock. Instance 2 receives balance A1 before it calculates debit A2. Now the balance stream is correct.
Zookeeper is built for just this sort of lock sharing. However, as you can see, it comes with some complexity. It also comes with a high expense in network latency. For every message processed, an instance must send a lock request and wait for a lock response.
The third approach is that each service instance maintains a subset of the state. This approach relies on partitioning. We partition the messages so that every message for a particular state is routed to the same instance. For example, we guarantee that all debits for account A always go to instance 1 and all debits for account B alway go instance 2, etc.
There is no race condition because all messages for a particular key are processed in sequence on a single thread.
This third approach is exactly how mapreduce works. In mapreduce, the mappers assigns a key to each value, the sorter groups all the values for each key, and the reducers receive all the values for a given key one at a time.
Kafka enables the third approach. Kafka can separate messages in partitions based on key, guaranteeing that every message for a given key is on the same partition.
The simplest and most typical approach for partitioning is to create a hash of the message key and take the remainder of dividing the hash by the number of partitions (ie modulo).
partition = hash(message key) % number of partitions
We call this approach “hashmod” for short. In our example, the message key would be the account identifier. In most cases, it does not matter to which instance a particular key goes so long as all messages with that key always go to the same instance.
Partition of partitions
Elastic scaling means that we can dynamically add nodes to scale with more throughput or remove nodes to conserve resources. With stateless streams, the process is trivial since any message can go to any node. With stateful streams, however, messages for the same key must go to the same node. If we add or remove nodes, then the distribution of the messages changes, which means that the state must be moved to the new nodes receiving the messages.
The streaming frameworks use the concept of a window. At the end of the window, the state object is persisted. To add or remove nodes, we would:
- stop all the nodes
- read the persisted state
- distribute the state to the appropriate nodes
- start the stream again
If the state is small, this process of shutting down and restarting should happen relatively quickly, perhaps a couple of seconds. However, the cache could be quite large or a couple of seconds may be unacceptable delay.
The challenges of the previous approach arise from having a single level of partitioning. If there is only one partition for each node, then any change in the number of nodes requires a complete re-partitioning of the data. However, if there were multiple partitions for each node, then we would only need to move a subset of the partitions when nodes are added or removed.
A better approach is to have many more partitions of the data than nodes that process the data. We call this approach “partition of partitions.”
Let us assume four nodes again. This time, instead of 4 partitions we will create 40. Then the distribution is
Now suppose we add two more nodes. We can easily balance the system by moving three partitions from each existing node to the new nodes, as follows.
We only need to stop and move 30% of the state instead of 100%.
The same is true in scaling down or when nodes fail. For instance, if node 3 were to suddenly stop, then we could distributes its partitions as follows:
- 0..6, 20,25
Mechanics of assigning partitions
In order to assign partitions, we need a lock. Each instance of the processors needs to request a lock for each partition that it is processing. Zookeeper is designed to solve this very problem.
One approach is first come, first serve. Each instance grabs locks on partitions until there are no more locks available. This approach however will inevitably lead to inefficiency of some instances having more partitions to process than others.
The alternative is for each instance to request an equal share of locks. This requires that the instances know how many partitions there are an how many instances are currently running. If there are 40 partitions and 4 processor instances, then each instance should request 10 locks. If two more processors are added, then the 4 current processors should release 3 locks each. The new processors can then claim the released locks. Again, this information can be easily shared in Zookeeper.
Some software frameworks handle all of this distribution for you. In Kafka Streams, for instance, a processor provides an application identifier. All partitions for a topic are distributed among every instance with the same application identifier.
We stated earlier that locks can be expensive. However, in this approach, we do not require a lock request for every message. Instead, we only need a lock response to initiate processing messages for a particular partition.