Skip to content

Commit

Permalink
Update unique job example to be a little more realistic
Browse files Browse the repository at this point in the history
As I was writing the docs for unique jobs, I found that I wanted to have
an example that was a little more elaborate than what we already had
which demonstrated the use of uniqueness along multiple properties
instead of just duration.

Here, update the unique opts example so that the job becomes reconciling
a single account that should run every 24 hours, so jobs for account 1
and account 2 are inserted separately, and we show that a duplicate for
account 1 isn't allowed. I suspect that this change makes the example a
little more realistic along the lines of what users will want to do so
it has that benefit in addition to being more conducive for the docs.
  • Loading branch information
brandur committed Nov 11, 2023
1 parent 2a398cf commit 7cd5ccf
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions example_unique_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,48 @@ import (
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

// Account represents a minimal account containing a unique identifier, recent
// expenditures, and a remaining total.
// Account represents a minimal account including recent expenditures and a
// remaining total.
type Account struct {
ID int
RecentExpenditures int
AccountTotal int
}

var allAccounts = []Account{ //nolint:gochecknoglobals
{ID: 1, RecentExpenditures: 100, AccountTotal: 1_000},
{ID: 2, RecentExpenditures: 999, AccountTotal: 1_000},
// Map of account ID -> account.
var allAccounts = map[int]Account{ //nolint:gochecknoglobals
1: {RecentExpenditures: 100, AccountTotal: 1_000},
2: {RecentExpenditures: 999, AccountTotal: 1_000},
}

type ReconcileAllAccountsArgs struct{}
type ReconcileAccountArgs struct {
AccountID int `json:"account_id"`
}

func (ReconcileAllAccountsArgs) Kind() string { return "reconcile_all_accounts" }
func (ReconcileAccountArgs) Kind() string { return "reconcile_account" }

// InsertOpts returns custom insert options that every job of this type will
// inherit by default, including unique options.
func (ReconcileAllAccountsArgs) InsertOpts() river.InsertOpts {
// inherit, including unique options.
func (ReconcileAccountArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{
UniqueOpts: river.UniqueOpts{
ByArgs: true,
ByPeriod: 24 * time.Hour,
},
}
}

type ReconcileAllAccountsWorker struct {
river.WorkerDefaults[ReconcileAllAccountsArgs]
type ReconcileAccountWorker struct {
river.WorkerDefaults[ReconcileAccountArgs]
}

func (w *ReconcileAllAccountsWorker) Work(ctx context.Context, job *river.Job[ReconcileAllAccountsArgs]) error {
for _, account := range allAccounts {
account.AccountTotal -= account.RecentExpenditures
account.RecentExpenditures = 0
func (w *ReconcileAccountWorker) Work(ctx context.Context, job *river.Job[ReconcileAccountArgs]) error {
account := allAccounts[job.Args.AccountID]

account.AccountTotal -= account.RecentExpenditures
account.RecentExpenditures = 0

fmt.Printf("Reconciled account %d; new total: %d\n", job.Args.AccountID, account.AccountTotal)

fmt.Printf("Reconciled account %d; new total: %d\n", account.ID, account.AccountTotal)
}
return nil
}

Expand All @@ -72,7 +76,7 @@ func Example_uniqueJob() {
}

workers := river.NewWorkers()
river.AddWorker(workers, &ReconcileAllAccountsWorker{})
river.AddWorker(workers, &ReconcileAccountWorker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Expand All @@ -93,14 +97,24 @@ func Example_uniqueJob() {
panic(err)
}

_, err = riverClient.Insert(ctx, ReconcileAllAccountsArgs{}, nil)
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
if err != nil {
panic(err)
}

// Job is inserted a second time, but it doesn't matter because its unique
// args ensure that it'll only run once in a 24 hour period.
_, err = riverClient.Insert(ctx, ReconcileAllAccountsArgs{}, nil)
// args ensure that it'll only run once per account per 24 hour period.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
if err != nil {
panic(err)
}

// Cheat a little by waiting for the first job to come back so we can
// guarantee that this example's output comes out in order.
waitForNJobs(subscribeChan, 1)

// Because the job is unique ByArgs, another job for account 2 is allowed.
_, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 7cd5ccf

Please sign in to comment.