Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get a basic README in place #21

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 99 additions & 114 deletions doc.go
Original file line number Diff line number Diff line change
@@ -1,169 +1,154 @@
/*
Package river is a robust high-performance job processing system for Go.
Package river is a robust high-performance job processing system for Go and
Postgres.

Because it is built using Postgres, River enables you to use the same database
for both your application data and your job queue. This simplifies operations,
but perhaps more importantly it makes it possible to enqueue jobs
transactionally with other database changes. This avoids a whole class of
distributed systems issues like jobs that execute before the database
transaction that enqueued them has even committed, or jobs that attempt to
utilize database changes which were rolled back. It also makes it possible for
your job to make database changes atomically with the job being marked as
complete.
See [homepage], [docs], and [godoc].

# Job args
Being built for Postgres, River encourages the use of the same database for
application data and job queue. By enqueueing jobs transactionally along with
other database changes, whole classes of distributed systems problems are
avoided. Jobs are guaranteed to be enqueued if their transaction commits, are
removed if their transaction rolls back, and aren't visible for work _until_
commit. See [transactional enqueueing] for more background on this philosophy.

Jobs need to be able to serialize their state to JSON so that they can round
tripped from the database and back. Each job has an args struct with JSON tags
on its properties to allow for this:
# Job args and workers

Jobs are defined in struct pairs, with an implementation of [`JobArgs`] and one
of [`Worker`].

Job args contain `json` annotations and define how jobs are serialized to and
from the database, along with a "kind", a stable string that uniquely identifies
the job.

// SortArgs are arguments for SortWorker.
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort_job" }

Args are created to enqueue a new job and are what a worker receives to work
one. Each one implements [JobArgs].Kind, which returns a unique string that's
used to recognize the job as it round trips from the database.

# Job workers
func (SortArgs) Kind() string { return "sort" }

Each job kind also has a corresponding worker struct where its core work
function is defined:
Workers expose a `Work` function that dictates how jobs run.

// SortWorker is a job worker for sorting strings.
type SortWorker struct {
river.WorkerDefaults[SortArgs]
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}

A few details to notice:

- Although not strictly necessary, workers embed [WorkerDefaults] with a
reference to their args type. This allows them to inherit defaults for the
[Worker] interface, and helps futureproof in case its ever expanded.

- Each worker implements [Worker].Work, which is where the async heavy-lifting
for a background job is done. Work implementations receive a generic like
river.Job[SortArgs] for easy access to job arguments.

# Registering workers

As a program is initially starting up, worker structs are registered so that
River can know how to work them:
Jobs are uniquely identified by their "kind" string. Workers are registered on
start up so that River knows how to assign jobs to workers:

workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})

# River client
# Starting a client

The main River client takes a [pgx] connection pool wrapped with River's Pgx v5
driver using [riverpgxv5.New] and a set of registered workers (see above). Each
queue can receive configuration like the maximum number of goroutines that'll be
used to work it:
A River [`Client`] provides an interface for job insertion and manages job
processing and [maintenance services]. A client's created with a database pool,
[driver], and config struct containing a `Workers` bundle and other settings.
Here's a client `Client` working one queue (`"default"`) with up to 100 worker
goroutines at a time:

dbConfig, err := pgxpool.ParseConfig("postgres://localhost/river")
if err != nil {
return err
}
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.DefaultQueue: {MaxWorkers: 100},
},
Workers: workers,
})

dbPool, err := pgxpool.NewWithConfig(ctx, dbConfig)
if err != nil {
return err
panic(err)
}
defer dbPool.Close()

riverClient, err := river.NewClient(&river.Config{
Driver: riverpgxv5.New(dbPool),
Queues: map[string]river.QueueConfig{
river.DefaultQueue: {MaxWorkers: 100},
},
Workers: workers,
})

// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
...
panic(err)
}

...
## Stopping

// Before program exit, try to shut down cleanly.
if err := riverClient.Shutdown(ctx); err != nil {
return err
}

For programs that'll be inserting jobs only, the Queues and Workers
configuration keys can be omitted for brevity:

riverClient, err := river.NewClient(&river.Config{
DBPool: dbPool,
})
The client should also be stopped on program shutdown:

However, if Workers is specified, the client can validate that an inserted job
has a worker that's registered with the workers bundle, so it's recommended that
Workers is configured anyway if your project is set up to easily allow it.
// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

See [Config] for details on all configuration options.
There are some complexities around ensuring clients stop cleanly, but also in a
timely manner. See [graceful shutdown] for more details on River's stop modes.

# Inserting jobs

Insert jobs by opening a transaction and calling [Client.InsertTx] with a job
args instance (a non-transactional [Client.Insert] is also available) and the
transaction wrapped with [riverpgxv5Tx]:

tx, err := dbPool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
[`Client.InsertTx`] is used in conjunction with an instance of job args to
insert a job to work on a transaction:

_, err = riverClient.InsertTx(ctx, tx, SortArgs{
Strings: []string{
"whale", "tiger", "bear",
},
Strings: []string{
"whale", "tiger", "bear",
},
}, nil)

if err != nil {
return err
panic(err)
}

if err := tx.Commit(ctx); err != nil {
return err
}
See the [`InsertAndWork` example] for complete code.

# Other features

Due to rules around transaction visibility, inserted jobs aren't visible to
workers until the transaction that inserted them is committed. This prevents
a whole host of problems like workers trying to work a job before its viable to
do so because not all its requisite data has been persisted yet.
- [Batch job insertion] for efficiently inserting many jobs at once using
Postgres `COPY FROM`.

See the InsertAndWork example for all this code in one place.
- [Cancelling jobs] from inside a work function.

# Other features
- [Error and panic handling].

- Periodic jobs that run on a predefined interval. See the PeriodicJob example
below.
- [Periodic and cron jobs].

# Verifying inserted jobs
- [Snoozing jobs] from inside a work function.

See the rivertest package for test helpers that can be used to easily verified
inserted jobs in a test suite. For example:
- [Subscriptions] to queue activity and statistics, providing easy hooks for
telemetry like logging and metrics.

job := rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, nil)
fmt.Printf("Test passed with message: %s\n", job.Args.Message)
- [Transactional job completion] to guarantee job completion commits with
other changes in a transaction.

[pgx]: https://github.com/jackc/pgx
*/
package river
- [Unique jobs] by args, period, queue, and state.

