Skip to content

Commit

Permalink
feat(clickhouse): distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 28, 2023
1 parent 43f0bb7 commit ca54062
Show file tree
Hide file tree
Showing 14 changed files with 1,794 additions and 56 deletions.
8 changes: 8 additions & 0 deletions cli/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import os
import random
import shutil
import string
import sys
import time
from dataclasses import dataclass
Expand Down Expand Up @@ -419,3 +421,9 @@ def get_otel_env() -> dict:
env[url_var] = conf(OTEL_VALIDATORS)[f"L12N_{url_var}"]
env[auth_var] = conf(OTEL_VALIDATORS)[f"L12N_{auth_var}"]
return env


def rand_cluster_id() -> str:
return "".join(
random.choice(string.digits + string.ascii_letters) for _ in range(6)
)
16 changes: 1 addition & 15 deletions cli/plugins/ballista.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@

import base64
import json
import random
import string
import time
from concurrent.futures import ThreadPoolExecutor

import core
from common import (
OTEL_VALIDATORS,
aws,
conf,
format_lambda_output,
get_otel_env,
rand_cluster_id,
terraform_output,
)
from invoke import Exit, task

VALIDATORS = OTEL_VALIDATORS


def rand_cluster_id() -> str:
return "".join(
random.choice(string.digits + string.ascii_letters) for _ in range(6)
)


@task(autoprint=True)
def lambda_example(c, json_output=False, month="01"):
"""CREATE EXTERNAL TABLE and find out SUM(trip_distance) GROUP_BY payment_type"""
Expand Down Expand Up @@ -55,7 +47,6 @@ def format_lambda_result(name, external_duration, lambda_res):

