Design of a Cost Efficient Time Series Store for Big Data
After writing about the challenges of running Druid at large scale (part 2), I want to present my vision of the next generation open source time series store, that shouldn’t have the issues inherent to Druid.
“Open source” is an important part of the problem statement, because the presented design is essentially a simplified version of proprietary Google BigQuery. I took information about the architecture of BigQuery mainly from the Dremel paper and the post “BigQuery under the hood”, and also some small bits from many other sources.
Other goals and self-constrains:
- The time series store is scalable to petabytes of compressed data and 100k of processing cores in a single cluster.
- Cloud first: leverage advantages of clouds.
- Cost efficient starting from tens of terabytes of data and a thousand of processing cores.
- Queries which process less than 5 TB of data should run in less than 3 seconds (p99 latency) in a reasonably-sized cluster — covering interactive ad analytics use case.
- Highly consistent query latency: similar queries should always take the same time to complete, regardless of what queries are running in parallel in the cluster.
- Newly ingested data should be queryable immediately.
- Think forward: the presented design should hopefully become more, not less relevant in 3–5 years.
- On-premise deployments.
- Cost efficiency on small scale.
- Efficiency of random updates and deletes of old data, although those things should be possible.
- p99 latency of less than half a second for whatever small queries even in a system not under load.
- Ease of first-time deployments and software updates.
Final introductory note: this post is based on experience of running Druid at scale at Metamarkets and theoretical research, but the described design has not yet implemented and tested in production. Some statements made in this post are wrong. Please leave comments under this post if you have any opinions or corrections!
The system consists of three parts with strict separation of duties: Stream processing system, Storage, and Computation tree.
Stream processing system ingests the data (accepts “writes”), partitions it, converts data within each interval into compressed columnar format and writes it to Storage. The workers of Stream processing system are also responsible for computing partial query results for the most recent data.
Computation tree has multiple levels of nodes: nodes in the lowest level download data of specific partitions and intervals from Storage and compute partial results for them. Nodes in the second level merge results of all partitions for specific intervals, accepted from nodes in the lowest level, and from the workers of Stream processing system, if the query interval includes the most recent data. Nodes in the third level merge or combine per-interval results from nodes in the second level, and contain cache of per-interval query results. Those nodes could also be responsible for cluster balancing and autoscaling of lower levels of Computation tree.
Key principles of this design:
Separation of Computation tree and Storage. This idea is taken from BigQuery. In my posts about issues with Druid (part 1, part 2) I explained how the absence of such separation in Druid makes query latency unpredictable, because queries interfere with each other.
Making nodes in the computation tree (almost) stateless means that they are more “disposable”, i. e. they could be Amazon’s spot or Google’s preemptible instances, which are several times cheaper than regular instances. Also, Computation tree could be scaled up and down in minutes, making it possible e. g. to scale it down every night and on weekends, when the query load is lower.
Separation of data ingestion (in Stream processing system) and Storage. This idea is already implemented in Druid actually, it has Real-time nodes. Such separation of concerns allows to keep Storage really simple, it doesn’t need to allocate resources for ingestion, columnar compression, query processing, etc. It focuses just on reading blocks of bytes from disk and sending them over network to the nodes in Computation tree.
Stream processing system could also be more dynamic than write-supporting Storage. Stream processing system could be scaled up or down in response to changes in the data ingestion intensity, that is usually lower at nights and on weekends. Stream processing system could have features that are hard to implement in Storage, such as dynamic repartitioning.
Network is the bottleneck
If the amount of downloads for queries doesn’t saturate Storage’s outbound network bandwidth, network’s contribution in the total query latency is constant, and doesn’t depend on the query size. This could be just granted, if cloud object storage is used as Storage (see “Cloud object storage” section below), or if the query load in the system is disproportionally small relative to the amount of historical data in Storage.
If neither of those two conditions apply, Storage could be used to host some non-timeseries, less frequently downloaded data, in order to artificially increase the size of the Storage cluster, and thus it’s outbound network bandwidth.
Otherwise network throughput between Storage and Computation tree is likely going to be the factor that bounds query latency in the proposed design. There are several methods to mitigate this:
- Unlike typical SQL queries, that produce just one table, queries to this system should compose all sub-queries, that are needed on a single screen in the analytics interface. Analytics interfaces often include at least several, and sometimes dozens of tables, charts, etc. that are results of sub-queries to the same time series data.
- Generously cache query results in the third level of Computation tree, to reduce load from redoing the same computations.
- Projection pushdown: download from Storage only subsets of columns, needed for query processing.
- Partition by dimension keys, which appear in query filters most often, download and process only the required partitions — sort of predicate pushdown. Since frequency of keys in many real-world data dimensions is Poisson-, Zipf-, or otherwise unevenly distributed, ideally Stream processing system should support “partial” partitioning, see the picture below. Because of low cardinality of such partitioning, data could be partitioned by several dimensions, before individual partitions become too small for efficient compression in columnar format and processing.
- More generally, metadata of data segments (partitions) should include information about all dimensions, which appear to be populated with just one (or very few) keys in this partition, that allows to benefit from “accidental” partitioning.
- Column compression should strongly favour compression ratio over decompression or processing speed.
- Column data should be streamed from Storage to the nodes in Computation tree, and sub-query processing is started as soon as the first blocks of all required columns arrive to the computational nodes. This allows to overlap network’s and CPU’s contributions in the total query latency as much as possible. To benefit from this, the order in which columns are sent from Storage to Computation tree should be smarter than simply the order in which columns appear to be laid out on disk in Storage, or alphabetical by column names. Columns could also be send in interleaved order by small blocks, instead of whole-column-after-column.
- Compute the final query results incrementally as soon as some partial results are ready and stream the incremental results to the client, to make the client to perceive the queries as running faster.
Later in this post, I go into more details about each part of the system.
In this section I want to discuss some possible implementations of Storage. They could coexist as interchangeable options, similarly to how it is done in Druid.
Cloud Object Storage
It’s Amazon S3, Google Cloud Storage (GCS), Azure Blob Storage, and similar offerings from other cloud providers.
Conceptually, this is exactly what should be used for Storage in the designed time series store, because GCS is backed by a system called Colossus, and it’s also what serves as the storage layer for BigQuery, according to Google.
Cloud object storage is much cheaper than the options that I’m going to discuss below, requires much less admin work, and has virtually unlimited throughput, so the whole section “Network is the bottleneck” above becomes largely irrelevant (in theory).
Cloud object storage APIs are not sophisticated enough to support multiple bytes range download in a single request (for projection pushdown of multiple columns), so each download of each column should be a separate request. I suspect that this is not how BigQuery works and it has tighter integration with Colossus, to enable proper multi-column projection pushdown.
The main disadvantage of the Cloud Object Storage option, as it seems to me, could be its p99 latency and throughput. Some benchmarks show that GCS and S3 have p99 latency in the ballpark of 100 ms (that is acceptable) and throughput limited only by VM capabilities on the download side, but I would be positively surprised if it remains true at the load of a hundred concurrent requests by a single node and the scale of a million of concurrent requests in the whole cluster. Note that none of the cloud providers has SLAs for object storage latency and throughput, and for GCS it was admitted that throughput is “quite variable”.
(Note: previously, in the section above, I mentioned that Cloud Object Storage APIs don’t support range requests, which is not true, though they still don’t support (as of October 2019) multiple range download within a single request, so the concurrent query amplification factor doesn’t go away. Thanks to Remi Dettai for the correction.)
Data partitions in Parquet format in HDFS
The main advantage of this option is good integration with the rest of the Hadoop ecosystem — Computation tree could even be “attached” to some already existing data warehouse. Complex queries that don’t fit well in the time series paradigm, such as large joins or multi-step queries, could be processed by systems such as Spark, Impala, Hive or Presto on top of the same HDFS cluster.
It’s also important that an organisation that aims to deploy the designed time series store may already have a very large HDFS cluster, that has large outbound network bandwidth, and if the time series store uses this HDFS cluster to store it’s data partitions, it may work around the network scalability problem.
However, stock HDFS routes all read requests through a single NameNode. 100k concurrent read requests (assuming that only one read request is required to download a data partition on a node in Computation tree) is something close the the absolute scalability limit of NameNode, and therefore beyond the limit, if HDFS cluster is substantially busy with some operations not related to the time series store.
Also, when HDFS is used as “remote” distributed file system, it doesn’t support projection pushdown even for files in Parquet format, so whole data partitions should be dowloaded by the nodes in Computation tree. It won’t work well if there are hundreds of columns in time series data and only a small fraction is typically used for queries. Making every column of every data partition a separate file, as was suggested for cloud object storage, imposes a greater scalability limit because of amplification of the number of files and read requests. NameNode won’t be able to process a million of concurrent requests, and HDFS is not optimized for files smaller than 10 MB in size, the size that individual columns of data partitions are going to have assuming the optimal data partition size is about 1 million rows.
However, under certain conditions (such as presence of a large underutilized HDFS cluster) and for certain use cases, HDFS may appear to be the most cost efficient option and work pretty well.
Apache Kudu is a columnar data store that aims to replace HDFS + Parquet pair in many cases. It combines space-efficient columnar storage with ability to make fast single-row reads and writes. The second part is actually not needed for the designed time series system, because writes are handled by Stream processing system, while we want to make Storage cheaper and not waste CPU (e. g. for background compaction tasks), memory and disk resources on each Storage node to support single-row reads and writes. In addition, single-row writes to old data are implemented in Kudu in a way that requires partition decompression on Kudu nodes, while in the proposed time series store design only compressed data should be transmitted between Storage and Computation tree.
On the other hand, Kudu has multiple features that are appealing for the time series system and which HDFS doesn’t have:
- RDBMS-like semantics. Data in Kudu is organized in tables, not just a pile of files.
- Tablet servers (nodes) in Kudu are more independent than in HDFS, that allows to bypass querying the master node (Kudu’s equivalent of NameNode) when doing reads, and so greatly improve read scalability.
- Projection pushdown.
- It’s written in C++, so tail latency should be much better than in HDFS, which is written in Java and experiences GC pauses.
The Kudu paper mentions that in theory it may support pluggable storage layouts. If a storage layout is implemented that drops Kudu’s support of ingestion single-row writes and old data writes, but is more suitable for the time series store design, Kudu may become a better Storage option than HDFS.
Each data partition could be stored in a single entry in a Cassandra-like system. From the perspective of Cassandra, columns have binary type and store compressed columns of data partitions.
This option shares many advantages with Kudu, and even does better: excellent read scalability, very low latency (especially if ScyllaDB is used), table semantics, ability to download only the required columns (projection pushdown).
On the other hand, Cassandra-like systems are not designed for column values of multiple MBs and total row size of about 100 MB, and may start experiencing operational problems, when populated with such data. Also, they don’t support streaming read on the level of a single row or even a single column in a single row, but it could probably be implemented in those systems relatively easily.
Cassandra is designed to sustain high write load, so it uses LSM-like storage structure and quite a lot of memory, that is going to be a waste of resources when used as Storage in the time series system.
This option is the fastest, but also the least cost efficient compared to the other options that I discussed above.
Reusing nodes of the Computation tree as Storage (added in 2019)
See the description of the idea here.
Stream processing system
As I noted above, Druid already separates data ingestion from storage in so-called indexing subsystem, or Real-time nodes. However, although this indexing subsystem implements a subset of features of full-fledged distributed stream processing systems, it doesn’t leverage any of them, nor even resource managers like Mesos or YARN, and has everything done in the Druid source code. Druid’s indexing subsystem is much less efficient than modern stream processing systems, because dozens of times less development efforts were invested in it.
Also, time series data is often being assembled or enriched in some other stream processing system in front of Druid. For example, Walmart does it with Storm, and Metamarkets uses Samza for similar purposes. This essentially means that two independent stream processing systems are being run one after another in the data pipeline, that prevents fusion of mapping operators with Druid’s ingestion terminal operator, a common optimization in stream processing systems.
This is why I think that in the next generation time series store ingestion should fully leverage some existing stream processing system.
Close integration between Stream processing system and the rest of the time series store is required, e. g. to allow nodes in Computation tree to query workers in Stream processing system. It means that unlike the situation with Storage, it would probably be too hard to support multiple stream processing systems. Just one should be chosen and integrated with the time series system.
Flink, Storm and Heron are possible candidates. It’s hard, or doesn’t make much sense to tell which one technically fits better at the moment, because those projects rapidly copy features from each other. If the designed time series system is actually created in some organization, the choice would probably depend on which stream processing system is already used in that organization.
Read this thread in Druid Development mailing list for some more information on this topic.
I didn’t think too hard about how this part of the system should look exactly. Some possible approach is described in the “Design overview” section above.
There is at least one problem with that approach: queries to a specific time series (table) couldn’t be effectively processed by multiple nodes in the third (highest) level of Computation tree if too much query results need to be cached, because in order to always route similar subqueries (those that differ only by the overall query interval) to the same nodes and capture cached results, one “composite” query with many subqueries should be broken into multiple independent queries, that in turn makes the use of network between Storage and Computation tree less efficient: see the section “Network is the bottleneck” above, the first item in the list.
However, it might be possible to scale nodes in the third level of Computation tree vertically, to make them large enough to be able to process all queries and accommodate the whole cache for any single time series, even the busiest one.
Vertical scaling means that one node in the third level of Computation tree should handle a lot of concurrent queries. This is one of the reasons why I think that if Computation tree is built from scratch, it should choose asynchronous server architecture rather than blocking (Go-style green threading is also OK). The other two reasons are:
- Nodes in the first level of Computation tree do a lot of network I/O with Storage. Computations on those nodes depend on data arrival from Storage with unpredictable latency: data requests from Storage could often get reordered responses.
- Nodes at all levels of Computation tree should support incremental query results computation, returning multiple results for the same query, potentially with very long intervals. It makes the system more fault-tolerant (discussed in my first post about the challenges of running Druid) and perceived as faster, as noted in “Network is the bottleneck” section above.
The programming platform on which Computation tree is built should ideally have the following traits:
- Support for runtime code generation, to make the queries to complete faster and increase CPU utilization efficiency. This blog post about runtime code generation in Impala explains it nicely.
- The generated machine code should be “optimal”, and vectorized when possible, for the same reasons.
- Low heap/object memory overhead, to make the nodes in Computation tree cheaper, because memory is expensive.
- Consistently short garbage collection pauses (for platforms with managed memory), to support the “consistent query latency” goal of the designed time series store.
C++ is a winner from purely technical point of view, it meets all those requirements. Non-performance related disadvantages of choosing C++ are also well-known: speed of development, debuggability, it’s difficult to extend the system with plugin architecture, etc.
JVM is still a solid choice, and I believe the system could be no more than 20% less efficient than if built in C++:
- JVM allows to piggy-back JIT compiler to achieve the same effect that is the goal of runtime code generation.
- For time series processing, code vectorization is needed primarily during column decompression and when running specific aggregations over the data; both could be done in JNI functions. The overhead of JNI is relatively small when paid once for dozens of kilobytes of decompressed data (we may want to do processing in chunks of this size to fit all decompressed data in L2 cache). Project Panama is going to make this overhead even smaller. If the data is stored and processed in off-heap memory, the JNI implications on garbage collection are also small or non-existent.
- Heap memory overhead could be made small by making all network IO, data storage, buffering and processing in off-heap memory and thus having very small heap only for some per-query allocations.
- Garbage collection pauses could be kept short by using Shenandoah GC. Heap memory read and write barriers shouldn’t impact CPU utilization much if all data structures used in core processing loops are allocated off-heap.
As far as I know, neither Go nor Rust support runtime code generation at the moment, although it probably requires not too much hacking to add such support: see gojit project and this question on StackOverflow about Rust. For other criteria, Go‘s runtime and generated code is probably a little less efficient, but for some non-technical reasons it’s a stronger option than Rust.
Drawbacks of the proposed time series system
- The system doesn’t feel like a single “database”, it has three independent subsystems with high total number of moving parts, that makes it not efficient on small scale, hard to deploy and update.
- It could be challenging to integrate the system with existing SQL-speaking interfaces efficiently, because the system needs to run “composite” queries with many independent subqueries to the same table.
- The system is not suitable for use cases when response to queries is needed faster than in one second.
- The performance of the system is highly dependent on the network performance in the data center where it is deployed.
- Inability to scale nodes in the third level of Computation tree horizontally could be a major scalability bottleneck in some use cases.
If you are interested in development of a time series store with the design similar to the described in this post, please drop me a line at email@example.com.