Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add internal "pilot" concept #627

Open
wants to merge 1 commit into
base: bg-remove-advisory-lock-unique-jobs
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ Although it comes with a number of improvements, there's nothing particularly no
### Added

- Added `JobCancel` and `JobCancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141) and [PR #152](https://github.com/riverqueue/river/pull/152).
- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Added `ClientFromContext` and `ClientFromContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Add `JobList` API for listing jobs. [PR #117](https://github.com/riverqueue/river/pull/117).
- Added `river validate` command which fails with a non-zero exit code unless all migrations are applied. [PR #170](https://github.com/riverqueue/river/pull/170).

Expand Down
18 changes: 16 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/maputil"
Expand Down Expand Up @@ -339,6 +340,7 @@ type Client[TTx any] struct {
insertNotifyLimiter *notifylimiter.Limiter
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
pilot riverpilot.Pilot
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *QueueBundle
Expand Down Expand Up @@ -505,7 +507,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
plugin, _ := driver.(driverPlugin[TTx])
if plugin != nil {
plugin.PluginInit(archetype, client)
client.pilot = plugin.PluginPilot()
}
if client.pilot == nil {
client.pilot = &riverpilot.StandardPilot{}
}
client.pilot.PilotInit(archetype)

// There are a number of internal components that are only needed/desired if
// we're actually going to be working jobs (as opposed to just enqueueing
Expand All @@ -515,7 +522,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return nil, errMissingDatabasePoolWithQueues
}

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), nil)
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), client.pilot, nil)
client.subscriptionManager = newSubscriptionManager(archetype, nil)
client.services = append(client.services, client.completer, client.subscriptionManager)

Expand Down Expand Up @@ -1409,7 +1416,7 @@ func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertM

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
results, err := tx.JobInsertFastMany(ctx, insertParams)
results, err := c.pilot.JobInsertMany(ctx, tx, insertParams)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1763,6 +1770,13 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
// client, and can be used to add new ones or remove existing ones.
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

// Driver exposes the underlying pilot used by the client.
//
// API is not stable. DO NOT USE.
func (c *Client[TTx]) Pilot() riverpilot.Pilot {
return c.pilot
}

// Queues returns the currently configured set of queues for the client, and can
// be used to add new ones.
func (c *Client[TTx]) Queues() *QueueBundle { return c.queues }
Expand Down
41 changes: 25 additions & 16 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
Expand Down Expand Up @@ -44,20 +45,12 @@ type CompleterJobUpdated struct {
JobStats *jobstats.JobStatistics
}

// PartialExecutor is always a riverdriver.Executor under normal circumstances,
// but is a minimal interface with the functions needed for completers to work
// to more easily facilitate mocking.
type PartialExecutor interface {
JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error)
}

type InlineCompleter struct {
baseservice.BaseService
startstop.BaseStartStop

disableSleep bool // disable sleep in testing
exec PartialExecutor
exec riverdriver.Executor
subscribeCh SubscribeChan

// A waitgroup is not actually needed for the inline completer because as
Expand All @@ -68,7 +61,7 @@ type InlineCompleter struct {
wg sync.WaitGroup
}

func NewInlineCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *InlineCompleter {
func NewInlineCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, subscribeCh SubscribeChan) *InlineCompleter {
return baseservice.Init(archetype, &InlineCompleter{
exec: exec,
subscribeCh: subscribeCh,
Expand Down Expand Up @@ -135,15 +128,15 @@ type AsyncCompleter struct {
concurrency int
disableSleep bool // disable sleep in testing
errGroup *errgroup.Group
exec PartialExecutor
exec riverdriver.Executor
subscribeCh SubscribeChan
}

func NewAsyncCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *AsyncCompleter {
func NewAsyncCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, subscribeCh SubscribeChan) *AsyncCompleter {
return newAsyncCompleterWithConcurrency(archetype, exec, asyncCompleterDefaultConcurrency, subscribeCh)
}

func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec PartialExecutor, concurrency int, subscribeCh SubscribeChan) *AsyncCompleter {
func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec riverdriver.Executor, concurrency int, subscribeCh SubscribeChan) *AsyncCompleter {
errGroup := &errgroup.Group{}
errGroup.SetLimit(concurrency)

Expand Down Expand Up @@ -223,7 +216,8 @@ type BatchCompleter struct {
completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation
disableSleep bool // disable sleep in testing
maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted
exec PartialExecutor
exec riverdriver.Executor
pilot riverpilot.Pilot
setStateParams map[int64]*batchCompleterSetState
setStateParamsMu sync.RWMutex
setStateStartTimes map[int64]time.Time
Expand All @@ -232,7 +226,7 @@ type BatchCompleter struct {
waitOnBacklogWaiting bool
}

func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, subscribeCh SubscribeChan) *BatchCompleter {
func NewBatchCompleter(archetype *baseservice.Archetype, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh SubscribeChan) *BatchCompleter {
const (
completionMaxSize = 5_000
maxBacklog = 20_000
Expand All @@ -242,6 +236,7 @@ func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, s
completionMaxSize: completionMaxSize,
exec: exec,
maxBacklog: maxBacklog,
pilot: pilot,
setStateParams: make(map[int64]*batchCompleterSetState),
setStateStartTimes: make(map[int64]time.Time),
subscribeCh: subscribeCh,
Expand Down Expand Up @@ -359,7 +354,21 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
}()

return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
return c.exec.JobSetStateIfRunningMany(ctx, batchParams)
tx, err := c.exec.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

rows, err := c.pilot.JobSetStateIfRunningMany(ctx, tx, batchParams)
if err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}

return rows, nil
})
}

Expand Down
Loading
Loading