Skip to content

Commit

Permalink
Rename constants to make adjectives like "Default" or "Min" suffix in…
Browse files Browse the repository at this point in the history
…stead of prefix

One last last-second API change I wanted to float: rename constants so
that adjectives like "Default" and "Min" become name suffixes instead of
prefixes. So e.g. `DefaultFetchCooldown` becomes `FetchCooldownDefault`.

The primary benefit in my mind is that constants that are related to
each other in referencing the same value, now sort together
alphabetically. Take for example:

    const (
            FetchCooldownDefault = 100 * time.Millisecond
            FetchCooldownMin     = 1 * time.Millisecond

Previously, these were organized together, but didn't actually line up
if the list were to be sorted.

Another benefit is better autocomplete: when trying to autocomplete a
default value, previously to disambiguate from all the other ones you'd
have to type the whole word "Default" which is common to all of them.
But with the prefix switched to a suffix, if looking for say the default
for max attempts, you'd get a match within just a couple characters.

Probably the most contentious part of this is that `QueueDefault` flips,
which is the one constant that's actually referenced a fair bit from
user code. e.g.

    Queues: map[string]river.QueueConfig{
            river.QueueDefault: {MaxWorkers: 100},
    },

This may be a tiny bit worse in terms of how it rolls off the tongue,
but I've been staring at it a while now, and I think it's something
you'd get used to. Also, it's kind of nice how it fits better with names
like `QueueConfig` which are present in the same invocation.
  • Loading branch information
brandur committed Nov 20, 2023
1 parent 62b5ae3 commit 3937b36
Show file tree
Hide file tree
Showing 37 changed files with 171 additions and 173 deletions.
54 changes: 26 additions & 28 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,17 @@ import (
)

const (
MaxQueueNumWorkers = 10_000
FetchCooldownDefault = 100 * time.Millisecond
FetchCooldownMin = 1 * time.Millisecond

DefaultFetchCooldown = 100 * time.Millisecond
MinFetchCooldown = 1 * time.Millisecond
FetchPollIntervalDefault = 1 * time.Second
FetchPollIntervalMin = 1 * time.Millisecond

DefaultFetchPollInterval = 1 * time.Second
MinFetchPollInterval = 1 * time.Millisecond

DefaultJobTimeout = 1 * time.Minute

DefaultMaxAttempts = rivercommon.DefaultMaxAttempts
DefaultQueue = rivercommon.DefaultQueue
DefaultPriority = rivercommon.DefaultPriority
JobTimeoutDefault = 1 * time.Minute
MaxAttemptsDefault = rivercommon.MaxAttemptsDefault
PriorityDefault = rivercommon.PriorityDefault
QueueDefault = rivercommon.QueueDefault
QueueNumWorkersMax = 10_000
)

// Config is the configuration for a Client.
Expand Down Expand Up @@ -105,7 +103,7 @@ type Config struct {
FetchPollInterval time.Duration

// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means DefaultJobTimeout will be
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
// unless the Client is shutting down.
//
Expand Down Expand Up @@ -181,11 +179,11 @@ func (c *Config) validate() error {
if c.DiscardedJobRetentionPeriod < 0 {
return fmt.Errorf("DiscardedJobRetentionPeriod cannot be less than zero")
}
if c.FetchCooldown < MinFetchCooldown {
return fmt.Errorf("FetchCooldown must be at least %s", MinFetchCooldown)
if c.FetchCooldown < FetchCooldownMin {
return fmt.Errorf("FetchCooldown must be at least %s", FetchCooldownMin)
}
if c.FetchPollInterval < MinFetchPollInterval {
return fmt.Errorf("FetchPollInterval must be at least %s", MinFetchPollInterval)
if c.FetchPollInterval < FetchPollIntervalMin {
return fmt.Errorf("FetchPollInterval must be at least %s", FetchPollIntervalMin)
}
if c.FetchPollInterval < c.FetchCooldown {
return fmt.Errorf("FetchPollInterval cannot be shorter than FetchCooldown (%s)", c.FetchCooldown)
Expand All @@ -201,7 +199,7 @@ func (c *Config) validate() error {
}

for queue, queueConfig := range c.Queues {
if queueConfig.MaxWorkers < 1 || queueConfig.MaxWorkers > MaxQueueNumWorkers {
if queueConfig.MaxWorkers < 1 || queueConfig.MaxWorkers > QueueNumWorkersMax {
return fmt.Errorf("invalid number of workers for queue %q: %d", queue, queueConfig.MaxWorkers)
}
if err := validateQueueName(queue); err != nil {
Expand Down Expand Up @@ -368,23 +366,23 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// For convenience, in case the user's specified a large JobTimeout but no
// RescueStuckJobsAfter, since RescueStuckJobsAfter must be greater than
// JobTimeout, set a reasonable default value that's longer thah JobTimeout.
rescueAfter := maintenance.DefaultRescueAfter
rescueAfter := maintenance.RescueAfterDefault
if config.JobTimeout > 0 && config.RescueStuckJobsAfter < 1 && config.JobTimeout > config.RescueStuckJobsAfter {
rescueAfter = config.JobTimeout + maintenance.DefaultRescueAfter
rescueAfter = config.JobTimeout + maintenance.RescueAfterDefault
}

// Create a new version of config with defaults filled in. This replaces the
// original object, so everything that we care about must be initialized
// here, even if it's only carrying over the original value.
config = &Config{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.DefaultCancelledJobRetentionPeriod),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.DefaultCompletedJobRetentionPeriod),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DefaultDiscardedJobRetentionPeriod),
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault),
ErrorHandler: config.ErrorHandler,
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, DefaultFetchCooldown),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, DefaultFetchPollInterval),
JobTimeout: valutil.ValOrDefault(config.JobTimeout, DefaultJobTimeout),
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, FetchCooldownDefault),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, FetchPollIntervalDefault),
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
PeriodicJobs: config.PeriodicJobs,
Queues: config.Queues,
Expand Down Expand Up @@ -933,9 +931,9 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad
jobInsertOpts = argsWithOpts.InsertOpts()
}

maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, rivercommon.DefaultMaxAttempts)
priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.DefaultPriority)
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.DefaultQueue)
maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, rivercommon.MaxAttemptsDefault)
priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault)
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)

