The challenges of running Druid at large scale, and future directions, part 1

Roman Leventov
7 min readNov 14, 2017

Druid is a time-series data store. I’m a Druid committer and one of the people at Metamarkets who operate the largest known Druid cluster, of hundreds of nodes at the moment of writing, which serves hundreds of terabytes of data with the replication factor of two.

Metamarkets uses Druid quite specifically, in our cluster we have only about 100 data sources (Druid term, it is usually called “table” in other databases) and only a few queries per second are performed, but those data sources are very write-intensive: they grow by up to tens of gigabytes of compressed data per hour, and queries to those data sources cover tens of thousands of segments, each of about 100 MB and a million of rows, giving terabytes of data and billions of rows for the whole query, that should complete in a few seconds. Although, only a fraction of this data is actually used to compute result of a query: a few columns out of dozens, and only tens of thousands of rows (down from a million) per segment on average, after applying all query filters. However, again, this is just a typical query, while some queries cover up to a hundred times more data.

This is the first post (see the second) where I want to highlight the most stressing issues with Druid, which increase query failure rates, degrade availability and make a Druid cluster of our size fail to complete queries fast (it promises “sub-second”) when the load increases.

Thanks to Charles Allen, a Druid committer and a PMC member, and to Michael Driscoll for reviews of this post.

First, I want to recap what a Druid cluster looks like:

Druid cluster view, simplified and without “indexing” part

Historical nodes download segments (compressed shards of data) from deep storage, that could be Amazon S3, HDFS, Google Cloud Storage, Cassandra, etc., into their local or network-attached disk (like Amazon EBS). All downloaded segments are mapped into memory of historical nodes. The remaining memory on historical nodes is used by the query processing engine, and, optionally, local query results cache. When a historical node downloads a segment, it announces that in ZooKeeper. Broker nodes subscribe for such updates and keep in their memory the information about what historicals serve each specific segment.

A client sends a query to a randomly chosen broker. The broker determines the set of segments, covering the interval of the data source of the query, and sends sub-queries to the historicals, which serve the needed segments. Broker also aggregates per-segment results, returned by the historicals.

For more details, see “The Druid Cluster” section on this page, and the Druid paper.

No fault tolerance on the query execution path

Despite Druid being a massively parallel query system — a single query could end up being processed on hundreds of historical nodes — it completely lacks any fault tolerance on the query execution path. When a broker sends sub-queries to historicals, it needs all of those sub-queries to complete successfully, before it could return anything to the client. If any of the sub-queries fail, the whole query fails. Although each segment is served by multiple historicals, and brokers know about all of them, if a sub-query to the primary historical fails, brokers don’t try to ask other historicals to compute partial query results for the same segments.

So currently in Druid, replication comes into play only when a historical goes completely down from ZooKeeper point of view, and then, after a broker is notified about this, it starts to route sub-queries for segments, which used to be served by the disappeared historical, to other historicals. But for example, if a historical node has a healthy connection to ZooKeeper, but fails to complete any real queries due to a bug in the query engine, the whole Druid cluster is going to be sick until manual human intervention. The same sorry situation will happen if there is a network problem between a historical and brokers, but not between this historical and ZooKeeper.

Common reasons of broker → historical sub-query failures that we see:

  • Network failures
  • When a historical node is destroyed by AWS, all in-flight sub-queries will fail + all queries, that brokers will continue to route to this historical, until they are notified by ZooKeeper, that the node has disappeared.
  • Similar to above, but because of the Linux OOM killer
  • Similar to above, but because of OutOfMemoryError in the JVM process of the historical
  • Similar to above, but because Marathon becomes “crazy” and restarts historical process without a good reason. I’m going to discuss this in the following blog post.
  • Rolling updates of historicals. Druid doesn’t support graceful shutdown and update of historical nodes, when it completes in-flight queries, but doesn’t accept new queries. And it wouldn’t work anyway without sub-query retry on the broker side, as described above.
  • Faulty canary rollouts with bugs in the query processing engine.

Straggling sub-queries to historical nodes

This problem is closely related to the previous one. Brokers don’t do anything with the straggling sub-queries to the historicals, so the whole query query is always slower than the slowest historical, used in this query. Although historicals are supposed to perform similarly, and there is a complex segment balancing system in Druid to ensure this, there are problems with it as well, and we see huge disparity of performance among the historicals. It is discussed in more details in the next section.

There is a solution that would practically solve this and the previous problem, for the most part: brokers should send aggregation of all completed sub-queries back to client with some interval, e. g. 5 seconds. See the discussion of this proposal on the Druid development mailing list.

This problem has also been pinpointed in the Dremel paper, which is the foundation of the present-day Google BigQuery. In this paper, it is stated that yielding results based on 99% tablets (equivalent to segments in Druid) makes the average query latency several times lower.

Huge variance in performance of historical nodes

Druid needs to intelligently choose which historical nodes load which segments, in order to ensure even load distribution among the historicals. The current segment balancing algorithm uses some rather complex formula to compute “cost” of loading some segment on some historical, taking into account the intervals of the segments in the same data source, which are already loaded on the historical. Then it chooses the historical which would be the “cheapest” to load the segment on. (This cost-based decision making algorithm runs on a special Coordinator type of Druid node, which was not mentioned yet.)

Despite improvements have been made to the segment balancing algorithm, in our Druid cluster the slowest historicals still perform several times worse than the fastest:

Average latency of some query type on a segment, in milliseconds. Each point is one historical node.

We think that it happens because the segments that we store in the cluster across different data sources are very uneven in terms of how many random parts of the segment memory are accessed during queries to those segments. If many “randomly accessed” segments are served by the same historical, it leads to much more trashing of file-mapped memory on this historical, and it makes all queries to the node to run slower.

This particular issue is specific to how Metamarkets uses Druid, and it could likely be resolved by some improvements to the Druid’s segment format and query processing engine. But the more conceptual problem with scalability is that in a large cluster, it’s hard or impossible to balance segments using any “theoretical” formula and avoid performance disparity between nodes, for who knows what underlying reasons. The proposed solution of this problem is to make balancing decisions based on the actual performance statistics of the historical nodes. See the discussion in the Druid development mailing list.

An even more radical solution would be to make historical nodes to not store segments on local disk and in memory, but instead to always download segments from deep storage, when performing sub-queries, i. e. decouple storage and compute. It would make “static routing” from brokers to historicals not needed, and obsolete the balancing system. Apart from this, it would also make Druid immensely simpler in many other ways. As an aside, this is the architecture of BigQuery, or at least how it used to look like two years ago.

The major drawback of this solution is that with commodity deep storage (Amazon S3) and network, it would make the majority of queries in our use case run for tens of seconds, instead of current 0 — 3 seconds. But as network and storage technologies are getting faster and cheaper, I think decoupling of storage and compute is the future of “big data” systems, including time series databases. Read how BookKeeper + Pulsar decouples messaging from storage, vs. Kafka which couples messaging and storage on the same nodes. This is also how Snowflake works. This principle has also been advanced recently by Google, they started to separate compute and shuffle.

It’s fascinating how a complete reverse from Hadoop’s motto “move computation to data” has happened in just about ten years.

Read next: