Oct 31, 2020

# Asynchronous Messaging Over gRPC

## Introduction

About four years ago I was actively involved in the redesign of a corporate-wide network monitoring system. The goal was to create a system which could handle two orders of magnitude more data than the existing one.

In a nutshell, the system was composed of "edge" data collectors which collected and pre-processed metrics and then pumped them into the central "event store". The store then relayed data to multiple consumers, such as time-series databases, alerting systems, etc. The collectors had to be autonomous to tolerate short periods of network downtime, while the event store had to be reliable, highly available, and have a long retention period.

In the past few years it became apparent that we have made quite a few mistakes. This story is about one particular mistake we made in the design of messaging protocol between data collectors and the store, and how it was fixed later.

## Fallacies of the original design

Reliable message delivery was a hard business requirement, so the risk of in-transfer data loss had to be reduced as much as possible. Therefore, before being sent over the network, the collected data had to be written to a local persistent storage.

For simplicity we chose the synchronous mode of operation whereby each sent message had to be explicitly acknowledged by the reliable storage (Kafka). To compensate for the unavoidable roundtrip latency, we assumed many simultaneous TCP connections and extensive batching (i.e. accumulating many data points in one message).

We reasoned that to handle large data volumes we had to optimise for two things:

• message encoding and decoding should be as fast as possible;
• on-the-wire data representation overhead should be minimised.

After some consideration we opted for a custom binary protocol where we had four message types: read, write, ack, and wait1.

We assumed several things (all of which turned out to be false):

• we would be relaying messages over fast, 10G (or even 100G) network, so bandwidth exhaustion was deemed improbable;
• we would be able to accumulate large batches;
• we would be able to create thousands of TCP connections if needed.

Firstly, it turned out that while the network was 10G in most sites, we had to share it with many other services and the available bandwidth was widely fluctuating and hard to predict.

Moreover, to cater for business requirements and the chosen partitioning scheme we couldn't accumulate data points for longer than 5 minutes. So message sizes were randomly distributed between a few kilobytes and a few hundred kilobytes, averaging at 10-25KB depending on site - much less than the 1MB messages we had planned for.

And finally, the firewall team yelled at us every time we had more than a few dozen TCP connections because a rusty, overloaded firewall box somewhere along the path just kept crashing otherwise.

So the two things we optimised for in the initial design were the least relevant ones. 😄

## Asynchronous messaging to the rescue

From the very beginning I had an inkling that we should use asynchronous messaging over UDP. However, we discarded the idea in favour of synchronous messaging over TCP. The reason was that reliable message delivery over UDP required unique message identifiers, some state which tracks all the in-flight messages, some form of congestion control, and a non-trivial CPU resources to handle it… All of that comes for free with TCP and we thought that we could have many of those.

At some point the ever-increasing data volume outweighed the effective bandwidth (about 40Mbits/s) on one of the sites which lead to partial outages - edge collectors simply couldn't deliver data fast enough.

The workaround we rushed to implement was message compression. It did help but not much because the partitioning scheme we used lead to high entropy of a single message, which resulted in small compression ratios. A re-design was required.

The insight was that TCP connections were full-duplex so we could send and receive messages simultaneously, allowing some limited number of in-flight messages. While I was sketching a draft implementation based on the binary message types we had had in use, a colleague gave strong arguments to utilise the gRPC protocol.

gRCP has some nice properties2. For instance, both, message and service specification can be defined in a single .proto file; the language allows variant data types (unions); and it runs over HTTP2. The latter point was essential for making the decision because we could potentially utilise existing third party tools (L7 load balancers) to intelligently route traffic based on the service/method pair to different nodes and/or applications through a single exposed port3.

The crucial point for me, however, was the ability to concurrently accept and receive message streams in different processes - precisely what was needed to implement asynchronous messaging.

Leaving out details, the service was defined as:

service DataSink {
// Each message from the request stream will be a HTTP2 POST request
// for /DataSink/Send path
rpc Send(stream Request) return (stream Response);
}

// Request can have different payload types
message Request {
uint32 version = 1;
bytes message_id = 2;
bytes plain_message = 9;
bytes zstd_compressed = 10;
}
}

message Response {
uint32 version = 1;
oneof response {
AckMessage ack = 9;
ServiceMessage info = 10;
}
}

message AckMessage {
repeated bytes message_ids = 1;
}

message ServiceMessage {
// ...
}


The Request message could have different payload types, while the Response message could either carry an acknowledgement for one or more messages or some kind of service message.

Both Request and Response messages were versioned to allow radical changes in the future.

## Implementation details

To expedite implementation, I modelled both, the client and the server side as finite state machines. The way I like to think about state machines is as of a set $$M$$ of four-element tuples

$\{(s_i, e_j, a_k, s_m) : s_i,s_m \in S, e_j \in E, a_k \in A \}$

where $$S$$ is a set of all possible states, $$E$$ is a set of expected events, and $$A$$ is a set of defined actions.

The same relation can be also written as

$e_1 \times s_1 \Rightarrow a_1 \times s_2 \\ ... \\ e_n \times s_m \Rightarrow a_k \times s_l$

which is a bit easier to read, e.g. "event $$e_1$$ in state $$s_1$$ triggers action $$a_1$$ and the machine transitions into state $$s_2$$".

We can also represented it as a table