tags := insertOpts.Tags
if insertOpts.Tags == nil {
Expand Down
76 changes: 38 additions & 38 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config {
FetchCooldown: 20 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond,
Logger: riverinternaltest.Logger(t),
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 50}},
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 50}},
Workers: workers,
disableSleep: true,
}
Expand Down Expand Up @@ -654,10 +654,10 @@ func Test_Client_Insert(t *testing.T) {
jobRow, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, 0, jobRow.Attempt)
require.Equal(t, rivercommon.DefaultMaxAttempts, jobRow.MaxAttempts)
require.Equal(t, rivercommon.MaxAttemptsDefault, jobRow.MaxAttempts)
require.Equal(t, (&noOpArgs{}).Kind(), jobRow.Kind)
require.Equal(t, DefaultPriority, jobRow.Priority)
require.Equal(t, DefaultQueue, jobRow.Queue)
require.Equal(t, PriorityDefault, jobRow.Priority)
require.Equal(t, QueueDefault, jobRow.Queue)
require.Equal(t, []string{}, jobRow.Tags)
})

Expand Down Expand Up @@ -751,10 +751,10 @@ func Test_Client_InsertTx(t *testing.T) {
jobRow, err := client.InsertTx(ctx, bundle.tx, &noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, 0, jobRow.Attempt)
require.Equal(t, rivercommon.DefaultMaxAttempts, jobRow.MaxAttempts)
require.Equal(t, rivercommon.MaxAttemptsDefault, jobRow.MaxAttempts)
require.Equal(t, (&noOpArgs{}).Kind(), jobRow.Kind)
require.Equal(t, DefaultPriority, jobRow.Priority)
require.Equal(t, DefaultQueue, jobRow.Queue)
require.Equal(t, PriorityDefault, jobRow.Priority)
require.Equal(t, QueueDefault, jobRow.Queue)
require.Equal(t, []string{}, jobRow.Tags)

// Job is not visible outside of the transaction.
Expand Down Expand Up @@ -1195,9 +1195,9 @@ func Test_Client_Maintenance(t *testing.T) {
Errors: errorsBytes,
FinalizedAt: params.FinalizedAt,
Kind: valutil.FirstNonZero(params.Kind, "test_kind"),
MaxAttempts: valutil.FirstNonZero(params.MaxAttempts, int16(rivercommon.DefaultMaxAttempts)),
Priority: int16(rivercommon.DefaultPriority),
Queue: DefaultQueue,
MaxAttempts: valutil.FirstNonZero(params.MaxAttempts, int16(rivercommon.MaxAttemptsDefault)),
Priority: int16(rivercommon.PriorityDefault),
Queue: QueueDefault,
ScheduledAt: params.ScheduledAt,
State: params.State,
})
Expand Down Expand Up @@ -1473,11 +1473,11 @@ func Test_Client_RetryPolicy(t *testing.T) {
subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed)
t.Cleanup(cancel)

originalJobs := make([]*dbsqlc.RiverJob, rivercommon.DefaultMaxAttempts)
originalJobs := make([]*dbsqlc.RiverJob, rivercommon.MaxAttemptsDefault)
for i := 0; i < len(originalJobs); i++ {
job := requireInsert(ctx, client)
// regression protection to ensure we're testing the right number of jobs:
require.Equal(t, rivercommon.DefaultMaxAttempts, job.MaxAttempts)
require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts)

