Skip to content

Commit

Permalink
Switch storage over to timescaledb
Browse files Browse the repository at this point in the history
This assumes the datastore will have been configured with the timescale
extension already.  Locally this is handled by the timescaledb
container, and in production we have already configured it.

The --database-url option for the CLI ensures we have that env var set.
We could always pass it through (since it's in the context) but I wasn't
sure if we actually needed it set via the CLI.

The writer class means we can ensure the appropriate table exists and
has had a hypertable created for it.

All tables have a unique constraint with the common _must_be_different
suffix so the INSERT's can act as an UPSERT (with ON CONFLICT) when it's
just the value that needs updating.
  • Loading branch information
ghickman committed Nov 3, 2023
1 parent 514e048 commit 5a1cac9
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 194 deletions.
11 changes: 5 additions & 6 deletions backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import structlog

from metrics import influxdb
from metrics.github.prs import process_prs
from metrics.logs import setup_logging
from metrics.timescaledb import TimescaleDBWriter
from metrics.tools.dates import date_from_iso, datetime_from_iso, iter_days


Expand All @@ -17,9 +17,6 @@
log = structlog.get_logger()


writer = influxdb.write


def get_data(db, orgs):
subprocess.check_call(["github-to-sqlite", "repos", db, *orgs])

Expand Down Expand Up @@ -83,7 +80,8 @@ def pr_queue(prs, org, start, days_threshold=None):
key = f"queue{suffix}"

log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_on_day))
process_prs(writer, key, prs_on_day, day)
with TimescaleDBWriter("github_pull_requests", f"queue{suffix}") as writer:
process_prs(writer, prs_on_day, day)


def pr_throughput(prs, org, start):
Expand Down Expand Up @@ -119,7 +117,8 @@ def next_weekday(d, weekday):

key = "throughput"
log.info("%s | %s | %s | Processing %s PRs", key, day, org, len(prs_in_range))
process_prs(writer, key, prs_in_range, day)
with TimescaleDBWriter("github_pull_requests", "throughput") as writer:
process_prs(writer, prs_in_range, day)


if __name__ == "__main__":
Expand Down
16 changes: 1 addition & 15 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,12 @@ services:
GF_DATABASE_SSL_MODE: disable
depends_on:
- db
- influxdb
- timescaledb
ports:
- 3000:3000
volumes:
- grafana:/var/lib/grafana

influxdb:
image: influxdb:latest
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: admin
DOCKER_INFLUXDB_INIT_ORG: bennett
DOCKER_INFLUXDB_INIT_BUCKET: data
ports:
- 8086:8086
volumes:
- influxdb:/var/lib/influxdb2

timescaledb:
image: timescale/timescaledb-ha:pg14-latest
environment:
Expand All @@ -52,5 +39,4 @@ services:
volumes:
postgres:
grafana:
influxdb:
timescaledb:
4 changes: 3 additions & 1 deletion metrics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

@click.group()
@click.option("--debug", default=False, is_flag=True)
@click.option("--database-url", required=True, envvar="DATABASE_URL")
@click.pass_context
def cli(ctx, debug):
def cli(ctx, debug, database_url):
ctx.ensure_object(dict)

setup_logging(debug)

ctx.obj["DEBUG"] = debug
ctx.obj["DATABASE_URL"] = database_url


cli.add_command(github)
Expand Down
9 changes: 5 additions & 4 deletions metrics/github/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import click
import structlog

from .. import influxdb
from ..timescaledb import TimescaleDBWriter
from . import api
from .prs import process_prs


log = structlog.get_logger()
writer = influxdb.write


@click.group()
Expand Down Expand Up @@ -42,7 +41,8 @@ def pr_queue(ctx, org, date, days_threshold):
suffix = f"_older_than_{days_threshold}_days" if days_threshold else ""

log.info("%s | %s | Processing %s PRs", date, org, len(prs))
process_prs(writer, f"queue{suffix}", prs, date)
with TimescaleDBWriter("github_pull_requests", f"queue{suffix}") as writer:
process_prs(writer, prs, date)


@github.command()
Expand All @@ -58,4 +58,5 @@ def pr_throughput(ctx, org, date, days):
prs = api.prs_opened_in_the_last_N_days(org, start, end)

