diff --git a/README.md b/README.md index aa3109a..a82e0b8 100644 --- a/README.md +++ b/README.md @@ -156,11 +156,22 @@ func Push(mgr worker.Manager, job *faktory.Job) error { } func syntheticPush(mgr worker.Manager, job *faktory.Job) error { - if mgr.IsRegistered(job.Type) { - return mgr.Dispatch(job) + if !mgr.IsRegistered(job.Type) { + return fmt.Errorf("failed to execute job type %s inline, job not registered", job.Type) } - return fmt.Errorf("inline job execution failed, unregistered job type %s", job.Type) + // Manually setting the worker defaults is a threadsafe alternative to mgr.Run/mgr.RunWithContext, which can trigger data races + err := mgr.SetUpWorkerProcess() + if err != nil { + return fmt.Errorf("couldn't set up Faktory worker process in synthetic push: %w", err) + } + + err := mgr.Dispatch(job) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("job was immediately executed but failed. Job type %s, with args %+v", job.Type, job.Args)) + } + + return nil } func realPush(job *faktory.Job) error { diff --git a/manager.go b/manager.go index 6337b3f..3bb2993 100644 --- a/manager.go +++ b/manager.go @@ -130,7 +130,7 @@ func NewManager() *Manager { } } -func (mgr *Manager) setUpWorkerProcess() error { +func (mgr *Manager) SetUpWorkerProcess() error { mgr.mut.Lock() defer mgr.mut.Unlock() @@ -174,7 +174,7 @@ func (mgr *Manager) RunWithContext(ctx context.Context) error { } func (mgr *Manager) boot() error { - err := mgr.setUpWorkerProcess() + err := mgr.SetUpWorkerProcess() if err != nil { return err } diff --git a/manager_test.go b/manager_test.go index cd6f2b8..28ce9a4 100644 --- a/manager_test.go +++ b/manager_test.go @@ -24,7 +24,7 @@ func TestManagerSetup(t *testing.T) { } mgr := NewManager() - err = mgr.setUpWorkerProcess() + err = mgr.SetUpWorkerProcess() assert.NoError(t, err) startupCalled := false diff --git a/runner_test.go b/runner_test.go index a84874b..7243c33 100644 --- a/runner_test.go +++ b/runner_test.go @@ -59,7 +59,7 @@ func TestLiveServer(t *testing.T) { mgr := NewManager() mgr.ProcessStrictPriorityQueues("fwgtest") mgr.Concurrency = 1 - err := mgr.setUpWorkerProcess() + err := mgr.SetUpWorkerProcess() assert.NoError(t, err) mgr.Register("aworker", func(ctx context.Context, args ...interface{}) error {