Skip to content

An extension for celery to dispatch large amount of subtasks within a main task

License

Notifications You must be signed in to change notification settings

gsfish/celery-dispatcher

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

47 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

celery-dispatcher

An extension for celery to dispatch large amount of subtasks within a main task, and process the results separately.

Installation

pip install celery-dispatcher

Usage

NOTICE: celery-dispatcher use tls to store running info, so the pool implementation using coroutines(like eventlet/gevent) can not be used

Firstly, yield subtask and its parameters in tuple from the main task by following order:

  1. task: signature
  2. args: tupple/list
  3. kwargs: dict
  4. options: dict

Then register the result handler for each subtask using signal:

from celery import shared_task
from celery_dispatcher import dispatch
from celery_dispatcher.signals import subtask_success

@shared_task
def sqrt(i):
    return i * i

@dispatch
@shared_task
def calc():
    for i in range(10):
        yield sqrt, (i,)

@subtask_success.connect(sender=calc)
def handle_result(root_id, task_id, retval, **kwargs):
    print(retval)

Or register in the decorator directly:

from celery import shared_task
from celery_dispatcher import dispatch

@shared_task
def sqrt(i):
    return i * i

def handle_result(root_id, task_id, retval, **kwargs):
    print(retval)

@dispatch(receiver=handle_result)
@shared_task
def calc():
    for i in range(10):
        yield sqrt, (i,)

If you want to revoke all subtasks, call revoke with specific signal:

import signal
from celery import shared_task
from celery_dispatcher import dispatch

@shared_task
def subtask(i):
    ...

@dispatch
@shared_task
def maintask():
    for i in range(10):
        yield subtask, (i,)

result = maintask.delay()
result.revoke(terminate=True, signal=signal.SIGUSR1)

Options

The dispatch accepts the following parameters:

  • options: dict
  • receiver: callable
  • backend: str
  • auto_ignore: bool

General settings

dispatcher_result_backend

Default: No result backend enabled by default.

The backend used to store subtask info. Can be one of the following:

dispatcher_batch_size

Default: 100

The batch size of subtask dispatching, or the result retrieving.

dispatcher_poll_size

Default: 100

The queue size of subtasks in result retrieving. celery-dispatcher put the unfinished subtasks into a queue, polling it continuously for completion of subtasks.

dispatcher_poll_timeout

Default: 10

The default timeout for polling subtasks.

dispatcher_subtask_timeout

Default: 3600

The default timeout in seconds before celery-dispatcher gives up retrieving the result of each subtask.

dispatcher_failure_on_subtask_timeout

Default: False

Whether raise exception when subtask timeout.

dispatcher_failure_on_subtask_exception

Default: False

Whether raise exception from subtask.

dispatcher_progress_update_frequency

Default: 1

The number of steps to record. celery-dispatcher only actually store the updated progress in the background at most every N steps.

About

An extension for celery to dispatch large amount of subtasks within a main task

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages