From 98224afa8cfe0e48df0fc4bdff94ef2b6b43be01 Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 7 Aug 2024 19:48:21 -0700 Subject: [PATCH] Add `migrate-list` subcommand that lists available migrations Here, add a new `migrate-list` command to the River CLI. It prints the versions and names of available migrations along with an indicator as to the current state of the database: $ go run ./cmd/river migrate-list --database-url postgres:///river_dev 001 create river migration 002 initial schema 003 river job tags non null 004 pending and more * 005 migration unique client It's pretty far from a high priority feature, but I started it a few weeks ago and I figured I may as well finish it. I started because I realized that despite having a `migrate-get` command, there was no way to figure out what migrations actually existed before you ran it. I bring in a test convention for CLI commands so we can start trying to check with more authority that things like expected output stay stable. (Previously, we weren't testing commands except to vet that they will run successfully in CI.) This approach uses mocks because we still have no way of reusing database-based testing infrastructure outside the main package (eventually some of it should go into `rivershared`), but despite that, it should give us reasonable assuredness for now. --- .github/workflows/ci.yaml | 4 + CHANGELOG.md | 4 + cmd/river/rivercli/command.go | 9 +- cmd/river/rivercli/river_cli.go | 63 ++++++++++++- cmd/river/rivercli/river_cli_test.go | 135 +++++++++++++++++++++++++++ rivermigrate/river_migrate.go | 46 +++++++++ rivermigrate/river_migrate_test.go | 46 +++++++++ 7 files changed, 302 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a0b01e8e..a4e144fe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -160,6 +160,10 @@ jobs: run: river migrate-up --database-url $DATABASE_URL shell: bash + - name: river migrate-list + run: river migrate-list --database-url $DATABASE_URL + shell: bash + - name: river validate run: river validate --database-url $DATABASE_URL shell: bash diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa4c979..9f1d4973 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #XXX](https://github.com/riverqueue/river/pull/XXX). + ## [0.11.2] - 2024-08-08 ### Fixed diff --git a/cmd/river/rivercli/command.go b/cmd/river/rivercli/command.go index b2ee032d..362a17a3 100644 --- a/cmd/river/rivercli/command.go +++ b/cmd/river/rivercli/command.go @@ -29,6 +29,7 @@ type BenchmarkerInterface interface { // around without having to know the transaction type. type MigratorInterface interface { AllVersions() []rivermigrate.Migration + ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) GetVersion(version int) (rivermigrate.Migration, error) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) @@ -39,6 +40,7 @@ type MigratorInterface interface { // CommandBase. type Command[TOpts CommandOpts] interface { Run(ctx context.Context, opts TOpts) (bool, error) + GetCommandBase() *CommandBase SetCommandBase(b *CommandBase) } @@ -53,9 +55,8 @@ type CommandBase struct { GetMigrator func(config *rivermigrate.Config) MigratorInterface } -func (b *CommandBase) SetCommandBase(base *CommandBase) { - *b = *base -} +func (b *CommandBase) GetCommandBase() *CommandBase { return b } +func (b *CommandBase) SetCommandBase(base *CommandBase) { *b = *base } // CommandOpts are options for a command options. It makes sure that options // provide a way of validating themselves. @@ -103,7 +104,7 @@ func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle commandBase.GetMigrator = func(config *rivermigrate.Config) MigratorInterface { return rivermigrate.New(driver, config) } default: - return false, errors.New("unsupport database URL; try one with a prefix of `postgres://...`") + return false, errors.New("unsupported database URL; try one with a prefix of `postgres://...`") } command.SetCommandBase(commandBase) diff --git a/cmd/river/rivercli/river_cli.go b/cmd/river/rivercli/river_cli.go index 3749e27a..5a8668ed 100644 --- a/cmd/river/rivercli/river_cli.go +++ b/cmd/river/rivercli/river_cli.go @@ -94,7 +94,7 @@ Provides command line facilities for the River job queue. } addDatabaseURLFlag := func(cmd *cobra.Command, databaseURL *string) { - cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`") + cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database (should look like `postgres://...`") mustMarkFlagRequired(cmd, "database-url") } addLineFlag := func(cmd *cobra.Command, line *string) { @@ -215,6 +215,25 @@ framework, which aren't necessary if using an external framework: rootCmd.AddCommand(cmd) } + // migrate-list + { + var opts migrateListOpts + + cmd := &cobra.Command{ + Use: "migrate-list", + Short: "List River schema migrations", + Long: strings.TrimSpace(` +TODO + `), + Run: func(cmd *cobra.Command, args []string) { + RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateList{}, &opts) + }, + } + addDatabaseURLFlag(cmd, &opts.DatabaseURL) + cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)") + rootCmd.AddCommand(cmd) + } + // migrate-up { var opts migrateOpts @@ -259,6 +278,7 @@ migrations that need to be run, but without running them. }, } addDatabaseURLFlag(cmd, &opts.DatabaseURL) + mustMarkFlagRequired(cmd, "database-url") cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)") rootCmd.AddCommand(cmd) } @@ -450,6 +470,47 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error) return true, nil } +type migrateListOpts struct { + DatabaseURL string + Line string +} + +func (o *migrateListOpts) Validate() error { return nil } + +type migrateList struct { + CommandBase +} + +func (c *migrateList) Run(ctx context.Context, opts *migrateListOpts) (bool, error) { + migrator := c.GetMigrator(&rivermigrate.Config{Line: opts.Line, Logger: c.Logger}) + + allMigrations := migrator.AllVersions() + + existingMigrations, err := migrator.ExistingVersions(ctx) + if err != nil { + return false, err + } + + var maxExistingVersion int + if len(existingMigrations) > 0 { + maxExistingVersion = existingMigrations[len(existingMigrations)-1].Version + } + + for _, migration := range allMigrations { + var currentVersionPrefix string + switch { + case migration.Version == maxExistingVersion: + currentVersionPrefix = "* " + case maxExistingVersion > 0: + currentVersionPrefix = " " + } + + fmt.Fprintf(c.Out, "%s%03d %s\n", currentVersionPrefix, migration.Version, migration.Name) + } + + return true, nil +} + type migrateUp struct { CommandBase } diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index 1ae93fbf..cbb8a988 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -1,14 +1,149 @@ package rivercli import ( + "bytes" + "context" + "strings" "testing" "time" "github.com/stretchr/testify/require" "github.com/riverqueue/river/rivermigrate" + "github.com/riverqueue/river/rivershared/riversharedtest" ) +type MigratorStub struct { + allVersionsStub func() []rivermigrate.Migration + existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error) + getVersionStub func(version int) (rivermigrate.Migration, error) + migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) + validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error) +} + +func (m *MigratorStub) AllVersions() []rivermigrate.Migration { + if m.allVersionsStub == nil { + panic("AllVersions is not stubbed") + } + + return m.allVersionsStub() +} + +func (m *MigratorStub) ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) { + if m.allVersionsStub == nil { + panic("ExistingVersions is not stubbed") + } + + return m.existingVersionsStub(ctx) +} + +func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) { + if m.allVersionsStub == nil { + panic("GetVersion is not stubbed") + } + + return m.getVersionStub(version) +} + +func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) { + if m.allVersionsStub == nil { + panic("Migrate is not stubbed") + } + + return m.migrateStub(ctx, direction, opts) +} + +func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) { + if m.allVersionsStub == nil { + panic("Validate is not stubbed") + } + + return m.validateStub(ctx) +} + +var ( + testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 1} //nolint:gochecknoglobals + testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 2} //nolint:gochecknoglobals + testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 3} //nolint:gochecknoglobals + + testMigrationAll = []rivermigrate.Migration{testMigration01, testMigration02, testMigration03} //nolint:gochecknoglobals +) + +func TestMigrateList(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + buf *bytes.Buffer + migratorStub *MigratorStub + } + + setup := func(t *testing.T) (*migrateList, *testBundle) { + t.Helper() + + cmd, buf := withMigrateBase(t, &migrateList{}) + + migratorStub := &MigratorStub{} + migratorStub.allVersionsStub = func() []rivermigrate.Migration { return testMigrationAll } + migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil } + + cmd.GetCommandBase().GetMigrator = func(config *rivermigrate.Config) MigratorInterface { return migratorStub } + + return cmd, &testBundle{ + buf: buf, + migratorStub: migratorStub, + } + } + + t.Run("NoExistingMigrations", func(t *testing.T) { + t.Parallel() + + migrateList, bundle := setup(t) + + _, err := migrateList.Run(ctx, &migrateListOpts{}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` +001 1st migration +002 2nd migration +003 3rd migration + `), strings.TrimSpace(bundle.buf.String())) + }) + + t.Run("WithExistingMigrations", func(t *testing.T) { + t.Parallel() + + migrateList, bundle := setup(t) + + bundle.migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { + return []rivermigrate.Migration{testMigration01, testMigration02}, nil + } + + _, err := migrateList.Run(ctx, &migrateListOpts{}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` + 001 1st migration +* 002 2nd migration + 003 3rd migration + `), strings.TrimSpace(bundle.buf.String())) + }) +} + +func withMigrateBase[TCommand Command[TOpts], TOpts CommandOpts](t *testing.T, cmd TCommand) (TCommand, *bytes.Buffer) { + t.Helper() + + var buf bytes.Buffer + cmd.SetCommandBase(&CommandBase{ + Logger: riversharedtest.Logger(t), + Out: &buf, + + GetMigrator: func(config *rivermigrate.Config) MigratorInterface { return &MigratorStub{} }, + }) + return cmd, &buf +} + func TestMigrationComment(t *testing.T) { t.Parallel() diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 78b9c6fc..204767b2 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -132,6 +132,52 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] }) } +// ExistingVersions gets the existing set of versions that have been migrated in +// the database, ordered by version. +func (m *Migrator[TTx]) ExistingVersions(ctx context.Context) ([]Migration, error) { + migrations, err := m.existingMigrations(ctx, m.driver.GetExecutor()) + if err != nil { + return nil, err + } + + versions, err := m.versionsFromDriver(migrations) + if err != nil { + return nil, err + } + + return versions, nil +} + +// ExistingVersions gets the existing set of versions that have been migrated in +// the database, ordered by version. +// +// This variant checks for existing versions in a transaction. +func (m *Migrator[TTx]) ExistingVersionsTx(ctx context.Context, tx TTx) ([]Migration, error) { + migrations, err := m.existingMigrations(ctx, m.driver.UnwrapExecutor(tx)) + if err != nil { + return nil, err + } + + versions, err := m.versionsFromDriver(migrations) + if err != nil { + return nil, err + } + + return versions, nil +} + +func (m *Migrator[TTx]) versionsFromDriver(migrations []*riverdriver.Migration) ([]Migration, error) { + versions := make([]Migration, len(migrations)) + for i, existingMigration := range migrations { + migration, ok := m.migrations[existingMigration.Version] + if !ok { + return nil, fmt.Errorf("migration %d not found in migrator bundle", existingMigration.Version) + } + versions[i] = migration + } + return versions, nil +} + // MigrateOpts are options for a migrate operation. type MigrateOpts struct { DryRun bool diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 919fee25..601115f0 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -127,6 +127,52 @@ func TestMigrator(t *testing.T) { require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) }) + t.Run("ExistingMigrationsDefault", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + migrations, err := migrator.ExistingVersions(ctx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxDefault", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxEmpty", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + require.NoError(t, err) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxFullyMigrated", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + t.Run("MigrateDownDefault", func(t *testing.T) { t.Parallel()