Hugh McKee, February 1, 2018
This article is the third and last in a series of three articles that dives into some of the interesting aspects of messaging within distributed systems environments. In this article, we take a look at exactly-once messaging. To understand the mechanics of exactly-once messaging you need to have a reasonably good grasp of the fundamentals of messaging in a distributed systems environment. Please see parts 1 and 2 of this series for a review of at-most-once and at-least-once messaging.
Let’s start with an example scenario as a way to understand the mechanics of exactly-once messaging in a distributed environment. In this example scenario, we will walk through a conversation between you and me. Let’s say we both are responsible for handling orders. You are responsible for taking orders, and I am responsible for shipping the orders. We communicate with each other via text messages. Think of this as a design exercise where you and I are walking through the design of two services, the order and shipping services.
When you receive a request for a new order, you send me a text message about that order. When I receive a new order text message, I need to send you a text message that acknowledges that I have received your text message. Since we are focusing on just the mechanics of the messages here, we will ignore most of the internal details of handling and shipping orders.
In the normal sequence of processing an order, you send me a text message for each new order. When I receive these new order text messages, I send you an acknowledgment reply text message. When each order is shipped, I send you an order shipped text message.
It is essential that each message from you to me and from me to you is delivered because each message results in a state change in the receiver. When I receive a new order message from you, this triggers the order shipping process. If any of these new order messages are not delivered to me, those orders are not shipped, which of course is unacceptable. Therefore you and I need to work out a protocol that ensures that all messages are delivered.
Now let’s look at what happens when the normal processing flow does not occur for some reason starting with the initial new order message from you to me. Multiple conditions will arise where a given new order text message will not be delivered to me. My phone could stop working. I could stop working. There could be a problem with the network.
You become aware of failures to deliver a given message to me either via some form of a message delivery error or after some timeout period expires. In either case, an error or timeout, you at some point know that a given message was not handled as expected.
It is worth taking a closer look at what your options are when new order text messages are not acknowledged. First, it does not matter if you received an error or a timeout. The simple reason is that there is no way for you to know if I received the text message or not. I may have received the message, and then the network failed immediately after the message arrived. I may have received the message, performed some state change operations, and before I could acknowledge it my phone failed or I was unable to send a reply for some other reason.
We both know that it is essential that every new order is processed and shipped. Therefore it is essential that each new order text message is delivered to me. We also both know that the only way for you to know that I have received a new order text message is for me to send you an acknowledgment text message.
So what do you do when you do not receive an acknowledgment message from me? The only real option is for you to try to send me the message again and again until the message is finally acknowledged.
Before we continue into the details of the handling failed messages, let’s take a moment to consider exactly-once delivery. Is there an exactly-once solution that will fix this problem for us? The simple answer is no. Implementing an exactly-once messaging system between two separated parties is impossible. The simple fact of the matter is that this is at least a five-step process.
- State change that triggers the decision to send a message
- Send the request message
- Receive and process the message, which results in a state change reaction
- Send a response message reply to the sender
- Receive and process the reply, which results in a state change reaction
It does not matter if you use a synchronous or asynchronous messaging flow. As shown in Figure 1, the reality is that there are five distinct steps and failures may occur at any one of the steps. But fear not, there are ways to implement effectively-once message delivery mechanisms, and we will look at some of the ways this is done in the following sections.
An excellent demonstration of the challenges of distributed messaging is a thought experiment known as the Two Generals Problem. What is shown in this analysis is that there is no way to guarantee state consistency between two endpoints when any form of two way communication is used where message delivery failures may occur.
In this article, we have been using an order processing example where you and I are the endpoints. You handle orders, and I am responsible for order shipping. We each maintain state for each order that we are processing.
In the two generals scenario let’s pretend that you and I are the two generals. We are planning on conducting a coordinated attack on a single enemy. As it happens, your army is located in one valley, the enemy is in the next valley, and my army is located in a third valley over a ridge from the enemy.
It is essential that we both attack the enemy at the same time. Jointly we have sufficient numbers of soldiers and resources required to defeat the enemy. However, if only one of our armies attacks the enemy alone, we will be defeated.
The challenge is that we have not yet agreed on a specific time to attack. We must communicate with each other via messengers to decide on when to attack. The dilemma is that the messengers must pass through enemy territory to deliver a message. Obviously, there is no guarantee that a given message will be delivered.
Say you decide that the time to attack is tomorrow at 8 am. This is essentially a state change. You are in the “let’s attack at eight tomorrow morning” state. You then dispatch a messenger with this information - “we attack tomorrow at 8 am”. At this point, you are waiting for my response. Without an acknowledgment from me that I agree with your plan you cannot proceed.
In our five-stage journey, you have completed the first stage, your state change decision on when to attack. Stage two involves the messenger delivering the message from you to me. Following the happy path first, I do receive the message. In stage three I make the state change decision to agree or not agree to your request to attack. In stage four I dispatch a messenger to deliver my reply message to you. Finally, you get my reply from me. In my reply, I’ve either agreed or rejected your proposed attack time.
Here is the five-stage messaging journey:
- You have completed the first stage, your state change decision on when to attack
- You dispatch a messenger to deliver your message to me
- I receive the message from you and make the decision to agree with your proposal
- I dispatch a messenger to respond back to you that I have agreed to your request
- You receive my reply and now know that we both agree on the time to attack
In this happy path example, we already have a serious problem. I have no way of knowing if you received my reply. How can I attack when I am unsure if you know that I have agreed to your proposed time?
There are at least two possibilities here. One possibility is that you did get my reply and of course the other possibility is that the reply messenger was captured or worse and the message was never delivered. In either case, I have no way of knowing what happened.
There is another more sinister possibility. The enemy captured the messenger. Then the enemy alters the message, say my reply was “I agree, we attack at 8 am”. But the message is altered to “8 am tomorrow is too soon, what about the next day?” Then the messenger is forced or bribed or replaced, and the altered message is delivered to you.
The point is that many things can go wrong just with my reply to you.
What do you do when you do not get a reply from me? In this case, we have another serious dilemma. You do not know if I have received your message or not. There are at least three possibilities here. One is that your messenger was captured and your message was not delivered to me. The second possibility is that I did receive the message, but for some reason, I was unable to send a reply. Finally, the third possibility is that I did send a reply, but my messenger was captured.
Is there a way to fix this communication problem?
One possible approach is that we require that each messenger delivers a message and then returns to the sender to verify that the message was delivered. When a message is dispatched, we wait for a finite period for the messenger to return. If the messenger does not return before the return wait time has expired, we send another messenger. We repeat this process over and over until we finally get a successful reply.
Will this modified message delivery approach work?
The short answer is no. The problem is that with this approach the message sender can know when a message was sent because the message delivery has been acknowledged. However, the message receiver does not know if a message was acknowledged.
Consider this scenario. You send a message to me, I get the message, and the messenger returns to you. The first problem is that I do not know if the messenger returned to you or not. What this means is that I can expect to see the same message from you more than once. In this scenario that is not a problem if I receive the same message multiple times.
The problem is with my response message back to you. You do not know if my messenger returned to me. You can expect that I may send my reply to you more than once because I’m using the technique of sending a messenger and waiting for a timeout period before sending another messenger. However, you cannot know if I have ever received an acknowledgement.
What this means is that we continue to have a significant communications problem. If my reply messenger does not return to me, I cannot attack as planned. At the same time, you are never sure if I will attack because you do not know if my messenger has returned to me with an acknowledgement.
After walking through the Two Generals Problem, you can see that reliable message delivery is challenging. We tried to solve the problems between the two generals, the two communication endpoints, using at-least-once message delivery techniques, and we still were unable to come up with a reliable, workable solution.
The reality is that when message producers push messages to message consumers, there are unsolvable failure scenarios that cannot be resolved. When you send/push a message to me, you have no way of knowing if I received the message or not. When I do receive your message, I have no way of knowing if you received my reply or not.
You the message sender and I the message receiver can know that there is a problem, but we cannot know in all failure conditions what happened on the other side of the wire. This is a fundamental law of the physics of distributed message communication that cannot be solved.
It would be wonderful if there were a workable exactly-once messaging solution. Ideally, we would like to exchange messages in the same way we invoke a method or function. Just give us a reliable remote procedure call, and we will be happy. What can be so hard about that?
As is often the case, there are many ways to solve software problems. Sometimes what we need to do is step back and evaluate what we are trying to accomplish. Our order processing scenario is not like the two generals’ problem. With the two generals, both parties need to coordinate their actions. In our order processing example, we merely have to perform a series of steps one after the other. Our only coordination requirement is that all of the required steps must be eventually completed.
In the case of our order and shipping scenario what we need is to exchange messages between orders and shipping. An essential requirement is that no messages can be lost. It would be nice to have an exactly-once solution available, but it is not an absolute requirement.
Our intuition drove us towards a push approach. You send me a text message when new orders are created. I send you a text message when I’ve started the packing process and when each order is shipped.
As we have learned so far in part-2 of this series, there are a lot of reliability problems with this push approach. The most basic problem is that sometimes messages are delivered, and sometimes messages are not delivered. Also, there is the uncertainty of not knowing what happened on the other side of the wire.
But we can make the push approach work - with some terms and conditions. First, you must implement a message retry approach. You keep trying to send me each message until you receive a reply from me. The Ts&Cs here is that you need to harden the retry process to the point that failures and restarts on your end do not result in your losing any messages. To do this, you will need some form of failure resilient pending messages list, as shown above in Figure 4. These are all solvable problems, but it does add a level of complexity to your message sending processing.
On my end, I have to handle potentially receiving the same message more than once. As we have discussed, when using the push/retry approach this results in the receiver receiving some messages one or more times. Handling the same message multiple times is also a solvable problem. Again, this takes some additional work on my end to handle this.
So the message push/retry and message receive one or more times is doable but it is more complex than your typical HTTP REST implementation.
Ok, so the push messaging approach is solvable but somewhat complex when it comes to reliable messaging. What about the pull approach? The pull approach is slightly counter-intuitive, but it is typically less complicated to implement. Both the push and pull approaches were covered in detail in part 2 of this series so please refer to that document for more details.
The push and pull approaches provide ways for implementing at-least-once delivery while the commonly used synchronous HTTP REST approach without retry offers at-most-once delivery.
What about exactly-once delivery? As already stated a general purpose exactly-once message delivery process is physically impossible to implement. However, it is possible to achieve what appears to be exactly-once messaging with techniques that are referred to as essentially-once.
The essentially-once message approach is a matter of perspective. On the receiving end what can be done is that the message receiver does not see duplicate messages, which effectively simulates exactly-once message delivery from, again only from the perspective of the message receiver. However, in between the message sender and the message receiver, we are going to have to implement some “magic” to make this happen.
First, let’s set the playing field in our order and shipping example scenario. On your order processing end, you store the state of orders in a local persistence store. On my end, I’ve got another local to me persistence store for maintaining the state of the order shipping processes. In between, we have a message bus, such as Kafka. To be clear, we each have our independent persistence stores, and we cannot perform any single transactions that spans our two persistence stores.
The message bus also provides transactional guarantees. Once a given message is successfully delivered to the message bus it guarantees that message is eventually delivered to the message receivers or consumers. One of the challenges in this message delivery flow is the non-transactional gaps between the event bus and the message senders and receivers, as shown in Figure 5. The details for handling this were also covered in part 2 of this series.
Figure 6: Transactionally store offset and state together
An essentially-once solution is to use the pull approach where the message producer logs all messages, and the message consumers each maintain an offset that points to the next message to be consumed, as shown in Figure 6. The essentially-once “trick” is for the message consumer to persist that offset in the same transaction used to persist the state change. This transactional pull approach nicely handles failures. A message is pulled from the log at the current offset. Then the state change operations are performed. When a failure occurs after a message has been pulled, but before the transaction is committed, the message consumer will restart after the failure at the same non-updated offset.
Consider the alternative where the state change and the offset are persisted in separate transactions. The idea is first to read the next message at the offset, next persist and commit the state change, and finally, persist and commit the offset update, as shown in Figure 7. This works, but it is an at-least-once solution, which means that some messages may be received more than once.
Another essentially-once approach involves a form of filtering incoming duplicate messages on the message consumer side. This is a variation of the single transaction approach used above. To filter messages, it is necessary to remember which messages have previously been processed. Just as it is necessary to persist the offset and the state change in a single transaction it is necessary to store read message used for filtering in the same transaction as the state change. The Kafka exactly-once implementation uses this approach, see Exactly-once Support in Apache Kafka for the details.
As stated on the Akka home page - Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala. Being message-driven, Akka provides solutions for at-most-once and at-lease-once messaging. Actors, the fundamental Akka building blocks, communicate with each other via asynchronous at-most-once message delivery semantics. The Akka toolkit also provides implementations of at-least-once delivery semantics. It should be no surprise to you if you have been following this 3-part series that any application of at-least-once message delivery requires some form of persistence, and this is also true with Akka.
Akka persistence provides the foundation for at-lease-once message delivery in the Akka Toolkit. Also, Akka Persistence Query provides a pull-based implementation that is used to pull entries stored in Akka persistence logs. The typical use-case is entries are pulled from the “write-side” logs, and the data is then stored on the “read-side” or “query-side” in a way that is optimal for querying. Akka Persistence and Akka Persistence Query is the Akka implementation of Event Sourcing and CQRS (Command Query Responsibility Segregation). The Lagom Framework also provides an implementation of Event Sourcing and CQRS that is built on top of Akka.
So ends this journey through messaging in distributed systems environments. The primary intent of this 3-part series was to explore some of the fundamentals, concepts, and considerations of messaging.
For many of us that are involved in building software systems, each of us is at some stage in a journey that inevitably leads to implementing ever more sophisticated distributed applications.
Currently (circa 2018) the distributed systems space is rapidly evolving - almost at the pace of controlled chaos. The hope is that if we ground ourselves with the fundamentals, we can better navigate our way through the excitement and confusion as things continue to evolve in the distributed systems space.
There was an ulterior motive as well for this focus on messaging. In one form or another, all of the Lightbend technologies and solutions are based on distributed messaging. Message-driven is one of the four properties of Reactive Systems. As the company that initially defined the term “reactive,” it should not be a surprise that we take this stuff seriously.