updatedJob, err := queries.JobUpdate(ctx, client.driver.GetDBPool(), dbsqlc.JobUpdateParams{
ID: job.ID,
Expand Down Expand Up @@ -1820,7 +1820,7 @@ func Test_Client_InsertTriggersImmediateWork(t *testing.T) {
config := newTestConfig(t, makeAwaitCallback(startedCh, doneCh))
config.FetchCooldown = 20 * time.Millisecond
config.FetchPollInterval = 20 * time.Second // essentially disable polling
config.Queues = map[string]QueueConfig{DefaultQueue: {MaxWorkers: 2}}
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}}

client := newTestClient(ctx, t, config)
statusUpdateCh := client.monitor.RegisterUpdates()
Expand Down Expand Up @@ -2188,22 +2188,22 @@ func Test_NewClient_Defaults(t *testing.T) {
AddWorker(workers, &noOpWorker{})

client, err := NewClient(riverpgxv5.New(dbPool), &Config{
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}},
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Workers: workers,
})
require.NoError(t, err)

require.Zero(t, client.adapter.(*dbadapter.StandardAdapter).Config.AdvisoryLockPrefix) //nolint:forcetypeassert

jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
require.Equal(t, maintenance.DefaultCancelledJobRetentionPeriod, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, maintenance.DefaultCompletedJobRetentionPeriod, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, maintenance.DefaultDiscardedJobRetentionPeriod, jobCleaner.Config.DiscardedJobRetentionPeriod)
require.Equal(t, maintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, maintenance.CompletedJobRetentionPeriodDefault, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, maintenance.DiscardedJobRetentionPeriodDefault, jobCleaner.Config.DiscardedJobRetentionPeriod)

require.Nil(t, client.config.ErrorHandler)
require.Equal(t, DefaultFetchCooldown, client.config.FetchCooldown)
require.Equal(t, DefaultFetchPollInterval, client.config.FetchPollInterval)
require.Equal(t, DefaultJobTimeout, client.config.JobTimeout)
require.Equal(t, FetchCooldownDefault, client.config.FetchCooldown)
require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval)
require.Equal(t, JobTimeoutDefault, client.config.JobTimeout)
require.NotZero(t, client.baseService.Logger)
require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy)
require.False(t, client.baseService.DisableSleep)
Expand Down Expand Up @@ -2234,7 +2234,7 @@ func Test_NewClient_Overrides(t *testing.T) {
FetchPollInterval: 124 * time.Millisecond,
JobTimeout: 125 * time.Millisecond,
Logger: logger,
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}},
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
RetryPolicy: retryPolicy,
Workers: workers,
disableSleep: true,
Expand Down Expand Up @@ -2298,7 +2298,7 @@ func Test_NewClient_Validations(t *testing.T) {
wantErr: errors.New("CompletedJobRetentionPeriod cannot be less than zero"),
},
{
name: "FetchCooldown cannot be less than MinFetchCooldown",
name: "FetchCooldown cannot be less than FetchCooldownMin",
configFunc: func(config *Config) { config.FetchCooldown = time.Millisecond - 1 },
wantErr: errors.New("FetchCooldown must be at least 1ms"),
},
Expand All @@ -2308,11 +2308,11 @@ func Test_NewClient_Validations(t *testing.T) {
wantErr: errors.New("FetchCooldown must be at least 1ms"),
},
{
name: "FetchCooldown defaults to DefaultFetchCooldown",
name: "FetchCooldown defaults to FetchCooldownDefault",
configFunc: func(config *Config) { config.FetchCooldown = 0 },
wantErr: nil,
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Equal(t, DefaultFetchCooldown, client.config.FetchCooldown)
require.Equal(t, FetchCooldownDefault, client.config.FetchCooldown)
},
},
{
Expand All @@ -2338,7 +2338,7 @@ func Test_NewClient_Validations(t *testing.T) {
configFunc: func(config *Config) { config.FetchPollInterval = 0 },
wantErr: nil,
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Equal(t, DefaultFetchPollInterval, client.config.FetchPollInterval)
require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval)
},
},
{
Expand All @@ -2364,7 +2364,7 @@ func Test_NewClient_Validations(t *testing.T) {
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
// A client config value of zero gets interpreted as the default timeout:
require.Equal(t, DefaultJobTimeout, client.config.JobTimeout)
require.Equal(t, JobTimeoutDefault, client.config.JobTimeout)
},
},
{
Expand Down Expand Up @@ -2396,7 +2396,7 @@ func Test_NewClient_Validations(t *testing.T) {
config.JobTimeout = 23 * time.Hour
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Equal(t, 23*time.Hour+maintenance.DefaultRescueAfter, client.config.RescueStuckJobsAfter)
require.Equal(t, 23*time.Hour+maintenance.RescueAfterDefault, client.config.RescueStuckJobsAfter)
},
},
{
Expand All @@ -2419,16 +2419,16 @@ func Test_NewClient_Validations(t *testing.T) {
{
name: "Queues MaxWorkers can't be negative",
configFunc: func(config *Config) {
config.Queues = map[string]QueueConfig{DefaultQueue: {MaxWorkers: -1}}
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: -1}}
},
wantErr: fmt.Errorf("invalid number of workers for queue \"default\": -1"),
},
{
name: "Queues can't have limits larger than MaxQueueNumWorkers",
configFunc: func(config *Config) {
config.Queues = map[string]QueueConfig{DefaultQueue: {MaxWorkers: MaxQueueNumWorkers + 1}}
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: QueueNumWorkersMax + 1}}
},
wantErr: fmt.Errorf("invalid number of workers for queue \"default\": %d", MaxQueueNumWorkers+1),
wantErr: fmt.Errorf("invalid number of workers for queue \"default\": %d", QueueNumWorkersMax+1),
},
{
name: "Queues queue names can't be empty",
Expand Down Expand Up @@ -2493,7 +2493,7 @@ func Test_NewClient_Validations(t *testing.T) {
AddWorker(workers, &noOpWorker{})

config := &Config{
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}},
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Workers: workers,
}
tt.configFunc(config)
Expand Down Expand Up @@ -2564,7 +2564,7 @@ func TestClient_JobTimeout(t *testing.T) {
name: "DefaultJobTimeoutIsUsedIfBothAreZero",
jobArgTimeout: 0,
clientJobTimeout: 0,
wantDuration: DefaultJobTimeout,
wantDuration: JobTimeoutDefault,
},
{
name: "NoJobTimeoutIfClientIsNegativeOneAndJobArgIsZero",
Expand Down Expand Up @@ -2594,7 +2594,7 @@ func TestClient_JobTimeout(t *testing.T) {

config := newTestConfig(t, nil)
config.JobTimeout = tt.clientJobTimeout
config.Queues = map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}}
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}
config.Workers = workers

