Design of a Cost Efficient Time Series Store for Big Data

  • The time series store is scalable to petabytes of compressed data and 100k of processing cores in a single cluster.
  • Cloud first: leverage advantages of clouds.
  • Cost efficient starting from tens of terabytes of data and a thousand of processing cores.
  • Queries which process less than 5 TB of data should run in less than 3 seconds (p99 latency) in a reasonably-sized cluster — covering interactive ad analytics use case.
  • Highly consistent query latency: similar queries should always take the same time to complete, regardless of what queries are running in parallel in the cluster.
  • Newly ingested data should be queryable immediately.
  • Think forward: the presented design should hopefully become more, not less relevant in 3–5 years.
  • On-premise deployments.
  • Cost efficiency on small scale.
  • Efficiency of random updates and deletes of old data, although those things should be possible.
  • p99 latency of less than half a second for whatever small queries even in a system not under load.
  • Ease of first-time deployments and software updates.

Design overview

Design of a time series store with three decoupled subsystems. Light blue lines mean the flow of uncompressed row-oriented data; dark blue lines — of compressed columnar data; red lines — of query results.

Network is the bottleneck

If the amount of downloads for queries doesn’t saturate Storage’s outbound network bandwidth, network’s contribution in the total query latency is constant, and doesn’t depend on the query size. This could be just granted, if cloud object storage is used as Storage (see “Cloud object storage” section below), or if the query load in the system is disproportionally small relative to the amount of historical data in Storage.

  • Unlike typical SQL queries, that produce just one table, queries to this system should compose all sub-queries, that are needed on a single screen in the analytics interface. Analytics interfaces often include at least several, and sometimes dozens of tables, charts, etc. that are results of sub-queries to the same time series data.
  • Generously cache query results in the third level of Computation tree, to reduce load from redoing the same computations.
  • Projection pushdown: download from Storage only subsets of columns, needed for query processing.
  • Partition by dimension keys, which appear in query filters most often, download and process only the required partitions — sort of predicate pushdown. Since frequency of keys in many real-world data dimensions is Poisson-, Zipf-, or otherwise unevenly distributed, ideally Stream processing system should support “partial” partitioning, see the picture below. Because of low cardinality of such partitioning, data could be partitioned by several dimensions, before individual partitions become too small for efficient compression in columnar format and processing.
Partial partitioning for uneven key distributions. Each box is a partition. The partition with “other values” could have thousands of “long tail” values.
  • More generally, metadata of data segments (partitions) should include information about all dimensions, which appear to be populated with just one (or very few) keys in this partition, that allows to benefit from “accidental” partitioning.
  • Column compression should strongly favour compression ratio over decompression or processing speed.
  • Column data should be streamed from Storage to the nodes in Computation tree, and sub-query processing is started as soon as the first blocks of all required columns arrive to the computational nodes. This allows to overlap network’s and CPU’s contributions in the total query latency as much as possible. To benefit from this, the order in which columns are sent from Storage to Computation tree should be smarter than simply the order in which columns appear to be laid out on disk in Storage, or alphabetical by column names. Columns could also be send in interleaved order by small blocks, instead of whole-column-after-column.
  • Compute the final query results incrementally as soon as some partial results are ready and stream the incremental results to the client, to make the client to perceive the queries as running faster.


In this section I want to discuss some possible implementations of Storage. They could coexist as interchangeable options, similarly to how it is done in Druid.

Cloud Object Storage

It’s Amazon S3, Google Cloud Storage (GCS), Azure Blob Storage, and similar offerings from other cloud providers.

Data partitions in Parquet format in HDFS

The main advantage of this option is good integration with the rest of the Hadoop ecosystem — Computation tree could even be “attached” to some already existing data warehouse. Complex queries that don’t fit well in the time series paradigm, such as large joins or multi-step queries, could be processed by systems such as Spark, Impala, Hive or Presto on top of the same HDFS cluster.


