Skip to content

Commit

Permalink
cfr: Add ctk cfr diagnostics program
Browse files Browse the repository at this point in the history
Add basic implementation for `sys-export` and `sys-import` subcommands.

It is about exporting system tables of CrateDB into SQL DDL and JSONL
files, and re-importing them for later analysis.
  • Loading branch information
amotl committed Apr 17, 2024
1 parent 1e41dc3 commit c3222b3
Show file tree
Hide file tree
Showing 14 changed files with 483 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ __pycache__
dist
.coverage*
coverage.xml
/cfr
/foo
/tmp
/DOWNLOAD
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


## Unreleased
- Add `cratedb-wtf` diagnostics program
- Add `ctk cfr` and `ctk wtf` diagnostics programs

## 2024/04/10 v0.0.10
- Dependencies: Unpin upper version bound of `dask`. Otherwise, compatibility
Expand Down
61 changes: 61 additions & 0 deletions cratedb_toolkit/cfr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# CrateDB Cluster Flight Recorder (CFR)

Collect required cluster information for support requests
and self-service debugging.


## Synopsis

Define CrateDB database cluster address.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://localhost/
```

Export system table information into timestamped file,
by default into the `cfr/sys` directory.
```shell
ctk cfr sys-export
```


## Usage

Export system table information into given directory.
```shell
ctk cfr sys-export file:///var/ctk/cfr/sys
```

Import system table information from given directory.
```shell
ctk cfr sys-import file://./cfr/sys/2024-04-16T05-43-37
```

In order to define the CrateDB database address on the
command line, use a command like this.
```shell
ctk cfr --cratedb-sqlalchemy-url=crate://localhost/ sys-export
```


## OCI

If you don't want or can't install the program, you can also use its OCI
container image, for example on Docker, Postman, or Kubernetes.

Optionally, start a CrateDB single-node instance for testing purposes.
```shell
docker run --rm -it \
--name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=4g \
crate/crate:nightly -Cdiscovery.type=single-node
```

Define the database URI address, and an alias to the `cfr` program.
```shell
echo "CRATEDB_SQLALCHEMY_URL=crate://localhost/" > .env
alias cfr="docker run --rm -it --network=host --volume=$(PWD)/cfr:/cfr --env-file=.env ghcr.io/crate-workbench/cratedb-toolkit:latest ctk cfr"
```

Verify everything works.
```shell
cfr --help
```
Empty file added cratedb_toolkit/cfr/__init__.py
Empty file.
15 changes: 15 additions & 0 deletions cratedb_toolkit/cfr/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# CFR Backlog

## Iteration +1
- sys-export: Does the program need capabilities to **LIMIT** cardinality
on `sys-export` operations, for example, when they are super large?
- sys-import: Accept target database schema.
- Combine with `ctk wtf info`
- Converge output into tar archive

## Iteration +2
- Cluster name muss in `cfr/<name>/sys/<timestamp>`, für multi-tenancy operations.

## Iteration +3
- Wie komme ich ans `crate.yaml`?
- Wie komme ich an die Logfiles? `docker log`?
67 changes: 67 additions & 0 deletions cratedb_toolkit/cfr/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import sys

import click
from click_aliases import ClickAliasedGroup

from cratedb_toolkit.cfr.systable import SystemTableExporter, SystemTableImporter
from cratedb_toolkit.util.cli import (
boot_click,
error_logger,
make_command,
)
from cratedb_toolkit.util.data import jd, path_from_url

logger = logging.getLogger(__name__)


cratedb_sqlalchemy_option = click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)


@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
@cratedb_sqlalchemy_option
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information")
@click.version_option()
@click.pass_context
def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool):
"""
Diagnostics and informational utilities.
"""
if not cratedb_sqlalchemy_url:
logger.error("Unable to operate without database address")
sys.exit(1)

Check warning on line 38 in cratedb_toolkit/cfr/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/cli.py#L37-L38

Added lines #L37 - L38 were not covered by tests
ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub})
return boot_click(ctx, verbose, debug)


@make_command(cli, "sys-export")
@click.argument("target", envvar="CFR_TARGET", type=str, required=False, default="file://./cfr/sys")
@click.pass_context
def sys_export(ctx: click.Context, target: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableExporter(dburi=cratedb_sqlalchemy_url, target=path_from_url(target))
path = stc.save()
jd({"path": str(path)})
except Exception as ex:
error_logger(ctx)(ex)
sys.exit(1)

Check warning on line 54 in cratedb_toolkit/cfr/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/cli.py#L52-L54

Added lines #L52 - L54 were not covered by tests


@make_command(cli, "sys-import")
@click.argument("source", envvar="CFR_SOURCE", type=str, required=True)
@click.pass_context
def sys_import(ctx: click.Context, source: str):
cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"]
try:
stc = SystemTableImporter(dburi=cratedb_sqlalchemy_url, source=path_from_url(source))
stc.load()
except Exception as ex:
error_logger(ctx)(ex)
sys.exit(1)

Check warning on line 67 in cratedb_toolkit/cfr/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/cli.py#L61-L67

Added lines #L61 - L67 were not covered by tests
222 changes: 222 additions & 0 deletions cratedb_toolkit/cfr/systable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"""
CrateDB Diagnostics: System Tables Exporter and Importer.
Schemas and results of following queries should be included:
```sql
SELECT * FROM sys.cluster
SELECT * FROM sys.nodes
SELECT * FROM sys.shards
SELECT * FROM sys.allocations
SELECT * FROM sys.jobs_log
SELECT * FROM sys.operations_log
```
https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/inspection-reflection.html
https://docs.sqlalchemy.org/en/20/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string
"""

import datetime as dt
import logging
import typing as t
from pathlib import Path

import polars as pl
import sqlalchemy as sa
from tqdm import tqdm

from cratedb_toolkit.sqlalchemy.patch import patch_encoder
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.cli import error_logger

logger = logging.getLogger(__name__)


DataFormat = t.Literal["csv", "jsonl", "ndjson", "parquet"]


class SystemTableKnowledge:
"""
Manage a few bits of knowledge about CrateDB internals.
"""

# Name of CrateDB's schema for system tables.
SYS_SCHEMA = "sys"

# TODO: Reflecting the `summits` table raises an error.
# AttributeError: 'UserDefinedType' object has no attribute 'get_col_spec'
REFLECTION_BLOCKLIST = ["summits"]


class ExportSettings:
"""
Manage a few bits of knowledge about how to export system tables from CrateDB.
"""

# Subdirectories where to store schema vs. data information.
SCHEMA_PATH = "schema"
DATA_PATH = "data"

# The filename prefix when storing tables to disk.
TABLE_FILENAME_PREFIX = "sys-"


class SystemTableInspector:
"""
Reflect schema information from CrateDB system tables.
"""

def __init__(self, dburi: str):
self.dburi = dburi
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.engine = self.adapter.engine
self.inspector = sa.inspect(self.engine)

def table_names(self):
return self.inspector.get_table_names(schema=SystemTableKnowledge.SYS_SCHEMA)

def ddl(self, tablename_in: str, tablename_out: str, out_schema: str = None, with_drop_table: bool = False) -> str:
meta = sa.MetaData(schema=SystemTableKnowledge.SYS_SCHEMA)
table = sa.Table(tablename_in, meta, autoload_with=self.engine)
table.schema = out_schema
table.name = tablename_out
sql = ""
if with_drop_table:
sql += sa.schema.DropTable(table, if_exists=True).compile(self.engine).string.strip() + ";\n"

Check warning on line 84 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L84

Added line #L84 was not covered by tests
sql += sa.schema.CreateTable(table, if_not_exists=True).compile(self.engine).string.strip() + ";\n"
return sql


class SystemTableExporter:
"""
Export schema and data from CrateDB system tables.
"""

def __init__(self, dburi: str, target: t.Union[Path], data_format: DataFormat = "jsonl"):
self.dburi = dburi
self.target = target
self.data_format = data_format
self.adapter = DatabaseAdapter(dburi=self.dburi)
self.engine = self.adapter.engine
self.inspector = SystemTableInspector(dburi=self.dburi)
self.target.mkdir(exist_ok=True, parents=True)

def read_table(self, tablename: str) -> pl.DataFrame:
sql = f'SELECT * FROM "{SystemTableKnowledge.SYS_SCHEMA}"."{tablename}"' # noqa: S608
# logger.info(f"Running SQL: {sql}") # noqa: ERA001
return pl.read_database(
query=sql, # noqa: S608
connection=self.engine,
)

def dump_table(self, frame: pl.DataFrame, file: t.Union[t.TextIO, None] = None):
if self.data_format == "csv":
# polars.exceptions.ComputeError: CSV format does not support nested data
# return df.write_csv() # noqa: ERA001
return frame.to_pandas().to_csv(file)

Check warning on line 115 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L115

Added line #L115 was not covered by tests
elif self.data_format in ["jsonl", "ndjson"]:
return frame.write_ndjson(file and file.buffer) # type: ignore[arg-type]
elif self.data_format in ["parquet", "pq"]:
return frame.write_parquet(file and file.buffer) # type: ignore[arg-type]

Check warning on line 119 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L118-L119

Added lines #L118 - L119 were not covered by tests
else:
raise NotImplementedError(f"Output format not implemented: {self.data_format}")

Check warning on line 121 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L121

Added line #L121 was not covered by tests

def save(self) -> Path:
timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
path = self.target / timestamp
logger.info(f"Exporting system tables to: {path}")
system_tables = self.inspector.table_names()
path_schema = path / ExportSettings.SCHEMA_PATH
path_data = path / ExportSettings.DATA_PATH
path_schema.mkdir(parents=True, exist_ok=True)
path_data.mkdir(parents=True, exist_ok=True)
table_count = 0
for tablename in tqdm(system_tables, disable=None):
if tablename in SystemTableKnowledge.REFLECTION_BLOCKLIST:
continue

table_count += 1

path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"
tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}"

# Write schema file.
with open(path_table_schema, "w") as fh_schema:
print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema)

# Write data file.
df = self.read_table(tablename=tablename)
if df.is_empty():
continue
mode = "w"
if self.data_format in ["parquet", "pq"]:
mode = "wb"

Check warning on line 153 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L153

Added line #L153 was not covered by tests
with open(path_table_data, mode) as fh_data:
self.dump_table(frame=df, file=t.cast(t.TextIO, fh_data))

logger.info(f"Successfully exported {table_count} system tables")
return path


class SystemTableImporter:
"""
Import schema and data about CrateDB system tables.
"""

def __init__(self, dburi: str, source: Path, data_format: DataFormat = "jsonl", debug: bool = False):
self.dburi = dburi
self.source = source
self.data_format = data_format
self.debug = debug
self.adapter = DatabaseAdapter(dburi=self.dburi)

Check warning on line 171 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L167-L171

Added lines #L167 - L171 were not covered by tests

def table_names(self):
path_schema = self.source / ExportSettings.SCHEMA_PATH
names = []
for item in path_schema.glob("*.sql"):
name = item.name.replace(ExportSettings.TABLE_FILENAME_PREFIX, "").replace(".sql", "")
names.append(name)
return names

Check warning on line 179 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L174-L179

Added lines #L174 - L179 were not covered by tests

def load(self):
path_schema = self.source / ExportSettings.SCHEMA_PATH
path_data = self.source / ExportSettings.DATA_PATH

Check warning on line 183 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L182-L183

Added lines #L182 - L183 were not covered by tests

if not path_schema.exists():
raise FileNotFoundError(f"Path does not exist: {path_schema}")

Check warning on line 186 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L185-L186

Added lines #L185 - L186 were not covered by tests

logger.info(f"Importing system tables from: {self.source}")

Check warning on line 188 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L188

Added line #L188 was not covered by tests

for tablename in tqdm(self.table_names()):
tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename

Check warning on line 191 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L190-L191

Added lines #L190 - L191 were not covered by tests

path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"

Check warning on line 194 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L193-L194

Added lines #L193 - L194 were not covered by tests

# Skip import of non-existing or empty files.
if not path_table_data.exists() or path_table_data.stat().st_size == 0:
continue

Check warning on line 198 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L197-L198

Added lines #L197 - L198 were not covered by tests

# Invoke SQL DDL.
schema_sql = path_table_schema.read_text()
self.adapter.run_sql(schema_sql)

Check warning on line 202 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L201-L202

Added lines #L201 - L202 were not covered by tests

# Load data.
try:
df: pl.DataFrame = self.load_table(path_table_data)
df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append")
except Exception as ex:
error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}")

Check warning on line 209 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L205-L209

Added lines #L205 - L209 were not covered by tests

# df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501

def load_table(self, path: Path) -> pl.DataFrame:
if path.suffix in [".jsonl"]:
return pl.read_ndjson(path)
elif path.suffix in [".parquet", ".pq"]:
return pl.read_parquet(path)

Check warning on line 217 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L214-L217

Added lines #L214 - L217 were not covered by tests
else:
raise NotImplementedError(f"Input format not implemented: {path.suffix}")

Check warning on line 219 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L219

Added line #L219 was not covered by tests


patch_encoder()
Loading

0 comments on commit c3222b3

Please sign in to comment.