diff --git a/trolldb/cli.py b/trolldb/cli.py index d5e1a01..91536b2 100644 --- a/trolldb/cli.py +++ b/trolldb/cli.py @@ -12,6 +12,30 @@ from trolldb.database.mongodb import MongoDB, mongodb_context +async def delete_uri_from_collection(collection, uri) -> int: + """Deletes a document from collection and logs the deletion. + + Args: + collection: + The collection object which includes the document to delete. + uri: + The URI used to query the collection. It can be either a URI of a previously recorded file message or + a dataset message. + + Returns: + Number of deleted documents. + """ + del_result_file = await collection.delete_many({"uri": uri}) + if del_result_file.deleted_count == 1: + logger.info(f"Deleted one document (file) with uri: {uri}") + + del_result_dataset = await collection.delete_many({"dataset.uri": uri}) + if del_result_dataset.deleted_count == 1: + logger.info(f"Deleted one document (dataset) with uri: {uri}") + + return del_result_file.deleted_count + del_result_dataset.deleted_count + + async def record_messages(config: AppConfig): """Record the metadata of messages into the database.""" async with mongodb_context(config.database): @@ -20,16 +44,19 @@ async def record_messages(config: AppConfig): ) for m in create_subscriber_from_dict_config(config.subscriber).recv(): msg = Message.decode(str(m)) - if msg.type in ["file", "dataset"]: - await collection.insert_one(msg.data) - logger.info(f"Inserted file with uri: {msg.data["uri"]}") - elif msg.type == "del": - deletion_result = await collection.delete_many({"uri": msg.data["uri"]}) - logger.info(f"Deleted document with uri: {msg.data["uri"]}") - if deletion_result.deleted_count != 1: - logger.error(f"Recorder found multiple deletions for uri: {msg.data["uri"]}!") - else: - logger.debug(f"Don't know what to do with {msg.type} message.") + match msg.type: + case "file": + await collection.insert_one(msg.data) + logger.info(f"Inserted file with uri: {msg.data["uri"]}") + case "dataset": + await collection.insert_one(msg.data) + logger.info(f"Inserted dataset with {len(msg.data["dataset"])} elements.") + case "del": + deletion_count = await delete_uri_from_collection(collection, msg.data["uri"]) + if deletion_count >= 1: + logger.error(f"Recorder found multiple deletions for uri: {msg.data["uri"]}!") + case _: + logger.debug(f"Don't know what to do with {msg.type} message.") async def record_messages_from_config(config_file: FilePath): diff --git a/trolldb/tests/test_recorder.py b/trolldb/tests/test_recorder.py index d54bb6e..5f3a20f 100644 --- a/trolldb/tests/test_recorder.py +++ b/trolldb/tests/test_recorder.py @@ -5,7 +5,12 @@ from posttroll.testing import patched_subscriber_recv from pytest_lazy_fixtures import lf -from trolldb.cli import record_messages, record_messages_from_command_line, record_messages_from_config +from trolldb.cli import ( + delete_uri_from_collection, + record_messages, + record_messages_from_command_line, + record_messages_from_config, +) from trolldb.database.mongodb import MongoDB, mongodb_context from trolldb.test_utils.common import AppConfig, create_config_file, make_test_app_config, test_app_config from trolldb.test_utils.mongodb_instance import running_prepared_database_context @@ -63,8 +68,12 @@ async def message_in_database_and_delete_count_is_one(msg) -> bool: collection = await MongoDB.get_collection("test_database", "test_collection") result = await collection.find_one(dict(scan_mode="EW")) result.pop("_id") - deletion_result = await collection.delete_many({"uri": msg.data["uri"]}) - return result == msg.data and deletion_result.deleted_count == 1 + uri = msg.data.get("uri") + if not uri: + uri = msg.data["dataset"][0]["uri"] + deletion_count = await delete_uri_from_collection(collection, uri) + + return result == msg.data and deletion_count == 1 @pytest.mark.parametrize(("function", "args"), [ @@ -101,13 +110,12 @@ async def test_record_deletes_message(tmp_path, file_message, del_message): result = await collection.find_one(dict(scan_mode="EW")) assert result is None + async def test_record_dataset_messages(tmp_path, dataset_message): - """Test recording a dataset message and deleting the file.""" + """Tests recording a dataset message and deleting the file.""" config = AppConfig(**make_test_app_config(tmp_path)) + msg = Message.decode(dataset_message) with running_prepared_database_context(): with patched_subscriber_recv([dataset_message]): await record_messages(config) - async with mongodb_context(config.database): - collection = await MongoDB.get_collection("mock_database", "mock_collection") - result = await collection.find_one(dict(scan_mode="EW")) - assert result is not None + assert await message_in_database_and_delete_count_is_one(msg)