Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add migrate-list subcommand that lists available migrations #534

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ jobs:
run: river migrate-up --database-url $DATABASE_URL
shell: bash

- name: river migrate-list
run: river migrate-list --database-url $DATABASE_URL
shell: bash

- name: river validate
run: river validate --database-url $DATABASE_URL
shell: bash
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534).

## [0.11.3] - 2024-08-19

### Changed
Expand Down
7 changes: 4 additions & 3 deletions cmd/river/rivercli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type BenchmarkerInterface interface {
// around without having to know the transaction type.
type MigratorInterface interface {
AllVersions() []rivermigrate.Migration
ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error)
GetVersion(version int) (rivermigrate.Migration, error)
Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error)
Validate(ctx context.Context) (*rivermigrate.ValidateResult, error)
Expand All @@ -43,6 +44,7 @@ type MigratorInterface interface {
// CommandBase.
type Command[TOpts CommandOpts] interface {
Run(ctx context.Context, opts TOpts) (bool, error)
GetCommandBase() *CommandBase
SetCommandBase(b *CommandBase)
}

Expand All @@ -57,9 +59,8 @@ type CommandBase struct {
GetMigrator func(config *rivermigrate.Config) MigratorInterface
}

func (b *CommandBase) SetCommandBase(base *CommandBase) {
*b = *base
}
func (b *CommandBase) GetCommandBase() *CommandBase { return b }
func (b *CommandBase) SetCommandBase(base *CommandBase) { *b = *base }

// CommandOpts are options for a command options. It makes sure that options
// provide a way of validating themselves.
Expand Down
63 changes: 62 additions & 1 deletion cmd/river/rivercli/river_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Provides command line facilities for the River job queue.
}

addDatabaseURLFlag := func(cmd *cobra.Command, databaseURL *string) {
cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`")
cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database (should look like `postgres://...`")
mustMarkFlagRequired(cmd, "database-url")
}
addLineFlag := func(cmd *cobra.Command, line *string) {
Expand Down Expand Up @@ -215,6 +215,25 @@ framework, which aren't necessary if using an external framework:
rootCmd.AddCommand(cmd)
}

// migrate-list
{
var opts migrateListOpts

cmd := &cobra.Command{
Use: "migrate-list",
Short: "List River schema migrations",
Long: strings.TrimSpace(`
TODO
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateList{}, &opts)
},
}
addDatabaseURLFlag(cmd, &opts.DatabaseURL)
cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)")
rootCmd.AddCommand(cmd)
}

// migrate-up
{
var opts migrateOpts
Expand Down Expand Up @@ -259,6 +278,7 @@ migrations that need to be run, but without running them.
},
}
addDatabaseURLFlag(cmd, &opts.DatabaseURL)
mustMarkFlagRequired(cmd, "database-url")
cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)")
rootCmd.AddCommand(cmd)
}
Expand Down Expand Up @@ -450,6 +470,47 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error)
return true, nil
}

type migrateListOpts struct {
DatabaseURL string
Line string
}

func (o *migrateListOpts) Validate() error { return nil }

type migrateList struct {
CommandBase
}

func (c *migrateList) Run(ctx context.Context, opts *migrateListOpts) (bool, error) {
migrator := c.GetMigrator(&rivermigrate.Config{Line: opts.Line, Logger: c.Logger})

allMigrations := migrator.AllVersions()

existingMigrations, err := migrator.ExistingVersions(ctx)
if err != nil {
return false, err
}

var maxExistingVersion int
if len(existingMigrations) > 0 {
maxExistingVersion = existingMigrations[len(existingMigrations)-1].Version
}

for _, migration := range allMigrations {
var currentVersionPrefix string
switch {
case migration.Version == maxExistingVersion:
currentVersionPrefix = "* "
case maxExistingVersion > 0:
currentVersionPrefix = " "
}

fmt.Fprintf(c.Out, "%s%03d %s\n", currentVersionPrefix, migration.Version, migration.Name)
}

return true, nil
}

type migrateUp struct {
CommandBase
}
Expand Down
135 changes: 135 additions & 0 deletions cmd/river/rivercli/river_cli_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,149 @@
package rivercli

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/riverqueue/river/rivermigrate"
"github.com/riverqueue/river/rivershared/riversharedtest"
)

type MigratorStub struct {
allVersionsStub func() []rivermigrate.Migration
existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error)
getVersionStub func(version int) (rivermigrate.Migration, error)
migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error)
validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error)
}

func (m *MigratorStub) AllVersions() []rivermigrate.Migration {
if m.allVersionsStub == nil {
panic("AllVersions is not stubbed")
}

return m.allVersionsStub()
}

func (m *MigratorStub) ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) {
if m.allVersionsStub == nil {
panic("ExistingVersions is not stubbed")
}

return m.existingVersionsStub(ctx)
}

func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) {
if m.allVersionsStub == nil {
panic("GetVersion is not stubbed")
}

return m.getVersionStub(version)
}

func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) {
if m.allVersionsStub == nil {
panic("Migrate is not stubbed")
}

return m.migrateStub(ctx, direction, opts)
}

func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) {
if m.allVersionsStub == nil {
panic("Validate is not stubbed")
}

return m.validateStub(ctx)
}

var (
testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 1} //nolint:gochecknoglobals
testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 2} //nolint:gochecknoglobals
testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 3} //nolint:gochecknoglobals

testMigrationAll = []rivermigrate.Migration{testMigration01, testMigration02, testMigration03} //nolint:gochecknoglobals
)

func TestMigrateList(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
buf *bytes.Buffer
migratorStub *MigratorStub
}

setup := func(t *testing.T) (*migrateList, *testBundle) {
t.Helper()

cmd, buf := withMigrateBase(t, &migrateList{})

migratorStub := &MigratorStub{}
migratorStub.allVersionsStub = func() []rivermigrate.Migration { return testMigrationAll }
migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil }

cmd.GetCommandBase().GetMigrator = func(config *rivermigrate.Config) MigratorInterface { return migratorStub }

return cmd, &testBundle{
buf: buf,
migratorStub: migratorStub,
}
}

t.Run("NoExistingMigrations", func(t *testing.T) {
t.Parallel()

migrateList, bundle := setup(t)

_, err := migrateList.Run(ctx, &migrateListOpts{})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
001 1st migration
002 2nd migration
003 3rd migration
`), strings.TrimSpace(bundle.buf.String()))
})