import "github.com/riverqueue/river/riverdriver/riverpgxv5"
- [Work functions] for simplified worker implementation.

// This is really dumb, but the package must be imported to make it linkable the
// Godoc above, so this is a trivial import to make sure it is.
var _ = riverpgxv5.New(nil)
See also [developing River].

[`Client`]: https://pkg.go.dev/github.com/riverqueue/river#Client
[`Client.InsertTx`]: https://pkg.go.dev/github.com/riverqueue/river#Client.InsertTx
[`InsertAndWork` example]: https://pkg.go.dev/github.com/riverqueue/river#example-package-CustomInsertOpts
[`JobArgs`]: https://pkg.go.dev/github.com/riverqueue/river#JobArgs
[`Worker`]: https://pkg.go.dev/github.com/riverqueue/river#Worker
[Batch job insertion]: https://riverqueue.com/docs/batch-job-insertion
[Cancelling jobs]: https://riverqueue.com/docs/cancelling-jobs
[Error and panic handling]: https://riverqueue.com/docs/error-handling
[Periodic and cron jobs]: https://riverqueue.com/docs/periodic-jobs
[Snoozing jobs]: https://riverqueue.com/docs/snoozing-jobs
[Subscriptions]: https://riverqueue.com/docs/subscriptions
[Transactional job completion]: https://riverqueue.com/docs/transactional-job-completion
[Unique jobs]: https://riverqueue.com/docs/unique-jobs
[Work functions]: https://riverqueue.com/docs/work-functions
[docs]: https://riverqueue.com/docs
[developing River]: https://github.com/riverqueue/river/blob/master/docs/development.md
[driver]: https://riverqueue.com/docs/database-drivers
[godoc]: https://pkg.go.dev/github.com/riverqueue/river
[graceful shutdown]: https://riverqueue.com/docs/graceful-shutdown
[homepage]: https://riverqueue.com
[maintenance services]: https://riverqueue.com/docs/maintenance-services
[transactional enqueueing]: https://riverqueue.com/docs/transactional-enqueueing
*/
package river
Loading
Loading