Events, Event Sourcing, and the Path Forward

TinCanTelephone

Distributed systems pose all kinds of challenges. And we’ve built them in the web age, when the tech of the wider Internet is what we use in microcosm to build the underpinnings of our own systems. Our industry has done somersaults to try to make these systems work well with synchronous calls built on top of HTTP. This is working at scale for a number of companies and that’s just fine. But if you were to start from scratch is that what you would build? A few years ago, we had that opportunity and we decided, no, that’s not what we would build. Instead, we built a distributed, asynchronous system centered around an event bus, and it has been one of the best decisions we’ve made. Most of the things that make a service architecture painful are either completely alleviated or mostly so, with only a few tradeoffs. Here’s what we’ve built at Community, and some of what we have learned.

Beyond The Monolith

The advice du jour is that you should start with a monolith because it allows you to iterate quickly, change big things more easily, and avoid any serious consistency issues—assuming you back your service with an ACID-compliant DB. That’s good advice that I also give to people, and we did that. That scales pretty well, but becomes an issue as you grow the team and need to iterate on independent work streams.

Queuing

But what’s the next step? For us, the next step was to stop building things in the monolith and to build some async services that could work semi-autonomously alongside it. Our platform is a messaging platform so this was already a reasonably good fit: messages aren’t delivered synchronously and some parts of the workflow can operate like a pipeline.

We needed at least a queuing system to do that, something that would buffer calls between services and which would guarantee reliable delivery. We are primarily an Elixir shop so we picked RabbitMQ because of the good driver support on our stack: RabbitMQ is written in Erlang and Elixir runs on the same VM and can leverage any Erlang libraries. This has turned out to be a really good choice for the long term. RabbitMQ is super reliable, can have very good throughput, is available in various hosted forms, and has a lot of different topologies and functionality that make it a Swiss army knife of async systems. We paid a 3rd party vendor to host it and began building on top of it.

Initially we used a very common pattern and just queued work for other services, using JSON payloads. This was great for passing things between services where fire-and-forget was adequate. Being able to rely on delivery once RabbitMQ accepted the message from the publisher means you don’t deal with retries on the sender side, you almost never lose messages, and the consumer of the messages can determine how it wants retries to be handled. Deploys never interrupt messaging. A service can treat each message as a transaction and only ack the message once work has been completed successfully. All good stuff.

But the core data and associated models was/were still locked up inside the monolith. And we needed to access that data from other services fairly often. The first pass was to just look at the messages passed from the monolith to other services and do a local DB call to enrich them with required fields before passing them on. That works for a few cases just fine.

Other Paradigms

We built other kinds of async messaging between services on top of those same ad hoc JSON messages, knowing full well that wasn’t what we wanted long term, but learning about the interaction patterns, and getting our product into the market.

But, eventually you litter the code with DB enrichment calls and other complexity. And with no fixed schemas, the JSON messaging rapidly outscales your ability to reason about it. Instead, wouldn’t it be nice if a new service could also get a copy of those same messages? And wouldn’t it be really great if the people writing that new service didn’t have to talk to the people emitting those messages to make sure the schema wouldn’t change on them? And what if you could get a history of all the major things that ever happened in the system in the form of messages? And maybe a new service could have access to that history to bootstrap it?

Events!

Yes, to all of the above. That’s what a truly event-based system offers. And that’s what we transformed this async system into.

Building the async system in the first place made this much easier and I want to shout out to Tomas Koci, Jeffrey Matthias, and Joe Merriweather-Webb who designed and built most of that and who have made many contributions to the events system as well. Once we were in the market with our product, we all agreed it was time for the next phase.

In mid-2019, Andrea Leopardi, Roland Tritsch, and I met up in Dublin and plotted the course for the next year or so. The plans from that meeting turned into the structure of the events system we have now. A lot of people have contributed since! This has been a big team effort from the folks in Community Engineering. I have attempted to name names here wherever possible, but there are a million contributions that have been important.

Since building the bus, we’ve grown to about 146 services running in production, of which 106 are core business services (61 Elixir, 20 Go, 7 Python, remainder 3rd party or other tech). Most core business logic lives in Elixir with Go in a supporting role, and Python focused on data science. This is nearly 2 services per engineer in the company. On most stacks that would be an enormous burden. On our system, as described below, it’s fairly painless. We still have the monolith, but it’s a lot smaller now. It gets smaller all the time, thanks to the drive of Jeffrey Matthias, Geoff Smith, Lee Marlow, and Joe Lepper among others.

So, back to the story…

envelope

The Public Event Bus