log.info("%s | %s | Processing %s PRs", date, org, len(prs))
process_prs(writer, "throughput", prs, date)
with TimescaleDBWriter("github_pull_requests", "throughput") as writer:
process_prs(writer, prs, date)
13 changes: 5 additions & 8 deletions metrics/github/prs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def process_prs(writer, key, prs, date):
def process_prs(writer, prs, date):
"""
Given a list of PRs, break them down in series for writing
Expand All @@ -20,13 +20,10 @@ def process_prs(writer, key, prs, date):

org = list(orgs)[0]

writer(
f"github_pull_requests_{key}",
writer.write(
date,
len(prs_by_author_and_repo),
tags={
"author": author,
"organisation": org,
"repo": repo,
},
author=author,
organisation=org,
repo=repo,
)
65 changes: 0 additions & 65 deletions metrics/influxdb.py

This file was deleted.

18 changes: 6 additions & 12 deletions metrics/slack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@

import click

from .. import influxdb
from ..timescaledb import TimescaleDBWriter
from .api import get_app, iter_messages


writer = influxdb.write


@click.group()
@click.option("--signing-secret", required=True, envvar="SLACK_SIGNING_SECRET")
@click.option("--token", required=True, envvar="SLACK_TOKEN")
Expand Down Expand Up @@ -38,11 +35,8 @@ def tech_support(ctx, date, tech_support_channel_id, backfill):

messages = iter_messages(app, tech_support_channel_id, date=day)

for date, messages in itertools.groupby(
messages, lambda m: datetime.fromtimestamp(float(m["ts"])).date()
):
writer(
"slack_tech_support_requests",
date,
len(list(messages)),
)
with TimescaleDBWriter("slack_tech_support", "requests") as writer:
for date, messages in itertools.groupby(
messages, lambda m: datetime.fromtimestamp(float(m["ts"])).date()
):
writer.write(date, len(list(messages)))
39 changes: 0 additions & 39 deletions metrics/timescaledb.py

This file was deleted.

6 changes: 6 additions & 0 deletions metrics/timescaledb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .writer import TimescaleDBWriter


__all__ = [
"TimescaleDBWriter",
]
19 changes: 19 additions & 0 deletions metrics/timescaledb/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
github_pull_requests = """
CREATE TABLE IF NOT EXISTS github_pull_requests (
time TIMESTAMP WITH TIME ZONE NOT NULL,
name TEXT NOT NULL,
value INTEGER NOT NULL,
author TEXT NOT NULL,
organisation TEXT NOT NULL,
repo TEXT NOT NULL,
CONSTRAINT github_pull_requests_must_be_different UNIQUE (time, name, author, repo)
);
"""
slack_tech_support = """
CREATE TABLE IF NOT EXISTS slack_tech_support (
time TIMESTAMP WITH TIME ZONE NOT NULL,
name TEXT NOT NULL,
value INTEGER NOT NULL,
CONSTRAINT slack_tech_support_must_be_different UNIQUE (time, name)
);
"""
76 changes: 76 additions & 0 deletions metrics/timescaledb/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
from datetime import datetime, time

import psycopg
import structlog

from . import tables


log = structlog.get_logger()

DATABASE_URL = os.environ["DATABASE_URL"]


def ensure_table(name):
"""
Ensure both the table and hypertable config exist in the database
"""
run(getattr(tables, name))

run(
"SELECT create_hypertable(%s, 'time', if_not_exists => TRUE);",
[name],
)


def run(sql, *args):
with psycopg.connect(DATABASE_URL) as conn:
cursor = conn.cursor()

return cursor.execute(sql, *args)


class TimescaleDBWriter:
def __init__(self, table, key):
self.key = key
self.table = table

def __enter__(self):
ensure_table(self.table)

return self

def __exit__(self, *args):
pass

def write(self, date, value, **kwargs):
# convert date to a timestamp
# TODO: do we need to do any checking to make sure this is tz-aware and in
# UTC?
dt = datetime.combine(date, time())

# insert into the table set at instantiation
# unique by the tables `{name}_must_be_different` and we always want to
# bump the value if that triggers a conflict
# the columns could differ per table… do we want an object to represent tables?
if kwargs:
extra_fields = ", " + ", ".join(kwargs.keys())
placeholders = ", " + ", ".join(["%s" for k in kwargs.keys()])
else:
extra_fields = ""
placeholders = ""
sql = f"""
INSERT INTO {self.table} (time, name, value {extra_fields})
VALUES (%s, %s, %s {placeholders})
ON CONFLICT ON CONSTRAINT {self.table}_must_be_different DO UPDATE SET value = EXCLUDED.value;
"""

run(sql, (dt, self.key, value, *kwargs.values()))

log.debug(
self.key,
date=dt.isoformat(),
value=value,
**kwargs,
)
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ requires-python = ">=3.11"
dependencies = [
"click",
"github-to-sqlite",
"influxdb-client",
"requests",
"psycopg[binary]",
"slack-bolt",
Expand Down
Loading

0 comments on commit 5a1cac9

Please sign in to comment.