From 05fa39749143dc1ec37ed3207bc9f08107f7940e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 12 Sep 2024 16:58:02 +0200 Subject: [PATCH] MongoDB: Improve dispatching of server- vs. file-based processing --- cratedb_toolkit/api/main.py | 52 ++++++++----------------------------- 1 file changed, 11 insertions(+), 41 deletions(-) diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 5fb5489..db51691 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -125,24 +125,6 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf logger.error("Data loading failed or incomplete") return False - elif source_url_obj.scheme.startswith("file"): - if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: - return mongodb_copy_generic( - str(source_url_obj), - target_url, - transformation=transformation, - progress=True, - ) - - elif source_url_obj.scheme.startswith("http"): - if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: - return mongodb_copy_generic( - str(source_url_obj), - target_url, - transformation=transformation, - progress=True, - ) - elif source_url_obj.scheme.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy @@ -156,37 +138,25 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf logger.error("Data loading failed or incomplete") return False - elif source_url_obj.scheme.startswith("mongodb"): + elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]: if "+cdc" in source_url_obj.scheme: source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) else: - return mongodb_copy_generic( - str(source_url_obj), + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if mongodb_copy( + source_url_obj, target_url, transformation=transformation, progress=True, - ) + ): + return True + else: + logger.error("Data loading failed or incomplete") + return False + else: raise NotImplementedError("Importing resource not implemented yet") - - return False - - -def mongodb_copy_generic( - source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False -): - from cratedb_toolkit.io.mongodb.api import mongodb_copy - - if mongodb_copy( - source_url, - target_url, - transformation=transformation, - progress=progress, - ): - return True - else: - logger.error("Data loading failed or incomplete") - return False