Skip to content

Commit

Permalink
Add migrate-list subcommand that lists available migrations (#534)
Browse files Browse the repository at this point in the history
Here, add a new `migrate-list` command to the River CLI. It prints the
versions and names of available migrations along with an indicator as to
the current state of the database:

    $ go run ./cmd/river migrate-list --database-url postgres:///river_dev
      001 create river migration
      002 initial schema
      003 river job tags non null
      004 pending and more
    * 005 migration unique client

It's pretty far from a high priority feature, but I started it a few
weeks ago and I figured I may as well finish it. I started because I
realized that despite having a `migrate-get` command, there was no way
to figure out what migrations actually existed before you ran it.

`migrate-list` currently requires a `--database-url`, but later I want
to put in an alternate version that can work without one too. A
complication with that is that I want to build a system that can hint on
the desired database system (currently detected based off URL scheme) in
case we add SQLite later. I'm thinking that we'll be able to add an
option like `--database postgres` or `--engine postgres` that can act as
an alternative to `--database-url`. This will also be useful for
`migrate-get`, which also currently has no answer for this.

I bring in a test convention for CLI commands so we can start trying to
check with more authority that things like expected output stay stable.
(Previously, we weren't testing commands except to vet that they will
run successfully in CI.) This approach uses mocks because we still have
no way of reusing database-based testing infrastructure outside the main
package (eventually some of it should go into `rivershared`), but
despite that, it should give us reasonable assuredness for now.
  • Loading branch information
brandur authored Aug 20, 2024
1 parent a3c841b commit 94b0a78
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 4 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ jobs:
run: river migrate-up --database-url $DATABASE_URL
shell: bash

- name: river migrate-list
run: river migrate-list --database-url $DATABASE_URL
shell: bash

- name: river validate
run: river validate --database-url $DATABASE_URL
shell: bash
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534).

## [0.11.3] - 2024-08-19

### Changed
Expand Down
7 changes: 4 additions & 3 deletions cmd/river/rivercli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type BenchmarkerInterface interface {
// around without having to know the transaction type.
type MigratorInterface interface {
AllVersions() []rivermigrate.Migration
ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error)
GetVersion(version int) (rivermigrate.Migration, error)
Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error)
Validate(ctx context.Context) (*rivermigrate.ValidateResult, error)
Expand All @@ -43,6 +44,7 @@ type MigratorInterface interface {
// CommandBase.
type Command[TOpts CommandOpts] interface {
Run(ctx context.Context, opts TOpts) (bool, error)
GetCommandBase() *CommandBase
SetCommandBase(b *CommandBase)
}

Expand All @@ -57,9 +59,8 @@ type CommandBase struct {
GetMigrator func(config *rivermigrate.Config) MigratorInterface
}

func (b *CommandBase) SetCommandBase(base *CommandBase) {
*b = *base
}
func (b *CommandBase) GetCommandBase() *CommandBase { return b }
func (b *CommandBase) SetCommandBase(base *CommandBase) { *b = *base }

// CommandOpts are options for a command options. It makes sure that options
// provide a way of validating themselves.
Expand Down
63 changes: 62 additions & 1 deletion cmd/river/rivercli/river_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Provides command line facilities for the River job queue.
}

addDatabaseURLFlag := func(cmd *cobra.Command, databaseURL *string) {
cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`")
cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database (should look like `postgres://...`")
mustMarkFlagRequired(cmd, "database-url")
}
addLineFlag := func(cmd *cobra.Command, line *string) {
Expand Down Expand Up @@ -215,6 +215,25 @@ framework, which aren't necessary if using an external framework:
rootCmd.AddCommand(cmd)
}

