



This project should ideally not exist and was created out of necessity.

Its acts as a proxy for an InfluxDB backend, sanitizing the output of strangely behaving InfluxQL queries.

Additionally, it features an automatic retention policy selection mechanism.

It works ideally between a Grafana and an InfluxDB instance.

This was originally a fork of trivago/Protector, an InfluxDB circuit breaker. All the HTTP proxying code comes from this project, so kudos to them.

The project is build in Python and expected to run on a Miniconda or Anaconda install.

Finally, I'm not a python expert, so don't be surprised to find all kinds of idiosyncrasies and anti-pythonic patterns. Do not hesitate to submit pull request to propose correction to those.


Automatic Retention Policy selection (issue #7198)

Automatic Retention Policy selection is implemented (source).

This corrects this long standing limitation of InfluxDB that is tracked primarily in issue #7198 (with interesting discussion in issue #6910 and issue #2625).

Active retention policies for each schema are retrieved automatically from the InfluxDB backend at Cleanflux startup. It only detect retention policies referenced in continuous queires with same source and destination measurement name.

This can be disabled by config:

auto_retrieve_retention_policies: False

Additionally, retention policies can be manually defined in the configuration file. This feature can also be used to override retention policies retrieved from database (e.g. to ignore one or when using Kapacitor instead of CQs).

Automatic Retention Policy only activates when no retention policy is explicitly defined in the query.

It will only work if fields names are kept the same across retention policies.

For queries that do a GROUP BY time(), the interval gets replaced if it corresponds to a higher precision that what the retention policy can offer.

Furthermore, we apply a ratio on values of SUM() ... GROUP BY time() queries that got a change of time interval, in order to preserve the rate unit.

Please note that queries with column like sum(aaa) + ... + sum(zzz) would have an incorrect ratio applied (only on last element would be scaled).


For example, with the following RPs:

> USE "my_app"
Using database my_app
name    duration   shardGroupDuration replicaN default
----    --------   ------------------ -------- -------
1_week  168h0m0s   1h0m0s             1        true
3_month 2160h0m0s  24h0m0s            1        false
10_year 87600h0m0s 168h0m0s           1        false

And the following CQs:

> CREATE CONTINUOUS QUERY cq_3_month_gauge ON "my_app" BEGIN SELECT mean(value) AS value INTO "my_app"."3_month".:MEASUREMENT FROM "my_app"."1_week"./^gauge_.*/ GROUP BY *, time(10m) END
> CREATE CONTINUOUS QUERY cq_3_month_counter ON "my_app" BEGIN SELECT sum(value) AS value INTO "my_app"."3_month".:MEASUREMENT FROM "my_app"."1_week"./^counter_.*/ GROUP BY *, time(10m) END
> CREATE CONTINUOUS QUERY cq_3_month_timer ON "my_app" BEGIN SELECT sum(count) AS count, mean(mean) AS mean, min(lower) AS lower, max(upper) AS upper, mean(stddev) AS stddev, mean("50_percentile") AS "50_percentile", mean("90_percentile") AS "90_percentile", mean("95_percentile") AS "95_percentile", mean("99_percentile") AS "99_percentile" INTO "my_app"."3_month".:MEASUREMENT FROM "my_app"."1_week"./^timer_.*/ GROUP BY *, time(10m) END
> CREATE CONTINUOUS QUERY cq_10_year_gauge ON "my_app" BEGIN SELECT mean(value) AS value INTO "my_app"."10_year".:MEASUREMENT FROM "my_app"."3_month"./^gauge_.*/ GROUP BY *, time(1h) END
> CREATE CONTINUOUS QUERY cq_10_year_counter ON "my_app" BEGIN SELECT sum(value) AS value INTO "my_app"."10_year".:MEASUREMENT FROM "my_app"."3_month"./^counter_.*/ GROUP BY *, time(1h) END
> CREATE CONTINUOUS QUERY cq_10_year_timer ON "my_app" BEGIN SELECT sum(count) AS count, mean(mean) AS mean, min(lower) AS lower, max(upper) AS upper, mean(stddev) AS stddev, mean("50_percentile") AS "50_percentile", mean("90_percentile") AS "90_percentile", mean("95_percentile") AS "95_percentile", mean("99_percentile") AS "99_percentile" INTO "my_app"."10_year".:MEASUREMENT FROM "my_app"."3_month"./^timer_.*/ GROUP BY *, time(1h) END

Configuration should follow this syntax:

    - regexp: 'timer_.*'
      function: sum
    - regexp: 'counter_.*'
      function: sum
    - regexp: 'gauge_.*'
      function: mean

If auto_retrieve_retention_policies is disabled (set to False), the retention policies should be declared manually:

    - name: 1_week
      duration: 168h0m0s
      default: true
      interval: 10s
    - name: 3_month
      duration: 2160h0m0s
      interval: 10m
    - name: 10_year
      duration: 87600h0m0s
      interval: 3h

The interval fields are optional and correspond to the interval between 2 points, i.e. the precision of the measurements.

For SUM() ... GROUP BY time(), in order to apply the correct ratio, Cleanflux needs to know the function used for downsampling (either sum or mean). This function is defined in the downsampling configuration (via CQs, Kapacitor...).

This is why we need to declare it in the aggregation_properties section.

Default rules can be specified with the special default schema name:

    - regexp: 'timer_.*'
      function: sum
    - regexp: 'counter_.*'
      function: sum
    - regexp: 'gauge_.*'
      function: mean

About InfluxDB Retention Policy Intervals

When defining an InfluxDB retention policy, you must precise a retention duration interval.

To maximize the rendered precision, we recommend intervals that match Grafana predefined ranges.

For example:


Automatic Reduction of Precision for Large Time Intervals

To optimize InfluxDB response times and Grafana rendering, Cleanflux has a mechanism to automatically downgrade the requested precision of queries for large time intervals.

There are 2 supported implementations.


The first and most lightweight downsamples according to an estimate of the number of points the query would return for 1 series.

It is better suited for when we know the max expected number of series any queries can produce.


max_nb_points_per_series: 1000

This should be set in accordance with the max-row-limit parameter in InfluxDB configuration. For example for a max-row-limit of 300000 and a max_nb_points_per_series of 1000, results would start to get truncated at the 301st series (1 + 300k / 1k).


The other mode is more accurate, by doing a first request towards InfluxDB to get the number of series.

This first request adds a time overhead.


max_nb_points_per_query: 300000

We can configure it with the same value as the max-row-limit parameter in InfluxDB configuration.

Removal of Partial Intervals case SUM() ... GROUP BY time() (issues #8010 & #6451)

Solves issue listed in issue #8010 (see also Grafana issue #5961).

issue #5961

As a positive side-effect, it also corrects issue #6451.

issue #6451

The code can be found in file remove_partial_intervals_case_sum_group_by_time.py

This corrects points corresponding to partial intervals when using SUM() along with GROUP BY time().

Moreover, it shifts forwards the time indexes of the result sets by one group by interval. Indeed, InfluxDB's behaviour is to make the SUM over the following interval and not the previous one!

Furthermore, (as of InfluxDB 1.1.1) the secondary argument for the GROUP BY time() clause doesn't work when corresponding to a delta superior to the group by time interval.

Please note that it might not work properly for queries with the time boundaries WHERE clause is surrounded by parenthesis.


An example configuration file at conf.example.yml.

In the rest of this documentation, it is assumed that it gets deployed at /etc/cleanflux.yml.


Python Virtual Env Instantiation

A virtual env (venv for short) allows to install packages only for the scope of a project.

It is recommended to instantiate one venv per project, to prevent conflicts between dependencies versions.

Conda Env

When using Miniconda or Anaconda, we rely on a conda env instead fo a standard virtual env.

Instead, we rely on the more modern environment.yml file. The env name is defined in this file.

To instantiate the conda env for the project, in a shell placed in the project root directory:

$ conda env create

Classic Virtual Env

Once you have created the virtual env, install dependencies by:

pip install -r requirements_pip.txt

Manual Launch

With venv active and inside root of the project source:

$ python cleanflux -c /etc/cleanflux.yml --foreground

LSB Service Script

The following has been tested on Debian with miniconda, cleanflux sources being deployed in /opt/cleanflux.

Create file /etc/init.d/cleanflux:

# Provides:          cleanflux
# Required-Start:    $all
# Required-Stop:
# Default-Start:     2 3 4 5
# Default-Stop:
# Short-Description: Proxy to ease communicating w/ an InfluxDB instance


DAEMON="$CLEANFLUX_VENV/bin/python cleanflux -c $CLEANFLUX_CONF"


case "$1" in
       cd $DAEMON_PATH
       $DAEMON start
       exit $?
        cd $DAEMON_PATH
        $DAEMON status
        exit $?
        cd $DAEMON_PATH
        $DAEMON stop
        exit $?
        cd $DAEMON_PATH
        $DAEMON restart
        exit $?
        echo "Usage: $0 {status|start|stop|restart}"
        exit 1

exit 0

Implementation Details

Query Parsing

We do a lot of the query parsing and modification via library sqlparse and use of regular expressions.

Library sqlparse seems to be kinda slow (at least for InfluxQL queries).


This app can log either to a custom file:

use_syslog: False
logfile: /var/log/cleanflux.log

Or to syslog:

use_syslog: true

When using syslog, one can log to the local syslog instance or to a remote one:

syslog_address: <address>

Please note that stack traces are logged.


Cleanflux can produce metrics in Datadog StatsD format using the datadog python module. This derived protocol allows meta-data ("tags") to be transmitted alongside the values.

To enable the collection of those metrics put this in config.yml:

statsd_host: localhost
statsd_port: 8125

We recommend collecting those metrics using the StatsD input plugin for Telegraf.

Don't forget to enable the following flag:

parse_data_dog_tags = true

Development Environment

This project was made using IntelliJ IDEA.

At the time of writing, IntelliJ IDEA / Pycharm have limited support for conda envs.

So you could either:

If using option #2, you can then bind the conda env with IntelliJ:

Known Limitations

This won't run under Microsoft Windows. Indeed, we rely on daemonocle that is linux-only.

This has not been tested and should not work (yet) with nested queries.

URL request parameters epoch, chunked and pretty are not (yet) supported.

No support for parsing user and password from basic authentication request.

Similar Projects

Obviously, trivago/Protector is a major source of inspiration.

Automatic Retention Policy selection has also been implemented by Lupul/influxdb-grafana-rp-proxy.


Jordan Besly @p3r7 (blog).