diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e4e4a8fb..a1d8a974 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,5 +1,5 @@ --- -name: Tests +name: "Tests: Common" on: pull_request: ~ diff --git a/.github/workflows/mongodb.yml b/.github/workflows/mongodb.yml new file mode 100644 index 00000000..4f06175b --- /dev/null +++ b/.github/workflows/mongodb.yml @@ -0,0 +1,84 @@ +--- +name: "Tests: MongoDB" + +on: + pull_request: + branches: ~ + paths: + - '.github/workflows/mongodb.yml' + - 'cratedb_toolkit/io/mongodb/**' + - 'pyproject.toml' + push: + branches: [ main ] + paths: + - '.github/workflows/mongodb.yml' + - 'cratedb_toolkit/io/mongodb/**' + - 'pyproject.toml' + + # Allow job to be triggered manually. + workflow_dispatch: + + # Run job each night after CrateDB nightly has been published. + schedule: + - cron: '0 3 * * *' + +# Cancel in-progress jobs when pushing to the same branch. +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.ref }} + +jobs: + + tests: + + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + python-version: ["3.8", "3.12"] + mongodb-version: ["2", "3", "4", "5", "6", "7"] + + env: + OS: ${{ matrix.os }} + PYTHON: ${{ matrix.python-version }} + MONGODB_VERSION: ${{ matrix.mongodb-version }} + # Do not tear down Testcontainers + TC_KEEPALIVE: true + + name: Python ${{ matrix.python-version }}, MongoDB ${{ matrix.mongodb-version }} on OS ${{ matrix.os }} + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: 'pyproject.toml' + + - name: Setup project + run: | + + # `setuptools 0.64.0` adds support for editable install hooks (PEP 660). + # https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400 + pip install "setuptools>=64" --upgrade + + # Install package in editable mode. + pip install --use-pep517 --prefer-binary --editable=.[io,test,develop] + + - name: Run linter and software tests + run: | + poe check + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + files: ./coverage.xml + flags: mongodb + env_vars: OS,PYTHON + name: codecov-umbrella + fail_ci_if_error: false diff --git a/CHANGES.md b/CHANGES.md index fc0f9e80..3bd119d5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ - Add SQL runner utility primitives to `io.sql` namespace - Add `import_csv_pandas` and `import_csv_dask` utility primitives +- Add `migr8` program from previous repository ## 2023/11/06 v0.0.2 diff --git a/cratedb_toolkit/__init__.py b/cratedb_toolkit/__init__.py index e69de29b..c37fc8d9 100644 --- a/cratedb_toolkit/__init__.py +++ b/cratedb_toolkit/__init__.py @@ -0,0 +1,11 @@ +try: + from importlib.metadata import PackageNotFoundError, version +except (ImportError, ModuleNotFoundError): # pragma:nocover + from importlib_metadata import PackageNotFoundError, version # type: ignore[assignment,no-redef,unused-ignore] + +__appname__ = "cratedb-toolkit" + +try: + __version__ = version(__appname__) +except PackageNotFoundError: # pragma: no cover + __version__ = "unknown" diff --git a/cratedb_toolkit/io/mongodb/README.md b/cratedb_toolkit/io/mongodb/README.md new file mode 100644 index 00000000..5fe5ecbb --- /dev/null +++ b/cratedb_toolkit/io/mongodb/README.md @@ -0,0 +1,188 @@ +# MongoDB → CrateDB Migration Tool + +A utility program, called `migr8`, supporting data migrations +between MongoDB and CrateDB. + + +## About + +### Details + +This tool iterates over one or multiple MongoDB collections, +and iteratively builds up a description of the schema of those +collections. + +In a second step, this description can be used to create a CrateDB table +schema, which will attempt to determine a best-fit table definition for +that schema. + +As such, this means the tool works best on collections of similarly +structured and typed data. + +### Supported MongoDB versions + +The application supports the following versions of MongoDB. + +[![Supported MongoDB versions](https://img.shields.io/badge/MongoDB-2.x%20--%207.x-blue.svg)](https://github.com/mongodb/mongo) + +If you need support for MongoDB 2.x, you will need to downgrade the `pymongo` +client driver library to version 3, like `pip install 'pymongo<4'`. + +### Installation + +Use `pip` to install the package from PyPI. +```shell +pip install --upgrade 'cratedb-toolkit[io]' +``` + +To verify if the installation worked, invoke: +```shell +migr8 --version +migr8 --help +``` + + +## Usage + +The program `migr8` offers three subcommands (`extract`, `translate`, `export`), +to conclude data transfers from MongoDB to CrateDB. Please read this section +carefully to learn how they can be used successfully. + +### Schema Extraction + +To extract a description of the schema of a collection, use the +`extract` subcommand. For example: + + migr8 extract --host localhost --port 27017 --database test_db + +After connecting to the designated MongoDB server, it will +look at the collections within that database, and will prompt you which +collections to *exclude* from analysis. + +You can then do a *full* or *partial* scan of the collection. + +A partial scan will only look at the first entry in a collection, and +thus may produce an ambiguous schema definition. It is still useful if you +already know the collection is systematically and regularly structured. + +A full scan will iterate over the entire collection and build up the +schema description. Cancelling the scan will cause the tool to output +the schema description it has built up thus far. + +For example, scanning a collection of payloads including a `ts` field, +a `sensor` field, and a `payload` object, may yield this outcome: + +```json +{ + "test": { + "count": 100000, + "document": { + "_id": { + "count": 100000, + "types": { + "OID": { + "count": 100000 + } + } + }, + "ts": { + "count": 100000, + "types": { + "DATETIME": { + "count": 100000 + } + } + }, + "sensor": { + "count": 100000, + "types": { + "STRING": { + "count": 100000 + } + } + }, + "payload": { + "count": 100000, + "types": { + "OBJECT": { + "count": 100000, + "document": { + "temp": { + "count": 100000, + "types": { + "FLOAT": { + "count": 1 + }, + "INTEGER": { + "count": 99999 + } + } + }, + "humidity": { + "count": 100000, + "types": { + "FLOAT": { + "count": 1 + }, + "INTEGER": { + "count": 99999 + } + } + } + } + } + } + } + } + } +} +``` + +This description indicates that the data is well-structured, and has +mostly consistent data-types. + + +### Schema Translation + +Once a schema description has been extracted, it can be translated +into a CrateDB schema definition using the `translate` subcommand: + + migr8 translate -i mongodb_schema.json + +This will attempt to translate the description into a best-fit CrateDB +table definition. Where datatypes are ambiguous, it will *choose the +most common datatype*. For example, the previous schema definition would +be translated into this SQL DDL statement: +```sql +CREATE TABLE IF NOT EXISTS "doc"."test" ( + "ts" TIMESTAMP WITH TIME ZONE, + "sensor" TEXT, + "payload" OBJECT (STRICT) AS ( + -- ⬇️ Types: FLOAT: 0.0%, INTEGER: 100.0% + "temp" INTEGER, + -- ⬇️ Types: FLOAT: 0.0%, INTEGER: 100.0% + "humidity" INTEGER + ) +); +``` + + +### MongoDB Collection Export + +To export a MongoDB collection to a JSON stream, use the `export` +subcommand: + + migr8 export --host localhost --port 27017 --database test_db --collection test + +This will convert the collection's records into JSON, and output the JSON to stdout. +For example, to redirect the output to a file, run: + + migr8 export --host localhost --port 27017 --database test_db --collection test > test.json + +Alternatively, use [cr8] to directly write the MongoDB collection into a CrateDB table: + + migr8 export --host localhost --port 27017 --database test_db --collection test | \ + cr8 insert-json --hosts localhost:4200 --table test + + +[cr8]: https://github.com/mfussenegger/cr8 diff --git a/cratedb_toolkit/io/mongodb/__init__.py b/cratedb_toolkit/io/mongodb/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py new file mode 100644 index 00000000..325dbba0 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -0,0 +1,95 @@ +import argparse +import json + +import pymongo +import rich +from bson.raw_bson import RawBSONDocument + +from cratedb_toolkit import __version__ +from cratedb_toolkit.io.mongodb.core import extract, translate + +from .export import export + + +def extract_parser(subargs): + parser = subargs.add_parser("extract", help="Extract a schema from a MongoDB database") + parser.add_argument("--host", default="localhost", help="MongoDB host") + parser.add_argument("--port", default=27017, help="MongoDB port") + parser.add_argument("--database", required=True, help="MongoDB database") + parser.add_argument("--collection", help="MongoDB collection to create a schema for") + parser.add_argument( + "--scan", + choices=["full", "partial"], + help="Whether to fully scan the MongoDB collections or only partially.", + ) + parser.add_argument("-o", "--out", default="mongodb_schema.json") + + +def translate_parser(subargs): + parser = subargs.add_parser( + "translate", + help="Translate a MongoDB schema definition to a CrateDB table schema", + ) + parser.add_argument("-i", "--infile", help="The JSON file to read the MongoDB schema from") + + +def export_parser(subargs): + parser = subargs.add_parser("export", help="Export a MongoDB collection as plain JSON") + parser.add_argument("--collection", required=True) + parser.add_argument("--host", default="localhost", help="MongoDB host") + parser.add_argument("--port", default=27017, help="MongoDB port") + parser.add_argument("--database", required=True, help="MongoDB database") + + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-V", + "--version", + action="version", + help="print package version of pyproject_fmt", + version=f"%(prog)s ({__version__})", + ) + subparsers = parser.add_subparsers(dest="command") + extract_parser(subparsers) + translate_parser(subparsers) + export_parser(subparsers) + return parser.parse_args() + + +def extract_to_file(args): + """ + Extract a schema or set of schemas from MongoDB collections into a JSON file. + """ + + schema = extract(args) + rich.print(f"\nWriting resulting schema to {args.out}...") + with open(args.out, "w") as out: + json.dump(schema, out, indent=4) + rich.print("[green bold]Done![/green bold]") + + +def translate_from_file(args): + """ + Read in a JSON file and extract the schema from it. + """ + + with open(args.infile) as f: + schema = json.load(f) + translate(schema) + + +def export_to_stdout(args): + client = pymongo.MongoClient(args.host, int(args.port), document_class=RawBSONDocument) + db = client[args.database] + export(db[args.collection]) + + +def main(): + args = get_args() + if args.command == "extract": + extract_to_file(args) + elif args.command == "translate": + translate_from_file(args) + elif args.command == "export": + export_to_stdout(args) diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py new file mode 100644 index 00000000..b1f40fc7 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/core.py @@ -0,0 +1,114 @@ +import re + +import pymongo +import pymongo.database +import rich +from rich.syntax import Syntax + +from .extract import extract_schema_from_collection +from .translate import translate as translate_schema + + +def parse_input_numbers(s: str): + """ + Parse an input string for numbers and ranges. + + Supports strings like '0 1 2', '0, 1, 2' as well as ranges such as + '0-2'. + """ + + options: list = [] + for option in re.split(", | ", s): + match = re.search(r"(\d+)-(\d+)", option) + if match: + lower, upper = sorted([match.group(1), match.group(2)]) + options = options + list(range(int(lower), int(upper) + 1)) + else: + try: + options.append(int(option)) + except ValueError: + pass + return options + + +def gather_collections(database): + """ + Gather a list of collections to use from a MongoDB database, based on user input. + """ + + collections = database.list_collection_names() + + tbl = rich.table.Table(show_header=True, header_style="bold blue") + tbl.add_column("Id", width=3) + tbl.add_column("Collection Name") + tbl.add_column("Estimated Size") + + for i, c in enumerate(collections): + tbl.add_row(str(i), c, str(database[c].estimated_document_count())) + + rich.print(tbl) + + rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2')") + + collections_to_ignore = parse_input_numbers(input("> ")) + filtered_collections = [] + for i, c in enumerate(collections): + if i not in collections_to_ignore: + filtered_collections.append(c) + + # MongoDB 2 does not understand `include_system_collections=False`. + filtered_collections = [item for item in filtered_collections if not item.startswith("system.")] + + return filtered_collections + + +def extract(args): + """ + Extract schemas from MongoDB collections. + + This asks the user for which collections they would like to extract, + iterates over these collections and returns a dictionary of schemas for + each of the selected collections. + """ + + rich.print("\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n") + + client: pymongo.MongoClient = pymongo.MongoClient(args.host, int(args.port)) + db: pymongo.database.Database = client.get_database(args.database) + if args.collection: + filtered_collections = [args.collection] + else: + filtered_collections = gather_collections(db) + + if not filtered_collections: + rich.print("\nExcluding all collections. Nothing to do.") + exit(0) + + if args.scan: + partial = args.scan == "partial" + else: + rich.print("\nDo a [red bold]full[/red bold] collection scan?") + rich.print("A full scan will iterate over all documents in the collection, a partial only one document. (Y/n)") + full = input("> ").strip().lower() + + partial = full != "y" + + rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan...") + + schemas = {} + for collection in filtered_collections: + schemas[collection] = extract_schema_from_collection(db[collection], partial) + return schemas + + +def translate(schema): + """ + Translate a given schema into a CrateDB compatible SQL DDL statement. + """ + rich.print("\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n") + sql_queries = translate_schema(schema) + for collection, query in sql_queries.items(): + syntax = Syntax(query, "sql") + rich.print(f"Collection [blue bold]'{collection}'[/blue bold]:") + rich.print(syntax) + rich.print() diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py new file mode 100644 index 00000000..e7df9fcf --- /dev/null +++ b/cratedb_toolkit/io/mongodb/export.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + + +""" +Export the documents from a MongoDB collection as JSON, to be ingested into CrateDB. +""" + +import calendar +import re +import sys +from datetime import datetime, timedelta + +import bsonjs +import orjson as json + +_TZINFO_RE = re.compile(r"([+\-])?(\d\d):?(\d\d)") + + +def date_converter(value): + if isinstance(value, int): + return value + dt = datetime.strptime(value[:-5], "%Y-%m-%dT%H:%M:%S.%f") + iso_match = _TZINFO_RE.match(value[-5:]) + if iso_match: + sign, hours, minutes = iso_match.groups() + tzoffset = int(hours) * 3600 + int(minutes) * 60 + if sign == "-": + dt = dt + timedelta(seconds=tzoffset) + else: + dt = dt - timedelta(seconds=tzoffset) + else: + raise Exception("Can't parse datetime string {0}".format(value)) + return calendar.timegm(dt.utctimetuple()) * 1000 + + +def timestamp_converter(value): + if len(str(value)) <= 10: + return value * 1000 + return value + + +type_converter = { + "date": date_converter, + "timestamp": timestamp_converter, + "undefined": lambda x: None, +} + + +def extract_value(value, parent_type=None): + if isinstance(value, dict): + if len(value) == 1: + for k, v in value.items(): + if k.startswith("$"): + return extract_value(v, k.lstrip("$")) + return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()} + if isinstance(value, list): + return [extract_value(v, parent_type) for v in value] + if parent_type: + converter = type_converter.get(parent_type) + if converter: + return converter(value) + return value + + +def convert(d): + newdict = {} + del d["_id"] + for k, v in d.items(): + newdict[k] = extract_value(v) + return newdict + + +def export(collection): + """ + Export a MongoDB collection's documents to standard JSON, output to stdout. + """ + for document in collection.find(): + bson_json = bsonjs.dumps(document.raw) + json_object = json.loads(bson_json) + sys.stdout.buffer.write(json.dumps(convert(json_object))) + sys.stdout.buffer.write(b"\n") diff --git a/cratedb_toolkit/io/mongodb/extract.py b/cratedb_toolkit/io/mongodb/extract.py new file mode 100644 index 00000000..98019863 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/extract.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +""" +Export a schema definition from a MongoDB collection. + +This will iterate over a collection (either totally, or partially) and build +up a description of the schema of the MongoDB collection. + +Within the schema, each field in the collection will be described with two +fields: + +- "count", being the number of entries in the collection that have this field. +- "types", being the types present for those entries. + +For each type in a field's types, it will have a count that signifies the number +of entries of that field with that data type. If it is an object, it will also +contain a schema of the object's types. If it is an array, it will contain +a list of types that are present in the arrays, as well as their counts. + +An example schema may look like: + +{ + "count": 10, + "document": { + "ts": { + "count": 10, + "types": {"DATETIME": {"count": 10} + } + }, + "payload": { + "count": 10, + "types": { + "OBJECT": { + "count": 10, + "document": { + "temp": { + "count": 10, + "types": {"FLOAT": {"count": 4}, "INTEGER": {"count": 6}} + } + } + } + } + } + } +} +""" + +import bson +from pymongo.collection import Collection +from rich import progress + +progressbar = progress.Progress( + progress.TextColumn("{task.description} ", justify="left"), + progress.BarColumn(bar_width=None), + "[progress.percentage]{task.percentage:>3.1f}% ({task.completed}/{task.total})", + "•", + progress.TimeRemainingColumn(), +) + + +def extract_schema_from_collection(collection: Collection, partial: bool): + """ + Extract a schema definition from a collection. + + If the extraction is partial, only the first document in the collection is + used to create the schema. + """ + + schema: dict = {"count": 0, "document": {}} + if partial: + count = 1 + else: + count = collection.estimated_document_count() + with progressbar: + t = progressbar.add_task(collection.name, total=count) + try: + for document in collection.find(): + schema["count"] += 1 + schema["document"] = extract_schema_from_document(document, schema["document"]) + progressbar.update(t, advance=1) + if partial: + break + except KeyboardInterrupt: + return schema + return schema + + +def extract_schema_from_document(document: dict, schema: dict): + """ + Extract and update schema definition from a given document. + """ + + for k, v in document.items(): + if k not in schema: + schema[k] = {"count": 0, "types": {}} + + item_type = get_type(v) + if item_type not in schema[k]["types"]: + if item_type == "OBJECT": + schema[k]["types"][item_type] = {"count": 0, "document": {}} + elif item_type == "ARRAY": + schema[k]["types"][item_type] = {"count": 0, "types": {}} + else: + schema[k]["types"][item_type] = {"count": 0} + + schema[k]["count"] += 1 + schema[k]["types"][item_type]["count"] += 1 + if item_type == "OBJECT": + schema[k]["types"][item_type]["document"] = extract_schema_from_document( + v, schema[k]["types"][item_type]["document"] + ) + elif item_type == "ARRAY": + schema[k]["types"][item_type]["types"] = extract_schema_from_array( + v, schema[k]["types"][item_type]["types"] + ) + return schema + + +def extract_schema_from_array(array: list, schema: dict): + """ + Extract and update a schema definition for a list. + """ + + for item in array: + t = get_type(item) + if t not in schema: + if t == "OBJECT": + schema[t] = {"count": 0, "document": {}} + elif t == "ARRAY": + schema[t] = {"count": 0, "types": {}} + else: + schema[t] = {"count": 0} + + schema[t]["count"] += 1 + if t == "OBJECT": + schema[t]["document"] = extract_schema_from_document(item, schema[t]["document"]) + elif t == "ARRAY": + schema[t]["types"] = extract_schema_from_array(item, schema[t]["types"]) + return schema + + +TYPES_MAP = { + # bson types + bson.ObjectId: "OID", + bson.datetime.datetime: "DATETIME", + bson.Timestamp: "TIMESTAMP", + bson.int64.Int64: "INT64", + # primitive types + str: "STRING", + bool: "BOOLEAN", + int: "INTEGER", + float: "FLOAT", + # collection types + list: "ARRAY", + dict: "OBJECT", +} + + +def get_type(o): + return TYPES_MAP.get(type(o), "UNKNOWN") diff --git a/cratedb_toolkit/io/mongodb/translate.py b/cratedb_toolkit/io/mongodb/translate.py new file mode 100644 index 00000000..66d2c332 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/translate.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +""" +Translate a MongoDB collection schema into a CrateDB CREATE TABLE expression. + +Given a generated MongoDB collection schema, this will translate that schema +into a CREATE TABLE statement, mapping fields to columns and the collection +name to the table name. + +In the case where there are type conflicts (for example, 40% of the values +for a field are integers, and 60% are strings), the translator will choose +the type with the greatest proportion. +""" + +from functools import reduce + +TYPES = { + "DATETIME": "TIMESTAMP WITH TIME ZONE", + "INT64": "INTEGER", + "STRING": "TEXT", + "BOOLEAN": "BOOLEAN", + "INTEGER": "INTEGER", + "FLOAT": "FLOAT", + "ARRAY": "ARRAY", + "OBJECT": "OBJECT", +} + +BASE = """ +CREATE TABLE IF NOT EXISTS "doc"."{table}" (\n{columns}\n); +""" + +COLUMN = '"{column_name}" {type}' + +OBJECT = "OBJECT ({object_type}) AS (\n{definition}\n)" + + +def get_columns_definition(columns): + columns_definition = [] + for column in columns: + if column[1]: + item = f"{column[1]}\n{column[0]}" + else: + item = column[0] + columns_definition.append(item) + return columns_definition + + +def translate_object(schema): + """ + Translate an object field schema definition into a CrateDB dynamic object column. + """ + + columns = [] + object_type = "DYNAMIC" + for fieldname, field in schema.items(): + sql_type, comment = determine_type(field) + columns.append((COLUMN.format(column_name=fieldname, type=sql_type), comment)) + columns_definition = get_columns_definition(columns) + return OBJECT.format( + object_type=object_type, + definition=",\n".join(columns_definition), + ) + + +def translate_array(schema): + """ + Translate an array field schema definition into a CrateDB array column. + """ + + subtype, comment = determine_type(schema) + if comment: + return f"{comment}\nARRAY({subtype})" + else: + return f"ARRAY({subtype})" + + +def determine_type(schema): + """ + Determine the type of a specific field schema. + """ + + types = schema.get("types", {}) + type_ = max(types, key=lambda item: types[item]["count"]) + if type_ in TYPES: + sql_type = TYPES.get(type_) + if sql_type == "OBJECT": + sql_type = translate_object(types["OBJECT"]["document"]) + elif sql_type == "ARRAY": + sql_type = translate_array(types["ARRAY"]) + + if len(types) > 1: + return (sql_type, proportion_string(types)) + return (sql_type, None) + return ("UNKNOWN", None) + + +def proportion_string(types: dict) -> str: + """ + Convert a list of types into a string explaining the proportions of each type. + """ + + total = reduce(lambda x, y: x + types[y]["count"], list(types.keys()), 0) + summary = "-- ⬇️ Types: " + proportions = [] + for type_ in types: + proportions.append(f"{type_}: {round((types[type_]['count']/total)*100, 2)}%") + return " " + (summary + ", ".join(proportions)) + + +def indent_sql(query: str) -> str: + """ + Indent an SQL query based on opening and closing brackets. + """ + + indent = 0 + lines = query.split("\n") + for idx, line in enumerate(lines): + lines[idx] = (" " * indent) + line + if len(line) >= 1: + if line[-1] == "(": + indent += 4 + elif line[-1] == ")": + indent -= 4 + return "\n".join(lines) + + +def translate(schemas): + """ + Translate a schema definition for a set of MongoDB collection schemas. + + This results in a set of CrateDB compatible CREATE TABLE expressions + corresponding to the set of MongoDB collection schemas. + """ + + tables = list(schemas.keys()) + sql_queries = {} + for tablename in tables: + collection = schemas[tablename] + columns = [] + for fieldname, field in collection["document"].items(): + sql_type, comment = determine_type(field) + if sql_type != "UNKNOWN": + columns.append((COLUMN.format(column_name=fieldname, type=sql_type), comment)) + + columns_definition = get_columns_definition(columns) + sql_queries[tablename] = indent_sql(BASE.format(table=tablename, columns=",\n".join(columns_definition))) + return sql_queries diff --git a/cratedb_toolkit/testing/testcontainers/cratedb.py b/cratedb_toolkit/testing/testcontainers/cratedb.py index 7ee6f46a..88f90a63 100644 --- a/cratedb_toolkit/testing/testcontainers/cratedb.py +++ b/cratedb_toolkit/testing/testcontainers/cratedb.py @@ -18,10 +18,12 @@ from testcontainers.core.generic import DbContainer from testcontainers.core.waiting_utils import wait_for_logs +from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer, asbool + logger = logging.getLogger(__name__) -class CrateDBContainer(DbContainer): +class CrateDBContainer(KeepaliveContainer, DbContainer): """ CrateDB database container. @@ -48,21 +50,21 @@ class CrateDBContainer(DbContainer): CRATEDB_USER = os.environ.get("CRATEDB_USER", "crate") CRATEDB_PASSWORD = os.environ.get("CRATEDB_PASSWORD", "") CRATEDB_DB = os.environ.get("CRATEDB_DB", "doc") - CRATEDB_KEEPALIVE = os.environ.get("CRATEDB_KEEPALIVE", os.environ.get("TC_KEEPALIVE", False)) + KEEPALIVE = asbool(os.environ.get("CRATEDB_KEEPALIVE", os.environ.get("TC_KEEPALIVE", False))) # TODO: Dual-port use with 4200+5432. def __init__( self, + # TODO: Use `crate/crate:nightly` by default? image: str = "crate:latest", port: int = 4200, user: Optional[str] = None, password: Optional[str] = None, dbname: Optional[str] = None, dialect: str = "crate", - keepalive: bool = False, **kwargs, ) -> None: - super(CrateDBContainer, self).__init__(image=image, **kwargs) + super().__init__(image=image, **kwargs) self._name = "testcontainers-cratedb" # -{os.getpid()} self._command = "-Cdiscovery.type=single-node -Ccluster.routing.allocation.disk.threshold_enabled=false" @@ -74,7 +76,6 @@ def __init__( self.CRATEDB_PASSWORD = password or self.CRATEDB_PASSWORD self.CRATEDB_DB = dbname or self.CRATEDB_DB - self.keepalive = keepalive or self.CRATEDB_KEEPALIVE self.port_to_expose = port self.dialect = dialect @@ -100,46 +101,3 @@ def _connect(self): # TODO: Better use a network connectivity health check? # In `testcontainers-java`, there is the `HttpWaitStrategy`. wait_for_logs(self, predicate="o.e.n.Node.*started", timeout=MAX_TRIES) - - def start(self): - """ - Improved `start()` method, supporting service-keepalive. - - In order to keep the service running where it normally would be torn down, - define the `CRATEDB_KEEPALIVE` or `TC_KEEPALIVE` environment variables. - """ - - self._configure() - - logger.info("Pulling image %s", self.image) - docker_client = self.get_docker_client() - - # Check if container is already running, and whether it should be reused. - containers_running = docker_client.client.api.containers(all=True, filters={"name": self._name}) - start_container = not containers_running - - if start_container: - logger.info("Starting CrateDB") - self._container = docker_client.run( - self.image, - command=self._command, - detach=True, - environment=self.env, - ports=self.ports, - name=self._name, - volumes=self.volumes, - **self._kwargs, - ) - else: - container_id = containers_running[0]["Id"] - self._container = docker_client.client.containers.get(container_id) - - logger.info("Container started: %s", self._container.short_id) - self._connect() - return self - - def stop(self, **kwargs): - if not self.keepalive: - logger.info("Stopping CrateDB") - return super().stop() - return None diff --git a/cratedb_toolkit/testing/testcontainers/mongodb.py b/cratedb_toolkit/testing/testcontainers/mongodb.py new file mode 100644 index 00000000..2929bba2 --- /dev/null +++ b/cratedb_toolkit/testing/testcontainers/mongodb.py @@ -0,0 +1,23 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from testcontainers.mongodb import MongoDbContainer + +from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer + + +class MongoDbContainerWithKeepalive(KeepaliveContainer, MongoDbContainer): + """ + A Testcontainer for MongoDB, honoring the `TC_KEEPALIVE` environment variable. + """ + + pass diff --git a/cratedb_toolkit/testing/testcontainers/util.py b/cratedb_toolkit/testing/testcontainers/util.py index 9114510f..d67120e4 100644 --- a/cratedb_toolkit/testing/testcontainers/util.py +++ b/cratedb_toolkit/testing/testcontainers/util.py @@ -10,8 +10,28 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import logging +import os +from typing import Any + from testcontainers.core.container import DockerContainer +logger = logging.getLogger(__name__) + + +# from sqlalchemy.util.langhelpers +# from paste.deploy.converters +def asbool(obj: Any) -> bool: + if isinstance(obj, str): + obj = obj.strip().lower() + if obj in ["true", "yes", "on", "y", "t", "1"]: + return True + elif obj in ["false", "no", "off", "n", "f", "0"]: + return False + else: + raise ValueError("String is not true/false: %r" % obj) + return bool(obj) + class ExtendedDockerContainer(DockerContainer): """ @@ -34,3 +54,81 @@ def get_real_host_address(self) -> str: For example, `172.17.0.4:9000`. """ return f"{self.get_real_host_ip()}:{self.port_to_expose}" + + +class KeepaliveContainer(DockerContainer): + """ + Improved `start()`/`stop()` methods, supporting service-keepalive. + + In order to keep the service running where it normally would be torn down, + define the `TC_KEEPALIVE` environment variable. + """ + + KEEPALIVE = asbool(os.environ.get("TC_KEEPALIVE", False)) + + def __init__( + self, + *args, + **kwargs, + ) -> None: + self.keepalive = self.KEEPALIVE + if "keepalive" in kwargs: + self.keepalive = kwargs["keepalive"] + del kwargs["keepalive"] + super().__init__(*args, **kwargs) + + def start(self): + """ + Improved `start()` method, supporting service-keepalive. + + In order to keep the service running where it normally would be torn down, + define the `CRATEDB_KEEPALIVE` or `TC_KEEPALIVE` environment variables. + """ + + self._configure() + + if self._name is None: + raise ValueError( + "KeepaliveContainer does not support unnamed containers. Use `.with_name()` to assign a name." + ) + + docker_client = self.get_docker_client() + + # Check if container is already running, and whether it should be reused. + logger.info(f"Searching for container: {self._name}") + containers = docker_client.client.api.containers(all=True, filters={"name": self._name}) + + if not containers: + logger.info(f"Creating container from image: {self.image}") + self._container = docker_client.run( + self.image, + command=self._command, + detach=True, + environment=self.env, + ports=self.ports, + name=self._name, + volumes=self.volumes, + **self._kwargs, + ) + logger.info(f"Container created: {self._container.name}") + else: + container_id = containers[0]["Id"] + container_names = containers[0]["Names"] + logger.info(f"Found container for reuse: {container_id} ({container_names})") + self._container = docker_client.client.containers.get(container_id) + container_name = self._container.name + if self._container.status != "running": + logger.info(f"Starting container: {container_id} ({container_name})") + self._container.start() + + self._connect() + return self + + def stop(self, **kwargs): + """ + Shut down container again, unless "keepalive" is enabled. + """ + if not self.keepalive: + logger.info("Stopping container") + return super().stop() + return None diff --git a/doc/sandbox.md b/doc/sandbox.md index 84af6dd7..03ebec24 100644 --- a/doc/sandbox.md +++ b/doc/sandbox.md @@ -30,10 +30,11 @@ export TC_KEEPALIVE=true poe check ``` -In order to shut down and destroy the CrateDB container, which was started by -the test suite, and was kept running by using `TC_KEEPALIVE`, use this command. +In order to shut down and destroy the auxiliary service containers, which have +been started by running the test suite, and were kept running by using +`TC_KEEPALIVE`, use this command. ```shell -docker rm --force testcontainers-cratedb +docker rm --force testcontainers-cratedb testcontainers-mongodb ``` Format code. diff --git a/pyproject.toml b/pyproject.toml index 7592cf46..bfe80a44 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,9 @@ dependencies = [ "colorlog", "crash", "crate[sqlalchemy]>=0.34", + "croud==1.7", + 'importlib-metadata; python_version <= "3.7"', + "rich<14,>=3.3.2", "sqlalchemy>=2", "sqlparse<0.5", ] @@ -103,8 +106,12 @@ develop = [ "validate-pyproject<0.16", ] io = [ + "cr8", "dask<=2023.10.1,>=2020", + "orjson<4,>=3.3.1", "pandas<3,>=1", + "pymongo<5,>=3.10.1", + "python-bsonjs<0.3,>=0.2", ] release = [ "build<2", @@ -114,9 +121,9 @@ test = [ "pytest<8", "pytest-cov<5", "pytest-mock<4", - "testcontainers<4", "testcontainers-azurite==0.0.1rc1", "testcontainers-minio==0.0.1rc1", + "testcontainers-mongodb==0.0.1rc1", ] [project.urls] changelog = "https://github.com/crate-workbench/cratedb-toolkit/blob/main/CHANGES.rst" @@ -126,6 +133,8 @@ repository = "https://github.com/crate-workbench/cratedb-toolkit" [project.scripts] cratedb-retention = "cratedb_toolkit.retention.cli:cli" cratedb-toolkit = "cratedb_toolkit.cli:cli" +ctk = "cratedb_toolkit.cli:cli" +migr8 = "cratedb_toolkit.io.mongodb.cli:main" [tool.black] line-length = 120 diff --git a/tests/io/mongodb/__init__.py b/tests/io/mongodb/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py new file mode 100644 index 00000000..e15aed83 --- /dev/null +++ b/tests/io/mongodb/test_cli.py @@ -0,0 +1,9 @@ +import os + + +def test_version(): + """ + CLI test: Invoke `migr8 --version`. + """ + exitcode = os.system("migr8 --version") # noqa: S605,S607 + assert exitcode == 0 diff --git a/tests/io/mongodb/test_core.py b/tests/io/mongodb/test_core.py new file mode 100644 index 00000000..6faa5503 --- /dev/null +++ b/tests/io/mongodb/test_core.py @@ -0,0 +1,35 @@ +import unittest + +from cratedb_toolkit.io.mongodb.core import parse_input_numbers + + +class TestInputNumberParser(unittest.TestCase): + def test_numbers(self): + s = "0 1 7 4" + parsed = parse_input_numbers(s) + self.assertEqual(parsed, [0, 1, 7, 4]) + + def test_comma_seperated_numbers(self): + s = "0, 1, 7, 4" + parsed = parse_input_numbers(s) + self.assertEqual(parsed, [0, 1, 7, 4]) + + def test_mixed_numbers(self): + s = "0 1, 7 4" + parsed = parse_input_numbers(s) + self.assertEqual(parsed, [0, 1, 7, 4]) + + def test_range(self): + s = "1-5" + parsed = parse_input_numbers(s) + self.assertEqual(parsed, [1, 2, 3, 4, 5]) + + def test_inverse_range(self): + s = "5-1" + parsed = parse_input_numbers(s) + self.assertEqual(parsed, [1, 2, 3, 4, 5]) + + def test_mixed(self): + s = "0 1, 3 5-8, 9 12-10" + parsed = parse_input_numbers(s) + self.assertEqual(parsed, [0, 1, 3, 5, 6, 7, 8, 9, 10, 11, 12]) diff --git a/tests/io/mongodb/test_extract.py b/tests/io/mongodb/test_extract.py new file mode 100644 index 00000000..c4a67d67 --- /dev/null +++ b/tests/io/mongodb/test_extract.py @@ -0,0 +1,77 @@ +import unittest + +import bson + +from cratedb_toolkit.io.mongodb import extract + + +class TestExtractTypes(unittest.TestCase): + def test_primitive_types(self): + i = {"a": "a", "b": True, "c": 3, "d": 4.4} + expected = {"a": "STRING", "b": "BOOLEAN", "c": "INTEGER", "d": "FLOAT"} + s = extract.extract_schema_from_document(i, {}) + for key, value in expected.items(): + types = list(s[key]["types"].keys()) + self.assertListEqual([value], types) + + def test_bson_types(self): + i = { + "a": bson.ObjectId("55153a8014829a865bbf700d"), + "b": bson.datetime.datetime.now(), + "c": bson.Timestamp(0, 0), + } + expected = {"a": "OID", "b": "DATETIME", "c": "TIMESTAMP"} + s = extract.extract_schema_from_document(i, {}) + for key, value in expected.items(): + types = list(s[key]["types"].keys()) + self.assertListEqual([value], types) + + def test_collection_types(self): + i = {"a": [1, 2, 3], "b": {"a": "hello world"}} + expected = {"a": "ARRAY", "b": "OBJECT"} + s = extract.extract_schema_from_document(i, {}) + for key, value in expected.items(): + types = list(s[key]["types"].keys()) + self.assertListEqual([value], types) + + def test_list_subtypes(self): + i = { + "a": ["a", "b", 3], + "b": [[1, 2, 3]], + "c": [{"a": "a"}, {"a": "b"}], + } + + subtypes = extract.extract_schema_from_array(i["a"], {}) + self.assertListEqual(["STRING", "INTEGER"], list(subtypes.keys())) + + subtypes = extract.extract_schema_from_array(i["b"], {}) + self.assertListEqual(["ARRAY"], list(subtypes.keys())) + self.assertListEqual(["INTEGER"], list(subtypes["ARRAY"]["types"].keys())) + + subtypes = extract.extract_schema_from_array(i["c"], {}) + self.assertListEqual(["OBJECT"], list(subtypes.keys())) + + def test_object_type(self): + i = {"a": {"b": "c"}} + s = extract.extract_schema_from_document(i, {}) + self.assertListEqual(["OBJECT"], list(s["a"]["types"].keys())) + + +class TestTypeCount(unittest.TestCase): + def test_multiple_of_same_type(self): + i = [{"a": 2}, {"a": 3}, {"a": 6}] + s = {} + for element in i: + s = extract.extract_schema_from_document(element, s) + self.assertEqual(len(s["a"]["types"]), 1) + self.assertEqual(s["a"]["types"]["INTEGER"]["count"], 3) + + def test_multiple_of_different_type(self): + i = [{"a": 2}, {"a": "Hello"}, {"a": True}] + s = {} + for element in i: + s = extract.extract_schema_from_document(element, s) + self.assertEqual(len(s["a"]["types"]), 3) + self.assertEqual(s["a"]["types"]["INTEGER"]["count"], 1) + self.assertEqual(s["a"]["types"]["STRING"]["count"], 1) + self.assertEqual(s["a"]["types"]["BOOLEAN"]["count"], 1) diff --git a/tests/io/mongodb/test_integration.py b/tests/io/mongodb/test_integration.py new file mode 100644 index 00000000..a170af9d --- /dev/null +++ b/tests/io/mongodb/test_integration.py @@ -0,0 +1,62 @@ +import logging +import os +import unittest +from unittest import mock + +import pymongo + +from cratedb_toolkit.io.mongodb.core import gather_collections +from cratedb_toolkit.testing.testcontainers.mongodb import MongoDbContainerWithKeepalive + +logger = logging.getLogger(__name__) + + +class TestMongoDBIntegration(unittest.TestCase): + """ + A few conditional integration test cases with MongoDB. + For providing a MongoDB instance, it uses Testcontainers for Python. + """ + + DBNAME = "testdrive" + + SKIP_IF_NOT_RUNNING = False + + @classmethod + def setUpClass(cls): + cls.startMongoDB() + cls.client = cls.mongodb.get_connection_client() + cls.db = cls.client.get_database(cls.DBNAME) + try: + server_info = cls.client.server_info() + logger.debug(f"MongoDB server info: {server_info}") + except pymongo.errors.ServerSelectionTimeoutError as ex: + if cls.SKIP_IF_NOT_RUNNING: + raise cls.skipTest(cls, reason="MongoDB server not running") from ex + else: # noqa: RET506 + raise + + @classmethod + def tearDownClass(cls): + cls.client.drop_database(cls.DBNAME) + cls.client.close() + cls.stopMongoDB() + + @classmethod + def startMongoDB(cls): + mongodb_version = os.environ.get("MONGODB_VERSION", "7") + mongodb_image = f"mongo:{mongodb_version}" + cls.mongodb = MongoDbContainerWithKeepalive(mongodb_image).with_name("testcontainers-mongodb") + cls.mongodb.start() + + @classmethod + def stopMongoDB(cls): + cls.mongodb.stop() + + def test_gather_collections(self): + """ + Verify if core method `gather_collections` works as expected. + """ + self.db.create_collection("foobar") + with mock.patch("builtins.input", return_value="unknown"): + collections = gather_collections(database=self.db) + self.assertEqual(collections, ["foobar"]) diff --git a/tests/io/mongodb/test_translate.py b/tests/io/mongodb/test_translate.py new file mode 100644 index 00000000..66977b68 --- /dev/null +++ b/tests/io/mongodb/test_translate.py @@ -0,0 +1,50 @@ +import unittest + +from cratedb_toolkit.io.mongodb import translate + + +class TestTranslate(unittest.TestCase): + def test_types_translation(self): + i = [ + ("DATETIME", "TIMESTAMP WITH TIME ZONE"), + ("INT64", "INTEGER"), + ("STRING", "TEXT"), + ("BOOLEAN", "BOOLEAN"), + ("INTEGER", "INTEGER"), + ("FLOAT", "FLOAT"), + ] + for test in i: + i = {"count": 1, "types": {test[0]: {"count": 1}}} + o, _ = translate.determine_type(i) + self.assertEqual(o, test[1]) + + def test_indeterminate_type(self): + i = { + "count": 3, + "types": { + "STRING": {"count": 1}, + "INTEGER": {"count": 1}, + "BOOLEAN": {"count": 1}, + }, + } + expected_type = "TEXT" + expected_comment = " -- ⬇️ Types: STRING: 33.33%, INTEGER: 33.33%, BOOLEAN: 33.33%" + (o_type, o_comment) = translate.determine_type(i) + self.assertEqual(o_type, expected_type) + self.assertEqual(o_comment, expected_comment) + + def test_object_translation(self): + i = { + "a": { + "count": 1, + "types": {"STRING": {"count": 1}}, + "b": {"count": 1, "types": {"DATETIME": {"count": 1}}}, + } + } + o = translate.translate_object(i) + self.assertEqual(" ".join(o.split()), 'OBJECT (DYNAMIC) AS ( "a" TEXT )') + + def test_array_translate(self): + i = {"count": 1, "types": {"STRING": {"count": 1}}} + o = translate.translate_array(i) + self.assertEqual("ARRAY(TEXT)", o)