From 037b8bb0d31f8c3f144d1097b619857256c48b3a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 13 Nov 2023 21:42:06 +0100 Subject: [PATCH] Add `examples/cloud_import.py` --- CHANGES.md | 1 + examples/cloud_import.py | 129 +++++++++++++++++++++++++++++++ pyproject.toml | 1 + tests/retention/test_examples.py | 16 ++++ 4 files changed, 147 insertions(+) create mode 100644 examples/cloud_import.py diff --git a/CHANGES.md b/CHANGES.md index 84ad3118..1a80dc53 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,6 +13,7 @@ - MongoDB: Add `migr8` program from previous repository - MongoDB: Improve UX by using `ctk load table mongodb://...` - load table: Refactor to use more OO +- Add `examples/cloud_import.py` ## 2023/11/06 v0.0.2 diff --git a/examples/cloud_import.py b/examples/cloud_import.py new file mode 100644 index 00000000..62df562b --- /dev/null +++ b/examples/cloud_import.py @@ -0,0 +1,129 @@ +# Copyright (c) 2023, Crate.io Inc. +# Distributed under the terms of the AGPLv3 license, see LICENSE. +""" +About +===== + +Example program demonstrating how to load data from files using +the CrateDB Cloud Import API. + +The supported file types are CSV, JSON, Parquet, optionally with +gzip compression. They can be acquired from the local filesystem, +or from remote HTTP and AWS S3 resources. + +The program obtains a single positional argument from the command line, +the CrateDB Cloud Cluster identifier. When omitted, it will fall back +to the `CRATEDB_CLOUD_CLUSTER_ID` environment variable. + +Synopsis +======== +:: + + # Install package. + pip install 'cratedb-toolkit' + + # Log in to CrateDB Cloud. + croud login --idp azuread + + # Inquire list of available clusters. + croud clusters list + + # Invoke import of CSV and Parquet files. + python examples/cloud_import.py e1e38d92-a650-48f1-8a70-8133f2d5c400 + +""" +import logging +import os +import sys + +from dotenv import find_dotenv, load_dotenv + +from cratedb_toolkit.api.main import ManagedCluster +from cratedb_toolkit.model import InputOutputResource, TableAddress +from cratedb_toolkit.util.common import setup_logging + +logger = logging.getLogger(__name__) + + +def import_csv(cluster_id: str): + """ + Import CSV file from HTTP, derive table name from file name. + + ctk shell --command 'SELECT * FROM "nab-machine-failure" LIMIT 10;' + """ + + # Acquire database cluster resource handle. + cluster = ManagedCluster(cloud_id=cluster_id) + + # Encapsulate source parameter. + url = "https://github.com/crate/cratedb-datasets/raw/main/machine-learning/timeseries/nab-machine-failure.csv" + resource = InputOutputResource(url=url) + + # Invoke import job. Without `target` argument, the destination + # table name will be derived from the input file name. + response, success = cluster.load_table(resource=resource) + if not success: + sys.exit(1) + + +def import_parquet(cluster_id: str): + """ + Import Parquet file from HTTP, and use specific schema and table names. + + ctk shell --command 'SELECT * FROM "testdrive"."yc-201907" LIMIT 10;' + """ + + # Acquire database cluster resource handle. + cluster = ManagedCluster(cloud_id=cluster_id) + + # Encapsulate source and target parameters. + url = "https://github.com/crate/cratedb-datasets/raw/main/timeseries/yc.2019.07-tiny.parquet.gz" + resource = InputOutputResource(url=url) + target = TableAddress(schema="testdrive", table="yc-201907") + + # Invoke import job. The destination table name is explicitly specified. + response, success = cluster.load_table(resource=resource, target=target) + if not success: + sys.exit(1) + + +def obtain_cluster_id() -> str: + """ + Obtain the CrateDB Cloud Cluster identifier from the environment. + + - Use first positional argument from command line. + - Fall back to `CRATEDB_CLOUD_CLUSTER_ID` environment variable. + """ + load_dotenv(find_dotenv()) + + try: + cluster_id = sys.argv[1] + except IndexError: + cluster_id = os.environ.get("CRATEDB_CLOUD_CLUSTER_ID") + + if not cluster_id: + raise ValueError( + "Unable to obtain cluster identifier from command line or " + "`CRATEDB_CLOUD_CLUSTER_ID` environment variable" + ) + + return cluster_id + + +def main(): + """ + Obtain cluster identifier, and run program. + """ + try: + cluster_id = obtain_cluster_id() + except ValueError as ex: + logger.error(ex) + sys.exit(1) + + import_csv(cluster_id) + import_parquet(cluster_id) + + +if __name__ == "__main__": + setup_logging() + main() diff --git a/pyproject.toml b/pyproject.toml index f7da36f6..2d5d4be9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,6 +92,7 @@ dependencies = [ "crate[sqlalchemy]>=0.34", "croud==1.8", 'importlib-metadata; python_version <= "3.7"', + "python-dotenv<2", "sqlalchemy>=2", "sqlparse<0.5", ] diff --git a/tests/retention/test_examples.py b/tests/retention/test_examples.py index 010289e6..95d2be8c 100644 --- a/tests/retention/test_examples.py +++ b/tests/retention/test_examples.py @@ -1,5 +1,8 @@ # Copyright (c) 2023, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. +from unittest.mock import patch + +import responses def test_example_edit(store): @@ -18,3 +21,16 @@ def test_example_retire_cutoff(store): from examples.retention_retire_cutoff import main main(dburi=store.database.dburi) + + +@responses.activate +def test_example_cloud_import(store, cloud_cluster_mock): + """ + Verify that the program `examples/cloud_import.py` works. + """ + + from examples.cloud_import import main + + cluster_id = "e1e38d92-a650-48f1-8a70-8133f2d5c400" + with patch("examples.cloud_import.obtain_cluster_id", return_value=cluster_id): + main()