An example of how to perform downsampling with Bytewax and InfluxDB Cloud v3. Bytewax is an open source real-time stream processing tool for buidlign data pipelines through a Python framewor.
- Python 3.8 or higher
- Latest version of Bytewax
- Latest version of Pandas
- InfluxDB v3 Client
Make sure to set the following environment variables for InfluxDB (or hardcode them in the scripts):
INLFUXDB_TOKEN: Your InfluxDB authentication token.
INFLUXDB_DATABASE: The name of your InfluxDB database.
INFLUXDB_ORG: Your InfluxDB organization name.
You can set them in your shell session like this:
export INLFUXDB_TOKEN="your-token-here"
export INFLUXDB_DATABASE="cpu"
export INFLUXDB_ORG="your-org-here"
First, clone the repository and navigate to the project directory:
git clone https://github.com/InfluxCommunity/Getting_Started_Bytewax_InfluxDB
cd Getting_Started_Bytewax_InfluxDB
Next set up a virtual environment:
python3 -m venv venv
source venv/bin/activate # On Windows use `venv\Scripts\activate`
pip install -r requirements.txt
This script sets up a dataflow using Bytewax to periodically query data from an InfluxDB database every 15 seconds, specifically retrieving the last 15 seconds of data from the cpu measurement. The retrieved data is processed and then output to the standard output (console) using the StdOutSink. Logging is configured to display information messages, and the flow is set up to log the output as it processes the data.
Run basic_request.py with:
python3 -m bytewax.run basic_request.py
This script provides an example that uses custom sink and sources defined in influx_connetor.py to downsample 1 minute of data every 10 seconds by:
- Querying InfluxBD and returing a dataframe.
- Using SQL to aggregate the values.
- Writing the downsampled dataframe back to InfluxDB.
Run dataflow.py with:
python3 -m bytewax.run dataflow.py