Karl Matthias bio photo

Karl Matthias

Director of Architecture and Platform at Community.com. Co-Author of "Docker: Up and Running" from O'Reilly Media. Dublin, Ireland.

Twitter

Github

Coordination-free Database Query Sharding with PostgreSQL

slice of pie

Follow other posts by

At Community.com, we had a problem where a bunch of workers needed to pick up and process a large amount of data from the same DB table in a highly scalable manner, with high throughput. We wanted to be able to:

  1. Not require any coordination between the workers
  2. Be able to arbitrarily change the amount each worker would pick up
  3. Not have to shard the table

I came up with a solution that has been in production, at scale, for quite awhile now. While it was my design, other great engineers at Community deserve the credit for the excellent implementation and for rounding off some of the rough edges. It was a team effort! There are undoubtedly other ways to solve this problem, but I thought this was pretty interesting and I myself have never seen anyone else do it before. I am not claiming it’s entirely novel. But this is what we did to nicely solve this problem.

The Problem

We have workers that process outbound SMS campaigns. They need to be able to take a single message and dispatch it to a million plus phone numbers. And, they need to do that at most once for each number. These workers have access to a data store that maps some data related to the campaign to a set of recipients. The workers don’t actually do the SMS, but they do the heavy lifting: the expansion of one message to millions.

We wanted to be able to divide that audience up into chunks of a size large enough to be efficient for querying and processing, and small enough that a worker could shut down mid-campaign and not lose anything. Ideally each worker would pick up an amount of work, crank through it, and then process another piece of work, without checking in with anyone else.

You could, as we initially did, have a single process read in all the recipient information from the DB and write batches of IDs into a queue for processing. That is simple. It worked fine way back in the early days. But it’s very slow. Single-threaded walking through a DB table is slow. It’s linear time. As that line gets longer it gets really bad. Further, our whole system also uses UUID4 primary keys, so it’s not trivial to walk a DB table or to provide ranges of IDs. If you were to use auto-increment integers this is easier, but then you will have other problems to solve. I’d rather not have those problems.

The Solution

We wanted a way for the single threaded job to be able to rapidly enqueue work without querying the DB, and for all the querying to happen in the workers, in parallel, against multiple read replicas. Our whole backbone is built on RabbitMQ, so this was the natural place to queue work. Our campaign workers are written in Elixir, like much of the rest of the system, and they can leverage the awesome Broadway library for processing. So all we needed was a way to divide up the table in a consistent way, so that given a set of parameters, the results to any query would be fairly evenly divisible into chunks without knowing ahead of time exactly how many there would be or which specific rows were assigned to the worker.

You might think, “partitioning!” but for various reasons that doesn’t make sense. You could argue positives and negatives of the tech, but the main blocker was that we need to query the table and get a result in chunks that will be reasonably evenly distributed no matter what other parameters we want to query on. So we’re not just ranging over IDs, we’re ranging over the results of an SQL query that filters on various fields in the row in arbitrary combinations. And while newer Postgres Hash partitions could maybe be made to work here, we need way more chunks than you’d ever want partitions. And we want to arbitrarily assign which workes take which ranges.

To be clear: we will run the same query on multiple workers, with overlapping queries, delays, etc, without any further coordination in the workers, and we’ll make sure that none of the results have any overlapping data, without hard partitionin in Postgres.

So, dividing a keyspace into buckets that can be addressed arbitrarily… that sounds a lot like a hashing function. Like most problems with distributed systems, hashing is part of the solution, if not the whole solution! What we wanted was:

  1. Assign a bucket to every row ahead of time
  2. Originating job publishes work covering bucket ranges
  3. Workers consume the work and run the query, supplying a bucket range
  4. Workers get results covering only those buckets, process them, return to 3

The design was to have a fixed number of buckets. This necessarily needed to be sized by the largest amount that made sense for a worker to process, given the largest campaign size we expected to see. In the end we chose 1,000 buckets. Armed with the knowledge of how many buckets there are and how many messages we need to publish, the originating job could publish the right number of work requests with the right number of bucket ranges in each. If your campaign only has 1000 recipients, we’d queue 1 item into RabbitMQ, with a bucket range of 0-1,000. If you had 1 million recipients, we’d queue up 100 items with 10 shards each.

Distribution isn’t perfect but it’s darn good.

The Details

The implementation is surprisingly(?) simple.

How do you turn your DB table into buckets addressable by a hash? Do we store it in the row? Nope. How about using PostgreSQL’s indexing on the result of a function? On the insertion of every row, an index gets updated with the result of the function as the key. Then, when you query with that function, you hit the index instead, so all the expensive math is done on insert and what is indexed is just an integer. There are undoubtedly other ways to make this work in PostgreSQL. You could calculate the bucket in code and write it in a column. But it’s pretty nice like this because Postgres manages all the annoying bits and we don’t have to mess with the field anywhere since it exists only in the index.

We need a hashing function with very good distribution. There are a number of these, but most are not available inside Postgres. But MD5 is, so that’s what we’re using.

This is our index:

CREATE INDEX CONCURRENTLY <table_name>_sharding_md5_modulo_1000_index
ON <table_name> (mod(abs(('x'||substr(md5(id::text),1,16))::bit(64)::bigint), 1000))

That is creating an index where the key to the index for this row is the result of that function. When we want to query it, we do the following:

SELECT id FROM <table_name> WHERE <arbitrary query here>
AND mod(abs(('x'||substr(md5(?::text),1,16))::bit(64)::bigint), 1000) BETWEEN ? AND ?

Now those workers can dequeue their work item, run their query, and then stream messages into RabbitMQ while they process the results of their query. Despite how that SELECT may look, no actual math is being done to hash each of those rows. This is hitting the pre-calculated index. The only hash function math is done at insert time.

Conclusion

So there you go, a super flexible way to break a table into chunks that can be combined with any arbitrary query the table can support. I called it “Coordination-free” in the title. Someone might object to that saying the coordination is happening in the originating job. It’s true some coordination is happening there. But it’s not happening in any high throughput part of the system. Those parts just crank away at scale.

We’ve been running this in production for a year, processing billions of messages through it. Performance is excellent and while we initially viewed this as a temporary “hack” to work around a performance problem, it has become a trusted part of the toolbox. Maybe it fits in your toolbox, too.


comments powered by Disqus