While TSDB is designed to store original, full resolution data as long as there is space, queries for wide time ranges or over many tag combinations can be quite painful. Such queries can take a long time to complete or, in the worst case, kill TSDs with out-of-memory exceptions. As of OpenTSDB 2.4, a set of new APIs allow for storing and querying lower resolution data to answer such queries much quicker. This page will give you an overview of what rollups and pre-aggregates are, how they work in TSDB and how best to use them. Look in the API's section for specific implementation details.
OpenTSDB does not itself calculate and store rollup or pre-aggregated data. There are multiple ways to compute the results but they all have benefits and drawbacks depending on the scale and accuracy requirements. See the Generating Rollups and Pre-Aggregates section discussing how to create this data.
To help describe the lower resolution data, lets look at some full resolution (also known as raw data) example data. The first table defines the time series with a short-cut identifier.
|Series ID||Metric||Tag 1||Tag 2||Tag 3|
Notice that they all have the same
interface tag, but different
Next for some data written at 15 minute intervals:
Notice that some data points are missing. With those data sets, lets look at rollups first.
A "rollup" is defined, in OpenTSDB, as a single time series aggregated over time. It may also be called a "time-based aggregation". Rollups help to solve the problem of looking at wide time spans. For example, if you write a data point every 60 seconds and query for one year of data, a time series would return more than 525k individual data points. Graphing that many points could be pretty messy. Instead you may want to look at lower resolution data, say 1 hour data where you only have around 8k values to plot. Then you can identify anomalies and drill down for finer resolution data.
If you have already used OpenTSDB to query data, you are likely familiar with downsamplers that aggregate each time series into a smaller, or lower resolution, value. A rollup is essentially the result of a downsampler stored in the system and called up at will. Each rollup (or downsampler) requires two pieces of information:
1hfor one hour of data or
1dfor a day of data.
sumto add all of the values or
maxto store the largest.
When storing rollups, it's best to avoid functions such as average, median or deviation. When performing further downsampling or grouping aggregations, such values become meaningless. Instead it's much better to always store the sum and count from which, at least, the average can be computed at query time. For more information, see the section below.
The timestamp of a rolled-up data point should snap to the top of the rollup interval. E.g. if the rollup interval is
1h then it contains 1 hour of data and should snap to the top of the hour. (As all timestamps are written in Unix Epoch format, defined as the UTC timezone, this would be the start of an hour UTC time).
Given the series above, lets store the
count with an interval of
Notice that all timestamps align to the top of the hour regardless of when the first data point in the interval "bucket" appears. Also notice that if a data point is not present for an interval, the count is lower.
In general, you should aim to compute and store the
COUNT for each time series when storing rollups.
When rollups are enabled and you request a downsampler with the
avg function from OpenTSDB, the TSD will scan storage for
COUNT values. Then while iterating over the data it will accurately compute the average.
The timestamps for count and sum values must match. However, if the expected count value for a sum is missing, the sum will be kicked out of the results. Take the following example set from above where we're now missing a count data point in
avg for a
2h downsampling query would look like this:
While rollups help with wide time span queries, you can still run into query performance issues with small ranges if the metric has high cardinality (i.e. the unique number of time series for the given metric). In the example above, we have 4 web servers. But lets say that we have 10,000 servers. Fetching the sum or average of interface traffic may be fairly slow. If users are often fetching the group by (or some think of it as the spatial aggregate) of large sets like this then it makes sense to store the aggregate and query that instead, fetching much less data.
Unlike rollups, pre-aggregates require only one extra piece of information:
sumto add all of the time series or
maxto store the largest.
In OpenTSDB, pre-aggregates are differentiated from other time series with a special tag. The default tag key is
_aggregate (configurable via
tsd.rollups.agg_tag_key). The aggregation function used to generate the data is then stored in the tag value in upper-case. Lets look at an example:
Given the example set at the top, we may want to look at the total interface traffic by colo (data center). In that case, we can aggregate by
COUNT similarly to the rollups. The result would be four new time series with meta data like:
|Series ID||Metric||Tag 1||Tag 2|
Notice that these time series have dropped the tags for
interface. That's because, during aggregation, multiple, different values of the
interface have been wrapped up into this new series so it no longer makes sense to have them as tags. Also note that we injected the new
_aggregate tag in the stored data. Queries can now access this data by specifying an
With rollups enabled, if you plan to use pre-aggregates, you may want to help differentiate raw data from pre-aggregates by having TSDB automatically inject
_aggregate=RAW. Just configure the
tsd.rollups.tag_raw setting to true.
Now for the resulting data:
Since we're performing a group by aggregation (grouping by
colo) we have a value for each timestamp from the original data set. We are not downsampling or performing a rollup in this situation.
As with rollups, when writing pre-aggregates, it's best to avoid functions such as average, median or deviation. Just store sum and count
While pre-aggregates certainly help with high-cardinality metrics, users may still want to ask for wide time spans but run into slow queries. Thankfully you can roll up a pre-aggregate in the same way as raw data. Just generate the pre-aggregate, then roll it up using the information above.
Currently the TSDs do not generate the rollup or pre-aggregated data for you. The primary reason for this is that OpenTSDB is meant to handle huge amounts of time series data so individual TSDs are focused on getting their data into storage as quickly as possible.
Because of the (essentially) stateless nature of the TSDs, they likely won't have the full set of data available to perform pre-aggregates. E.g., our sample
ts1 data may be written to
ts2 is written to
TSD_B. Neither can perform a proper group-by without reading the data out of storage. We also don't know at what time we should perform the pre-aggregation. We could wait for 1 minute and pre-aggregate the data but miss anything that came in after that minute. Or we could wait an hour and queries over the pre-aggregates won't have data for the last hour. And what happens if data comes in much later?
Additionally for rollups, depending on how users write data to TSDs, for
ts1, we may receive the
12:15 data point on
TSD_A but the
12:30 value arrives on
TSD_B so neither has the data required for the full hour. Time windowing constraints also apply to rollups.
Using rollups and pre-aggregates require some analysis and a choice between various trade-offs. Since some OpenTSDB users already have means in place for calculating this kind of data, we simply provide the API to store and query. However here are some tips on how to compute these on your own.
One method that is commonly used by other time series databases is to read the data out of the database after some delay, calculate the pre-aggs and rollups, then write them. This is the easiest way of solving the problem and works well at small scales. However there are still a number of issues:
Some methods of improving batch processing include:
Queueing on TSDs
Another option that some databases use is to queue all of the data in memory in the process and write the results after a configured time window has passed. But because TSDs are stateless and generally users put a load balancer in front of their TSDs, a single TSD may not get the full picture of the rollup or pre-agg to be calculated (as we mentioned above). For this method to work, upstream collectors would have to route all of the data required for a calculation to a specific TSD. It's not a difficult task but the problems faced include:
In general, queueing on a writer is a bad idea. Avoid the pain.
A better way of dealing with rollups and pre-aggregates is to route the data into a stream processing system where it can be processed in near-real-time and written to the TSDs. It's similar to the Queuing on TSDs option but using one of the myriad stream processing frameworks (Storm, Flink, Spark, etc.) to handle message routing and in-memory storage. Then you simply write some code to compute the aggregates and spit the data out after a window has passed.
This is the solution used by many next-generation monitoring solutions such as that at Yahoo!. Yahoo is working to open source their stream processing system for others who need monitoring at massive scales and it plugs neatly into TSDB.
While stream processing is better you still have problems to deal with such as:
If you have working code for calculating aggregations, please share with the OpenTSDB group. If your solution is open-source we may be able to incorporate it in the OpenTSDB ecosystem.
© 2010–2016 The OpenTSDB Authors
Licensed under the GNU LGPLv2.1+ and GPLv3+ licenses.