def run_executor(
lambda_name: str,
bucket_name: str,
seed_ip: str,
virtual_ip: str,
scheduler_ip: str,
Expand All @@ -78,7 +69,6 @@ def run_executor(
Payload=json.dumps(
{
"role": "executor",
"bucket_name": bucket_name,
"timeout_sec": 40,
"scheduler_ip": scheduler_ip,
"env": env,
Expand All @@ -93,7 +83,6 @@ def run_executor(

def run_scheduler(
lambda_name: str,
bucket_name: str,
seed_ip: str,
virtual_ip: str,
query: str,
Expand All @@ -116,7 +105,6 @@ def run_scheduler(
Payload=json.dumps(
{
"role": "scheduler",
"bucket_name": bucket_name,
"timeout_sec": 38,
"query": base64.b64encode(query.encode()).decode(),
"env": env,
Expand Down Expand Up @@ -149,7 +137,6 @@ def distributed(c, seed, dataset=10):
scheduler_fut = ex.submit(
run_scheduler,
lambda_name,
bucket_name,
seed,
"172.28.0.1",
sql,
Expand All @@ -162,7 +149,6 @@ def distributed(c, seed, dataset=10):
ex.submit(
run_executor,
lambda_name,
bucket_name,
seed,
f"172.28.0.{i+2}",
"172.28.0.1",
Expand Down
10 changes: 1 addition & 9 deletions cli/plugins/chappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import base64
import json
import random
import string
import time
import uuid
from collections import namedtuple
Expand All @@ -17,23 +15,17 @@
REPOROOT,
FargateService,
aws,
conf,
format_lambda_output,
get_otel_env,
terraform_output,
wait_deployment,
rand_cluster_id,
)
from invoke import Context, Exit, task

VALIDATORS = OTEL_VALIDATORS


def rand_cluster_id() -> str:
return "".join(
random.choice(string.digits + string.ascii_letters) for _ in range(6)
)


def service_outputs(c: Context) -> tuple[str, str, str]:
with ThreadPoolExecutor() as ex:
cluster_fut = ex.submit(terraform_output, c, "chappy", "fargate_cluster_name")
Expand Down
102 changes: 100 additions & 2 deletions cli/plugins/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
"""Clickhouse on AWS Lambda"""

import base64
import json
import time
from concurrent.futures import ThreadPoolExecutor

import core
from common import AWS_REGION
from invoke import task
from common import (
AWS_REGION,
OTEL_VALIDATORS,
aws,
format_lambda_output,
get_otel_env,
rand_cluster_id,
terraform_output,
)
from invoke import Exit, task

VALIDATORS = OTEL_VALIDATORS


@task(autoprint=True)
Expand All @@ -15,3 +30,86 @@ def lambda_example(c, json_output=False, month="01"):
if not json_output:
print(sql)
return core.run_lambda(c, "clickhouse", sql, json_output=json_output)


def format_lambda_result(name, external_duration, lambda_res):
result = []
result.append(f"==============================")
result.append(f"RESULTS FOR {name}")
result.append(f"EXTERNAL_DURATION: {external_duration}")
if "FunctionError" in lambda_res:
raise Exit(message=lambda_res["Payload"], code=1)
result.append("== PAYLOAD ==")
result.append(format_lambda_output(lambda_res["Payload"], False))
result.append(f"==============================")
return "\n".join(result)


def run_node(
lambda_name: str,
seed_ip: str,
virtual_ip: str,
query: str,
nodes: int,
cluster_id: str,
):
start_time = time.time()
env = {
"CHAPPY_CLUSTER_SIZE": nodes,
"CHAPPY_SEED_HOSTNAME": seed_ip,
"CHAPPY_SEED_PORT": 8000,
"CHAPPY_CLUSTER_ID": cluster_id,
"CHAPPY_VIRTUAL_IP": virtual_ip,
"RUST_LOG": "info,chappy_perforator=debug,chappy=trace,rustls=error",
"RUST_BACKTRACE": "1",
**get_otel_env(),
}
lambda_res = aws("lambda").invoke(
FunctionName=lambda_name,
Payload=json.dumps(
{
"timeout_sec": 38,
"query": base64.b64encode(query.encode()).decode(),
"env": env,
}
).encode(),
InvocationType="RequestResponse",
LogType="None",
)
lambda_res["Payload"] = lambda_res["Payload"].read().decode()
return (lambda_res, time.time() - start_time)


@task
def distributed(c, seed, dataset=10, nodes=5):
"""CREATE EXTERNAL TABLE and find out stored page data by url_host_registered_domain"""
bucket_name = core.bucket_name(c)
core.load_commoncrawl_index(c, dataset)
cluster_id = rand_cluster_id()
sql = f"""
SELECT url_host_registered_domain, sum(warc_record_length) AS stored_bytes
FROM s3Cluster('cloudfuse_cluster', 'https://{bucket_name}.s3.{AWS_REGION()}.amazonaws.com/commoncrawl/index/n{dataset}/crawl=CC-MAIN-2023-14/subset=warc/*', 'Parquet')
GROUP BY url_host_registered_domain
ORDER BY sum(warc_record_length) DESC
LIMIT 10;
"""

lambda_name = terraform_output(c, "clickhouse", "distributed_lambda_name")
with ThreadPoolExecutor(max_workers=nodes + 4) as ex:
node_futs = []
for i in range(1, nodes + 1):
node_futs.append(
ex.submit(
run_node,
lambda_name,
seed,
f"172.28.0.{i}",
sql if i == nodes else "",
nodes,
cluster_id,
)
)

for i in range(0, nodes):
executor_res, executor_duration = node_futs[i].result()
print(format_lambda_result(f"NODE-{i+1}", executor_duration, executor_res))
15 changes: 7 additions & 8 deletions docker/ballista/distributed-handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,18 @@ def handle_event(event) -> dict[str, Any]:
global IS_COLD_START
is_cold_start = IS_COLD_START
IS_COLD_START = False
if not is_cold_start:
raise Exception(f"Only cold starts supported")

logging.info(f"role :{event['role']}")
timeout_sec = float(event["timeout_sec"])

if is_cold_start:
if event["role"] == "scheduler":
srv_proc = init_scheduler()
elif event["role"] == "executor":
srv_proc = init_executor(event["scheduler_ip"])
else:
raise Exception(f'Unknown role {event["role"]}')
if event["role"] == "scheduler":
srv_proc = init_scheduler()
elif event["role"] == "executor":
srv_proc = init_executor(event["scheduler_ip"])
else:
raise Exception(f"Only cold starts supported")
raise Exception(f'Unknown role {event["role"]}')

init_duration = time.time() - start

Expand Down
15 changes: 1 addition & 14 deletions docker/ballista/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,7 @@ services:
context: ../..
dockerfile: docker/ballista/distributed.Dockerfile
image: cloudfuse-io/l12n:ballista-distributed
cap_drop:
- ALL
read_only: true
volumes:
- ballista-tmp:/tmp
entrypoint:
- python3
- distributed-handler.py
environment:
- AWS_ACCESS_KEY_ID=$LAMBDA_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY=$LAMBDA_SECRET_ACCESS_KEY
- AWS_SESSION_TOKEN=$LAMBDA_SESSION_TOKEN
- AWS_DEFAULT_REGION=$L12N_AWS_REGION
- DATA_BUCKET_NAME
# unfortunately local tests of distributed images is not supported yet

volumes:
ballista-tmp:
Loading

0 comments on commit ca54062

Please sign in to comment.