Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Carbondb 2.0 - New #957

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 42 additions & 39 deletions api/api_helpers.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import typing
import io
import ipaddress
import json
import uuid
import faulthandler
from functools import cache
from html import escape as html_escape
import re

faulthandler.enable() # will catch segfaults and write to STDERR

from starlette.background import BackgroundTask
from fastapi.exceptions import RequestValidationError
from fastapi.responses import ORJSONResponse
import numpy as np
import requests
import scipy.stats
from pydantic import BaseModel

faulthandler.enable() # will catch segfaults and write to STDERR

from lib.global_config import GlobalConfig
from lib.db import DB
Expand Down Expand Up @@ -47,6 +48,12 @@ def store_artifact(artifact_type: Enum, key:str, data, ex=2592000):
except redis.RedisError as e:
error_helpers.log_error('Redis store_artifact failed', exception=e)


# Note
# ---------------
# Use this function never in the phase_stats. The metrics must always be on
# The same unit for proper comparison!
#
def rescale_energy_value(value, unit):
# We only expect values to be mJ for energy!
if unit != 'mJ' and not unit.startswith('ugCO2e/'):
Expand Down Expand Up @@ -476,6 +483,9 @@ def get_phase_stats_object(phase_stats, case):
#'is_significant': None, # currently no use for that
'data': {},
}
elif phase_stats_object['data'][phase][metric_name]['unit'] != unit:
raise RuntimeError(f"Metric cannot be compared as units have changed: {unit} vs. {phase_stats_object['data'][phase][metric_name]['unit']}")


if detail_name not in phase_stats_object['data'][phase][metric_name]['data']:
phase_stats_object['data'][phase][metric_name]['data'][detail_name] = {
Expand Down Expand Up @@ -763,47 +773,40 @@ def get_carbon_intensity(latitude, longitude):

return None

def carbondb_add(client_ip, energydatas, user_id):

data_rows = []

for e in energydatas:
def carbondb_add(connecting_ip, data, source, user_id):

if not isinstance(e, dict):
e = e.dict()

e = html_escape_multi(e)

fields_to_check = {
'type': e['type'],
'energy_value': e['energy_value'], # is expected to be in J
'time_stamp': e['time_stamp'], # is expected to be in microseconds
}

for field_name, field_value in fields_to_check.items():
if field_value is None or str(field_value).strip() == '':
raise RequestValidationError(f"{field_name.capitalize()} is empty. Ignoring everything!")
query = '''
INSERT INTO carbondb_data_raw
("type", "project", "machine", "source", "tags","time","energy_kwh","carbon_kg","carbon_intensity_g","latitude","longitude","ip_address","user_id","created_at")
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
'''

if 'ip' in e: # An ip has been given with the data. We prioritize that
latitude, longitude = get_geo(e['ip']) # cached
carbon_intensity = get_carbon_intensity(latitude, longitude) # cached
else:
latitude, longitude = get_geo(client_ip) # cached
carbon_intensity = get_carbon_intensity(latitude, longitude) # cached
used_client_ip = data.get('ip', None) # An ip has been given with the data. We prioritize that
if used_client_ip is None:
used_client_ip = connecting_ip

energy_kwh = float(e['energy_value']) * 2.77778e-7 # kWh
co2_value = energy_kwh * carbon_intensity # results in g
carbon_intensity_g_per_kWh = data.get('carbon_intensity_g', None)

company_uuid = e['company'] if e['company'] is not None else ''
project_uuid = e['project'] if e['project'] is not None else ''
tags_clean = "{" + ",".join([f'"{tag.strip()}"' for tag in e['tags'].split(',') if e['tags']]) + "}" if e['tags'] is not None else ''
if carbon_intensity_g_per_kWh is not None: # we need this check explicitely as we want to allow 0 as possible value
latitude = None # no use to derive if we get supplied data. We rather indicate with NULL that user supplied
longitude = None # no use to derive if we get supplied data. We rather indicate with NULL that user supplied
else:
latitude, longitude = get_geo(used_client_ip) # cached
carbon_intensity_g_per_kWh = get_carbon_intensity(latitude, longitude) # cached

row = f"{e['type']}|{company_uuid}|{e['machine']}|{project_uuid}|{tags_clean}|{int(e['time_stamp'])}|{e['energy_value']}|{co2_value}|{carbon_intensity}|{latitude}|{longitude}|{client_ip}|{user_id}"
data_rows.append(row)
energy_J = float(data['energy_uj']) / 1e6
energy_kWh = energy_J / (3_600*1_000)
carbon_kg = energy_kWh * carbon_intensity_g_per_kWh

data_str = "\n".join(data_rows)
data_file = io.StringIO(data_str)
DB().query(
query=query,
params=(
data['type'],
data['project'], data['machine'], source, data['tags'], data['time'], energy_kWh, carbon_kg, carbon_intensity_g_per_kWh, latitude, longitude, used_client_ip, user_id))

columns = ['type', 'company', 'machine', 'project', 'tags', 'time_stamp', 'energy_value', 'co2_value', 'carbon_intensity', 'latitude', 'longitude', 'ip_address', 'user_id']

DB().copy_from(file=data_file, table='carbondb_energy_data', columns=columns, sep='|')
def validate_carbondb_params(param, elements: list):
for el in elements:
if not re.fullmatch(r'[A-Za-z0-9\._-]+', el):
raise ValueError(f"Parameter for '{param}' may only contain A-Za-z0-9._- characters and no spaces. Was: {el}")
Loading