diff --git a/README.md b/README.md index a82e0b8..3c307d8 100644 --- a/README.md +++ b/README.md @@ -156,22 +156,12 @@ func Push(mgr worker.Manager, job *faktory.Job) error { } func syntheticPush(mgr worker.Manager, job *faktory.Job) error { - if !mgr.IsRegistered(job.Type) { - return fmt.Errorf("failed to execute job type %s inline, job not registered", job.Type) - } - - // Manually setting the worker defaults is a threadsafe alternative to mgr.Run/mgr.RunWithContext, which can trigger data races - err := mgr.SetUpWorkerProcess() + err := mgr.InlineDispatch(job) if err != nil { - return fmt.Errorf("couldn't set up Faktory worker process in synthetic push: %w", err) + return errors.Wrap(err, "syntheticPush failed") } - err := mgr.Dispatch(job) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("job was immediately executed but failed. Job type %s, with args %+v", job.Type, job.Args)) - } - - return nil + return nil } func realPush(job *faktory.Job) error { diff --git a/manager.go b/manager.go index 3bb2993..6ef453e 100644 --- a/manager.go +++ b/manager.go @@ -48,22 +48,42 @@ func (mgr *Manager) Register(name string, fn Perform) { } } -// IsRegistered checks if a given job name is registered with the manager. +// isRegistered checks if a given job name is registered with the manager. // -// mgr.IsRegistered("SomeJob") -func (mgr *Manager) IsRegistered(name string) bool { +// mgr.isRegistered("SomeJob") +func (mgr *Manager) isRegistered(name string) bool { _, ok := mgr.jobHandlers[name] return ok } -// Dispatch immediately executes a job, including all middleware. -func (mgr *Manager) Dispatch(job *faktory.Job) error { +// dispatch immediately executes a job, including all middleware. +func (mgr *Manager) dispatch(job *faktory.Job) error { perform := mgr.jobHandlers[job.Type] return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform) } +// InlineDispatch is designed for testing. It immediate executes a job, including all middleware, +// as well as performs manager setup if needed. +func (mgr *Manager) InlineDispatch(job *faktory.Job) error { + if !mgr.isRegistered(job.Type) { + return fmt.Errorf("failed to dispatch inline for job type %s; job not registered", job.Type) + } + + err := mgr.setUpWorkerProcess() + if err != nil { + return fmt.Errorf("couldn't set up worker process for inline dispatch - %w", err) + } + + err = mgr.dispatch(job) + if err != nil { + return fmt.Errorf("job was dispatched inline but failed. Job type %s, with args %+v - %w", job.Type, job.Args, err) + } + + return nil +} + // Register a callback to be fired when a process lifecycle event occurs. // These are useful for hooking into process startup or shutdown. func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) { @@ -130,7 +150,7 @@ func NewManager() *Manager { } } -func (mgr *Manager) SetUpWorkerProcess() error { +func (mgr *Manager) setUpWorkerProcess() error { mgr.mut.Lock() defer mgr.mut.Unlock() @@ -174,7 +194,7 @@ func (mgr *Manager) RunWithContext(ctx context.Context) error { } func (mgr *Manager) boot() error { - err := mgr.SetUpWorkerProcess() + err := mgr.setUpWorkerProcess() if err != nil { return err } diff --git a/manager_test.go b/manager_test.go index 28ce9a4..cd6f2b8 100644 --- a/manager_test.go +++ b/manager_test.go @@ -24,7 +24,7 @@ func TestManagerSetup(t *testing.T) { } mgr := NewManager() - err = mgr.SetUpWorkerProcess() + err = mgr.setUpWorkerProcess() assert.NoError(t, err) startupCalled := false diff --git a/runner.go b/runner.go index f552324..2482c25 100644 --- a/runner.go +++ b/runner.go @@ -144,7 +144,7 @@ func processOne(mgr *Manager) error { } } - if !mgr.IsRegistered(job.Type) { + if !mgr.isRegistered(job.Type) { je := &NoHandlerError{JobType: job.Type} err := mgr.with(func(c *faktory.Client) error { return c.Fail(job.Jid, je, nil) @@ -155,7 +155,7 @@ func processOne(mgr *Manager) error { return je } - joberr := mgr.Dispatch(job) + joberr := mgr.dispatch(job) if joberr != nil { // job errors are normal and expected, we don't return early from them mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr) diff --git a/runner_test.go b/runner_test.go index 7243c33..a84874b 100644 --- a/runner_test.go +++ b/runner_test.go @@ -59,7 +59,7 @@ func TestLiveServer(t *testing.T) { mgr := NewManager() mgr.ProcessStrictPriorityQueues("fwgtest") mgr.Concurrency = 1 - err := mgr.SetUpWorkerProcess() + err := mgr.setUpWorkerProcess() assert.NoError(t, err) mgr.Register("aworker", func(ctx context.Context, args ...interface{}) error {