The next step was the move to a system with a public event bus. We made some pretty good decisions but still live with a few small mistakes. So I’ll describe a simpler version of what we have now and gloss over the iterations on getting to this points, and I’ll call out mistakes later.

If you aren’t familiar with the idea of an event bus, it boils down to this: any service can listen on the bus for events that happen anywhere in the system, and then do whatever they need to do based on the event that happened in the system. Services that do important things publish those events in a defined schema so that anyone can use them as they need. You archive all of those events and make them arbitrarily queryable and replayable. To be clear here about what we mean when we say “events”: Events are something that happened, Commands are a request for something to happen. We started with only events, and that was a good choice. That approach allowed us to more carefully iterate on the events side so that we didn’t make the same mistakes in two places at once.

For other interactions between services we built an async RPC functionality over RabbitMQ that essentially provides a very lightweight Protobuf wrapper around arbitrary data. This enabled us to largely punt on Commands implementation while we got good at events. It allowed us to identify sensible practices in a private way between services before making those commands public system-wide.

So let’s talk about the event bus and event store since that’s the core of the system.

System Overview

Our public bus is RabbitMQ, with a common top-level exchange. We separated it into two clusters: one built around text messages (e.g. SMS) that we process and the main one around events that are the real core of the system. This allowed us to have very low latency on core data events while allowing more latency on the high throughput messaging cluster. You could run it on on a single cluster, but we have enough messaging throughput that separation was a good choice. It also divides the fault domain along lines that make sense for our system.

We publish events to the main bus (a topic exchange), using event type as the routing key. Services that want to subscribe to them do so. Those services may then event-source a projection into a DB of their own based on those events, or simply take action as required. DB tech is whatever the service requires. We have Postgres, Vitess (MySQL), Redis, Cassandra, ElasticSearch, etc. For services that do event source, we have standardized a set of rules about how they must interact with the bus, how they handle out-of-order events, duplicates, etc. We have a LADR that defines how this must work. The technical side of this is built into a shared Elixir library that most of the services use. This wraps the excellent Broadway library, the Broadway AMQP integration, and our generated Protobuf library containing the schemas. It provides things like validation and sane RabbitMQ bindings, and publishes an events manifest we can use to build maps of who produces and consumes which events. Dan Selans worked for us back then, and built a frontend that makes those manifests human consumable, and draws an events map. This is very useful!

Because some of the services in our system are written in Go, we have built some of the same logic in that language and leverage the Benthos project (which we sponsor) for the work pipelining, similar to how we use Broadway in Elixir. Benthos is an unbelievable jack-of-all-trades that you should take a look at if you don’t know it. We additionally build all the Protobuf in Python for use in data science activities, but don’t have a full events library implementation…. yet.

We archive everything that is published on the bus and put it into the event store. This then enables replays.

Sourcing

When we start a new service that needs to source events, we bootstrap it from a replay of historical events and it writes them to its DB using the same code it will use in production. Because our services must handle out of order events, we can generally replay any part of the history necessary at any future point. Simple rules about idempotency and a little bit of state keeping solve this out of order handling for the 95% case. The remaining 5% tend to be one-off solutions for each use case.

Entropy

Services now have their own projection(s) of what things look like from the events they have consumed. Because this is so distributed, even with RabbitMQ ensuring deliverability, there are still many ways the system could introduce drift.

Underpinning all of the other mechanisms is another simple rule: we have worked hard to guarantee that one event type is published from one service. This vastly reduces the complexity of working with them.

We handle anti-entropy by several means:

  1. We have a fallback for RabbitMQ. If it’s not available for publishing, services will fall back to Amazon SQS. This is an extremely rare occurrence, but ensures we don’t lose events. We can then play events from SQS into RabbitMQ when it comes back up. Thus, services don’t subscribe to SQS.

  2. Services must handle all events that are of the type they subscribe to. This means any failure to do so is an error that must be handled. This is pretty rare in production because it generally gets caught pretty early on, and Protobuf and our events library help guarantee correctness.

  3. We run daily replays of all of the previous day’s core events, on the production bus. We don’t replay message events, but all of the core system events are replayed. This means that a service has a maximum window of 24 hours to have missed an event before it sees it again. We’ll be adding 1 hour replays or similar in the next few quarters.

  4. We run Spark jobs that compare drift on some event-sourced DBs against the event store. This allows us to track how widespread any issues may be. We have dashboards that let us see what this looks like. It’s extremely small drift, and is generally insignificant. Recents runs show that average drift is 0.005%, which is already very good, but is better than it looks because it also reflects changes that happen while the run is in flight. For all practical purposes this then simply reflects eventual consistency and in absolute numbers is basically zero.

Consistency

