From d75d2607e4a22e037bfde40dcdefcf49d9095c72 Mon Sep 17 00:00:00 2001 From: Allan Stockman Rugano Date: Fri, 13 Dec 2024 20:16:24 +0100 Subject: [PATCH 1/2] Updated tasks --- src/recognizeapp/c.py | 10 ++++++++++ src/recognizeapp/tasks.py | 1 + 2 files changed, 11 insertions(+) diff --git a/src/recognizeapp/c.py b/src/recognizeapp/c.py index a6ec33b..e661232 100644 --- a/src/recognizeapp/c.py +++ b/src/recognizeapp/c.py @@ -7,6 +7,16 @@ ) app.config_from_object("recognizeapp.c", namespace="CELERY_") +app.conf.update( + task_serializer="json", # Use JSON for task arguments + accept_content=["json"], # Accept only JSON + # result_serializer="json", # Use JSON for results + timezone="UTC", # Ensure consistent timezone + enable_utc=True, # Use UTC timestamps + task_track_started=True, # Track task start + task_time_limit=3600, # Set a timeout for tasks (in seconds) + task_acks_late=True, # Ensure task acknowledgment only after completion +) CELERY_DEBUG = True diff --git a/src/recognizeapp/tasks.py b/src/recognizeapp/tasks.py index 3bff185..ffe58f9 100644 --- a/src/recognizeapp/tasks.py +++ b/src/recognizeapp/tasks.py @@ -34,6 +34,7 @@ def convert_dict_keys_to_str(input_dict: Dict[Path, Any]) -> Dict[str, Any]: def encode_chunk( self: Task, files: List[str], + chunk_id: str, config: Dict[str, Any], pre_encodings: Dict[str, Any], ) -> Dict[str, Any]: From b96b2de4e6c336f6b3cd4cc332e1b952af347e36 Mon Sep 17 00:00:00 2001 From: Allan Stockman Rugano Date: Sat, 14 Dec 2024 14:02:20 +0100 Subject: [PATCH 2/2] Task fix --- README.md | 2 +- src/recognizeapp/tasks.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index b88802e..23b5948 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ dedupe data/IMAGES -p 4 #### 1. Start a Celery Worker In the first terminal, start a Celery worker: ```bash -watchmedo auto-restart --directory=./src/ --pattern *.py --recursive -- celery -E -A recognizeapp.c.app worker +watchmedo auto-restart --directory=./src/ --pattern *.py --recursive -- celery -A recognizeapp.c.app worker ``` #### 2. Run Deduplication with Queueing diff --git a/src/recognizeapp/tasks.py b/src/recognizeapp/tasks.py index ffe58f9..d462541 100644 --- a/src/recognizeapp/tasks.py +++ b/src/recognizeapp/tasks.py @@ -71,8 +71,10 @@ def dedupe_chunk( @app.task(bind=True) def get_findings(self: Task, results: List[Dict[str, Any]], config: Dict[str, Any]) -> Dict[str, Any]: """Aggregate and save findings.""" + # Extract only the first element (dictionary) from each tuple in results + dictionaries = [result[0] for result in results if isinstance(result, tuple) and isinstance(result[0], dict)] ds = Dataset(config) - findings = dict(ChainMap(*results)) + findings = dict(ChainMap(*dictionaries)) ds.update_findings(findings) end_time = datetime.now() @@ -104,7 +106,8 @@ def get_findings(self: Task, results: List[Dict[str, Any]], config: Dict[str, An def get_encodings(self: Task, results: List[Dict[str, Any]], config: Dict[str, Any]) -> Dict[str, Any]: """Aggregate and save encodings.""" ds = Dataset(config) - encodings = dict(ChainMap(*results)) + dictionaries = [result[0] for result in results if isinstance(result, tuple) and isinstance(result[0], dict)] + encodings = dict(ChainMap(*dictionaries)) ds.update_encodings(encodings) end_time = datetime.now()