Event/State $$s_1$$ $$s_2$$ $$...$$ $$s_k$$
$$e_1$$ $$a_{1_1} \times s_{i_{1}}$$ $$a_{1_2} \times s_{i_{2}}$$ $$...$$ $$a_{1_2} \times s_{i_{2}}$$
$$e_2$$ $$a_{2_1} \times s_{j_{1}}$$ $$a_{1_2} \times s_{j_{2}}$$ $$...$$ $$a_{1_2} \times s_{j_{2}}$$
$$...$$ $$...$$ $$...$$ $$...$$ $$...$$
$$e_n$$ $$a_{n_1} \times s_{m_{1}}$$ $$a_{n_2} \times s_{m_{2}}$$ $$...$$ $$a_{n_2} \times s_{m_{2}}$$

This notation really helps to think through all the possible permutations of events and states. When a particular combination doesn't make sense we just put a no-op action into the cell and keep the state.

Erlang OTP, being innately awesome piece of technology, comes with the handy gen_statem behaviour. Once events, states, and actions are clearly defined, the process of implementing the gen_statem behaviour becomes mechanical.

On the client side I defined the following distinct states:

idle
when there is nothing to send and all in-flight messages were acknowledged;
can_send
when the number of in-flight messages was less than the threshold;
blocked
when the number of in-flight messages is equal or greater than the threshold.

The event set $$E$$ contained three elements:

send(id)
send a new message, uniquely identified by id;
ack_received(id)
acknowledge message with the given id;
timeout(id)
message identified by id has timed out.

And the set of actions $$A$$:

do_send(body_of(id))
retrieve the message body for the given id, send it over the network (asynchronously), add id into the set of in-flight messages, and start the timer for that id;
do_ack(id)
remove id from the set of in-flight messages, cancel the timer, and mark id as being sent in the persistent storage;
on_timeout(id)
remove message id from the set of in-flight messages and re-schedule it by generating send(id) event.

The server side is simpler. For each connection there is fixed size FIFO queue. Each time the queue becomes full, the oldest message gets dropped and InfoMessage is sent to the client, negotiating the queue capacity. AckMessage is sent by a separate process once the message is written (asynchronously) to the storage.

The set of states $$S$$ on the server is:

receiving
the message queue has some capacity;
blocked
the message queue is full.

Event set $$E$$:

received(msg(id))
the id message received;
want_more
the storage is ready to process another message.

Action set $$A$$:

enqueue(msg(id))
add the id message to the queue;
dequeue(msg(id))
take the id message off of the queue and forward it to the storage writer process;
negotiate(capacity)
send InfoMessage indicating the queue capacity when the queue is full.

## Did it work?

I daresay yes. I was paired up with a colleague for the task. He was responsible for the server part, while I implemented the client side. Thank you, L., for your hard work and for putting up with my nagging! 😊

We tested the implementation over 10G link which we knew wasn't busy and didn't have any junk firewall on the path. For the test we generated roughly 7GB worth of metrics4. We simply measured the time it took to send all the data across and write it to the storage.

For the synchronous messaging case the data was being sent over 54 TCP connections, while the asynchronous messaging used just one.

Implementation/measurement time speedup
Original (synchronous) 1,140s (19min) 1x
Asynchronous (no compression) 314s (5min) 3.6x
Asynchronous (zstd compression) 144s (2min) 7.9x

So we've got massive improvement over the original design. Still there was room for more, considering the fact that the effective throughput was around 400Mbits/s while we had a 10G link.

I believe there were several limiting factors in play:

• the gRPC library we used was bad at handling large messages so we had to limit the message size to 64KB, which reduced the possible compression ratio5;
• the available Zstd Erlang wrapper exposes only the simplest API and we didn't have time to write another NIF to use a more efficient technique6;
• we choose some arbitrary number for the receive window (100), which, probably, wasn't the most optimal;
• even though we were sending data over fast network, ultimately we were reading from and writing to spinning disks, so disk IO was a limiting factor as well.

## Conclusion

The design assumes fairly static receive window and constant network & storage latency. This, of course, is not always the case. Unfortunately, we had time to implement only a crude receive window negotiation. I accepted an offer in another company, so we had only a month to design, implement, and make client/server production ready.

Nevertheless, the achieved results were quite impressive. With the new messaging protocol in place, the system can now pipe through 7x more data. It should be good for while.

On a more philosophical note, time and time again I see that asynchronous processing outperforms synchronous one. However, reasoning about asynchronous systems is hard. Using semi-formal approach (that is, explicitly defining the state machines) was instrumental to aptly implement a working solution. I hope the post might help someone to improve their systems when the time comes.

## Footnotes:

1

While some think that JSON over HTTP is the pinnacle of software engineering, using it for the data volume we were planning for would have been silly, mildly speaking. To give you an analogy, imagine you need to transfer 1000 tonnes of coal per day. While it is technically possible to pack it into multiple 10Kg bags and load into thousands sport cars, doing so would be prohibitively expensive.

2

I am genuinely puzzled why JSON-over HTTP 1.1 "API" idea is still prevailing. HTTP2 has lower overhead and allows request multiplexing, making it more efficient, while Protobufs are more space efficient and data type reach. A win-win.

3

Anyone who worked at a large enterprise organisation knows how painful firewall burns could be (unless you have a bribed "friend" in the firewall team).

4

The data generating tool was written by another colleague who used production data to find probabilities so that the generated metrics had distribution properties and values close real life.

5

A stub implementation in Go was roughly 10x times faster and could handle 1MB messages just fine;

6

Since we knew upfront what sort of data we would be sending, we could utilise dictionary-based compression feature of Zstd.