Skip to content

Commit

Permalink
Support for database/sql in migrations + framework for multi-driver…
Browse files Browse the repository at this point in the history
… River

Here, add a new minimal driver called `riverdriver/riversql` that
supports Go's built-in `database/sql` package, but only for purposes of
migrations. The idea here is to fully complete #57 by providing a way of
making `rivermigrate` interoperable with Go migration frameworks that
support Go-based migrations like Goose, which provides hooks for
`*sql.Tx` [1] rather than pgx.

`riverdriver/riversql` is not a full driver and is only meant to be used
with `rivermigrate`. We document this clearly in a number of places.

To make a multi-driver world possible with River, we have to start the
work of building a platform that does more than `riverpgxv5`'s "cheat"
workaround. This works by having each driver implement specific database
operations like `MigrationGetAll`, which target their wrapped database
package of choice.

This is accomplished by having each driver bundle in its own sqlc that
targets its package. So `riverpgxv5` has an `sqlc.yaml` that targets
`pgx/v5`, while `riversql` has one that targets `database/sql`. There's
some `sqlc.yaml` duplication involved here, but luckily both drivers can
share a `river_migration.sql` file that contains all queries involved,
so you only need to change one place. `river_migration.sql` also migrates
entirely out of the main `./internal/dbsqlc`.

