Skip to content

Commit

Permalink
MongoDB: Improve dispatching of server- vs. file-based processing
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 12, 2024
1 parent 86b8024 commit 05fa397
Showing 1 changed file with 11 additions and 41 deletions.
52 changes: 11 additions & 41 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,6 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
logger.error("Data loading failed or incomplete")
return False

elif source_url_obj.scheme.startswith("file"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
return mongodb_copy_generic(
str(source_url_obj),
target_url,
transformation=transformation,
progress=True,
)

elif source_url_obj.scheme.startswith("http"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
return mongodb_copy_generic(
str(source_url_obj),
target_url,
transformation=transformation,
progress=True,
)

elif source_url_obj.scheme.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

Expand All @@ -156,37 +138,25 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
logger.error("Data loading failed or incomplete")
return False

elif source_url_obj.scheme.startswith("mongodb"):
elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
else:
return mongodb_copy_generic(
str(source_url_obj),
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if mongodb_copy(
source_url_obj,
target_url,
transformation=transformation,
progress=True,
)
):
return True
else:
logger.error("Data loading failed or incomplete")
return False

else:
raise NotImplementedError("Importing resource not implemented yet")

return False


def mongodb_copy_generic(
source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False
):
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if mongodb_copy(
source_url,
target_url,
transformation=transformation,
progress=progress,
):
return True
else:
logger.error("Data loading failed or incomplete")
return False

0 comments on commit 05fa397

Please sign in to comment.