Skip to content

Commit

Permalink
proof out a "pilot" layer between Client and Driver
Browse files Browse the repository at this point in the history
This builds out a new higher level layer to sit between the Client and
Driver, focused more on higher level operations than individual database
queries. This allows those operations to be implemented with a
combination of multiple database queries in a transaction, rather than
just a single one.

Naming here was tough, I really didn't like any of the options I came up
with and ultimately landed on "pilot", but I'm pretty indifferent to it
if a truly good one can be found.

For now, the only operations implemented in the pilot are `InsertMany`
and `JobSetStateIfRunningMany` (from the completer). I also made some
changes within River to use these at the correct places and to propagate
the pilot as needed. `JobCompleteTx` was updated to pull a `Client` out
of the context in order to access this and use the same underlying query
(via the pilot) as the `BatchCompleter`.
  • Loading branch information
bgentry committed Sep 30, 2024
1 parent 2b5eff2 commit daac06a
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 57 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ Although it comes with a number of improvements, there's nothing particularly no
### Added
- Added `JobCancel` and `JobCancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141) and [PR #152](https://github.com/riverqueue/river/pull/152).
- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Added `ClientFromContext` and `ClientFromContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Add `JobList` API for listing jobs. [PR #117](https://github.com/riverqueue/river/pull/117).
- Added `river validate` command which fails with a non-zero exit code unless all migrations are applied. [PR #170](https://github.com/riverqueue/river/pull/170).

Expand Down
18 changes: 16 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/maputil"
Expand Down Expand Up @@ -339,6 +340,7 @@ type Client[TTx any] struct {
insertNotifyLimiter *notifylimiter.Limiter
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
pilot riverpilot.Pilot
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *QueueBundle
Expand Down Expand Up @@ -505,7 +507,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
plugin, _ := driver.(driverPlugin[TTx])
if plugin != nil {
plugin.PluginInit(archetype, client)
client.pilot = plugin.PluginPilot()
}
if client.pilot == nil {
client.pilot = &riverpilot.StandardPilot{}
}
client.pilot.PilotInit(archetype)

// There are a number of internal components that are only needed/desired if
// we're actually going to be working jobs (as opposed to just enqueueing
Expand All @@ -515,7 +522,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return nil, errMissingDatabasePoolWithQueues
}

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), nil)
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), client.pilot, nil)
client.subscriptionManager = newSubscriptionManager(archetype, nil)
client.services = append(client.services, client.completer, client.subscriptionManager)

Expand Down Expand Up @@ -1409,7 +1416,7 @@ func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertM

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
results, err := tx.JobInsertFastMany(ctx, insertParams)
results, err := c.pilot.JobInsertMany(ctx, tx, insertParams)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1763,6 +1770,13 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
// client, and can be used to add new ones or remove existing ones.
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

// Driver exposes the underlying pilot used by the client.
//
// API is not stable. DO NOT USE.
func (c *Client[TTx]) Pilot() riverpilot.Pilot {
return c.pilot
}

// Queues returns the currently configured set of queues for the client, and can
// be used to add new ones.
func (c *Client[TTx]) Queues() *QueueBundle { return c.queues }
Expand Down
41 changes: 25 additions & 16 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
Expand Down Expand Up @@ -44,20 +45,12 @@ type CompleterJobUpdated struct {
JobStats *jobstats.JobStatistics
}

// PartialExecutor is always a riverdriver.Executor under normal circumstances,
// but is a minimal interface with the functions needed for completers to work
// to more easily facilitate mocking.
type PartialExecutor interface {
JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error)
}

type InlineCompleter struct {
baseservice.BaseService
startstop.BaseStartStop

disableSleep bool // disable sleep in testing
exec PartialExecutor
exec riverdriver.Executor
subscribeCh SubscribeChan

// A waitgroup is not actually needed for the inline completer because as
Expand All @@ -68,7 +61,7 @@ type InlineCompleter struct {
wg sync.WaitGroup
}

func NewInlineCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *InlineCompleter {
func NewInlineCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, subscribeCh SubscribeChan) *InlineCompleter {
return baseservice.Init(archetype, &InlineCompleter{
exec: exec,
subscribeCh: subscribeCh,
Expand Down Expand Up @@ -135,15 +128,15 @@ type AsyncCompleter struct {
concurrency int
disableSleep bool // disable sleep in testing
errGroup *errgroup.Group
exec PartialExecutor
exec riverdriver.Executor
subscribeCh SubscribeChan
}

func NewAsyncCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *AsyncCompleter {
func NewAsyncCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, subscribeCh SubscribeChan) *AsyncCompleter {
return newAsyncCompleterWithConcurrency(archetype, exec, asyncCompleterDefaultConcurrency, subscribeCh)
}

func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec PartialExecutor, concurrency int, subscribeCh SubscribeChan) *AsyncCompleter {
func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec riverdriver.Executor, concurrency int, subscribeCh SubscribeChan) *AsyncCompleter {
errGroup := &errgroup.Group{}
errGroup.SetLimit(concurrency)

Expand Down Expand Up @@ -223,7 +216,8 @@ type BatchCompleter struct {
completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation
disableSleep bool // disable sleep in testing
maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted
exec PartialExecutor
exec riverdriver.Executor
pilot riverpilot.Pilot
setStateParams map[int64]*batchCompleterSetState
setStateParamsMu sync.RWMutex
setStateStartTimes map[int64]time.Time
Expand All @@ -232,7 +226,7 @@ type BatchCompleter struct {
waitOnBacklogWaiting bool
}

func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *BatchCompleter {
func NewBatchCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh SubscribeChan) *BatchCompleter {
const (
completionMaxSize = 5_000
maxBacklog = 20_000
Expand All @@ -242,6 +236,7 @@ func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, s
completionMaxSize: completionMaxSize,
exec: exec,
maxBacklog: maxBacklog,
pilot: pilot,
setStateParams: make(map[int64]*batchCompleterSetState),
setStateStartTimes: make(map[int64]time.Time),
subscribeCh: subscribeCh,
Expand Down Expand Up @@ -359,7 +354,21 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
}()

return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
return c.exec.JobSetStateIfRunningMany(ctx, batchParams)
tx, err := c.exec.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

rows, err := c.pilot.JobSetStateIfRunningMany(ctx, tx, batchParams)
if err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}

return rows, nil
})
}

Expand Down
Loading

0 comments on commit daac06a

Please sign in to comment.