Apache Kudu is a columnar data store that aims to replace HDFS + Parquet pair in many cases. It combines space-efficient columnar storage with ability to make fast single-row reads and writes. The second part is actually not needed for the designed time series system, because writes are handled by Stream processing system, while we want to make Storage cheaper and not waste CPU (e. g. for background compaction tasks), memory and disk resources on each Storage node to support single-row reads and writes. In addition, single-row writes to old data are implemented in Kudu in a way that requires partition decompression on Kudu nodes, while in the proposed time series store design only compressed data should be transmitted between Storage and Computation tree.

  • RDBMS-like semantics. Data in Kudu is organized in tables, not just a pile of files.
  • Tablet servers (nodes) in Kudu are more independent than in HDFS, that allows to bypass querying the master node (Kudu’s equivalent of NameNode) when doing reads, and so greatly improve read scalability.
  • Projection pushdown.
  • It’s written in C++, so tail latency should be much better than in HDFS, which is written in Java and experiences GC pauses.

Cassandra or Scylla

Each data partition could be stored in a single entry in a Cassandra-like system. From the perspective of Cassandra, columns have binary type and store compressed columns of data partitions.

Reusing nodes of the Computation tree as Storage (added in 2019)

See the description of the idea here.

Stream processing system

As I noted above, Druid already separates data ingestion from storage in so-called indexing subsystem, or Real-time nodes. However, although this indexing subsystem implements a subset of features of full-fledged distributed stream processing systems, it doesn’t leverage any of them, nor even resource managers like Mesos or YARN, and has everything done in the Druid source code. Druid’s indexing subsystem is much less efficient than modern stream processing systems, because dozens of times less development efforts were invested in it.

Computation tree

I didn’t think too hard about how this part of the system should look exactly. Some possible approach is described in the “Design overview” section above.

  • Nodes in the first level of Computation tree do a lot of network I/O with Storage. Computations on those nodes depend on data arrival from Storage with unpredictable latency: data requests from Storage could often get reordered responses.
  • Nodes at all levels of Computation tree should support incremental query results computation, returning multiple results for the same query, potentially with very long intervals. It makes the system more fault-tolerant (discussed in my first post about the challenges of running Druid) and perceived as faster, as noted in “Network is the bottleneck” section above.


The programming platform on which Computation tree is built should ideally have the following traits:

  • Support for runtime code generation, to make the queries to complete faster and increase CPU utilization efficiency. This blog post about runtime code generation in Impala explains it nicely.
  • The generated machine code should be “optimal”, and vectorized when possible, for the same reasons.
  • Low heap/object memory overhead, to make the nodes in Computation tree cheaper, because memory is expensive.
  • Consistently short garbage collection pauses (for platforms with managed memory), to support the “consistent query latency” goal of the designed time series store.
  • JVM allows to piggy-back JIT compiler to achieve the same effect that is the goal of runtime code generation.
  • For time series processing, code vectorization is needed primarily during column decompression and when running specific aggregations over the data; both could be done in JNI functions. The overhead of JNI is relatively small when paid once for dozens of kilobytes of decompressed data (we may want to do processing in chunks of this size to fit all decompressed data in L2 cache). Project Panama is going to make this overhead even smaller. If the data is stored and processed in off-heap memory, the JNI implications on garbage collection are also small or non-existent.
  • Heap memory overhead could be made small by making all network IO, data storage, buffering and processing in off-heap memory and thus having very small heap only for some per-query allocations.
  • Garbage collection pauses could be kept short by using Shenandoah GC. Heap memory read and write barriers shouldn’t impact CPU utilization much if all data structures used in core processing loops are allocated off-heap.

Drawbacks of the proposed time series system

  • The system doesn’t feel like a single “database”, it has three independent subsystems with high total number of moving parts, that makes it not efficient on small scale, hard to deploy and update.
  • It could be challenging to integrate the system with existing SQL-speaking interfaces efficiently, because the system needs to run “composite” queries with many independent subqueries to the same table.
  • The system is not suitable for use cases when response to queries is needed faster than in one second.
  • The performance of the system is highly dependent on the network performance in the data center where it is deployed.
  • Inability to scale nodes in the third level of Computation tree horizontally could be a major scalability bottleneck in some use cases.



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Roman Leventov

Roman Leventov

Writing about systems, technology, philosophy.