Skip to content

Commit

Permalink
Support inline job execution for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Steel authored and mperham committed Jan 19, 2024
1 parent e4ed398 commit a1dfffe
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 9 deletions.
48 changes: 44 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func main() {
//mgr.ProcessWeightedPriorityQueues(map[string]int{"critical":3, "default":2, "bulk":1})

go func(){
// Start processing jobs in background routine, this method does not return
// Start processing jobs in background routine, this method does not return
// unless an error is returned or cancel() is called
mgr.RunWithContext(ctx)
}()

go func() {
stopSignals := []os.Signal{
syscall.SIGTERM,
syscall.SIGTERM,
syscall.SIGINT,
}
stop := make(chan os.Signal, len(stopSignals))
Expand All @@ -131,13 +131,53 @@ func main() {
}
}
}()

<-ctx.Done()
}
```

See `test/main.go` for a working example.

# Testing

`faktory_worker_go` provides helpers that allow you to configure tests to execute jobs inline if you prefer. In this example, the application has defined its own wrapper function for `client.Push`.

```go
import (
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
)

func Push(mgr worker.Manager, job *faktory.Job) error {
if viper.GetBool("faktory_inline") {
return syntheticPush(mgr worker.Manager, job)
}
return realPush(job)
}

func syntheticPush(mgr worker.Manager, job *faktory.Job) error {
if mgr.IsRegistered(job.Type) {
return mgr.Dispatch(job)
}

return fmt.Errorf("inline job execution failed, unregistered job type %s", job.Type)
}

func realPush(job *faktory.Job) error {
client, err := faktory.Open()
if err != nil {
return errors.Wrap(err, "failed to open Faktory client connection")
}

err = client.Push(job)
if err != nil {
return errors.Wrap(err, "failed to enqueue Faktory job")
}

return nil
}
```

# FAQ

* How do I specify the Faktory server location?
Expand Down
18 changes: 17 additions & 1 deletion manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,29 @@ type Manager struct {
// Register a handler for the given jobtype. It is expected that all jobtypes
// are registered upon process startup.
//
// mgr.Register("ImportantJob", ImportantFunc)
// mgr.Register("ImportantJob", ImportantFunc)
func (mgr *Manager) Register(name string, fn Perform) {
mgr.jobHandlers[name] = func(ctx context.Context, job *faktory.Job) error {
return fn(ctx, job.Args...)
}
}

// IsRegistered checks if a given job name is registered with the manager.
//
// mgr.IsRegistered("SomeJob")
func (mgr *Manager) IsRegistered(name string) bool {
_, ok := mgr.jobHandlers[name]

return ok
}

// Dispatch immediately executes a job, including all middleware.
func (mgr *Manager) Dispatch(job *faktory.Job) error {
perform := mgr.jobHandlers[job.Type]

return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
}

// Register a callback to be fired when a process lifecycle event occurs.
// These are useful for hooking into process startup or shutdown.
func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler) {
Expand Down
6 changes: 2 additions & 4 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,7 @@ func processOne(mgr *Manager) error {
}
}

perform := mgr.jobHandlers[job.Type]

if perform == nil {
if !mgr.IsRegistered(job.Type) {
je := &NoHandlerError{JobType: job.Type}
err := mgr.with(func(c *faktory.Client) error {
return c.Fail(job.Jid, je, nil)
Expand All @@ -157,7 +155,7 @@ func processOne(mgr *Manager) error {
return je
}

joberr := dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
joberr := mgr.Dispatch(job)
if joberr != nil {
// job errors are normal and expected, we don't return early from them
mgr.Logger.Errorf("Error running %s job %s: %v", job.Type, job.Jid, joberr)
Expand Down

0 comments on commit a1dfffe

Please sign in to comment.