t.Run("WithExistingMigrations", func(t *testing.T) {
t.Parallel()

migrateList, bundle := setup(t)

bundle.migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) {
return []rivermigrate.Migration{testMigration01, testMigration02}, nil
}

_, err := migrateList.Run(ctx, &migrateListOpts{})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
001 1st migration
* 002 2nd migration
003 3rd migration
`), strings.TrimSpace(bundle.buf.String()))
})
}

func withMigrateBase[TCommand Command[TOpts], TOpts CommandOpts](t *testing.T, cmd TCommand) (TCommand, *bytes.Buffer) {
t.Helper()

var buf bytes.Buffer
cmd.SetCommandBase(&CommandBase{
Logger: riversharedtest.Logger(t),
Out: &buf,

GetMigrator: func(config *rivermigrate.Config) MigratorInterface { return &MigratorStub{} },
})
return cmd, &buf
}

func TestMigrationComment(t *testing.T) {
t.Parallel()

Expand Down
46 changes: 46 additions & 0 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,52 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx]
})
}

// ExistingVersions gets the existing set of versions that have been migrated in
// the database, ordered by version.
func (m *Migrator[TTx]) ExistingVersions(ctx context.Context) ([]Migration, error) {
migrations, err := m.existingMigrations(ctx, m.driver.GetExecutor())
if err != nil {
return nil, err
}

versions, err := m.versionsFromDriver(migrations)
if err != nil {
return nil, err
}

return versions, nil
}

// ExistingVersions gets the existing set of versions that have been migrated in
// the database, ordered by version.
//
// This variant checks for existing versions in a transaction.
func (m *Migrator[TTx]) ExistingVersionsTx(ctx context.Context, tx TTx) ([]Migration, error) {
migrations, err := m.existingMigrations(ctx, m.driver.UnwrapExecutor(tx))
if err != nil {
return nil, err
}

versions, err := m.versionsFromDriver(migrations)
if err != nil {
return nil, err
}

return versions, nil
}

func (m *Migrator[TTx]) versionsFromDriver(migrations []*riverdriver.Migration) ([]Migration, error) {
versions := make([]Migration, len(migrations))
for i, existingMigration := range migrations {
migration, ok := m.migrations[existingMigration.Version]
if !ok {
return nil, fmt.Errorf("migration %d not found in migrator bundle", existingMigration.Version)
}
versions[i] = migration
}
return versions, nil
}

// MigrateOpts are options for a migrate operation.
type MigrateOpts struct {
DryRun bool
Expand Down
46 changes: 46 additions & 0 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,52 @@ func TestMigrator(t *testing.T) {
require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsDefault", func(t *testing.T) {
t.Parallel()

migrator, _ := setup(t)

migrations, err := migrator.ExistingVersions(ctx)
require.NoError(t, err)
require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsTxDefault", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, seqOneTo(migrationsBundle.MaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsTxEmpty", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1})
require.NoError(t, err)

migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, []int{}, sliceutil.Map(migrations, migrationToInt))
})

t.Run("ExistingMigrationsTxFullyMigrated", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)

migrations, err := migrator.ExistingVersionsTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, seqOneTo(migrationsBundle.WithTestVersionsMaxVersion), sliceutil.Map(migrations, migrationToInt))
})

t.Run("MigrateDownDefault", func(t *testing.T) {
t.Parallel()

Expand Down
Loading