Skip to content

Commit

Permalink
BatchCompleter: batch all ops, not just completed
Browse files Browse the repository at this point in the history
This adds a `JobSetStateIfRunningMany` query and corresponding driver
API, with implementations for both pgxv5 and `database/sql`.

The `BatchCompleter` was updated to use this new query and to batch
_all_ operations, not only those moving to a `complete` state. This
means the `AsyncCompleter` (as well as the `InlineCompleter`) are both
now unused and could be deleted, along with their underlying queries.

The intention of this is not just to facilitate improved performance
even on snoozes, retries, errors, cancellations, etc., but also to get
down to a single path for completions (similar to now having a single
path for insertions).
  • Loading branch information
bgentry committed Sep 25, 2024
1 parent e9b751f commit 2eb03e2
Show file tree
Hide file tree
Showing 12 changed files with 776 additions and 70 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- The `BatchCompleter` that marks jobs as completed can now batch database updates for _all_ states of jobs that have finished execution. Prior to this change, only `completed` jobs were batched into a single `UPDATE` call, while jobs moving to any other state used a single `UPDATE` per job. This change should significantly reduce database and pool contention on high volume system when jobs get retried, snoozed, cancelled, or discarded following execution. [PR #617](https://github.com/riverqueue/river/pull/617).

## [0.12.0] - 2024-09-23

⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version:
Expand Down
106 changes: 55 additions & 51 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type CompleterJobUpdated struct {
// but is a minimal interface with the functions needed for completers to work
// to more easily facilitate mocking.
type PartialExecutor interface {
JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error)
}

Expand Down Expand Up @@ -220,13 +220,13 @@ type BatchCompleter struct {
baseservice.BaseService
startstop.BaseStartStop

asyncCompleter *AsyncCompleter // used for non-complete completions
completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation
disableSleep bool // disable sleep in testing
maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted
completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation
disableSleep bool // disable sleep in testing
maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted
exec PartialExecutor
setStateParams map[int64]*batchCompleterSetState
setStateParamsMu sync.RWMutex
setStateStartTimes map[int64]time.Time
subscribeCh SubscribeChan
waitOnBacklogChan chan struct{}
waitOnBacklogWaiting bool
Expand All @@ -239,18 +239,17 @@ func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, s
)

return baseservice.Init(archetype, &BatchCompleter{
asyncCompleter: NewAsyncCompleter(archetype, exec, subscribeCh),
completionMaxSize: completionMaxSize,
exec: exec,
maxBacklog: maxBacklog,
setStateParams: make(map[int64]*batchCompleterSetState),
subscribeCh: subscribeCh,
completionMaxSize: completionMaxSize,
exec: exec,
maxBacklog: maxBacklog,
setStateParams: make(map[int64]*batchCompleterSetState),
setStateStartTimes: make(map[int64]time.Time),
subscribeCh: subscribeCh,
})
}

func (c *BatchCompleter) ResetSubscribeChan(subscribeCh SubscribeChan) {
c.subscribeCh = subscribeCh
c.asyncCompleter.subscribeCh = subscribeCh
}

