Skip to content

Commit

Permalink
chore: Fix Redis worker pointers
Browse files Browse the repository at this point in the history
  • Loading branch information
avitova committed Aug 24, 2023
1 parent cab3e1a commit ed8eb3a
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions pkg/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type RedisWorker struct {
concurrency int
loopWG sync.WaitGroup

// number of in-flight jobs (must be use via atomic functions)
// number of in-flight jobs (must be used via atomic functions)
inFlight int64
}

Expand Down Expand Up @@ -70,6 +70,12 @@ func (w *RedisWorker) RegisterHandler(jtype JobType, handler JobHandler, args an
}

func loggerWithJob(ctx context.Context, job *Job) *zerolog.Logger {
if job == nil {
logger := zerolog.Ctx(ctx)
logger.Warn().Msgf("Invalid job, logging without the job ID")
return logger
}

logger := zerolog.Ctx(ctx).With().
Str("job_id", job.ID.String()).
Str("job_type", string(job.Type)).
Expand All @@ -79,6 +85,10 @@ func loggerWithJob(ctx context.Context, job *Job) *zerolog.Logger {

func (w *RedisWorker) Enqueue(ctx context.Context, job *Job) error {
var err error
if job == nil {
return fmt.Errorf("unable to enqueue job")

Check failure on line 89 in pkg/worker/redis.go

View workflow job for this annotation

GitHub Actions / 🎯 Go linter

err113: do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"unable to enqueue job\")" (goerr113)
}

if job.ID == uuid.Nil {
job.ID, err = uuid.NewRandom()
if err != nil {
Expand Down Expand Up @@ -193,6 +203,9 @@ func (w *RedisWorker) fetchJob(ctx context.Context) {

func (w *RedisWorker) processJob(ctx context.Context, job *Job) {
defer recoverAndLog(ctx)
if job == nil {
return
}

defer atomic.AddInt64(&w.inFlight, -1)
logger := loggerWithJob(ctx, job)
Expand All @@ -203,7 +216,7 @@ func (w *RedisWorker) processJob(ctx context.Context, job *Job) {
cCtx, cFunc := context.WithTimeout(ctx, config.Worker.Timeout)
defer func() {
if c := cCtx.Err(); c != nil {
zerolog.Ctx(ctx).Error().Err(c).Msg("Job was either cancelled or timeout occured")
zerolog.Ctx(ctx).Error().Err(c).Msg("Job was either cancelled or timeout occurred")
}
cFunc()
}()
Expand Down

0 comments on commit ed8eb3a

Please sign in to comment.