Skip to content

Commit

Permalink
Merge pull request #7 from srugano/develop
Browse files Browse the repository at this point in the history
Updated tasks
  • Loading branch information
srugano authored Dec 16, 2024
2 parents 62d4c64 + b96b2de commit 869dfaa
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dedupe data/IMAGES -p 4
#### 1. Start a Celery Worker
In the first terminal, start a Celery worker:
```bash
watchmedo auto-restart --directory=./src/ --pattern *.py --recursive -- celery -E -A recognizeapp.c.app worker
watchmedo auto-restart --directory=./src/ --pattern *.py --recursive -- celery -A recognizeapp.c.app worker
```

#### 2. Run Deduplication with Queueing
Expand Down
10 changes: 10 additions & 0 deletions src/recognizeapp/c.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@
)

app.config_from_object("recognizeapp.c", namespace="CELERY_")
app.conf.update(
task_serializer="json", # Use JSON for task arguments
accept_content=["json"], # Accept only JSON
# result_serializer="json", # Use JSON for results
timezone="UTC", # Ensure consistent timezone
enable_utc=True, # Use UTC timestamps
task_track_started=True, # Track task start
task_time_limit=3600, # Set a timeout for tasks (in seconds)
task_acks_late=True, # Ensure task acknowledgment only after completion
)

CELERY_DEBUG = True

Expand Down
8 changes: 6 additions & 2 deletions src/recognizeapp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def convert_dict_keys_to_str(input_dict: Dict[Path, Any]) -> Dict[str, Any]:
def encode_chunk(
self: Task,
files: List[str],
chunk_id: str,
config: Dict[str, Any],
pre_encodings: Dict[str, Any],
) -> Dict[str, Any]:
Expand Down Expand Up @@ -70,8 +71,10 @@ def dedupe_chunk(
@app.task(bind=True)
def get_findings(self: Task, results: List[Dict[str, Any]], config: Dict[str, Any]) -> Dict[str, Any]:
"""Aggregate and save findings."""
# Extract only the first element (dictionary) from each tuple in results
dictionaries = [result[0] for result in results if isinstance(result, tuple) and isinstance(result[0], dict)]
ds = Dataset(config)
findings = dict(ChainMap(*results))
findings = dict(ChainMap(*dictionaries))
ds.update_findings(findings)

end_time = datetime.now()
Expand Down Expand Up @@ -103,7 +106,8 @@ def get_findings(self: Task, results: List[Dict[str, Any]], config: Dict[str, An
def get_encodings(self: Task, results: List[Dict[str, Any]], config: Dict[str, Any]) -> Dict[str, Any]:
"""Aggregate and save encodings."""
ds = Dataset(config)
encodings = dict(ChainMap(*results))
dictionaries = [result[0] for result in results if isinstance(result, tuple) and isinstance(result[0], dict)]
encodings = dict(ChainMap(*dictionaries))
ds.update_encodings(encodings)

end_time = datetime.now()
Expand Down

0 comments on commit 869dfaa

Please sign in to comment.