From 8be427d439194eaa21b2418c88ea2b75214f01a3 Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 7 Aug 2024 19:48:21 -0700 Subject: [PATCH] Add `migrate-list` subcommand that lists available migrations Here, add a new `migrate-list` command to the River CLI. It prints the versions and names of available migrations along with an indicator as to the current state of the database: $ go run ./cmd/river migrate-list --database-url postgres:///river_dev 001 create river migration 002 initial schema 003 river job tags non null 004 pending and more * 005 migration unique client It's pretty far from a high priority feature, but I started it a few weeks ago and I figured I may as well finish it. I started because I realized that despite having a `migrate-get` command, there was no way to figure out what migrations actually existed before you ran it. `migrate-list` currently requires a `--database-url`, but later I want to put in an alternate version that can work without one too. A complication with that is that I want to build a system that can hint on the desired database system (currently detected based off URL scheme) in case we add SQLite later. I'm thinking that we'll be able to add an option like `--database postgres` or `--engine postgres` that can act as an alternative to `--database-url`. This will also be useful for `migrate-get`, which also currently has no answer for this. I bring in a test convention for CLI commands so we can start trying to check with more authority that things like expected output stay stable. (Previously, we weren't testing commands except to vet that they will run successfully in CI.) This approach uses mocks because we still have no way of reusing database-based testing infrastructure outside the main package (eventually some of it should go into `rivershared`), but despite that, it should give us reasonable assuredness for now. --- .github/workflows/ci.yaml | 4 + CHANGELOG.md | 4 + cmd/river/rivercli/command.go | 7 +- cmd/river/rivercli/river_cli.go | 63 ++++++++++++- cmd/river/rivercli/river_cli_test.go | 135 +++++++++++++++++++++++++++ rivermigrate/river_migrate.go | 46 +++++++++ rivermigrate/river_migrate_test.go | 46 +++++++++ 7 files changed, 301 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9119a420..9c5b1e26 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -161,6 +161,10 @@ jobs: run: river migrate-up --database-url $DATABASE_URL shell: bash + - name: river migrate-list + run: river migrate-list --database-url $DATABASE_URL + shell: bash + - name: river validate run: river validate --database-url $DATABASE_URL shell: bash diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e3a77a4..bdee96f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534). + ## [0.11.3] - 2024-08-19 ### Changed diff --git a/cmd/river/rivercli/command.go b/cmd/river/rivercli/command.go index 1ac0df94..00206cd1 100644 --- a/cmd/river/rivercli/command.go +++ b/cmd/river/rivercli/command.go @@ -33,6 +33,7 @@ type BenchmarkerInterface interface { // around without having to know the transaction type. type MigratorInterface interface { AllVersions() []rivermigrate.Migration + ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) GetVersion(version int) (rivermigrate.Migration, error) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) @@ -43,6 +44,7 @@ type MigratorInterface interface { // CommandBase. type Command[TOpts CommandOpts] interface { Run(ctx context.Context, opts TOpts) (bool, error) + GetCommandBase() *CommandBase SetCommandBase(b *CommandBase) } @@ -57,9 +59,8 @@ type CommandBase struct { GetMigrator func(config *rivermigrate.Config) MigratorInterface } -func (b *CommandBase) SetCommandBase(base *CommandBase) { - *b = *base -} +func (b *CommandBase) GetCommandBase() *CommandBase { return b } +func (b *CommandBase) SetCommandBase(base *CommandBase) { *b = *base } // CommandOpts are options for a command options. It makes sure that options // provide a way of validating themselves. diff --git a/cmd/river/rivercli/river_cli.go b/cmd/river/rivercli/river_cli.go index 3749e27a..5a8668ed 100644 --- a/cmd/river/rivercli/river_cli.go +++ b/cmd/river/rivercli/river_cli.go @@ -94,7 +94,7 @@ Provides command line facilities for the River job queue. } addDatabaseURLFlag := func(cmd *cobra.Command, databaseURL *string) { - cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`") + cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database (should look like `postgres://...`") mustMarkFlagRequired(cmd, "database-url") } addLineFlag := func(cmd *cobra.Command, line *string) { @@ -215,6 +215,25 @@ framework, which aren't necessary if using an external framework: rootCmd.AddCommand(cmd) } + // migrate-list + { + var opts migrateListOpts + + cmd := &cobra.Command{ + Use: "migrate-list", + Short: "List River schema migrations", + Long: strings.TrimSpace(` +TODO + `), + Run: func(cmd *cobra.Command, args []string) { + RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateList{}, &opts) + }, + } + addDatabaseURLFlag(cmd, &opts.DatabaseURL) + cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)") + rootCmd.AddCommand(cmd) + } + // migrate-up { var opts migrateOpts @@ -259,6 +278,7 @@ migrations that need to be run, but without running them. }, } addDatabaseURLFlag(cmd, &opts.DatabaseURL) + mustMarkFlagRequired(cmd, "database-url") cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)") rootCmd.AddCommand(cmd) } @@ -450,6 +470,47 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error) return true, nil } +type migrateListOpts struct { + DatabaseURL string + Line string +} + +func (o *migrateListOpts) Validate() error { return nil } + +type migrateList struct { + CommandBase +} + +func (c *migrateList) Run(ctx context.Context, opts *migrateListOpts) (bool, error) { + migrator := c.GetMigrator(&rivermigrate.Config{Line: opts.Line, Logger: c.Logger}) + + allMigrations := migrator.AllVersions() + + existingMigrations, err := migrator.ExistingVersions(ctx) + if err != nil { + return false, err + } + + var maxExistingVersion int + if len(existingMigrations) > 0 { + maxExistingVersion = existingMigrations[len(existingMigrations)-1].Version + } + + for _, migration := range allMigrations { + var currentVersionPrefix string + switch { + case migration.Version == maxExistingVersion: + currentVersionPrefix = "* " + case maxExistingVersion > 0: + currentVersionPrefix = " " + } + + fmt.Fprintf(c.Out, "%s%03d %s\n", currentVersionPrefix, migration.Version, migration.Name) + } + + return true, nil +} + type migrateUp struct { CommandBase } diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index 1ae93fbf..cbb8a988 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -1,14 +1,149 @@ package rivercli import ( + "bytes" + "context" + "strings" "testing" "time" "github.com/stretchr/testify/require" "github.com/riverqueue/river/rivermigrate" + "github.com/riverqueue/river/rivershared/riversharedtest" ) +type MigratorStub struct { + allVersionsStub func() []rivermigrate.Migration + existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error) + getVersionStub func(version int) (rivermigrate.Migration, error) + migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) + validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error) +} + +func (m *MigratorStub) AllVersions() []rivermigrate.Migration { + if m.allVersionsStub == nil { + panic("AllVersions is not stubbed") + } + + return m.allVersionsStub() +} + +func (m *MigratorStub) ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) { + if m.allVersionsStub == nil { + panic("ExistingVersions is not stubbed") + } + + return m.existingVersionsStub(ctx) +} + +func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) { + if m.allVersionsStub == nil { + panic("GetVersion is not stubbed") + } + + return m.getVersionStub(version) +} + +func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) { + if m.allVersionsStub == nil { + panic("Migrate is not stubbed") + } + + return m.migrateStub(ctx, direction, opts) +} + +func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) { + if m.allVersionsStub == nil { + panic("Validate is not stubbed") + } + + return m.validateStub(ctx) +} + +var ( + testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 1} //nolint:gochecknoglobals + testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 2} //nolint:gochecknoglobals + testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 3} //nolint:gochecknoglobals + + testMigrationAll = []rivermigrate.Migration{testMigration01, testMigration02, testMigration03} //nolint:gochecknoglobals +) + +func TestMigrateList(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + buf *bytes.Buffer + migratorStub *MigratorStub + } + + setup := func(t *testing.T) (*migrateList, *testBundle) { + t.Helper() + + cmd, buf := withMigrateBase(t, &migrateList{}) + + migratorStub := &MigratorStub{} + migratorStub.allVersionsStub = func() []rivermigrate.Migration { return testMigrationAll } + migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil } + + cmd.GetCommandBase().GetMigrator = func(config *rivermigrate.Config) MigratorInterface { return migratorStub } + + return cmd, &testBundle{ + buf: buf, + migratorStub: migratorStub, + } + } + + t.Run("NoExistingMigrations", func(t *testing.T) { + t.Parallel() + + migrateList, bundle := setup(t) + + _, err := migrateList.Run(ctx, &migrateListOpts{}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` +001 1st migration +002 2nd migration +003 3rd migration + `), strings.TrimSpace(bundle.buf.String())) + }) + + t.Run("WithExistingMigrations", func(t *testing.T) { + t.Parallel() + + migrateList, bundle := setup(t) + + bundle.migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { + return []rivermigrate.Migration{testMigration01, testMigration02}, nil + } + + _, err := migrateList.Run(ctx, &migrateListOpts{}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` + 001 1st migration +* 002 2nd migration + 003 3rd migration + `), strings.TrimSpace(bundle.buf.String())) + }) +} + +func withMigrateBase[TCommand Command[TOpts], TOpts CommandOpts](t *testing.T, cmd TCommand) (TCommand, *bytes.Buffer) { + t.Helper() + + var buf bytes.Buffer + cmd.SetCommandBase(&CommandBase{ + Logger: riversharedtest.Logger(t), + Out: &buf, + + GetMigrator: func(config *rivermigrate.Config) MigratorInterface { return &MigratorStub{} }, + }) + return cmd, &buf +} + func TestMigrationComment(t *testing.T) { t.Parallel() diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 78b9c6fc..204767b2 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -132,6 +132,52 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] }) } +// ExistingVersions gets the existing set of versions that have been migrated in +// the database, ordered by version. +func (m *Migrator[TTx]) ExistingVersions(ctx context.Context) ([]Migration, error) { + migrations, err := m.existingMigrations(ctx, m.driver.GetExecutor()) + if err != nil { + return nil, err + } + + versions, err := m.versionsFromDriver(migrations) + if err != nil { + return nil, err + } + + return versions, nil +} + +// ExistingVersions gets the existing set of versions that have been migrated in +// the database, ordered by version. +// +// This variant checks for existing versions in a transaction. +func (m *Migrator[TTx]) ExistingVersionsTx(ctx context.Context, tx TTx) ([]Migration, error) { + migrations, err := m.existingMigrations(ctx, m.driver.UnwrapExecutor(tx)) + if err != nil { + return nil, err + } + + versions, err := m.versionsFromDriver(migrations) + if err != nil { + return nil, err + } + + return versions, nil +} + +func (m *Migrator[TTx]) versionsFromDriver(migrations []*riverdriver.Migration) ([]Migration, error) { + versions := make([]Migration, len(migrations)) + for i, existingMigration := range migrations { + migration, ok := m.migrations[existingMigration.Version] + if !ok { + return nil, fmt.Errorf("migration %d not found in migrator bundle", existingMigration.Version) + } + versions[i] = migration + } + return versions, nil +} + // MigrateOpts are options for a migrate operation. type MigrateOpts struct { DryRun bool diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 919fee25..601115f0 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -127,6 +127,52 @@ func TestMigrator(t *testing.T) { require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) }) + t.Run("ExistingMigrationsDefault", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + migrations, err := migrator.ExistingVersions(ctx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxDefault", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxEmpty", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + require.NoError(t, err) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxFullyMigrated", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + t.Run("MigrateDownDefault", func(t *testing.T) { t.Parallel()