We handle consistency by assuming eventual consistency whenever possible. Where it’s not possible, we do allow querying of a remote service to get data. Events related to that data should only be published by one service. So it’s possible to get an authoritative source when strictly necessary. This is done over a synchronous RPC implementation on top of RabbitMQ with Protobuf thanks to Tom Patterer and Andrea Leopardi. Andrea wrote about this implementation.

Many of the frontend calls go through the monolith. This then provides some consistency for the models it contains. We sometimes use the monolith as a BFF where necessary, to provide some consistency. For all other cases we query directly against the services themselves via an API gateway.

Commands

Following on the two year success of the event bus, we introduced commands, which work quite similarly but which express a different idea. Commands are a request for something in the system to happen (intent). Services may or may not take action as a result. If they do, they may or may not generate events. Commands are also public, archived, and replayable. This is so far working quite well, but runs alongside other interaction methods described below. We’ll phase out much of the async RPC in favor of Commands now that we have them.

Current State

We continue to iterate and improve on this system. Some more improvements and efficiencies are slated for this year already. We’ll write more about those when we get there. If you are a person who wants MOAR detail, keep reading below. I’ll attempt to describe some of this in more technical detail.

lion moaring

More Details

That gives a pretty good overview. But if you want to know more, read on.

Routing, Exchanges, Queues

We don’t leverage headers for much routing, but there are a few places in the system that add them and use them (e.g. for a consitent-hash-exchange). But most routing is by event type and that’s our key. We learned early on that each service should attach its own exchange to the top-level exchange and hang its queue off of that. Exchanges are really cheap for RabbitMQ and this allows services to use a different exchange type if necessary—and it also prevents rapid connects/disconnects from impacting the main bus. This happened when an service once went nuts and was bound to the top level. That hasn’t happened since.

Most services will bind a single durable queue to their exchange to make sure that they receive events even when down, and the work is spread across the instances of the service when up. Some services use ephemeral queues that go away when they are not running. Others use different topologies. RabbitMQ is very flexible here and has been the bedrock of our implementation.

Archiving

Once things are on the bus, we archive every single one. We have a service that subscribes to every event type on the two buses, aggregates them in memory into file blobs, then uploads them to Amazon S3 (the “event store”). This archiver only acks them from Rabbit once they are uploaded, so we don’t lose events. Those files on S3 are in a run-length-encoded raw Protobuf format, straight from the wire, grouped into directories by timestamp and event class. Filenames are generated in a consistent way, and include some hashing of the contents so that we prevent overwrites.

Like the majority of the services operating on the events store and bus, this service is written in Elixir and leverages all of the badassery of Erlang’s OTP and supervision trees to be always working. It doesn’t break.

Event Store Iteration

The main event store is the archive of raw Protobuf that was sent on the wire, encoded with a run length encoding into blob files that are posted to S3, as mentioned. After the first year, Tom Patterer and Aki Colovic here in Europe built Apache Spark jobs and a Go service to transform those into Parquet files on S3 more or less as they get written—so there is very little latency between the two. We can then leverage AWS Athena (Presto) for ad hoc queries, monitoring of events, and understanding what is there.

And, the ability to query all of history for debugging is an amazing thing that I don’t think I’d ever want to live without again. It takes a lot of pressure off of distributed tracing, although we do have that for complex flows on Otel thanks to Jaden Grossman, Tatsuro Alpert, and Bradley Smith.

Replays

We built a replayer service that initially could only do replays by event type and time range, played straight from S3 to a private copy of the bus (“the replay bus”). Services can hook up to that bus to get the replayed data. We usually do this with a one-off deploy of the same code. That got us well into this: it was enough for the first year.

Later on, we built further integration with AWS Athena that allows us to arbitrarily query events from the event store in Athena from Parquet files on S3, and also allows for the results of a query to be used in an event replay. This allows for very targeted bootstrapping of new services, repairing outages, fixing state when a bug caused a service to behave badly, etc. The ability to arbitrarily query all of history also helps when looking for any issues in the event store or your service. Athena is pretty quick, even with several years of data. Partitioning by event date and type really helps. We actually use our Go libs from Spark and I wrote about how to do that.

Snapshots

An additional step we took later (thanks to Moritz Mack) was to use Spark to build daily snapshots in the form of Parquet tables of events… that can also then be replayed. This also speeds up querying and consistency checking by vastly reducing the amount of queried data. We currently rebuild those snapshots nightly from all of history so that there is no drift. We will move to incremental snapshotting at some point, but Spark and S3 are hugely fast, and we have enough individual files to run in parallel nicely.

Events and Event Schemas

