Skip to content

Commit

Permalink
Drop client Schema configuration (#18)
Browse files Browse the repository at this point in the history
As discussed in #15, the `Schema` configuration parameter doesn't
actually do anything, and we realized that it's possible to support this
feature by using an alternate configuration for pgxpool in `search_path`.

Here, drop `Schema` and add a test that verifies that this technique
works. So as to not have to create and migrate an alternate schema, we
cheat a little bit by repointing the schema and then just verifying that
we can't find a `river_job` table there.

I'm finding the test hierarchy in `client_test.go` to be pretty
confusing in determining on what best practice is for what should go
where, so I'm starting out with a new `Test_Client` which I'm hoping we
can start using as a base for most of the general tests in here in a
more reusable way. I leverage `JobArgsReflectKind` and `WorkFunc` in its
subtests so that it's not dependent on our various global job args that
are defined all over the place. Instead, args and the work routine are
colocated right inside the test.

Configuring an alternate schema is definitely going to need its own
documentation page somewhere because it's not obvious, but that'll come
separately.

Fixes #15.
  • Loading branch information
brandur authored Nov 13, 2023
1 parent 714e59c commit 58a44c3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 22 deletions.
6 changes: 0 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ const (
DefaultMaxAttempts = rivercommon.DefaultMaxAttempts
DefaultQueue = rivercommon.DefaultQueue
DefaultPriority = rivercommon.DefaultPriority
DefaultSchema = "public"
)

// Config is the configuration for a Client.
Expand Down Expand Up @@ -155,10 +154,6 @@ type Config struct {
// Defaults to DefaultRetryPolicy.
RetryPolicy ClientRetryPolicy

// Schema is the name of the database schema to use for this Client. If
// unspecified, it defaults to DefaultSchema.
Schema string

// Workers is a bundle of registered job workers.
//
// This field may be omitted for a program that's only enqueueing jobs
Expand Down Expand Up @@ -391,7 +386,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
ReindexerSchedule: config.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
Schema: valutil.ValOrDefault(config.Schema, DefaultSchema),
Workers: config.Workers,
disableSleep: config.disableSleep,
}
Expand Down
98 changes: 83 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/internal/rivertest"
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/internal/util/valutil"
Expand Down Expand Up @@ -141,7 +142,7 @@ func newTestClient(ctx context.Context, t *testing.T, config *Config) *Client[pg
return client
}

func runClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
func startClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
t.Helper()

if err := client.Start(ctx); err != nil {
Expand All @@ -158,10 +159,80 @@ func runClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
func runNewTestClient(ctx context.Context, t *testing.T, config *Config) *Client[pgx.Tx] {
t.Helper()
client := newTestClient(ctx, t, config)
runClient(ctx, t, client)
startClient(ctx, t, client)
return client
}

func Test_Client(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct{}

setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

config := newTestConfig(t, nil)
client := newTestClient(ctx, t, config)

return client, &testBundle{}
}

t.Run("StartInsertAndWork", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

workedChan := make(chan struct{})

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
workedChan <- struct{}{}
return nil
}))

startClient(ctx, t, client)

_, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

rivertest.WaitOrTimeout(t, workedChan)
})

t.Run("AlternateSchema", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

// Reconfigure the pool with an alternate schema, initialize a new pool
dbPoolConfig := client.driver.GetDBPool().Config() // a copy of the original config
dbPoolConfig.ConnConfig.RuntimeParams["search_path"] = "alternate_schema"

dbPool, err := pgxpool.NewWithConfig(ctx, dbPoolConfig)
require.NoError(t, err)
t.Cleanup(dbPool.Close)

client, err = NewClient(riverpgxv5.New(dbPool), newTestConfig(t, nil))
require.NoError(t, err)

// We don't actually verify that River's functional on another schema so
// that we don't have to raise and migrate it. We cheat a little by
// configuring a different schema and then verifying that we can't find
// a `river_job` to confirm we're point there.
_, err = client.Insert(ctx, &noOpArgs{}, nil)
var pgErr *pgconn.PgError
require.ErrorAs(t, err, &pgErr)
require.Equal(t, pgerrcode.UndefinedTable, pgErr.Code)
// PgError has SchemaName and TableName properties, but unfortunately
// neither contain a useful value in this case.
require.Equal(t, `relation "river_job" does not exist`, pgErr.Message)
})
}

func Test_Client_Stop(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1024,7 +1095,7 @@ func Test_Client_Maintenance(t *testing.T) {
jobWithinHorizon2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateCompleted, FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(1 * time.Hour))})
jobWithinHorizon3 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateDiscarded, FinalizedAt: ptrutil.Ptr(deleteHorizon.Add(1 * time.Hour))})

