diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a0b01e8e..a4e144fe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -160,6 +160,10 @@ jobs: run: river migrate-up --database-url $DATABASE_URL shell: bash + - name: river migrate-list + run: river migrate-list --database-url $DATABASE_URL + shell: bash + - name: river validate run: river validate --database-url $DATABASE_URL shell: bash diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa4c979..9f1d4973 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #XXX](https://github.com/riverqueue/river/pull/XXX). + ## [0.11.2] - 2024-08-08 ### Fixed diff --git a/cmd/river/rivercli/command.go b/cmd/river/rivercli/command.go index b2ee032d..362a17a3 100644 --- a/cmd/river/rivercli/command.go +++ b/cmd/river/rivercli/command.go @@ -29,6 +29,7 @@ type BenchmarkerInterface interface { // around without having to know the transaction type. type MigratorInterface interface { AllVersions() []rivermigrate.Migration + ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) GetVersion(version int) (rivermigrate.Migration, error) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) @@ -39,6 +40,7 @@ type MigratorInterface interface { // CommandBase. type Command[TOpts CommandOpts] interface { Run(ctx context.Context, opts TOpts) (bool, error) + GetCommandBase() *CommandBase SetCommandBase(b *CommandBase) } @@ -53,9 +55,8 @@ type CommandBase struct { GetMigrator func(config *rivermigrate.Config) MigratorInterface } -func (b *CommandBase) SetCommandBase(base *CommandBase) { - *b = *base -} +func (b *CommandBase) GetCommandBase() *CommandBase { return b } +func (b *CommandBase) SetCommandBase(base *CommandBase) { *b = *base } // CommandOpts are options for a command options. It makes sure that options // provide a way of validating themselves. @@ -103,7 +104,7 @@ func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle commandBase.GetMigrator = func(config *rivermigrate.Config) MigratorInterface { return rivermigrate.New(driver, config) } default: - return false, errors.New("unsupport database URL; try one with a prefix of `postgres://...`") + return false, errors.New("unsupported database URL; try one with a prefix of `postgres://...`") } command.SetCommandBase(commandBase) diff --git a/cmd/river/rivercli/river_cli.go b/cmd/river/rivercli/river_cli.go index 3749e27a..5a8668ed 100644 --- a/cmd/river/rivercli/river_cli.go +++ b/cmd/river/rivercli/river_cli.go @@ -94,7 +94,7 @@ Provides command line facilities for the River job queue. } addDatabaseURLFlag := func(cmd *cobra.Command, databaseURL *string) { - cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`") + cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database (should look like `postgres://...`") mustMarkFlagRequired(cmd, "database-url") } addLineFlag := func(cmd *cobra.Command, line *string) { @@ -215,6 +215,25 @@ framework, which aren't necessary if using an external framework: rootCmd.AddCommand(cmd) } + // migrate-list + { + var opts migrateListOpts + + cmd := &cobra.Command{ + Use: "migrate-list", + Short: "List River schema migrations", + Long: strings.TrimSpace(` +TODO + `), + Run: func(cmd *cobra.Command, args []string) { + RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateList{}, &opts) + }, + } + addDatabaseURLFlag(cmd, &opts.DatabaseURL) + cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)") + rootCmd.AddCommand(cmd) + } + // migrate-up { var opts migrateOpts @@ -259,6 +278,7 @@ migrations that need to be run, but without running them. }, } addDatabaseURLFlag(cmd, &opts.DatabaseURL) + mustMarkFlagRequired(cmd, "database-url") cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)") rootCmd.AddCommand(cmd) } @@ -450,6 +470,47 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error) return true, nil } +type migrateListOpts struct { + DatabaseURL string + Line string +} + +func (o *migrateListOpts) Validate() error { return nil } + +type migrateList struct { + CommandBase +} + +func (c *migrateList) Run(ctx context.Context, opts *migrateListOpts) (bool, error) { + migrator := c.GetMigrator(&rivermigrate.Config{Line: opts.Line, Logger: c.Logger}) + + allMigrations := migrator.AllVersions() + + existingMigrations, err := migrator.ExistingVersions(ctx) + if err != nil { + return false, err + } + + var maxExistingVersion int + if len(existingMigrations) > 0 { + maxExistingVersion = existingMigrations[len(existingMigrations)-1].Version + } + + for _, migration := range allMigrations { + var currentVersionPrefix string + switch { + case migration.Version == maxExistingVersion: + currentVersionPrefix = "* " + case maxExistingVersion > 0: + currentVersionPrefix = " " + } + + fmt.Fprintf(c.Out, "%s%03d %s\n", currentVersionPrefix, migration.Version, migration.Name) + } + + return true, nil +} + type migrateUp struct { CommandBase } diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index 1ae93fbf..cbb8a988 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -1,14 +1,149 @@ package rivercli import ( + "bytes" + "context" + "strings" "testing" "time" "github.com/stretchr/testify/require" "github.com/riverqueue/river/rivermigrate" + "github.com/riverqueue/river/rivershared/riversharedtest" ) +type MigratorStub struct { + allVersionsStub func() []rivermigrate.Migration + existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error) + getVersionStub func(version int) (rivermigrate.Migration, error) + migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) + validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error) +} + +func (m *MigratorStub) AllVersions() []rivermigrate.Migration { + if m.allVersionsStub == nil { + panic("AllVersions is not stubbed") + } + + return m.allVersionsStub() +} + +func (m *MigratorStub) ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) { + if m.allVersionsStub == nil { + panic("ExistingVersions is not stubbed") + } + + return m.existingVersionsStub(ctx) +} + +func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) { + if m.allVersionsStub == nil { + panic("GetVersion is not stubbed") + } + + return m.getVersionStub(version) +} + +func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) { + if m.allVersionsStub == nil { + panic("Migrate is not stubbed") + } + + return m.migrateStub(ctx, direction, opts) +} + +func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) { + if m.allVersionsStub == nil { + panic("Validate is not stubbed") + } + + return m.validateStub(ctx) +} + +var ( + testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 1} //nolint:gochecknoglobals + testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 2} //nolint:gochecknoglobals + testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 3} //nolint:gochecknoglobals + + testMigrationAll = []rivermigrate.Migration{testMigration01, testMigration02, testMigration03} //nolint:gochecknoglobals +) + +func TestMigrateList(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + buf *bytes.Buffer + migratorStub *MigratorStub + } + + setup := func(t *testing.T) (*migrateList, *testBundle) { + t.Helper() + + cmd, buf := withMigrateBase(t, &migrateList{}) + + migratorStub := &MigratorStub{} + migratorStub.allVersionsStub = func() []rivermigrate.Migration { return testMigrationAll } + migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil } + + cmd.GetCommandBase().GetMigrator = func(config *rivermigrate.Config) MigratorInterface { return migratorStub } + + return cmd, &testBundle{ + buf: buf, + migratorStub: migratorStub, + } + } + + t.Run("NoExistingMigrations", func(t *testing.T) { + t.Parallel() + + migrateList, bundle := setup(t) + + _, err := migrateList.Run(ctx, &migrateListOpts{}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` +001 1st migration +002 2nd migration +003 3rd migration + `), strings.TrimSpace(bundle.buf.String())) + }) + + t.Run("WithExistingMigrations", func(t *testing.T) { + t.Parallel() + + migrateList, bundle := setup(t) + + bundle.migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { + return []rivermigrate.Migration{testMigration01, testMigration02}, nil + } + + _, err := migrateList.Run(ctx, &migrateListOpts{}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` + 001 1st migration +* 002 2nd migration + 003 3rd migration + `), strings.TrimSpace(bundle.buf.String())) + }) +} + +func withMigrateBase[TCommand Command[TOpts], TOpts CommandOpts](t *testing.T, cmd TCommand) (TCommand, *bytes.Buffer) { + t.Helper() + + var buf bytes.Buffer + cmd.SetCommandBase(&CommandBase{ + Logger: riversharedtest.Logger(t), + Out: &buf, + + GetMigrator: func(config *rivermigrate.Config) MigratorInterface { return &MigratorStub{} }, + }) + return cmd, &buf +} + func TestMigrationComment(t *testing.T) { t.Parallel() diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 78b9c6fc..204767b2 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -132,6 +132,52 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] }) } +// ExistingVersions gets the existing set of versions that have been migrated in +// the database, ordered by version. +func (m *Migrator[TTx]) ExistingVersions(ctx context.Context) ([]Migration, error) { + migrations, err := m.existingMigrations(ctx, m.driver.GetExecutor()) + if err != nil { + return nil, err + } + + versions, err := m.versionsFromDriver(migrations) + if err != nil { + return nil, err + } + + return versions, nil +} + +// ExistingVersions gets the existing set of versions that have been migrated in +// the database, ordered by version. +// +// This variant checks for existing versions in a transaction. +func (m *Migrator[TTx]) ExistingVersionsTx(ctx context.Context, tx TTx) ([]Migration, error) { + migrations, err := m.existingMigrations(ctx, m.driver.UnwrapExecutor(tx)) + if err != nil { + return nil, err + } + + versions, err := m.versionsFromDriver(migrations) + if err != nil { + return nil, err + } + + return versions, nil +} + +func (m *Migrator[TTx]) versionsFromDriver(migrations []*riverdriver.Migration) ([]Migration, error) { + versions := make([]Migration, len(migrations)) + for i, existingMigration := range migrations { + migration, ok := m.migrations[existingMigration.Version] + if !ok { + return nil, fmt.Errorf("migration %d not found in migrator bundle", existingMigration.Version) + } + versions[i] = migration + } + return versions, nil +} + // MigrateOpts are options for a migrate operation. type MigrateOpts struct { DryRun bool diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 919fee25..601115f0 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -127,6 +127,52 @@ func TestMigrator(t *testing.T) { require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) }) + t.Run("ExistingMigrationsDefault", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + migrations, err := migrator.ExistingVersions(ctx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxDefault", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxEmpty", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + require.NoError(t, err) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("ExistingMigrationsTxFullyMigrated", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) + + migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx) + require.NoError(t, err) + require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt)) + }) + t.Run("MigrateDownDefault", func(t *testing.T) { t.Parallel()