diff --git a/CHANGELOG.md b/CHANGELOG.md index cfe4a322..87734316 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -438,7 +438,7 @@ Although it comes with a number of improvements, there's nothing particularly no ### Added - Added `JobCancel` and `JobCancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141) and [PR #152](https://github.com/riverqueue/river/pull/152). -- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145). +- Added `ClientFromContext` and `ClientFromContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145). - Add `JobList` API for listing jobs. [PR #117](https://github.com/riverqueue/river/pull/117). - Added `river validate` command which fails with a non-zero exit code unless all migrations are applied. [PR #170](https://github.com/riverqueue/river/pull/170). diff --git a/client.go b/client.go index 4fd26aee..7fd6506b 100644 --- a/client.go +++ b/client.go @@ -23,6 +23,7 @@ import ( "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" "github.com/riverqueue/river/rivershared/util/maputil" @@ -339,6 +340,7 @@ type Client[TTx any] struct { insertNotifyLimiter *notifylimiter.Limiter notifier *notifier.Notifier // may be nil in poll-only mode periodicJobs *PeriodicJobBundle + pilot riverpilot.Pilot producersByQueueName map[string]*producer queueMaintainer *maintenance.QueueMaintainer queues *QueueBundle @@ -505,7 +507,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client plugin, _ := driver.(driverPlugin[TTx]) if plugin != nil { plugin.PluginInit(archetype, client) + client.pilot = plugin.PluginPilot() } + if client.pilot == nil { + client.pilot = &riverpilot.StandardPilot{} + } + client.pilot.PilotInit(archetype) // There are a number of internal components that are only needed/desired if // we're actually going to be working jobs (as opposed to just enqueueing @@ -515,7 +522,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return nil, errMissingDatabasePoolWithQueues } - client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), nil) + client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), client.pilot, nil) client.subscriptionManager = newSubscriptionManager(archetype, nil) client.services = append(client.services, client.completer, client.subscriptionManager) @@ -1409,7 +1416,7 @@ func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertM func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { return c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) { - results, err := tx.JobInsertFastMany(ctx, insertParams) + results, err := c.pilot.JobInsertMany(ctx, tx, insertParams) if err != nil { return nil, err } @@ -1763,6 +1770,13 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara // client, and can be used to add new ones or remove existing ones. func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs } +// Driver exposes the underlying pilot used by the client. +// +// API is not stable. DO NOT USE. +func (c *Client[TTx]) Pilot() riverpilot.Pilot { + return c.pilot +} + // Queues returns the currently configured set of queues for the client, and can // be used to add new ones. func (c *Client[TTx]) Queues() *QueueBundle { return c.queues } diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 60a771ec..da0aecca 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/internal/jobstats" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/sliceutil" @@ -44,20 +45,12 @@ type CompleterJobUpdated struct { JobStats *jobstats.JobStatistics } -// PartialExecutor is always a riverdriver.Executor under normal circumstances, -// but is a minimal interface with the functions needed for completers to work -// to more easily facilitate mocking. -type PartialExecutor interface { - JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) - JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) -} - type InlineCompleter struct { baseservice.BaseService startstop.BaseStartStop disableSleep bool // disable sleep in testing - exec PartialExecutor + exec riverdriver.Executor subscribeCh SubscribeChan // A waitgroup is not actually needed for the inline completer because as @@ -68,7 +61,7 @@ type InlineCompleter struct { wg sync.WaitGroup } -func NewInlineCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *InlineCompleter { +func NewInlineCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, subscribeCh SubscribeChan) *InlineCompleter { return baseservice.Init(archetype, &InlineCompleter{ exec: exec, subscribeCh: subscribeCh, @@ -135,15 +128,15 @@ type AsyncCompleter struct { concurrency int disableSleep bool // disable sleep in testing errGroup *errgroup.Group - exec PartialExecutor + exec riverdriver.Executor subscribeCh SubscribeChan } -func NewAsyncCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *AsyncCompleter { +func NewAsyncCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, subscribeCh SubscribeChan) *AsyncCompleter { return newAsyncCompleterWithConcurrency(archetype, exec, asyncCompleterDefaultConcurrency, subscribeCh) } -func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec PartialExecutor, concurrency int, subscribeCh SubscribeChan) *AsyncCompleter { +func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec riverdriver.Executor, concurrency int, subscribeCh SubscribeChan) *AsyncCompleter { errGroup := &errgroup.Group{} errGroup.SetLimit(concurrency) @@ -223,7 +216,8 @@ type BatchCompleter struct { completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation disableSleep bool // disable sleep in testing maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted - exec PartialExecutor + exec riverdriver.Executor + pilot riverpilot.Pilot setStateParams map[int64]*batchCompleterSetState setStateParamsMu sync.RWMutex setStateStartTimes map[int64]time.Time @@ -232,7 +226,7 @@ type BatchCompleter struct { waitOnBacklogWaiting bool } -func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *BatchCompleter { +func NewBatchCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh SubscribeChan) *BatchCompleter { const ( completionMaxSize = 5_000 maxBacklog = 20_000 @@ -242,6 +236,7 @@ func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, s completionMaxSize: completionMaxSize, exec: exec, maxBacklog: maxBacklog, + pilot: pilot, setStateParams: make(map[int64]*batchCompleterSetState), setStateStartTimes: make(map[int64]time.Time), subscribeCh: subscribeCh, @@ -359,7 +354,21 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { }() return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - return c.exec.JobSetStateIfRunningMany(ctx, batchParams) + tx, err := c.exec.Begin(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + rows, err := c.pilot.JobSetStateIfRunningMany(ctx, tx, batchParams) + if err != nil { + return nil, err + } + if err := tx.Commit(ctx); err != nil { + return nil, err + } + + return rows, nil }) } diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 8e237fe4..beea82ce 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -17,6 +17,7 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testfactory" @@ -25,6 +26,7 @@ import ( ) type partialExecutorMock struct { + riverdriver.Executor JobSetStateIfRunningManyCalled bool JobSetStateIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunningCalled bool @@ -36,11 +38,20 @@ type partialExecutorMock struct { // down into the given real executor. func NewPartialExecutorMock(exec riverdriver.Executor) *partialExecutorMock { return &partialExecutorMock{ + Executor: exec, JobSetStateIfRunningManyFunc: exec.JobSetStateIfRunningMany, JobSetStateIfRunningFunc: exec.JobSetStateIfRunning, } } +func (m *partialExecutorMock) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { + tx, err := m.Executor.Begin(ctx) + if err != nil { + return nil, err + } + return &partialExecutorTxMock{ExecutorTx: tx, partial: m}, nil +} + func (m *partialExecutorMock) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { m.setCalled(func() { m.JobSetStateIfRunningManyCalled = true }) return m.JobSetStateIfRunningManyFunc(ctx, params) @@ -57,6 +68,19 @@ func (m *partialExecutorMock) setCalled(setCalledFunc func()) { setCalledFunc() } +type partialExecutorTxMock struct { + riverdriver.ExecutorTx + partial *partialExecutorMock +} + +func (m *partialExecutorTxMock) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + return m.partial.JobSetStateIfRunningMany(ctx, params) +} + +func (m *partialExecutorTxMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { + return m.partial.JobSetStateIfRunning(ctx, params) +} + func TestInlineJobCompleter_Complete(t *testing.T) { t.Parallel() @@ -91,7 +115,7 @@ func TestInlineJobCompleter_Complete(t *testing.T) { func TestInlineJobCompleter_Subscribe(t *testing.T) { t.Parallel() - testCompleterSubscribe(t, func(exec PartialExecutor, subscribeCh SubscribeChan) JobCompleter { + testCompleterSubscribe(t, func(exec riverdriver.Executor, subscribeCh SubscribeChan) JobCompleter { return NewInlineCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) }) } @@ -99,7 +123,7 @@ func TestInlineJobCompleter_Subscribe(t *testing.T) { func TestInlineJobCompleter_Wait(t *testing.T) { t.Parallel() - testCompleterWait(t, func(exec PartialExecutor, subscribeChan SubscribeChan) JobCompleter { + testCompleterWait(t, func(exec riverdriver.Executor, subscribeChan SubscribeChan) JobCompleter { return NewInlineCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeChan) }) } @@ -197,7 +221,7 @@ func TestAsyncJobCompleter_Complete(t *testing.T) { func TestAsyncJobCompleter_Subscribe(t *testing.T) { t.Parallel() - testCompleterSubscribe(t, func(exec PartialExecutor, subscribeCh SubscribeChan) JobCompleter { + testCompleterSubscribe(t, func(exec riverdriver.Executor, subscribeCh SubscribeChan) JobCompleter { return newAsyncCompleterWithConcurrency(riversharedtest.BaseServiceArchetype(t), exec, 4, subscribeCh) }) } @@ -205,12 +229,12 @@ func TestAsyncJobCompleter_Subscribe(t *testing.T) { func TestAsyncJobCompleter_Wait(t *testing.T) { t.Parallel() - testCompleterWait(t, func(exec PartialExecutor, subscribeCh SubscribeChan) JobCompleter { + testCompleterWait(t, func(exec riverdriver.Executor, subscribeCh SubscribeChan) JobCompleter { return newAsyncCompleterWithConcurrency(riversharedtest.BaseServiceArchetype(t), exec, 4, subscribeCh) }) } -func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor, SubscribeChan) JobCompleter) { +func testCompleterSubscribe(t *testing.T, constructor func(riverdriver.Executor, SubscribeChan) JobCompleter) { t.Helper() ctx := context.Background() @@ -254,7 +278,7 @@ func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor, Subs } } -func testCompleterWait(t *testing.T, constructor func(PartialExecutor, SubscribeChan) JobCompleter) { +func testCompleterWait(t *testing.T, constructor func(riverdriver.Executor, SubscribeChan) JobCompleter) { t.Helper() ctx := context.Background() @@ -320,24 +344,24 @@ func testCompleterWait(t *testing.T, constructor func(PartialExecutor, Subscribe func TestAsyncCompleter(t *testing.T) { t.Parallel() - testCompleter(t, func(t *testing.T, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) *AsyncCompleter { + testCompleter(t, func(t *testing.T, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) *AsyncCompleter { t.Helper() return NewAsyncCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) }, func(completer *AsyncCompleter) { completer.disableSleep = true }, - func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec }, + func(completer *AsyncCompleter, exec riverdriver.Executor) { completer.exec = exec }, ) } func TestBatchCompleter(t *testing.T) { t.Parallel() - testCompleter(t, func(t *testing.T, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) *BatchCompleter { + testCompleter(t, func(t *testing.T, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) *BatchCompleter { t.Helper() - return NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) + return NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), exec, pilot, subscribeCh) }, func(completer *BatchCompleter) { completer.disableSleep = true }, - func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec }, + func(completer *BatchCompleter, exec riverdriver.Executor) { completer.exec = exec }, ) ctx := context.Background() @@ -353,8 +377,9 @@ func TestBatchCompleter(t *testing.T) { var ( driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) exec = driver.GetExecutor() + pilot = &riverpilot.StandardPilot{} subscribeCh = make(chan []CompleterJobUpdated, 10) - completer = NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) + completer = NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), exec, pilot, subscribeCh) ) require.NoError(t, completer.Start(ctx)) @@ -432,24 +457,24 @@ func TestBatchCompleter(t *testing.T) { func TestInlineCompleter(t *testing.T) { t.Parallel() - testCompleter(t, func(t *testing.T, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) *InlineCompleter { + testCompleter(t, func(t *testing.T, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) *InlineCompleter { t.Helper() return NewInlineCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) }, func(completer *InlineCompleter) { completer.disableSleep = true }, - func(completer *InlineCompleter, exec PartialExecutor) { completer.exec = exec }) + func(completer *InlineCompleter, exec riverdriver.Executor) { completer.exec = exec }) } func testCompleter[TCompleter JobCompleter]( t *testing.T, - newCompleter func(t *testing.T, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) TCompleter, + newCompleter func(t *testing.T, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) TCompleter, // These functions are here to help us inject test behavior that's not part // of the JobCompleter interface. We could alternatively define a second // interface like jobCompleterWithTestFacilities to expose the additional // functionality, although that's not particularly beautiful either. disableSleep func(completer TCompleter), - setExec func(completer TCompleter, exec PartialExecutor), + setExec func(completer TCompleter, exec riverdriver.Executor), ) { t.Helper() @@ -466,8 +491,9 @@ func testCompleter[TCompleter JobCompleter]( var ( driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) exec = driver.GetExecutor() + pilot = &riverpilot.StandardPilot{} subscribeCh = make(chan []CompleterJobUpdated, 10) - completer = newCompleter(t, exec, subscribeCh) + completer = newCompleter(t, exec, pilot, subscribeCh) ) require.NoError(t, completer.Start(ctx)) @@ -850,28 +876,28 @@ func testCompleter[TCompleter JobCompleter]( } func BenchmarkAsyncCompleter_Concurrency10(b *testing.B) { - benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { + benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { b.Helper() return newAsyncCompleterWithConcurrency(riversharedtest.BaseServiceArchetype(b), exec, 10, subscribeCh) }) } func BenchmarkAsyncCompleter_Concurrency100(b *testing.B) { - benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { + benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { b.Helper() return newAsyncCompleterWithConcurrency(riversharedtest.BaseServiceArchetype(b), exec, 100, subscribeCh) }) } func BenchmarkBatchCompleter(b *testing.B) { - benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { + benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { b.Helper() - return NewBatchCompleter(riversharedtest.BaseServiceArchetype(b), exec, subscribeCh) + return NewBatchCompleter(riversharedtest.BaseServiceArchetype(b), exec, pilot, subscribeCh) }) } func BenchmarkInlineCompleter(b *testing.B) { - benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { + benchmarkCompleter(b, func(b *testing.B, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) JobCompleter { b.Helper() return NewInlineCompleter(riversharedtest.BaseServiceArchetype(b), exec, subscribeCh) }) @@ -879,7 +905,7 @@ func BenchmarkInlineCompleter(b *testing.B) { func benchmarkCompleter( b *testing.B, - newCompleter func(b *testing.B, exec riverdriver.Executor, subscribeCh chan<- []CompleterJobUpdated) JobCompleter, + newCompleter func(b *testing.B, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh chan<- []CompleterJobUpdated) JobCompleter, ) { b.Helper() @@ -888,6 +914,7 @@ func benchmarkCompleter( type testBundle struct { exec riverdriver.Executor jobs []*rivertype.JobRow + pilot riverpilot.Pilot stats []jobstats.JobStatistics } @@ -897,8 +924,9 @@ func benchmarkCompleter( var ( driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, b)) exec = driver.GetExecutor() + pilot = &riverpilot.StandardPilot{} subscribeCh = make(chan []CompleterJobUpdated, 100) - completer = newCompleter(b, exec, subscribeCh) + completer = newCompleter(b, exec, pilot, subscribeCh) ) b.Cleanup(riverinternaltest.DiscardContinuously(subscribeCh)) @@ -931,6 +959,7 @@ func benchmarkCompleter( return completer, &testBundle{ exec: exec, jobs: jobs, + pilot: pilot, stats: make([]jobstats.JobStatistics, b.N), } } diff --git a/internal/maintenance/queue_maintainer.go b/internal/maintenance/queue_maintainer.go index dcc2f4be..9a6c494b 100644 --- a/internal/maintenance/queue_maintainer.go +++ b/internal/maintenance/queue_maintainer.go @@ -49,7 +49,8 @@ type QueueMaintainer struct { func NewQueueMaintainer(archetype *baseservice.Archetype, services []startstop.Service) *QueueMaintainer { servicesByName := make(map[string]startstop.Service, len(services)) for _, service := range services { - servicesByName[reflect.TypeOf(service).Elem().Name()] = service + elem := reflect.TypeOf(service).Elem() + servicesByName[elem.PkgPath()+"."+elem.Name()] = service } return baseservice.Init(archetype, &QueueMaintainer{ servicesByName: servicesByName, diff --git a/job_complete_tx.go b/job_complete_tx.go index d7eef9f6..b4f4d00f 100644 --- a/job_complete_tx.go +++ b/job_complete_tx.go @@ -28,13 +28,28 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx return nil, errors.New("job must be running") } - var driver TDriver - jobRow, err := driver.UnwrapExecutor(tx).JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, time.Now())) + client := ClientFromContext[TTx](ctx) + if client == nil { + return nil, errors.New("client not found in context, can only work within a River worker") + } + + driver := client.Driver() + pilot := client.Pilot() + + execTx := driver.UnwrapExecutor(tx) + params := riverdriver.JobSetStateCompleted(job.ID, time.Now()) + rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, &riverdriver.JobSetStateIfRunningManyParams{ + ID: []int64{params.ID}, + ErrData: [][]byte{params.ErrData}, + FinalizedAt: []*time.Time{params.FinalizedAt}, + MaxAttempts: []*int{params.MaxAttempts}, + ScheduledAt: []*time.Time{params.ScheduledAt}, + State: []rivertype.JobState{params.State}, + }) if err != nil { return nil, err } - - updatedJob := &Job[TArgs]{JobRow: jobRow} + updatedJob := &Job[TArgs]{JobRow: rows[0]} if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil { return nil, err diff --git a/job_complete_tx_test.go b/job_complete_tx_test.go index f08d51c2..497688fc 100644 --- a/job_complete_tx_test.go +++ b/job_complete_tx_test.go @@ -8,9 +8,11 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/testfactory" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivertype" @@ -26,25 +28,32 @@ func TestJobCompleteTx(t *testing.T) { } type testBundle struct { - exec riverdriver.Executor - tx pgx.Tx + client *Client[pgx.Tx] + exec riverdriver.Executor + tx pgx.Tx } - setup := func(t *testing.T) *testBundle { + setup := func(ctx context.Context, t *testing.T) (context.Context, *testBundle) { t.Helper() tx := riverinternaltest.TestTx(ctx, t) + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + ctx = context.WithValue(ctx, rivercommon.ContextKeyClient{}, client) - return &testBundle{ - exec: riverpgxv5.New(nil).UnwrapExecutor(tx), - tx: tx, + return ctx, &testBundle{ + client: client, + exec: riverpgxv5.New(nil).UnwrapExecutor(tx), + tx: tx, } } t.Run("CompletesJob", func(t *testing.T) { t.Parallel() - bundle := setup(t) + ctx, bundle := setup(ctx, t) job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ State: ptrutil.Ptr(rivertype.JobStateRunning), @@ -63,7 +72,7 @@ func TestJobCompleteTx(t *testing.T) { t.Run("ErrorIfNotRunning", func(t *testing.T) { t.Parallel() - bundle := setup(t) + ctx, bundle := setup(ctx, t) job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) diff --git a/plugin.go b/plugin.go index d0676ab6..8e5284ff 100644 --- a/plugin.go +++ b/plugin.go @@ -2,6 +2,7 @@ package river import ( "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/startstop" ) @@ -19,6 +20,8 @@ type driverPlugin[TTx any] interface { // only run on an elected leader) for a River client. PluginMaintenanceServices() []startstop.Service + PluginPilot() riverpilot.Pilot + // PluginServices returns additional non-maintenance services (will run on // all clients) for a River client. PluginServices() []startstop.Service diff --git a/plugin_test.go b/plugin_test.go index b7654ba1..8cecab8d 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstop" ) @@ -101,6 +102,14 @@ func (d *TestDriverWithPlugin) PluginMaintenanceServices() []startstop.Service { return []startstop.Service{d.maintenanceService} } +func (d *TestDriverWithPlugin) PluginPilot() riverpilot.Pilot { + if !d.initCalled { + panic("expected PluginInit to be called before this function") + } + + return nil +} + func (d *TestDriverWithPlugin) PluginServices() []startstop.Service { if !d.initCalled { panic("expected PluginInit to be called before this function") diff --git a/rivershared/clientconfig/client_config.go b/rivershared/clientconfig/client_config.go new file mode 100644 index 00000000..c239eda0 --- /dev/null +++ b/rivershared/clientconfig/client_config.go @@ -0,0 +1,19 @@ +package clientconfig + +import ( + "time" + + "github.com/riverqueue/river/internal/dbunique" +) + +// InsertOpts is a mirror of river.InsertOpts usable by other internal packages. +type InsertOpts struct { + MaxAttempts int + Metadata []byte + Pending bool + Priority int + Queue string + ScheduledAt time.Time + Tags []string + UniqueOpts dbunique.UniqueOpts +} diff --git a/rivershared/go.mod b/rivershared/go.mod index 40aa228d..d25ef238 100644 --- a/rivershared/go.mod +++ b/rivershared/go.mod @@ -18,6 +18,10 @@ require ( github.com/kr/pretty v0.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/tidwall/gjson v1.17.3 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.2.5 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/rivershared/go.sum b/rivershared/go.sum index 16353bb8..819a11a5 100644 --- a/rivershared/go.sum +++ b/rivershared/go.sum @@ -1,6 +1,14 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -22,10 +30,25 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= +github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go new file mode 100644 index 00000000..a281d2fe --- /dev/null +++ b/rivershared/riverpilot/pilot.go @@ -0,0 +1,25 @@ +package riverpilot + +import ( + "context" + + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivertype" +) + +// A Pilot bridges the gap between the River client and the driver, implementing +// higher level functionality on top of the driver's underlying queries. It +// tracks closely to the underlying driver's API, but may add additional +// functionality or logic wrapping the queries. +type Pilot interface { + JobInsertMany( + ctx context.Context, + tx riverdriver.ExecutorTx, + params []*riverdriver.JobInsertFastParams, + ) ([]*riverdriver.JobInsertFastResult, error) + + JobSetStateIfRunningMany(ctx context.Context, tx riverdriver.ExecutorTx, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) + + PilotInit(archetype *baseservice.Archetype) +} diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard.go new file mode 100644 index 00000000..483792c6 --- /dev/null +++ b/rivershared/riverpilot/standard.go @@ -0,0 +1,28 @@ +package riverpilot + +import ( + "context" + + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivertype" +) + +type StandardPilot struct { +} + +func (p *StandardPilot) JobInsertMany( + ctx context.Context, + tx riverdriver.ExecutorTx, + params []*riverdriver.JobInsertFastParams, +) ([]*riverdriver.JobInsertFastResult, error) { + return tx.JobInsertFastMany(ctx, params) +} + +func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, tx riverdriver.ExecutorTx, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + return tx.JobSetStateIfRunningMany(ctx, params) +} + +func (p *StandardPilot) PilotInit(archetype *baseservice.Archetype) { + // Noop +}