Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Data Storage #125

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions clickhouse/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM clickhouse/clickhouse-server:24.1.2.1

# Set environment variables
ENV CLICKHOUSE_CONFIG=/etc/clickhouse-server/config.xml
ENV CLICKHOUSE_USER_CONFIG=/etc/clickhouse-server/users.xml

# Copy custom configuration file
COPY config.yaml /etc/clickhouse-server/config.yaml

# Expose ClickHouse ports
EXPOSE 8123 9000 9009

# Set the default command
CMD ["clickhouse-server"]
57 changes: 57 additions & 0 deletions clickhouse/clickhouse_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import clickhouse_connect
from clickhouse_connect.driver.client import Client
import argparse
import time
import pathlib

OUTPUT_PATH = "/parquet-data/processed"


def get_tables(client: Client, network: str, schema: str):
query = f"select name from system.tables where database = '{schema}'"
return client.query(query).result_rows


def export_table(client: Client, network: str, schema: str, table: str):
query = f"select * from {schema}.{table}"
file_name = f"{OUTPUT_PATH}/{schema}/{table}.parquet"
stream = client.raw_stream(query, fmt="Parquet")
with open(file_name, "wb") as f:
for chunk in stream:
f.write(chunk)


def export_data(client: Client, network: str, schema: str):
start_time = time.time()

# Create the schema directory if it doesn't exist
pathlib.Path(f"{OUTPUT_PATH}/{schema}").mkdir(parents=True, exist_ok=True)

# Get the tables in the schema
tables = get_tables(client, network, schema)

# Export each table
for table in tables:
table_name = table[0]
export_table(client, network, schema, table_name)

end_time = time.time()
print(f"Exported {len(tables)} tables in {end_time - start_time} seconds")


def main(network: str, target: str):
client = clickhouse_connect.get_client(host="clickhouse", port=8123, user="default")

raw_schema = f"{target}_raw_{network}"
processed_schema = f"{target}_{network}"

export_data(client, network, raw_schema)
export_data(client, network, processed_schema)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--network", type=str, required=True)
parser.add_argument("--target", type=str, required=True)
args = parser.parse_args()
main(args.network, args.target)
69 changes: 69 additions & 0 deletions clickhouse/clickhouse_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import clickhouse_connect
from clickhouse_connect.driver.client import Client
import argparse
import os
import re

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data"
BASE_PATH = "/parquet-data"


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case


def get_event_list(root: str, network: str) -> set[str]:
data_path = f"{BASE_PATH}/{root}/{network}"
event_list = set()
for root, dirs, files in os.walk(f"{data_path}"):
for file in files:
if file.endswith(".parquet"):
event_name = file.split(".")[0]
event_list.add(event_name)
print(f"Found {len(event_list)} events for {network}")
return event_list


def create_table(
client: Client,
event_name: str,
network: str,
root: str,
hive_partition: bool = False,
):
table_name = f"{network}.{convert_case(event_name)}"
if hive_partition:
file_path = (
f"{CLICKHOUSE_INTERNAL_PATH}/{root}/{network}/*/{event_name}.parquet"
)
else:
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{root}/{network}/{event_name}.parquet"
query = (
f"create table if not exists {table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{file_path}', 'Parquet')"
)
print(query)
client.command(query)


def import_data(client: Client, network: str, path: str, hive_partition: bool = False):
event_list = get_event_list(path, network)
for event_name in event_list:
create_table(client, event_name, network, path, hive_partition)


def main(network: str):
client = clickhouse_connect.get_client(host="clickhouse", port=8123, user="default")
client.command(f"create database if not exists {network}")

import_data(client, network, "indexed", hive_partition=True)
import_data(client, network, "clean")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--network", type=str, required=True)
args = parser.parse_args()
main(args.network)
Empty file added clickhouse/config.yaml
Empty file.
1 change: 1 addition & 0 deletions clickhouse/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
clickhouse-connect
30 changes: 24 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
version: "2.4"