The best decision we made was to use Protobuf for messaging. Protobuf is one of the only modern encoding systems that had good support across all three of our core languages. There were two Protobuf libraries in Elixir at the time. We picked one, then later switched to elixir-protobuf, to which we are now major contributors thanks to Andrea Leopardi, João Britto, and Eric Meadows-Jönsson. Using Protobuf means that we can guarantee compatibility on schemas going forward, have deprecation ability, and because it is widely supported, we have access to it in all of the toolsets where we need it. It also converts back and forth to JSON nicely when needed.

Protobuf schemas, unlike Avro, for example, don’t accompany the data payload. This means that you need to provide them to the software that needs them out of band. Andrea Leopardi wrote about how we do that so I won’t detail it here. We took most of the pain out of this by designing the schemas in a way that means most services don’t always have to have the latest schemas. And because Protobuf allows decoding all of a payload that you do know, it means as long as we’re only adding fields we don’t have any issues.

To do this, we designed a schema where there is a common Event base, an Envelope with a separate set of guaranteed fields for all events (e.g. id, timestamp, and type). This allows systems to process events without always having the very latest schemas unless they need to access the new event type or new fields.

Other Communication Methods

Events make this all so much easier. But it’s hard to always use them (or commands). It’s possible but there are just places where you need something else. We have some get out of jail free cards. When we model new solutions in our space we push the following hierarchy:

  1. Events and Commands first. If it can be done with events/commands, we do that.
  2. Async, stateless RPC over a queue. For things like queueing work to yourself or another service. Private calls that don’t need archiving or, replays.
  3. Synchronous RPC, using queues for call-response
  4. HTTP as a last resort

I believe that in the core system there are only two remaining places that widely leverage HTTP for things other than serving APIs to frontends.

Things We Learned

You don’t build an entire system without running into issues. Here are some things we learned. They are not strictly ordered but somewhat in order of importance.

  • Putting events on S3 was a really good decision. It has all the S3 goodnesss, and also unlocks all of the “Big Data” tools like Spark, Athena, etc.

  • People will find every way possible to publish events that aren’t right in one way or another. This is no surprise. But really, if you are deciding where to allocate time, it’s worth all the effort you can muster to put validation into the publishing side.

  • Being able to throw away your datastore and start over at any time is powerful. For example: we run ElasticSearch but we never re-index. We just throw away the cluster and make a new one from a replay thanks to Alec Rubin, Joe Lepper, and Brian Jones. If you have to swap from one kind of datastore to another (e.g. Redis -> Postgres) you can just rehydrate the new store from a replay using the same event sourcing code you would have to write anyway. Very little migration code.

  • Sometimes you want to represent events as things that occurred. Sometimes you want to represent them as the current state of something after the occurrence. Some people will tell you not to do the latter. Making this a first class behavior and clearly identifying which pattern an event is using has been super helpful. We do both.

  • Use the wrapper types in Protobuf. One major shortcoming of Protobuf is that you can’t tell if something is null or the zero value for its type. The wrappers fix that.

  • If you have to interact with a DB and publish an event, sometimes the right pattern is to publish the event, and consume it yourself before writing to your DB. This helps with consistency issues and allows you to replay your own events to fix bugs. Lee Marlow, Geoff Smith, and Jeffrey Matthias called this pattern “dogfooding”. Sometimes the right pattern is to send yourself a command instead.

  • Protobuf supports custom annotations. Those can be really helpful for encoding things like which events are allowed on which bus, which actions are allowed on the event, etc. Especially helpful when building supporting libraries in more than one language.

  • Daily replays allow you both an anti-entropy method as well as a daily stress test of the whole system. This has been great for hammering out issues. It also guarantees that services can deal with out-of-order events. They get them at least every day. The main gate to rolling it out was fixing all of the places this wasn’t right. Now it stays right.

  • Event-based systems make reporting so much easier! And data exports. And external integrations. And webhooks, and a million other things.

  • The event store is a fantastic audit trail.

  • We sometimes rewrite our event store if we’ve messed something up. We save up a list of the screw-ups and then do it in batch. We leave the old copy on S3 and make a new root in the bucket (e.g. v4/). We use Spark to do it. We don’t remove the old store so it’s there for auditing.

  • Write some tools to make working with your events nicer. We have things to listen on a bus, to download and decode files from the raw store, etc.

  • Local development seeds are easy when you have events. Just seed the DB with a local replay of staging/dev data.

Future

On the roadmap for the next year is to take our existing service cookie cutter repo and enable it to maintain live event-sourced projections in the most common format for a few very commonly used event types. We’ll snapshot those nightly and when standing up a new service, we can start from the latest DB snapshot and only replay since. This will make things even more efficient.