Skip to content

Commit

Permalink
Support for database/sql in migrations + framework for multi-driver…
Browse files Browse the repository at this point in the history
… River (#98)

Here, add a new minimal driver called `riverdriver/riversql` that
supports Go's built-in `database/sql` package, but only for purposes of
migrations. The idea here is to fully complete #57 by providing a way of
making `rivermigrate` interoperable with Go migration frameworks that
support Go-based migrations like Goose, which provides hooks for
`*sql.Tx` [1] rather than pgx.

`riverdriver/riversql` is not a full driver and is only meant to be used
with `rivermigrate`. We document this clearly in a number of places.

To make a multi-driver world possible with River, we have to start the
work of building a platform that does more than `riverpgxv5`'s "cheat"
workaround. This works by having each driver implement specific database
operations like `MigrationGetAll`, which target their wrapped database
package of choice.

This is accomplished by having each driver bundle in its own sqlc that
targets its package. So `riverpgxv5` has an `sqlc.yaml` that targets
`pgx/v5`, while `riversql` has one that targets `database/sql`. There's
some `sqlc.yaml` duplication involved here, but luckily both drivers can
share a `river_migration.sql` file that contains all queries involved,
so you only need to change one place. `river_migration.sql` also migrates
entirely out of the main `./internal/dbsqlc`.

The idea here is that eventually `./internal/dbsqlc` will disappear
completely, usurped entirely by driver-specific versions. As this is
done, all references to `pgx` will disappear from the top-level package.
There are some complications here to figure out like `LISTEN`/`NOTIFY`
though, and I'm not clear whether `database/sql` could ever become a
fully functional driver as it might be missing some needed functionality
(e.g. subtransactions are still not supported after talking about them
for ten f*ing years [2]. However, even if it's not, the system would let
us support other fully functional packages or future major versions of
pgx (or even past ones like `pgx/v4` if there's demand).

`river/riverdriver` becomes a package as it now has types in it that
need to be referenced by driver implementations, and this would
otherwise not be possible without introducing a circular dependency.

[1] https://github.com/pressly/goose#go-migrations
[2] golang/go#7898
  • Loading branch information
brandur authored Dec 13, 2023
1 parent 1468298 commit 9852b52
Show file tree
Hide file tree
Showing 36 changed files with 1,162 additions and 148 deletions.
44 changes: 37 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ jobs:
env:
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_testdb?sslmode=disable

- name: Test riverpgxv5
- name: Test riverdriver
working-directory: ./riverdriver
run: go test -race ./...

- name: Test riverdriver/riverdatabasesql
working-directory: ./riverdriver/riverdatabasesql
run: go test -race ./...

- name: Test riverdriver/riverpgxv5
working-directory: ./riverdriver/riverpgxv5
run: go test -race ./...

Expand Down Expand Up @@ -117,10 +125,13 @@ jobs:
golangci:
name: lint
runs-on: ubuntu-latest
env:
GOLANGCI_LINT_VERSION: v1.55.2
permissions:
contents: read
# allow read access to pull request. Use with `only-new-issues` option.
pull-requests: read

steps:
- uses: actions/setup-go@v4
with:
Expand All @@ -130,13 +141,33 @@ jobs:
- name: Checkout
uses: actions/checkout@v3

- name: golangci-lint
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
# Optional: show only new issues if it's a pull request. The default value is `false`.
only-new-issues: true
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: .

version: v1.55.2
- name: Lint riverdriver
uses: golangci/golangci-lint-action@v3
with:
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: ./riverdriver

- name: Lint riverdriver/riverdatabasesql
uses: golangci/golangci-lint-action@v3
with:
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: ./riverdriver/riverdatabasesql

- name: Lint riverdriver/riverpgxv5
uses: golangci/golangci-lint-action@v3
with:
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: ./riverdriver/riverpgxv5

producer_sample:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -204,7 +235,6 @@ jobs:
sqlc-version: "1.22.0"

- name: Run sqlc diff
working-directory: ./internal/dbsqlc
run: |
echo "Please make sure that all sqlc changes are checked in!"
sqlc diff
make verify
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ linters-settings:
- Default
- Prefix(github.com/riverqueue)

gomoddirectives:
replace-local: true

gosec:
excludes:
- G404 # use of non-crypto random; overly broad for our use case
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `riverdriver/riverdatabasesql` driver to enable River Go migrations through Go's built in `database/sql` package. [PR #98](https://github.com/riverqueue/river/pull/98).

### Changed

- Errored jobs that have a very short duration before their next retry (<5 seconds) are set to `available` immediately instead of being made `scheduled` and having to wait for the scheduler to make a pass to make them workable. [PR #105](https://github.com/riverqueue/river/pull/105).
- `riverdriver` becomes its own submodule. It contains types that `riverdriver/riverdatabasesql` and `riverdriver/riverpgxv5` need to reference. [PR #98](https://github.com/riverqueue/river/pull/98).

## [0.0.12] - 2023-12-02

Expand Down
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,16 @@ generate: generate/sqlc

.PHONY: generate/sqlc
generate/sqlc:
cd internal/dbsqlc && sqlc generate
cd internal/dbsqlc && sqlc generate
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc generate
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc generate

.PHONY: verify
verify:
verify: verify/sqlc

.PHONY: verify/sqlc
verify/sqlc:
cd internal/dbsqlc && sqlc diff
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc diff
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc diff
2 changes: 2 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ queries. After changing an sqlc `.sql` file, generate Go with:
```shell
git checkout master && git pull --rebase
VERSION=v0.0.x
git tag riverdriver/VERSION -m "release riverdriver/VERSION"
git tag riverdriver/riverpgxv5/$VERSION -m "release riverdriver/riverpgxv5/$VERSION"
git tag riverdriver/riverdatabasesql/$VERSION -m "release riverdriver/riverdatabasesql/$VERSION"
git tag $VERSION
git push --tags
```
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
module github.com/riverqueue/river

go 1.21.0
go 1.21.4

// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5
replace github.com/riverqueue/river/riverdriver => ./riverdriver

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5

replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ./riverdriver/riverdatabasesql

require (
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgx/v5 v5.5.1
github.com/jackc/puddle/v2 v2.2.1
github.com/oklog/ulid/v2 v2.1.0
github.com/riverqueue/river/riverdriver v0.0.0-00010101000000-000000000000
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.0-00010101000000-000000000000
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
Expand All @@ -23,6 +29,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.15.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12 h1:mcDBnqwzEXY9WDOwbkd8xmFdSr/H6oHb1F3NCNCmLDY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12/go.mod h1:k6hsPkW9Fl3qURzyLHbvxUCqWDpit0WrZ3oEaKezD3E=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
Expand Down
6 changes: 0 additions & 6 deletions internal/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions internal/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ sql:
queries:
- river_job.sql
- river_leader.sql
- river_migration.sql
schema:
- river_job.sql
- river_leader.sql
- river_migration.sql
gen:
go:
package: "dbsqlc"
Expand Down
29 changes: 25 additions & 4 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"log"
"log/slog"
"net/url"
"os"
"runtime"
"sync"
Expand Down Expand Up @@ -62,19 +63,39 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {
}

func DatabaseConfig(databaseName string) *pgxpool.Config {
databaseURL := valutil.ValOrDefault(os.Getenv("TEST_DATABASE_URL"), "postgres:///river_testdb?sslmode=disable")

config, err := pgxpool.ParseConfig(databaseURL)
config, err := pgxpool.ParseConfig(DatabaseURL(databaseName))
if err != nil {
panic(fmt.Sprintf("error parsing database URL: %v", err))
}
config.MaxConns = dbPoolMaxConns
config.ConnConfig.ConnectTimeout = 10 * time.Second
config.ConnConfig.Database = databaseName
config.ConnConfig.RuntimeParams["timezone"] = "UTC"
return config
}

// DatabaseURL gets a test database URL from TEST_DATABASE_URL or falls back on
// a default pointing to `river_testdb`. If databaseName is set, it replaces the
// database in the URL, although the host and other parameters are preserved.
//
// Most of the time DatabaseConfig should be used instead of this function, but
// it may be useful in non-pgx situations like for examples showing the use of
// `database/sql`.
func DatabaseURL(databaseName string) string {
u, err := url.Parse(valutil.ValOrDefault(

Check failure on line 84 in internal/riverinternaltest/riverinternaltest.go

View workflow job for this annotation

GitHub Actions / lint

variable name 'u' is too short for the scope of its usage (varnamelen)
os.Getenv("TEST_DATABASE_URL"),
"postgres://localhost/river_testdb?sslmode=disable"),
)
if err != nil {
panic(err)
}

if databaseName != "" {
u.Path = databaseName
}

return u.String()
}

// DiscardContinuously drains continuously out of the given channel and discards
// anything that comes out of it. Returns a stop function that should be invoked
// to stop draining. Stop must be invoked before tests finish to stop an
Expand Down
33 changes: 33 additions & 0 deletions internal/util/dbutil/db_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/jackc/pgx/v5"

"github.com/riverqueue/river/internal/dbsqlc"
"github.com/riverqueue/river/riverdriver"
)

// Executor is an interface for a type that can begin a transaction and also
Expand Down Expand Up @@ -56,3 +57,35 @@ func WithTxV[T any](ctx context.Context, txBeginner TxBeginner, innerFunc func(c

return res, nil
}

// WithExecutorTx starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTx(ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) error) error {
_, err := WithExecutorTxV(ctx, exec, func(ctx context.Context, tx riverdriver.ExecutorTx) (struct{}, error) {
return struct{}{}, innerFunc(ctx, tx)
})
return err
}

// WithExecutorTxV starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTxV[T any](ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) (T, error)) (T, error) {
var defaultRes T

tx, err := exec.Begin(ctx)
if err != nil {
return defaultRes, fmt.Errorf("error beginning transaction: %w", err)
}
defer tx.Rollback(ctx)

res, err := innerFunc(ctx, tx)
if err != nil {
return defaultRes, err
}

if err := tx.Commit(ctx); err != nil {
return defaultRes, fmt.Errorf("error committing transaction: %w", err)
}

return res, nil
}
35 changes: 35 additions & 0 deletions internal/util/dbutil/db_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func TestWithTx(t *testing.T) {
Expand Down Expand Up @@ -40,3 +42,36 @@ func TestWithTxV(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 7, ret)
}

func TestWithExecutorTx(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

err := WithExecutorTx(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) error {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return nil
})
require.NoError(t, err)
}

func TestWithExecutorTxV(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

ret, err := WithExecutorTxV(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (int, error) {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return 7, nil
})
require.NoError(t, err)
require.Equal(t, 7, ret)
}
14 changes: 14 additions & 0 deletions riverdriver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/riverqueue/river/riverdriver

go 1.21.4

require github.com/jackc/pgx/v5 v5.5.0

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
28 changes: 28 additions & 0 deletions riverdriver/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit 9852b52

Please sign in to comment.