The idea here is that eventually `./internal/dbsqlc` will disappear
completely, usurped entirely by driver-specific versions. As this is
done, all references to `pgx` will disappear from the top-level package.
There are some complications here to figure out like `LISTEN`/`NOTIFY`
though, and I'm not clear whether `database/sql` could ever become a
fully functional driver as it might be missing some needed functionality
(e.g. subtransactions are still not supported after talking about them
for ten f*ing years [2]. However, even if it's not, the system would let
us support other fully functional packages or future major versions of
pgx (or even past ones like `pgx/v4` if there's demand).

`river/riverdriver` becomes a package as it now has types in it that
need to be referenced by driver implementations, and this would
otherwise not be possible without introducing a circular dependency.

Notably, this development branch has to use some `go.mod` `replace`
directives to demonstrate that it works correctly. If we go this
direction, we'll need to break it into chunks to release it without
them:

1. Break out changes to `river/riverdriver`. Tag and release it.

2. Break out changes to `riverdriver/river*` drivers. Have them target
   the release in (1), comment out `replace`s, then tag and release them.

3. Target the remaining River changes to the releases in (1) and (2),
   comment out `replace`s, then tag and release the top-level driver.

Unfortunately future deep incisions to drivers will require similar
gymnastics, but I don't think there's a way around it (we already have
this process except it's currently two steps instead of three). The hope
is that these will change relatively rarely, so it won't be too painful.

[1] https://github.com/pressly/goose#go-migrations
[2] golang/go#7898
  • Loading branch information
brandur committed Dec 3, 2023
1 parent 0bbcd0c commit 2e4c2d5
Show file tree
Hide file tree
Showing 31 changed files with 996 additions and 114 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ jobs:
sqlc-version: "1.22.0"

- name: Run sqlc diff
working-directory: ./internal/dbsqlc
run: |
echo "Please make sure that all sqlc changes are checked in!"
sqlc diff
make verify
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,16 @@ generate: generate/sqlc

.PHONY: generate/sqlc
generate/sqlc:
cd internal/dbsqlc && sqlc generate
cd internal/dbsqlc && sqlc generate
cd riverdriver/riversql/internal/dbsqlc && sqlc generate
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc generate

.PHONY: verify
verify:
verify: verify/sqlc

.PHONY: verify/sqlc
verify/sqlc:
cd internal/dbsqlc && sqlc diff
cd riverdriver/riversql/internal/dbsqlc && sqlc diff
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc diff
2 changes: 2 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ queries. After changing an sqlc `.sql` file, generate Go with:
```shell
git checkout master && git pull --rebase
VERSION=v0.0.x
git tag riverdriver/VERSION -m "release riverdriver/VERSION"
git tag riverdriver/riverpgxv5/$VERSION -m "release riverdriver/riverpgxv5/$VERSION"
git tag riverdriver/riversql/$VERSION -m "release riverdriver/riversql/$VERSION"
git tag $VERSION
git push --tags
```
13 changes: 10 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
module github.com/riverqueue/river

go 1.21.0
go 1.21.4

// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5
replace github.com/riverqueue/river/riverdriver => ./riverdriver

Check failure on line 5 in go.mod

View workflow job for this annotation

GitHub Actions / lint

local replacement are not allowed: github.com/riverqueue/river/riverdriver (gomoddirectives)

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5

Check failure on line 7 in go.mod

View workflow job for this annotation

GitHub Actions / lint

local replacement are not allowed: github.com/riverqueue/river/riverdriver/riverpgxv5 (gomoddirectives)

replace github.com/riverqueue/river/riverdriver/riversql => ./riverdriver/riversql

Check failure on line 9 in go.mod

View workflow job for this annotation

GitHub Actions / lint

local replacement are not allowed: github.com/riverqueue/river/riverdriver/riversql (gomoddirectives)

require (
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgx/v5 v5.5.0
github.com/jackc/puddle/v2 v2.2.1
github.com/oklog/ulid/v2 v2.1.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.10
github.com/riverqueue/river/riverdriver v0.0.0-00010101000000-000000000000
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
Expand All @@ -23,7 +28,9 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/riverqueue/river/riverdriver/riversql v0.0.0-00010101000000-000000000000 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.10 h1:t5fUWmH/uYQfepli2cMfDRjaanVfb7yXtx1ca1G3uW4=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.10/go.mod h1:k6hsPkW9Fl3qURzyLHbvxUCqWDpit0WrZ3oEaKezD3E=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
Expand Down
6 changes: 0 additions & 6 deletions internal/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions internal/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ sql:
queries:
- river_job.sql
- river_leader.sql
- river_migration.sql
schema:
- river_job.sql
- river_leader.sql
- river_migration.sql
gen:
go:
package: "dbsqlc"
Expand Down
33 changes: 33 additions & 0 deletions internal/util/dbutil/db_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/jackc/pgx/v5"

"github.com/riverqueue/river/internal/dbsqlc"
"github.com/riverqueue/river/riverdriver"
)

// Executor is an interface for a type that can begin a transaction and also
Expand Down Expand Up @@ -56,3 +57,35 @@ func WithTxV[T any](ctx context.Context, txBeginner TxBeginner, innerFunc func(c

return res, nil
}

// WithExecutorTx starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTx(ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) error) error {
_, err := WithExecutorTxV(ctx, exec, func(ctx context.Context, tx riverdriver.ExecutorTx) (struct{}, error) {
return struct{}{}, innerFunc(ctx, tx)
})
return err
}

// WithExecutorTxV starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTxV[T any](ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) (T, error)) (T, error) {
var defaultRes T

tx, err := exec.Begin(ctx)
if err != nil {
return defaultRes, fmt.Errorf("error beginning transaction: %w", err)
}
defer tx.Rollback(ctx)

res, err := innerFunc(ctx, tx)
if err != nil {
return defaultRes, err
}

if err := tx.Commit(ctx); err != nil {
return defaultRes, fmt.Errorf("error committing transaction: %w", err)
}

return res, nil
}
35 changes: 35 additions & 0 deletions internal/util/dbutil/db_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func TestWithTx(t *testing.T) {
Expand Down Expand Up @@ -40,3 +42,36 @@ func TestWithTxV(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 7, ret)
}

func TestWithExecutorTx(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

err := WithExecutorTx(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) error {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return nil
})
require.NoError(t, err)
}

func TestWithExecutorTxV(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

ret, err := WithExecutorTxV(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (int, error) {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return 7, nil
})
require.NoError(t, err)
require.Equal(t, 7, ret)
}
14 changes: 14 additions & 0 deletions riverdriver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/riverqueue/river/riverdriver

go 1.21.4

require github.com/jackc/pgx/v5 v5.5.0

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
28 changes: 28 additions & 0 deletions riverdriver/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
95 changes: 95 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@
package riverdriver

import (
"context"
"errors"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

var (
ErrNotImplemented = errors.New("driver does not implement this functionality")
ErrNoRows = errors.New("no rows found")
ErrSubTxNotSupported = errors.New("subtransactions not supported for this driver")
)

// Driver provides a database driver for use with river.Client.
//
// Its purpose is to wrap the interface of a third party database package, with
Expand All @@ -32,10 +42,95 @@ import (
type Driver[TTx any] interface {
// GetDBPool returns a database pool.This doesn't make sense in a world
// where multiple drivers are supported and is subject to change.
//
// API is not stable. DO NOT USE.
GetDBPool() *pgxpool.Pool

// GetExecutor gets an executor for the driver.
//
// API is not stable. DO NOT USE.
GetExecutor() Executor

// UnwrapExecutor gets unwraps executor from a driver transaction.
//
// API is not stable. DO NOT USE.
UnwrapExecutor(tx TTx) Executor

// UnwrapTx turns a generically typed transaction into a pgx.Tx for use with
// internal infrastructure. This doesn't make sense in a world where
// multiple drivers are supported and is subject to change.
//
// API is not stable. DO NOT USE.
UnwrapTx(tx TTx) pgx.Tx
}

// Executor provides River operations against a database. It may be a database
// pool or transaction.
type Executor interface {
// Begin begins a new subtransaction. ErrSubTxNotSupported may be returned
// if the executor is a transaction and the driver doesn't support
// subtransactions (like riverdriver/riversql for database/sql).
//
// API is not stable. DO NOT USE.
Begin(ctx context.Context) (ExecutorTx, error)

// Exec executes raw SQL. Used for migrations.
//
// API is not stable. DO NOT USE.
Exec(ctx context.Context, sql string) (struct{}, error)

// MigrationDeleteByVersionMany deletes many migration versions.
//
// API is not stable. DO NOT USE.
MigrationDeleteByVersionMany(ctx context.Context, versions []int) ([]*Migration, error)

// MigrationGetAll gets all currently applied migrations.
//
// API is not stable. DO NOT USE.
MigrationGetAll(ctx context.Context) ([]*Migration, error)

// MigrationInsertMany inserts many migration versions.
//
// API is not stable. DO NOT USE.
MigrationInsertMany(ctx context.Context, versions []int) ([]*Migration, error)

// TableExists checks whether a table exists for the schema in the current
// search schema.
//
// API is not stable. DO NOT USE.
TableExists(ctx context.Context, tableName string) (bool, error)
}

// ExecutorTx is an executor which is a transaction. In addition to standard
// Executor operations, it may be commited or rolled back.
type ExecutorTx interface {
Executor

// Commit commits the transaction.
//
// API is not stable. DO NOT USE.
Commit(ctx context.Context) error

// Rollback rolls back the transaction.
//
// API is not stable. DO NOT USE.
Rollback(ctx context.Context) error
}

// Migration represents a River migration.
type Migration struct {
// ID is an automatically generated primary key for the migration.
//
// API is not stable. DO NOT USE.
ID int

// CreatedAt is when the migration was initially created.
//
// API is not stable. DO NOT USE.
CreatedAt time.Time

// Version is the version of the migration.
//
// API is not stable. DO NOT USE.
Version int
}
9 changes: 0 additions & 9 deletions riverdriver/river_driver_interface_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1 @@
package riverdriver

import (
"github.com/jackc/pgx/v5"

"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

// Verify interface compliance.
var _ Driver[pgx.Tx] = &riverpgxv5.Driver{}
5 changes: 4 additions & 1 deletion riverdriver/riverpgxv5/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
module github.com/riverqueue/river/riverdriver/riverpgxv5

go 1.21.0
go 1.21.4

replace github.com/riverqueue/river/riverdriver => ../

require (
github.com/jackc/pgx/v5 v5.5.0
github.com/riverqueue/river/riverdriver v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.1
)

Expand Down
Loading

0 comments on commit 2e4c2d5

Please sign in to comment.