client := runNewTestClient(ctx, t, config)
Expand Down Expand Up @@ -2622,9 +2622,9 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.NoError(t, err)
require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs))
require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind)
require.Equal(t, rivercommon.DefaultMaxAttempts, insertParams.MaxAttempts)
require.Equal(t, rivercommon.DefaultPriority, insertParams.Priority)
require.Equal(t, DefaultQueue, insertParams.Queue)
require.Equal(t, rivercommon.MaxAttemptsDefault, insertParams.MaxAttempts)
require.Equal(t, rivercommon.PriorityDefault, insertParams.Priority)
require.Equal(t, QueueDefault, insertParams.Queue)
require.Equal(t, time.Time{}, insertParams.ScheduledAt)
require.Equal(t, []string(nil), insertParams.Tags)
require.False(t, insertParams.Unique)
Expand Down Expand Up @@ -2718,7 +2718,7 @@ func TestInsert(t *testing.T) {

config := &Config{
FetchCooldown: 2 * time.Millisecond,
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}},
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Workers: workers,
}

Expand Down Expand Up @@ -2786,7 +2786,7 @@ func TestInsert(t *testing.T) {
require.Equal(JobStateAvailable, insertedJob.State)
require.Equal("noOp", insertedJob.Kind)
// default state:
require.Equal(DefaultQueue, insertedJob.Queue)
require.Equal(QueueDefault, insertedJob.Queue)
require.Equal(1, insertedJob.Priority)
// Default comes from database now(), and we can't know the exact value:
require.WithinDuration(time.Now(), insertedJob.ScheduledAt, 2*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ goroutines at a time:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.DefaultQueue: {MaxWorkers: 100},
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
Expand Down
2 changes: 1 addition & 1 deletion example_batch_insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func Example_batchInsert() {
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.DefaultQueue: {MaxWorkers: 100},
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
Expand Down
2 changes: 1 addition & 1 deletion example_complete_job_within_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func Example_completeJobWithinTx() {
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.DefaultQueue: {MaxWorkers: 100},
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
Expand Down
2 changes: 1 addition & 1 deletion example_cron_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Example_cronJob() {
),
},
Queues: map[string]river.QueueConfig{
river.DefaultQueue: {MaxWorkers: 100},
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
Expand Down
2 changes: 1 addition & 1 deletion example_custom_insert_opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Example_customInsertOpts() {
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.DefaultQueue: {MaxWorkers: 100},
river.QueueDefault: {MaxWorkers: 100},
"high_priority": {MaxWorkers: 100},
},
Workers: workers,
Expand Down
Loading

0 comments on commit 3937b36

Please sign in to comment.