services:
clickhouse:
image: clickhouse/clickhouse-server:latest
container_name: clickhouse
networks:
- data
ulimits:
nofile:
soft: 262144
hard: 262144
volumes:
- ./parquet-data:/var/lib/clickhouse/user_files/parquet-data
ports:
- 8123:8123
- 9000:9000
- 9009:9009
deploy:
resources:
limits:
cpus: "4.0"
memory: 8192M
db:
build:
context: ./postgres
Expand Down Expand Up @@ -80,16 +100,12 @@ services:
context: ./indexers/arbitrum-sepolia
networks:
- data
depends_on:
- db
restart: always
environment:
DB_HOST: db
DB_PORT: 5432
DB_NAME: arbitrum_sepolia
DB_PASS: $PG_PASSWORD
GQL_PORT: 4350
RPC_ENDPOINT: https://sepolia-rollup.arbitrum.io/rpc
volumes:
- ./parquet-data:/parquet-data

arbitrum-mainnet-processor:
build:
Expand Down Expand Up @@ -162,6 +178,7 @@ services:
context: ./transformers
depends_on:
- db
- clickhouse
environment:
PG_PASSWORD: $PG_PASSWORD
volumes:
Expand Down Expand Up @@ -206,6 +223,7 @@ services:
- data
depends_on:
- db
- clickhouse
env_file:
- .env
environment:
Expand Down
4 changes: 2 additions & 2 deletions indexers/arbitrum-sepolia/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ WORKDIR /app

COPY package*.json ./

RUN npm install
RUN npm ci

COPY . .

RUN npm run generate:processor
RUN npm run build

CMD npm run generate:migration ; npm run start
CMD npm run start
58 changes: 1 addition & 57 deletions indexers/arbitrum-sepolia/commands.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,77 +5,25 @@
"description": "delete all build artifacts",
"cmd": ["npx", "--yes", "rimraf", "lib"]
},
"generate": {
"description": "Generate a squid from an ABI file",
"cmd": ["squid-gen-abi"]
},
"squid-gen-abi": {
"description": "Generate a squid from an ABI file",
"cmd": ["squid-gen-abi"],
"hidden": true
},
"build": {
"description": "Build the squid project",
"deps": ["clean"],
"cmd": ["tsc"]
},
"up": {
"description": "Start a PG database",
"cmd": ["docker-compose", "up", "-d"]
},
"down": {
"description": "Drop a PG database",
"cmd": ["docker-compose", "down"]
},
"migration:apply": {
"description": "Apply the DB migrations",
"cmd": ["squid-typeorm-migration", "apply"]
},
"migration:generate": {
"description": "Generate a DB migration matching the TypeORM entities",
"deps": ["build", "migration:clean"],
"cmd": ["squid-typeorm-migration", "generate"],
},
"migration:clean": {
"description": "Clean the migrations folder",
"cmd": ["npx", "--yes", "rimraf", "./db/migrations"],
},
"migration": {
"deps": ["build"],
"cmd": ["squid-typeorm-migration", "generate"],
"hidden": true
},
"codegen": {
"description": "Generate TypeORM entities from the schema file",
"cmd": ["squid-typeorm-codegen"]
},
"typegen": {
"description": "Generate data access classes for an ABI file(s) in the ./abi folder",
"cmd": ["squid-evm-typegen", "./src/abi", {"glob": "./abi/*.json"}, "--multicall"]
},
"process": {
"description": "Load .env and start the squid processor",
"deps": ["build", "migration:apply"],
"deps": ["build"],
"cmd": ["node", "--require=dotenv/config", "lib/main.js"]
},
"process:prod": {
"description": "Start the squid processor",
"deps": ["migration:apply"],
"cmd": ["node", "lib/main.js"],
"hidden": true
},
"serve": {
"description": "Start the GraphQL API server",
"cmd": ["squid-graphql-server"]
},
"serve:prod": {
"description": "Start the GraphQL API server with caching and limits",
"cmd": ["squid-graphql-server",
"--dumb-cache", "in-memory",
"--dumb-cache-ttl", "1000",
"--dumb-cache-size", "100",
"--dumb-cache-max-age", "1000" ]
},
"check-updates": {
"cmd": ["npx", "--yes", "npm-check-updates", "--filter=/subsquid/", "--upgrade"],
"hidden": true
Expand All @@ -84,10 +32,6 @@
"description": "Bump @subsquid packages to the latest versions",
"deps": ["check-updates"],
"cmd": ["npm", "i", "-f"]
},
"open": {
"description": "Open a local browser window",
"cmd": ["npx", "--yes", "opener"]
}
}
}
Loading
Loading