// migrate-list
{
var opts migrateListOpts

cmd := &cobra.Command{
Use: "migrate-list",
Short: "List River schema migrations",
Long: strings.TrimSpace(`
TODO
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateList{}, &opts)
},
}
addDatabaseURLFlag(cmd, &opts.DatabaseURL)
cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)")
rootCmd.AddCommand(cmd)
}

// migrate-up
{
var opts migrateOpts
Expand Down Expand Up @@ -259,6 +278,7 @@ migrations that need to be run, but without running them.
},
}
addDatabaseURLFlag(cmd, &opts.DatabaseURL)
mustMarkFlagRequired(cmd, "database-url")
cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)")
rootCmd.AddCommand(cmd)
}
Expand Down Expand Up @@ -450,6 +470,47 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error)
return true, nil
}

type migrateListOpts struct {
DatabaseURL string
Line string
}

func (o *migrateListOpts) Validate() error { return nil }

type migrateList struct {
CommandBase
}

func (c *migrateList) Run(ctx context.Context, opts *migrateListOpts) (bool, error) {
migrator := c.GetMigrator(&rivermigrate.Config{Line: opts.Line, Logger: c.Logger})

allMigrations := migrator.AllVersions()

existingMigrations, err := migrator.ExistingVersions(ctx)
if err != nil {
return false, err
}

var maxExistingVersion int
if len(existingMigrations) > 0 {
maxExistingVersion = existingMigrations[len(existingMigrations)-1].Version
}

for _, migration := range allMigrations {
var currentVersionPrefix string
switch {
case migration.Version == maxExistingVersion:
currentVersionPrefix = "* "
case maxExistingVersion > 0:
currentVersionPrefix = " "
}

fmt.Fprintf(c.Out, "%s%03d %s\n", currentVersionPrefix, migration.Version, migration.Name)
}

return true, nil
}

type migrateUp struct {
CommandBase
}
Expand Down
135 changes: 135 additions & 0 deletions cmd/river/rivercli/river_cli_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,149 @@
package rivercli

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/riverqueue/river/rivermigrate"
"github.com/riverqueue/river/rivershared/riversharedtest"
)

type MigratorStub struct {
allVersionsStub func() []rivermigrate.Migration
existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error)
getVersionStub func(version int) (rivermigrate.Migration, error)
migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error)
validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error)
}

func (m *MigratorStub) AllVersions() []rivermigrate.Migration {
if m.allVersionsStub == nil {
panic("AllVersions is not stubbed")
}

return m.allVersionsStub()
}

func (m *MigratorStub) ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) {
if m.allVersionsStub == nil {
panic("ExistingVersions is not stubbed")
}

return m.existingVersionsStub(ctx)
}

func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) {
if m.allVersionsStub == nil {
panic("GetVersion is not stubbed")
}

return m.getVersionStub(version)
}

func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) {
if m.allVersionsStub == nil {
panic("Migrate is not stubbed")
}

return m.migrateStub(ctx, direction, opts)
}

func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) {
if m.allVersionsStub == nil {
panic("Validate is not stubbed")
}

return m.validateStub(ctx)
}

var (
testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 1} //nolint:gochecknoglobals
testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 2} //nolint:gochecknoglobals
testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 3} //nolint:gochecknoglobals

testMigrationAll = []rivermigrate.Migration{testMigration01, testMigration02, testMigration03} //nolint:gochecknoglobals
)

func TestMigrateList(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
buf *bytes.Buffer
migratorStub *MigratorStub
}

setup := func(t *testing.T) (*migrateList, *testBundle) {
t.Helper()

cmd, buf := withMigrateBase(t, &migrateList{})

migratorStub := &MigratorStub{}
migratorStub.allVersionsStub = func() []rivermigrate.Migration { return testMigrationAll }
migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil }

cmd.GetCommandBase().GetMigrator = func(config *rivermigrate.Config) MigratorInterface { return migratorStub }

return cmd, &testBundle{
buf: buf,
migratorStub: migratorStub,
}
}

t.Run("NoExistingMigrations", func(t *testing.T) {
t.Parallel()

migrateList, bundle := setup(t)

_, err := migrateList.Run(ctx, &migrateListOpts{})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
001 1st migration
002 2nd migration
003 3rd migration
`), strings.TrimSpace(bundle.buf.String()))
})

t.Run("WithExistingMigrations", func(t *testing.T) {
t.Parallel()

migrateList, bundle := setup(t)

bundle.migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) {
return []rivermigrate.Migration{testMigration01, testMigration02}, nil
}

_, err := migrateList.Run(ctx, &migrateListOpts{})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
001 1st migration
* 002 2nd migration
003 3rd migration
`), strings.TrimSpace(bundle.buf.String()))
})
}

func withMigrateBase[TCommand Command[TOpts], TOpts CommandOpts](t *testing.T, cmd TCommand) (TCommand, *bytes.Buffer) {
t.Helper()

var buf bytes.Buffer
cmd.SetCommandBase(&CommandBase{
Logger: riversharedtest.Logger(t),
Out: &buf,

GetMigrator: func(config *rivermigrate.Config) MigratorInterface { return &MigratorStub{} },
})
return cmd, &buf
}

func TestMigrationComment(t *testing.T) {
t.Parallel()

Expand Down
46 changes: 46 additions & 0 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,52 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx]
})
}

// ExistingVersions gets the existing set of versions that have been migrated in
// the database, ordered by version.
func (m *Migrator[TTx]) ExistingVersions(ctx context.Context) ([]Migration, error) {
migrations, err := m.existingMigrations(ctx, m.driver.GetExecutor())
if err != nil {
return nil, err
}

versions, err := m.versionsFromDriver(migrations)
if err != nil {
return nil, err
}

return versions, nil
}

// ExistingVersions gets the existing set of versions that have been migrated in
// the database, ordered by version.
//
// This variant checks for existing versions in a transaction.
func (m *Migrator[TTx]) ExistingVersionsTx(ctx context.Context, tx TTx) ([]Migration, error) {
migrations, err := m.existingMigrations(ctx, m.driver.UnwrapExecutor(tx))
if err != nil {
return nil, err
}

versions, err := m.versionsFromDriver(migrations)
if err != nil {
return nil, err
}

return versions, nil
}

func (m *Migrator[TTx]) versionsFromDriver(migrations []*riverdriver.Migration) ([]Migration, error) {
versions := make([]Migration, len(migrations))
for i, existingMigration := range migrations {
migration, ok := m.migrations[existingMigration.Version]
if !ok {
return nil, fmt.Errorf("migration %d not found in migrator bundle", existingMigration.Version)
}
versions[i] = migration
}
return versions, nil
}

// MigrateOpts are options for a migrate operation.
type MigrateOpts struct {
DryRun bool
Expand Down
46 changes: 46 additions & 0 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,52 @@ func TestMigrator(t *testing.T) {
require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsDefault", func(t *testing.T) {
t.Parallel()

migrator, _ := setup(t)

migrations, err := migrator.ExistingVersions(ctx)
require.NoError(t, err)
require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsTxDefault", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsTxEmpty", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1})
require.NoError(t, err)

migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsTxFullyMigrated", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)

migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("MigrateDownDefault", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 94b0a78

Please sign in to comment.