func (c *BatchCompleter) Start(ctx context.Context) error {
Expand All @@ -263,13 +262,10 @@ func (c *BatchCompleter) Start(ctx context.Context) error {
panic("subscribeCh must be non-nil")
}

if err := c.asyncCompleter.Start(ctx); err != nil {
return err
}

go func() {
started()
defer stopped() // this defer should come first so it's first out
defer close(c.subscribeCh)

c.Logger.DebugContext(ctx, c.Name+": Run loop started")
defer c.Logger.DebugContext(ctx, c.Name+": Run loop stopped")
Expand Down Expand Up @@ -327,17 +323,22 @@ func (c *BatchCompleter) Start(ctx context.Context) error {
}

func (c *BatchCompleter) handleBatch(ctx context.Context) error {
var setStateBatch map[int64]*batchCompleterSetState
var (
setStateBatch map[int64]*batchCompleterSetState
setStateStartTimes map[int64]time.Time
)
func() {
c.setStateParamsMu.Lock()
defer c.setStateParamsMu.Unlock()

setStateBatch = c.setStateParams
setStateStartTimes = c.setStateStartTimes

// Don't bother resetting the map if there's nothing to process,
// allowing the completer to idle efficiently.
if len(setStateBatch) > 0 {
c.setStateParams = make(map[int64]*batchCompleterSetState)
c.setStateStartTimes = make(map[int64]time.Time)
} else {
// Set nil to avoid a data race below in case the map is set as a
// new job comes in.
Expand All @@ -351,34 +352,39 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {

// Complete a sub-batch with retries. Also helps reduce visual noise and
// increase readability of loop below.
completeSubBatch := func(batchID []int64, batchFinalizedAt []time.Time) ([]*rivertype.JobRow, error) {
completeSubBatch := func(batchParams *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
start := time.Now()
defer func() {
c.Logger.DebugContext(ctx, c.Name+": Completed sub-batch of job(s)", "duration", time.Since(start), "num_jobs", len(batchID))
c.Logger.DebugContext(ctx, c.Name+": Completed sub-batch of job(s)", "duration", time.Since(start), "num_jobs", len(batchParams.ID))
}()

return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
return c.exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{
ID: batchID,
FinalizedAt: batchFinalizedAt,
})
return c.exec.JobSetStateIfRunningMany(ctx, batchParams)
})
}

// This could be written more simply using multiple `sliceutil.Map`s, but
// it's done this way to allocate as few new slices as necessary.
mapIDsAndFinalizedAt := func(setStateBatch map[int64]*batchCompleterSetState) ([]int64, []time.Time) {
var (
batchIDs = make([]int64, len(setStateBatch))
batchFinalizedAt = make([]time.Time, len(setStateBatch))
i int
)
mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams {
params := &riverdriver.JobSetStateIfRunningManyParams{
ID: make([]int64, len(setStateBatch)),
ErrData: make([][]byte, len(setStateBatch)),
FinalizedAt: make([]*time.Time, len(setStateBatch)),
MaxAttempts: make([]*int, len(setStateBatch)),
ScheduledAt: make([]*time.Time, len(setStateBatch)),
State: make([]rivertype.JobState, len(setStateBatch)),
}
var i int
for _, setState := range setStateBatch {
batchIDs[i] = setState.Params.ID
batchFinalizedAt[i] = *setState.Params.FinalizedAt
params.ID[i] = setState.Params.ID
params.ErrData[i] = setState.Params.ErrData
params.FinalizedAt[i] = setState.Params.FinalizedAt
params.MaxAttempts[i] = setState.Params.MaxAttempts
params.ScheduledAt[i] = setState.Params.ScheduledAt
params.State[i] = setState.Params.State
i++
}
return batchIDs, batchFinalizedAt
return params
}

// Tease apart enormous batches into sub-batches.
Expand All @@ -387,31 +393,40 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
// doesn't allocate any additional memory in case the entire batch is
// smaller than the sub-batch maximum size (which will be the common case).
var (
batchID, batchFinalizedAt = mapIDsAndFinalizedAt(setStateBatch)
jobRows []*rivertype.JobRow
params = mapBatch(setStateBatch)
jobRows []*rivertype.JobRow
)
c.Logger.DebugContext(ctx, c.Name+": Completing batch of job(s)", "num_jobs", len(setStateBatch))
if len(setStateBatch) > c.completionMaxSize {
jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch))
for i := 0; i < len(setStateBatch); i += c.completionMaxSize {
endIndex := min(i+c.completionMaxSize, len(batchID)) // beginning of next sub-batch or end of slice
jobRowsSubBatch, err := completeSubBatch(batchID[i:endIndex], batchFinalizedAt[i:endIndex])
endIndex := min(i+c.completionMaxSize, len(params.ID)) // beginning of next sub-batch or end of slice
subBatch := &riverdriver.JobSetStateIfRunningManyParams{
ID: params.ID[i:endIndex],
ErrData: params.ErrData[i:endIndex],
FinalizedAt: params.FinalizedAt[i:endIndex],
MaxAttempts: params.MaxAttempts[i:endIndex],
ScheduledAt: params.ScheduledAt[i:endIndex],
State: params.State[i:endIndex],
}
jobRowsSubBatch, err := completeSubBatch(subBatch)
if err != nil {
return err
}
jobRows = append(jobRows, jobRowsSubBatch...)
}
} else {
var err error
jobRows, err = completeSubBatch(batchID, batchFinalizedAt)
jobRows, err = completeSubBatch(params)
if err != nil {
return err
}
}

events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated {
setState := setStateBatch[jobRow.ID]
setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(*setState.Params.FinalizedAt)
startTime := setStateStartTimes[jobRow.ID]
setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(startTime)
return CompleterJobUpdated{Job: jobRow, JobStats: setState.Stats}
})

Expand All @@ -432,13 +447,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
}

func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error {
// Send completions other than setting to `complete` to an async completer.
// We consider this okay because these are expected to be much more rare, so
// only optimizing `complete` will yield huge speed gains.
if params.State != rivertype.JobStateCompleted {
return c.asyncCompleter.JobSetStateIfRunning(ctx, stats, params)
}

now := c.Time.NowUTC()
// If we've built up too much of a backlog because the completer's fallen
// behind, block completions until the complete loop's had a chance to catch
// up.
Expand All @@ -448,16 +457,11 @@ func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta
defer c.setStateParamsMu.Unlock()

c.setStateParams[params.ID] = &batchCompleterSetState{params, stats}
c.setStateStartTimes[params.ID] = now

return nil
}

func (c *BatchCompleter) Stop() {
c.BaseStartStop.Stop()
c.asyncCompleter.Stop()
// subscribeCh already closed by asyncCompleter.Stop ^
}

func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) {
c.setStateParamsMu.RLock()
var (
Expand Down
40 changes: 21 additions & 19 deletions internal/jobcompleter/job_completer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ import (
)

type partialExecutorMock struct {
JobSetCompleteIfRunningManyCalled bool
JobSetCompleteIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunningCalled bool
JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error)
mu sync.Mutex
JobSetStateIfRunningManyCalled bool
JobSetStateIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunningCalled bool
JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error)
mu sync.Mutex
}