runClient(ctx, t, client)
startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()
jc := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
Expand Down Expand Up @@ -1147,7 +1218,7 @@ func Test_Client_Maintenance(t *testing.T) {
jobNotYetStuck2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{Kind: "noOp", State: dbsqlc.JobStateRunning, AttemptedAt: ptrutil.Ptr(now.Add(-1 * time.Minute))})
jobNotYetStuck3 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{Kind: "noOp", State: dbsqlc.JobStateRunning, AttemptedAt: ptrutil.Ptr(now.Add(-10 * time.Second))})

runClient(ctx, t, client)
startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()
svc := maintenance.GetService[*rescuer](client.queueMaintainer)
Expand Down Expand Up @@ -1207,7 +1278,7 @@ func Test_Client_Maintenance(t *testing.T) {
jobInFuture2 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateCompleted, FinalizedAt: ptrutil.Ptr(now.Add(1 * time.Minute))})
jobInFuture3 := insertJob(ctx, client.driver.GetDBPool(), insertJobParams{State: dbsqlc.JobStateDiscarded, FinalizedAt: ptrutil.Ptr(now.Add(10 * time.Second))})

runClient(ctx, t, client)
startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()
scheduler := maintenance.GetService[*maintenance.Scheduler](client.queueMaintainer)
Expand Down Expand Up @@ -1289,7 +1360,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
originalJobs[i] = updatedJob
}

runClient(ctx, t, client)
startClient(ctx, t, client)

// Wait for the expected number of jobs to be finished.
for i := 0; i < len(originalJobs); i++ {
Expand Down Expand Up @@ -1399,7 +1470,7 @@ func Test_Client_Subscribe(t *testing.T) {
jobFailed2,
}

runClient(ctx, t, client)
startClient(ctx, t, client)

events := make([]*Event, len(expectedJobs))

Expand Down Expand Up @@ -1460,7 +1531,7 @@ func Test_Client_Subscribe(t *testing.T) {
jobCompleted,
}

runClient(ctx, t, client)
startClient(ctx, t, client)

events := make([]*Event, len(expectedJobs))

Expand Down Expand Up @@ -1501,7 +1572,7 @@ func Test_Client_Subscribe(t *testing.T) {
jobFailed,
}

runClient(ctx, t, client)
startClient(ctx, t, client)

events := make([]*Event, len(expectedJobs))

Expand Down Expand Up @@ -1554,7 +1625,7 @@ func Test_Client_Subscribe(t *testing.T) {
_ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, subscribeChanSize+1)
}()

runClient(ctx, t, client)
startClient(ctx, t, client)

wg.Wait()

Expand Down Expand Up @@ -1620,7 +1691,7 @@ func Test_Client_InsertTriggersImmediateWork(t *testing.T) {
insertedJob, err := client.Insert(ctx, callbackArgs{}, nil)
require.NoError(err)

runClient(ctx, t, client)
startClient(ctx, t, client)

// Wait for the client to be ready by waiting for a job to be executed:
select {
Expand Down Expand Up @@ -1896,7 +1967,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
require.NoError(client.Stop(ctx))
}

func Test_Client_Run_Error(t *testing.T) {
func Test_Client_Start_Error(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand Down Expand Up @@ -1998,7 +2069,6 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Equal(t, DefaultJobTimeout, client.config.JobTimeout)
require.NotZero(t, client.baseService.Logger)
require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy)
require.Equal(t, DefaultSchema, client.config.Schema)
require.False(t, client.baseService.DisableSleep)
require.False(t, client.config.disableSleep)
}
Expand Down Expand Up @@ -2029,7 +2099,6 @@ func Test_NewClient_Overrides(t *testing.T) {
Logger: logger,
Queues: map[string]QueueConfig{DefaultQueue: {MaxWorkers: 1}},
RetryPolicy: retryPolicy,
Schema: "custom_schema",
Workers: workers,
disableSleep: true,
})
Expand All @@ -2048,7 +2117,6 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, 125*time.Millisecond, client.config.JobTimeout)
require.Equal(t, logger, client.baseService.Logger)
require.Equal(t, retryPolicy, client.config.RetryPolicy)
require.Equal(t, "custom_schema", client.config.Schema)
require.True(t, client.baseService.DisableSleep)
require.True(t, client.config.disableSleep)
}
Expand Down
2 changes: 1 addition & 1 deletion worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestWorkFunc(t *testing.T) {
t.Helper()

client := newTestClient(ctx, t, newTestConfig(t, nil))
runClient(ctx, t, client)
startClient(ctx, t, client)

return client, &testBundle{}
}
Expand Down

0 comments on commit 58a44c3

Please sign in to comment.