diff --git a/cli/common.py b/cli/common.py
index 142c804..54c3731 100644
--- a/cli/common.py
+++ b/cli/common.py
@@ -1,6 +1,8 @@
import json
import os
+import random
import shutil
+import string
import sys
import time
from dataclasses import dataclass
@@ -419,3 +421,9 @@ def get_otel_env() -> dict:
env[url_var] = conf(OTEL_VALIDATORS)[f"L12N_{url_var}"]
env[auth_var] = conf(OTEL_VALIDATORS)[f"L12N_{auth_var}"]
return env
+
+
+def rand_cluster_id() -> str:
+ return "".join(
+ random.choice(string.digits + string.ascii_letters) for _ in range(6)
+ )
diff --git a/cli/plugins/ballista.py b/cli/plugins/ballista.py
index 3a53fdf..841e001 100644
--- a/cli/plugins/ballista.py
+++ b/cli/plugins/ballista.py
@@ -2,8 +2,6 @@
import base64
import json
-import random
-import string
import time
from concurrent.futures import ThreadPoolExecutor
@@ -11,9 +9,9 @@
from common import (
OTEL_VALIDATORS,
aws,
- conf,
format_lambda_output,
get_otel_env,
+ rand_cluster_id,
terraform_output,
)
from invoke import Exit, task
@@ -21,12 +19,6 @@
VALIDATORS = OTEL_VALIDATORS
-def rand_cluster_id() -> str:
- return "".join(
- random.choice(string.digits + string.ascii_letters) for _ in range(6)
- )
-
-
@task(autoprint=True)
def lambda_example(c, json_output=False, month="01"):
"""CREATE EXTERNAL TABLE and find out SUM(trip_distance) GROUP_BY payment_type"""
@@ -55,7 +47,6 @@ def format_lambda_result(name, external_duration, lambda_res):
def run_executor(
lambda_name: str,
- bucket_name: str,
seed_ip: str,
virtual_ip: str,
scheduler_ip: str,
@@ -78,7 +69,6 @@ def run_executor(
Payload=json.dumps(
{
"role": "executor",
- "bucket_name": bucket_name,
"timeout_sec": 40,
"scheduler_ip": scheduler_ip,
"env": env,
@@ -93,7 +83,6 @@ def run_executor(
def run_scheduler(
lambda_name: str,
- bucket_name: str,
seed_ip: str,
virtual_ip: str,
query: str,
@@ -116,7 +105,6 @@ def run_scheduler(
Payload=json.dumps(
{
"role": "scheduler",
- "bucket_name": bucket_name,
"timeout_sec": 38,
"query": base64.b64encode(query.encode()).decode(),
"env": env,
@@ -149,7 +137,6 @@ def distributed(c, seed, dataset=10):
scheduler_fut = ex.submit(
run_scheduler,
lambda_name,
- bucket_name,
seed,
"172.28.0.1",
sql,
@@ -162,7 +149,6 @@ def distributed(c, seed, dataset=10):
ex.submit(
run_executor,
lambda_name,
- bucket_name,
seed,
f"172.28.0.{i+2}",
"172.28.0.1",
diff --git a/cli/plugins/chappy.py b/cli/plugins/chappy.py
index 1f2c929..2c5dec4 100644
--- a/cli/plugins/chappy.py
+++ b/cli/plugins/chappy.py
@@ -2,8 +2,6 @@
import base64
import json
-import random
-import string
import time
import uuid
from collections import namedtuple
@@ -17,23 +15,17 @@
REPOROOT,
FargateService,
aws,
- conf,
format_lambda_output,
get_otel_env,
terraform_output,
wait_deployment,
+ rand_cluster_id,
)
from invoke import Context, Exit, task
VALIDATORS = OTEL_VALIDATORS
-def rand_cluster_id() -> str:
- return "".join(
- random.choice(string.digits + string.ascii_letters) for _ in range(6)
- )
-
-
def service_outputs(c: Context) -> tuple[str, str, str]:
with ThreadPoolExecutor() as ex:
cluster_fut = ex.submit(terraform_output, c, "chappy", "fargate_cluster_name")
diff --git a/cli/plugins/clickhouse.py b/cli/plugins/clickhouse.py
index 4c9be0b..c46c0d2 100644
--- a/cli/plugins/clickhouse.py
+++ b/cli/plugins/clickhouse.py
@@ -1,8 +1,23 @@
"""Clickhouse on AWS Lambda"""
+import base64
+import json
+import time
+from concurrent.futures import ThreadPoolExecutor
+
import core
-from common import AWS_REGION
-from invoke import task
+from common import (
+ AWS_REGION,
+ OTEL_VALIDATORS,
+ aws,
+ format_lambda_output,
+ get_otel_env,
+ rand_cluster_id,
+ terraform_output,
+)
+from invoke import Exit, task
+
+VALIDATORS = OTEL_VALIDATORS
@task(autoprint=True)
@@ -15,3 +30,86 @@ def lambda_example(c, json_output=False, month="01"):
if not json_output:
print(sql)
return core.run_lambda(c, "clickhouse", sql, json_output=json_output)
+
+
+def format_lambda_result(name, external_duration, lambda_res):
+ result = []
+ result.append(f"==============================")
+ result.append(f"RESULTS FOR {name}")
+ result.append(f"EXTERNAL_DURATION: {external_duration}")
+ if "FunctionError" in lambda_res:
+ raise Exit(message=lambda_res["Payload"], code=1)
+ result.append("== PAYLOAD ==")
+ result.append(format_lambda_output(lambda_res["Payload"], False))
+ result.append(f"==============================")
+ return "\n".join(result)
+
+
+def run_node(
+ lambda_name: str,
+ seed_ip: str,
+ virtual_ip: str,
+ query: str,
+ nodes: int,
+ cluster_id: str,
+):
+ start_time = time.time()
+ env = {
+ "CHAPPY_CLUSTER_SIZE": nodes,
+ "CHAPPY_SEED_HOSTNAME": seed_ip,
+ "CHAPPY_SEED_PORT": 8000,
+ "CHAPPY_CLUSTER_ID": cluster_id,
+ "CHAPPY_VIRTUAL_IP": virtual_ip,
+ "RUST_LOG": "info,chappy_perforator=debug,chappy=trace,rustls=error",
+ "RUST_BACKTRACE": "1",
+ **get_otel_env(),
+ }
+ lambda_res = aws("lambda").invoke(
+ FunctionName=lambda_name,
+ Payload=json.dumps(
+ {
+ "timeout_sec": 38,
+ "query": base64.b64encode(query.encode()).decode(),
+ "env": env,
+ }
+ ).encode(),
+ InvocationType="RequestResponse",
+ LogType="None",
+ )
+ lambda_res["Payload"] = lambda_res["Payload"].read().decode()
+ return (lambda_res, time.time() - start_time)
+
+
+@task
+def distributed(c, seed, dataset=10, nodes=5):
+ """CREATE EXTERNAL TABLE and find out stored page data by url_host_registered_domain"""
+ bucket_name = core.bucket_name(c)
+ core.load_commoncrawl_index(c, dataset)
+ cluster_id = rand_cluster_id()
+ sql = f"""
+SELECT url_host_registered_domain, sum(warc_record_length) AS stored_bytes
+FROM s3Cluster('cloudfuse_cluster', 'https://{bucket_name}.s3.{AWS_REGION()}.amazonaws.com/commoncrawl/index/n{dataset}/crawl=CC-MAIN-2023-14/subset=warc/*', 'Parquet')
+GROUP BY url_host_registered_domain
+ORDER BY sum(warc_record_length) DESC
+LIMIT 10;
+"""
+
+ lambda_name = terraform_output(c, "clickhouse", "distributed_lambda_name")
+ with ThreadPoolExecutor(max_workers=nodes + 4) as ex:
+ node_futs = []
+ for i in range(1, nodes + 1):
+ node_futs.append(
+ ex.submit(
+ run_node,
+ lambda_name,
+ seed,
+ f"172.28.0.{i}",
+ sql if i == nodes else "",
+ nodes,
+ cluster_id,
+ )
+ )
+
+ for i in range(0, nodes):
+ executor_res, executor_duration = node_futs[i].result()
+ print(format_lambda_result(f"NODE-{i+1}", executor_duration, executor_res))
diff --git a/docker/ballista/distributed-handler.py b/docker/ballista/distributed-handler.py
index 547931a..aaba534 100644
--- a/docker/ballista/distributed-handler.py
+++ b/docker/ballista/distributed-handler.py
@@ -167,19 +167,18 @@ def handle_event(event) -> dict[str, Any]:
global IS_COLD_START
is_cold_start = IS_COLD_START
IS_COLD_START = False
+ if not is_cold_start:
+ raise Exception(f"Only cold starts supported")
logging.info(f"role :{event['role']}")
timeout_sec = float(event["timeout_sec"])
- if is_cold_start:
- if event["role"] == "scheduler":
- srv_proc = init_scheduler()
- elif event["role"] == "executor":
- srv_proc = init_executor(event["scheduler_ip"])
- else:
- raise Exception(f'Unknown role {event["role"]}')
+ if event["role"] == "scheduler":
+ srv_proc = init_scheduler()
+ elif event["role"] == "executor":
+ srv_proc = init_executor(event["scheduler_ip"])
else:
- raise Exception(f"Only cold starts supported")
+ raise Exception(f'Unknown role {event["role"]}')
init_duration = time.time() - start
diff --git a/docker/ballista/docker-compose.yaml b/docker/ballista/docker-compose.yaml
index d8dbcd1..9fda5e2 100644
--- a/docker/ballista/docker-compose.yaml
+++ b/docker/ballista/docker-compose.yaml
@@ -24,20 +24,7 @@ services:
context: ../..
dockerfile: docker/ballista/distributed.Dockerfile
image: cloudfuse-io/l12n:ballista-distributed
- cap_drop:
- - ALL
- read_only: true
- volumes:
- - ballista-tmp:/tmp
- entrypoint:
- - python3
- - distributed-handler.py
- environment:
- - AWS_ACCESS_KEY_ID=$LAMBDA_ACCESS_KEY_ID
- - AWS_SECRET_ACCESS_KEY=$LAMBDA_SECRET_ACCESS_KEY
- - AWS_SESSION_TOKEN=$LAMBDA_SESSION_TOKEN
- - AWS_DEFAULT_REGION=$L12N_AWS_REGION
- - DATA_BUCKET_NAME
+ # unfortunately local tests of distributed images is not supported yet
volumes:
ballista-tmp:
diff --git a/docker/clickhouse/distributed-config.xml b/docker/clickhouse/distributed-config.xml
new file mode 100644
index 0000000..ad1163f
--- /dev/null
+++ b/docker/clickhouse/distributed-config.xml
@@ -0,0 +1,1329 @@
+
+
+
+
+ true
+
+
+
+
+ trace
+ /tmp/var/log/clickhouse-server/clickhouse-server.log
+ /tmp/var/log/clickhouse-server/clickhouse-server.err.log
+
+ 1000M
+ 10
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 8123
+
+
+ 9000
+
+
+ 9004
+
+
+ 9005
+
+
+
+
+
+
+
+
+
+
+
+ 9009
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 127.0.0.1
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 4096
+
+
+ 3
+
+
+
+
+ false
+
+
+ /path/to/ssl_cert_file
+ /path/to/ssl_key_file
+
+
+ false
+
+
+ /path/to/ssl_ca_cert_file
+
+
+ none
+
+
+ 0
+
+
+ -1
+ -1
+
+
+ false
+
+
+
+
+
+
+
+
+
+ none
+ true
+ true
+ sslv2,sslv3
+ true
+
+
+
+ true
+ true
+ sslv2,sslv3
+ true
+
+
+
+ RejectCertificateHandler
+
+
+
+
+
+
+
+
+ 0
+ 0
+
+
+ 100
+
+
+ 0
+
+
+
+ 10000
+
+
+
+
+
+ 0.9
+
+
+ 4194304
+
+
+ 0
+
+
+
+
+
+ 8589934592
+
+
+ 5368709120
+
+
+
+ 1000
+
+
+ 134217728
+
+
+ 10000
+
+
+ /tmp/var/lib/clickhouse/
+
+
+
+
+
+
+ /tmp/var/lib/clickhouse/tmp/
+
+
+ 1
+ 1
+ 1
+
+
+
+
+
+ /tmp/var/lib/clickhouse/user_files/
+
+
+
+
+
+
+
+
+
+
+
+
+ /etc/clickhouse-server/users.xml
+
+
+
+ /tmp/var/lib/clickhouse/access/
+
+
+
+
+
+
+
+ false
+
+
+ false
+
+
+ false
+
+
+ false
+
+
+ false
+
+
+
+ default
+
+
+
+
+
+
+
+
+
+
+
+ default
+
+
+
+
+
+
+
+
+ true
+
+
+ false
+
+ ' | sed -e 's|.*>\(.*\)<.*|\1|')
+ wget https://github.com/ClickHouse/clickhouse-jdbc-bridge/releases/download/v$PKG_VER/clickhouse-jdbc-bridge_$PKG_VER-1_all.deb
+ apt install --no-install-recommends -f ./clickhouse-jdbc-bridge_$PKG_VER-1_all.deb
+ clickhouse-jdbc-bridge &
+
+ * [CentOS/RHEL]
+ export MVN_URL=https://repo1.maven.org/maven2/com/clickhouse/clickhouse-jdbc-bridge/
+ export PKG_VER=$(curl -sL $MVN_URL/maven-metadata.xml | grep '' | sed -e 's|.*>\(.*\)<.*|\1|')
+ wget https://github.com/ClickHouse/clickhouse-jdbc-bridge/releases/download/v$PKG_VER/clickhouse-jdbc-bridge-$PKG_VER-1.noarch.rpm
+ yum localinstall -y clickhouse-jdbc-bridge-$PKG_VER-1.noarch.rpm
+ clickhouse-jdbc-bridge &
+
+ Please refer to https://github.com/ClickHouse/clickhouse-jdbc-bridge#usage for more information.
+ ]]>
+
+
+
+
+
+
+ true
+ %%REPLICA_LIST%%
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 3600
+
+
+
+ 3600
+
+
+ 60
+
+
+
+
+
+
+
+
+
+
+
+
+ system
+
+
+ toYYYYMM(event_date)
+
+
+
+
+
+ 7500
+
+
+
+
+ system
+
+
+ toYYYYMM(event_date)
+ 7500
+
+
+
+
+ system
+
+ toYYYYMM(event_date)
+ 7500
+
+
+
+
+ system
+
+ toYYYYMM(event_date)
+ 7500
+
+
+
+
+ system
+
+ toYYYYMM(event_date)
+ 7500
+
+
+
+
+
+
+ system
+
+ 7500
+ 1000
+
+
+
+
+ system
+
+ 7000
+
+
+
+
+
+
+ engine MergeTree
+ partition by toYYYYMM(finish_date)
+ order by (finish_date, finish_time_us, trace_id)
+
+ system
+
+ 7500
+
+
+
+
+
+ system
+
+
+
+ 1000
+
+
+
+
+
+
+
+ system
+
+
+ toYYYYMM(event_date)
+ 7500
+
+
+
+
+ system
+
+
+ 7500
+ event_date
+ event_date + INTERVAL 3 DAY
+
+
+
+
+
+
+
+
+
+ *_dictionary.xml
+
+
+ *_function.xml
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ /clickhouse/task_queue/ddl
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ click_cost
+ any
+
+ 0
+ 3600
+
+
+ 86400
+ 60
+
+
+
+ max
+
+ 0
+ 60
+
+
+ 3600
+ 300
+
+
+ 86400
+ 3600
+
+
+
+
+
+ /tmp/var/lib/clickhouse/format_schemas/
+
+
+
+
+
+
+
+
+
+ false
+
+ false
+
+
+ https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docker/clickhouse/distributed-handler.py b/docker/clickhouse/distributed-handler.py
new file mode 100644
index 0000000..026f40b
--- /dev/null
+++ b/docker/clickhouse/distributed-handler.py
@@ -0,0 +1,248 @@
+import base64
+import logging
+import os
+import selectors
+import socket
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import traceback
+from contextlib import closing
+
+logging.getLogger().setLevel(logging.INFO)
+
+IS_COLD_START = True
+INTERPOLATED_CONFIG_FILE = "/tmp/interpolated-config.xml"
+
+
+class Perforator:
+ def __init__(self, bin_path):
+ self.tmp_file = tempfile.NamedTemporaryFile(mode="w+", delete=True)
+ self.proc = subprocess.Popen(
+ [bin_path],
+ stderr=self.tmp_file,
+ )
+ self.logs = ""
+
+ def _load_logs(self):
+ if self.logs == "":
+ self.proc.terminate()
+ try:
+ self.proc.communicate(timeout=5)
+ logging.info("Perforator successfully terminated")
+ except subprocess.TimeoutExpired:
+ logging.error("Perforator could not terminate properly")
+ self.proc.kill()
+ self.proc.communicate()
+ self.tmp_file.seek(0)
+ self.logs = self.tmp_file.read().strip()
+ self.tmp_file.close()
+
+ def get_logs(self) -> str:
+ self._load_logs()
+ return self.logs
+
+ def log(self, log=logging.info):
+ perf_logs_prefixed = "\n".join(
+ [f"[PERFORATOR] {line}" for line in self.get_logs().split("\n")]
+ )
+ log(f"=> PERFORATOR LOGS:\n{perf_logs_prefixed}")
+
+
+class StdLogger:
+ """A class that efficiently multiplexes std streams into logs"""
+
+ def _start_logging(self):
+ while True:
+ for key, _ in self.selector.select():
+ # read1 instead or read to avoid blocking
+ data = key.fileobj.read1()
+ if key.fileobj not in self.files:
+ raise Exception("Unexpected file desc in selector")
+ with self.lock:
+ name = self.files[key.fileobj]
+ if not data:
+ print(f"{name} - EOS", flush=True, file=sys.stderr)
+ self.selector.unregister(key.fileobj)
+ with self.lock:
+ del self.files[key.fileobj]
+ else:
+ lines = data.decode().splitlines()
+ for line in lines:
+ print(f"{name} - {line}", flush=True, file=sys.stderr)
+ with self.lock:
+ self.logs[name].extend(lines)
+
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.files = {}
+ self.logs = {}
+ self.selector = selectors.DefaultSelector()
+ self.thread = threading.Thread(target=self._start_logging, daemon=True)
+
+ def start(self):
+ """Start consuming registered streams (if any) and logging them"""
+ self.thread.start()
+
+ def add(self, name: str, file):
+ """Add a new stream with the given name"""
+ with self.lock:
+ self.files[file] = name
+ self.logs[name] = []
+ self.selector.register(file, selectors.EVENT_READ)
+
+ def get(self, name: str) -> str:
+ """Get the history of the stream for the given name"""
+ with self.lock:
+ return "\n".join(self.logs[name])
+
+
+def wait_for_socket(process_name: str, port: int):
+ c = 0
+ start_time = time.time()
+ while True:
+ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
+ s = sock.connect_ex(("localhost", port))
+ duration = time.time() - start_time
+ c += 1
+ if s == 0:
+ msg = f"{process_name} up after {duration} secs and {c} connection attempts"
+ logging.info(msg)
+ break
+ if duration >= 5:
+ raise Exception(f"{process_name} timed out after {c} connection attempts")
+ time.sleep(0.05)
+
+
+def interpolate_config(config_str: str, virtual_ip: str, cluster_size: int) -> str:
+ """Map virtual IPs to shards using the LSB"""
+ current_replica = int(virtual_ip.split(".")[-1])
+ if current_replica > cluster_size:
+ raise Exception(f"Unexpected ip {virtual_ip} for cluster size {cluster_size}")
+ cluster_hosts = [
+ "localhost" if h == current_replica else f"172.28.0.{h}"
+ for h in range(1, cluster_size + 1)
+ ]
+ replicas = [
+ f"{h}9000" for h in cluster_hosts
+ ]
+ logging.info(f"replicas interpolation: {replicas}")
+ config_str = config_str.replace("%%REPLICA_LIST%%", "\n".join(replicas))
+ #
+ # config_str = config_str.replace(
+ # "%%MACROS_CONTENT%%", f"{current_replica:02d}"
+ # )
+ return config_str
+
+
+def init() -> StdLogger:
+ srv_proc = subprocess.Popen(
+ ["clickhouse-server", f"--config-file={INTERPOLATED_CONFIG_FILE}"],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ logging.info("server starting...")
+ logger = StdLogger()
+ logger.add("server|stdout", srv_proc.stdout)
+ logger.add("server|stderr", srv_proc.stderr)
+ logger.start()
+ try:
+ wait_for_socket("server", 9000)
+ except Exception as e:
+ logging.error(str(e))
+ if srv_proc.returncode is None:
+ logging.info(f"clickhouse-server still running, terminating...")
+ srv_proc.terminate()
+ logging.info(f"clickhouse-server returncode: {srv_proc.wait(3)}")
+
+ with open("/tmp/var/log/clickhouse-server/clickhouse-server.log", "r") as f:
+ logging.info("/tmp/var/log/clickhouse-server/clickhouse-server.log")
+ logging.info(f.read())
+ raise
+ return logger
+
+
+def handle_event(event):
+ start = time.time()
+ global IS_COLD_START
+ is_cold_start = IS_COLD_START
+ IS_COLD_START = False
+ if not is_cold_start:
+ raise Exception(f"Only cold starts supported")
+
+ # TODO: config interpolation should work regardless of the subnet structure
+ cluster_size = int(os.environ["CHAPPY_CLUSTER_SIZE"])
+ virtual_ip = os.environ["CHAPPY_VIRTUAL_IP"]
+ with open("/etc/clickhouse-server/config.xml", "r") as file:
+ config_str = file.read()
+ config_str = interpolate_config(config_str, virtual_ip, cluster_size)
+ with open(INTERPOLATED_CONFIG_FILE, "w") as file:
+ config_str = file.write(config_str)
+
+ init()
+ init_duration = time.time() - start
+
+ resp = ""
+ logs = ""
+ src_command = ""
+ if "query" in event and event["query"] != "":
+ src_command = base64.b64decode(event["query"]).decode("utf-8")
+ try:
+ cli_proc = subprocess.run(
+ [
+ "clickhouse-client",
+ f"--config-file={INTERPOLATED_CONFIG_FILE}",
+ "-q",
+ src_command,
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ timeout=event["timeout_sec"],
+ )
+ resp = cli_proc.stdout.decode()
+ logs = cli_proc.stderr.decode()
+ except subprocess.TimeoutExpired as e:
+ assert e.stdout is not None, "not None if PIPE specified"
+ assert e.stderr is not None, "not None if PIPE specified"
+ resp = e.stdout.decode()
+ logs = e.stderr.decode()
+
+ else:
+ timeout = event["timeout_sec"]
+ logging.info(f"no query, running node for {timeout} sec")
+ time.sleep(timeout)
+
+ result = {
+ "resp": resp,
+ "logs": logs,
+ "parsed_queries": [src_command],
+ "context": {
+ "cold_start": is_cold_start,
+ "handler_duration_sec": time.time() - start,
+ "init_duration_sec": init_duration,
+ },
+ }
+ return result
+
+
+def handler(event, context):
+ """AWS Lambda handler
+
+ event:
+ - timeout_sec: float
+ - env: dict
+ - query: Optionl[str] (base64)
+ """
+ for key, value in event["env"].items():
+ logging.info(f"{key}={value}")
+ os.environ[key] = str(value)
+
+ perforator = Perforator("/opt/ballista/chappy-perforator")
+ try:
+ result = handle_event(event)
+ except Exception:
+ result = {"exception": traceback.format_exc()}
+ result["perforator_logs"] = perforator.get_logs()
+ return result
diff --git a/docker/clickhouse/distributed.Dockerfile b/docker/clickhouse/distributed.Dockerfile
new file mode 100644
index 0000000..80310e3
--- /dev/null
+++ b/docker/clickhouse/distributed.Dockerfile
@@ -0,0 +1,58 @@
+
+ARG CLICKHOUSE_VERSION=22.10.2.11
+ARG FUNCTION_DIR="/function"
+
+FROM rust:bullseye as chappy-build
+RUN apt update && apt install -y protobuf-compiler
+RUN mkdir /code
+WORKDIR /code
+COPY chappy/ .
+RUN cargo build --release
+
+
+FROM ubuntu:20.04 as ric-dependency
+
+ENV DEBIAN_FRONTEND=noninteractive
+
+RUN apt-get update && \
+ apt-get install -y \
+ g++ \
+ make \
+ cmake \
+ unzip \
+ python3 \
+ python3-pip \
+ libcurl4-openssl-dev
+ARG FUNCTION_DIR
+RUN mkdir -p ${FUNCTION_DIR}
+RUN pip3 install \
+ --target ${FUNCTION_DIR} \
+ awslambdaric
+COPY docker/clickhouse/distributed-handler.py ${FUNCTION_DIR}/lambda-handler.py
+
+
+FROM ghcr.io/cloudfuse-io/lambdatization:clickhouse-v$CLICKHOUSE_VERSION-patch
+ARG FUNCTION_DIR
+
+ENV RUST_LOG=warn
+ENV RUST_BACKTRACE=full
+ENV LD_PRELOAD=/opt/ballista/libchappy.so
+
+COPY --from=chappy-build /code/target/release/libchappy.so /opt/ballista/libchappy.so
+COPY --from=chappy-build /code/target/release/chappy-perforator /opt/ballista/chappy-perforator
+
+RUN apt-get update -y && \
+ apt-get install -y python3 && \
+ apt-get clean && \
+ rm -rf /var/lib/apt/lists/* && \
+ rm -rf /var/cache/apt/*
+
+COPY --from=ric-dependency ${FUNCTION_DIR} ${FUNCTION_DIR}
+COPY docker/clickhouse/distributed-config.xml /etc/clickhouse-server/config.xml
+ENV CLICKHOUSE_WATCHDOG_ENABLE=0
+RUN rm /etc/clickhouse-server/config.d/docker_related_config.xml
+
+WORKDIR ${FUNCTION_DIR}
+
+ENTRYPOINT [ "python3", "-m", "awslambdaric" ]
+CMD [ "lambda-handler.handler" ]
diff --git a/docker/clickhouse/docker-compose.yaml b/docker/clickhouse/docker-compose.yaml
index e35c6d5..a69db2a 100644
--- a/docker/clickhouse/docker-compose.yaml
+++ b/docker/clickhouse/docker-compose.yaml
@@ -1,7 +1,9 @@
version: "3.9"
services:
- clickhouse:
- build: .
+ clickhouse_standalone:
+ build:
+ context: .
+ dockerfile: standalone.Dockerfile
image: cloudfuse-io/l12n:clickhouse
cap_drop:
- ALL
@@ -24,5 +26,13 @@ services:
soft: 1024
hard: 1024
+ clickhouse_distributed:
+ build:
+ context: ../..
+ dockerfile: docker/clickhouse/distributed.Dockerfile
+ image: cloudfuse-io/l12n:clickhouse-distributed
+ # unfortunately local tests of distributed images is not supported yet
+
+
volumes:
clickhouse-tmp:
diff --git a/docker/clickhouse/lambda-handler.py b/docker/clickhouse/standalone-handler.py
similarity index 100%
rename from docker/clickhouse/lambda-handler.py
rename to docker/clickhouse/standalone-handler.py
diff --git a/docker/clickhouse/Dockerfile b/docker/clickhouse/standalone.Dockerfile
similarity index 94%
rename from docker/clickhouse/Dockerfile
rename to docker/clickhouse/standalone.Dockerfile
index 5c4803b..5796ca0 100644
--- a/docker/clickhouse/Dockerfile
+++ b/docker/clickhouse/standalone.Dockerfile
@@ -21,7 +21,7 @@ RUN mkdir -p ${FUNCTION_DIR}
RUN pip3 install \
--target ${FUNCTION_DIR} \
awslambdaric
-COPY lambda-handler.py ${FUNCTION_DIR}
+COPY standalone-handler.py ${FUNCTION_DIR}/lambda-handler.py
FROM ghcr.io/cloudfuse-io/lambdatization:clickhouse-v$CLICKHOUSE_VERSION-patch
diff --git a/infra/runtime/clickhouse/main.tf b/infra/runtime/clickhouse/main.tf
index 5b0e776..242f881 100644
--- a/infra/runtime/clickhouse/main.tf
+++ b/infra/runtime/clickhouse/main.tf
@@ -1,6 +1,8 @@
variable "region_name" {}
-variable "clickhouse_image" {}
+variable "clickhouse_standalone_image" {}
+
+variable "clickhouse_distributed_image" {}
variable "bucket_arn" {}
@@ -44,7 +46,7 @@ module "engine" {
function_base_name = "clickhouse"
region_name = var.region_name
- docker_image = var.clickhouse_image
+ docker_image = var.clickhouse_standalone_image
memory_size = 2048
timeout = 300
@@ -53,6 +55,26 @@ module "engine" {
}
+module "distributed_engine" {
+ source = "../../common/lambda"
+
+ function_base_name = "clickhouse-distributed"
+ region_name = var.region_name
+ docker_image = var.clickhouse_distributed_image
+ memory_size = 2048
+ timeout = 300
+
+ additional_policies = [aws_iam_policy.s3_access.arn]
+ environment = {
+ CHAPPY_VIRTUAL_SUBNET : "172.28.0.0/16",
+ }
+
+}
+
output "lambda_name" {
value = module.engine.lambda_name
}
+
+output "distributed_lambda_name" {
+ value = module.distributed_engine.lambda_name
+}
diff --git a/infra/runtime/clickhouse/terragrunt.hcl b/infra/runtime/clickhouse/terragrunt.hcl
index 7a70880..c8caa0f 100644
--- a/infra/runtime/clickhouse/terragrunt.hcl
+++ b/infra/runtime/clickhouse/terragrunt.hcl
@@ -29,7 +29,8 @@ terraform {
}
inputs = {
- region_name = local.region_name
- clickhouse_image = "dummy_overriden_by_before_hook"
- bucket_arn = dependency.core.outputs.bucket_arn
+ region_name = local.region_name
+ clickhouse_standalone_image = "dummy_overriden_by_before_hook"
+ clickhouse_distributed_image = "dummy_overriden_by_before_hook"
+ bucket_arn = dependency.core.outputs.bucket_arn
}