// NewPartialExecutorMock returns a new mock with all mock functions set to call
// down into the given real executor.
func NewPartialExecutorMock(exec riverdriver.Executor) *partialExecutorMock {
return &partialExecutorMock{
JobSetCompleteIfRunningManyFunc: exec.JobSetCompleteIfRunningMany,
JobSetStateIfRunningFunc: exec.JobSetStateIfRunning,
JobSetStateIfRunningManyFunc: exec.JobSetStateIfRunningMany,
JobSetStateIfRunningFunc: exec.JobSetStateIfRunning,
}
}

func (m *partialExecutorMock) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) {
m.setCalled(func() { m.JobSetCompleteIfRunningManyCalled = true })
return m.JobSetCompleteIfRunningManyFunc(ctx, params)
func (m *partialExecutorMock) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
m.setCalled(func() { m.JobSetStateIfRunningManyCalled = true })
return m.JobSetStateIfRunningManyFunc(ctx, params)
}

func (m *partialExecutorMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) {
Expand Down Expand Up @@ -325,7 +325,8 @@ func TestAsyncCompleter(t *testing.T) {
return NewAsyncCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh)
},
func(completer *AsyncCompleter) { completer.disableSleep = true },
func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec })
func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec },
)
}

func TestBatchCompleter(t *testing.T) {
Expand All @@ -336,7 +337,8 @@ func TestBatchCompleter(t *testing.T) {
return NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh)
},
func(completer *BatchCompleter) { completer.disableSleep = true },
func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec })
func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec },
)

ctx := context.Background()

Expand Down Expand Up @@ -728,11 +730,11 @@ func testCompleter[TCompleter JobCompleter](
}

execMock := NewPartialExecutorMock(bundle.exec)
execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) {
execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
if err := maybeError(); err != nil {
return nil, err
}
return bundle.exec.JobSetCompleteIfRunningMany(ctx, params)
return bundle.exec.JobSetStateIfRunningMany(ctx, params)
}
execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) {
if err := maybeError(); err != nil {
Expand All @@ -751,7 +753,7 @@ func testCompleter[TCompleter JobCompleter](
// Make sure our mocks were really called. The specific function called
// will depend on the completer under test, so okay as long as one or
// the other was.
require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled)
require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled)

// Job still managed to complete despite the errors.
requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted)
Expand All @@ -767,7 +769,7 @@ func testCompleter[TCompleter JobCompleter](
disableSleep(completer)

execMock := NewPartialExecutorMock(bundle.exec)
execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) {
execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
return nil, context.Canceled
}
execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) {
Expand All @@ -788,7 +790,7 @@ func testCompleter[TCompleter JobCompleter](
// Make sure our mocks were really called. The specific function called
// will depend on the completer under test, so okay as long as one or
// the other was.
require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled)
require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled)

// Job is still running because the completer is forced to give up
// immediately on certain types of errors like where a pool is closed.
Expand All @@ -805,7 +807,7 @@ func testCompleter[TCompleter JobCompleter](
disableSleep(completer)

execMock := NewPartialExecutorMock(bundle.exec)
execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) {
execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
return nil, puddle.ErrClosedPool
}
execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) {
Expand All @@ -826,7 +828,7 @@ func testCompleter[TCompleter JobCompleter](
// Make sure our mocks were really called. The specific function called
// will depend on the completer under test, so okay as long as one or
// the other was.
require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled)
require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled)

// Job is still running because the completer is forced to give up
// immediately on certain types of errors like where a pool is closed.
Expand Down
Loading

0 comments on commit 2eb03e2

Please sign in to comment.