Skip to content

Commit

Permalink
add integration test for governance upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaczanowski committed Nov 9, 2024
1 parent 40d8768 commit 3ddaa8d
Showing 1 changed file with 203 additions and 93 deletions.
296 changes: 203 additions & 93 deletions internal/pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
urproto "blazar/internal/pkg/proto/upgrades_registry"
vrproto "blazar/internal/pkg/proto/version_resolver"
"blazar/internal/pkg/provider"
"blazar/internal/pkg/provider/chain"
"blazar/internal/pkg/provider/database"
"blazar/internal/pkg/provider/local"
"blazar/internal/pkg/state_machine"
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestIntegrationDaemon(t *testing.T) {
metrics, err := metrics.NewMetrics("/path/to/docker-compose.yml", "dummy", "test")
require.NoError(t, err)

ports := getFreePorts(t, 4)
ports := getFreePorts(t, 6)

t.Run("LocalProvider", func(t *testing.T) {
name := fmt.Sprintf("blazar-e2e-test-local-simapp-%d", rand.Uint64())
Expand All @@ -93,7 +94,7 @@ func TestIntegrationDaemon(t *testing.T) {
t.Fatalf("failed to create local provider: %v", err)
}

run(t, metrics, provider, urproto.ProviderType_LOCAL, tempDir, name, ports[0], ports[1])
runNonChain(t, metrics, provider, urproto.ProviderType_LOCAL, tempDir, name, ports[0], ports[1])
})

t.Run("DatabaseProvider", func(t *testing.T) {
Expand All @@ -106,18 +107,19 @@ func TestIntegrationDaemon(t *testing.T) {
t.Fatalf("failed to create database provider: %v", err)
}

run(t, metrics, provider, urproto.ProviderType_DATABASE, tempDir, name, ports[2], ports[3])
runNonChain(t, metrics, provider, urproto.ProviderType_DATABASE, tempDir, name, ports[2], ports[3])
})
}

// The integration test for the daemon asserts that all 3 types of upgrades are successfully executed (for a given provider). This is:
// 1. GOVERNANCE
// 2. NON_GOVERNANCE_UNCOORDINATED
// 3. NON_GOVERNANCE_COORDINATED
func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, source urproto.ProviderType, tempDir, serviceName string, grpcPort, cometbftPort int) {
// ------ PREPARE ENVIRONMENT ------ //
cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort)
t.Run("ChainProvider", func(t *testing.T) {
name := fmt.Sprintf("blazar-e2e-test-chain-simapp-%d", rand.Uint64())
t.Parallel()
tempDir := testutils.PrepareTestData(t, "", "daemon", name)

runChain(t, metrics, tempDir, name, ports[4], ports[5])
})
}

func injectTestLogger(cfg *config.Config) (*threadSafeBuffer, context.Context) {
// inject test logger
outBuffer := &threadSafeBuffer{}
output := zerolog.ConsoleWriter{Out: outBuffer, TimeFormat: time.Kitchen, NoColor: true}
Expand All @@ -126,30 +128,165 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
ctx := logger.WithContext(context.Background(), &log)
fallbackNotifier := notification.NewFallbackNotifier(cfg, nil, &log, "test")
ctx = notification.WithContextFallback(ctx, fallbackNotifier)
return outBuffer, ctx
}

// compose client with logger
dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode)
require.NoError(t, err)

func setUIDEnv(t *testing.T) {
// ensure we run container with current user (not root!)
currentUser, err := user.Current()
require.NoError(t, err)
err = os.Setenv("MY_UID", currentUser.Uid)
require.NoError(t, err)
}

func initUrSm(t *testing.T, source urproto.ProviderType, prvdr provider.UpgradeProvider, tempDir string) (*upgrades_registry.UpgradeRegistry, *state_machine.StateMachine) {
// initialzie new upgrade registry
sm := state_machine.NewStateMachine(nil)
versionResolvers := []urproto.ProviderType{source}
upgradeProviders := map[urproto.ProviderType]provider.UpgradeProvider{source: prvdr}
// we will attach a local provider in the chain case, to serve as the version resolver
if source == urproto.ProviderType_CHAIN {
versionResolvers = append(versionResolvers, urproto.ProviderType_LOCAL)
provider, err := local.NewProvider(
path.Join(tempDir, "blazar", "local.db.json"),
"test",
1,
)
require.NoError(t, err)
upgradeProviders[urproto.ProviderType_LOCAL] = provider
}
ur := upgrades_registry.NewUpgradeRegistry(
map[urproto.ProviderType]provider.UpgradeProvider{source: prvdr},
[]urproto.ProviderType{source},
upgradeProviders,
versionResolvers,
sm,
"test",
)
return ur, sm
}

func startAndTestGovUpgrade(ctx context.Context, t *testing.T, daemon *Daemon, cfg *config.Config, outBuffer *threadSafeBuffer, versionProvideer urproto.ProviderType) {
// start the simapp node
_, _, err := cmd.CheckOutputWithDeadline(ctx, 5*time.Second, nil, "docker", "compose", "-f", cfg.ComposeFile, "up", "-d", "--force-recreate")
require.NoError(t, err)

// start cosmos client and wait for it to be ready
cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout)
require.NoError(t, err)

for range 20 {
if err = cosmosClient.StartCometbftClient(); err != nil {
time.Sleep(500 * time.Millisecond)
continue
}
}

require.NoError(t, err)
daemon.cosmosClient = cosmosClient

// wait just in case the rpc is not responsive yet
time.Sleep(2 * time.Second)

// the governance proposal passes by ~7 height in chain provoider case
heightIncreased := false
for range 50 {
height, err := cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
if height > 7 {
heightIncreased = true
break
}
time.Sleep(500 * time.Millisecond)
}

if !heightIncreased {
t.Fatal("Test chain height did not cross 7 in expected time")
}

err = daemon.ur.RegisterVersion(ctx, &vrproto.Version{
Height: 10,
Tag: strings.Split(simd2RepoTag, ":")[1],
Network: "test",
Source: versionProvideer,
Priority: 1,
}, false)

require.NoError(t, err)

// refresh the upgrade registry cache
// in the gov case the upgrade should be registered by the test cripts by now
_, _, _, _, err = daemon.ur.Update(ctx, 0, true)
require.NoError(t, err)

// ------ TEST GOVERNANCE UPGRADE ------ //
// we expect the chain to upgrade to simd2RepoTag at height 10 //
latestHeight, err := cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(8), "the test is faulty, the chain is already at height >= 8")

height, err := daemon.waitForUpgrade(ctx, cfg)
require.NoError(t, err)
require.Equal(t, int64(10), height)

// get simapp container logs
var stdout bytes.Buffer
cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
cmd.Stdout = &stdout
err = cmd.Run()
require.NoError(t, err)

// chain process must have logged upgrade height being hit
require.Contains(t, stdout.String(), "UPGRADE \"test1\" NEEDED at height: 10")

requirePreCheckStatus(t, daemon.stateMachine, 10)

// perform the upgrade
err = daemon.performUpgrade(ctx, &cfg.Compose, cfg.ComposeService, height)
require.NoError(t, err)

// ensure the upgrade was successful
isImageContainerRunning, err := daemon.dcc.IsServiceRunning(ctx, cfg.ComposeService, time.Second*2)
require.NoError(t, err)
require.True(t, isImageContainerRunning)

// blazar should have logged all this
require.Contains(t, outBuffer.String(), fmt.Sprintf("Monitoring %s for new upgrades", cfg.UpgradeInfoFilePath()))
require.Contains(t, outBuffer.String(), "Received upgrade data from upgrade-info.json")
require.Contains(t, outBuffer.String(), "Executing compose up")
require.Contains(t, outBuffer.String(), fmt.Sprintf("Upgrade completed. New image: %s", simd2RepoTag))

// lets see if post upgrade checks pass
err = daemon.postUpgradeChecks(ctx, daemon.stateMachine, &cfg.Checks.PostUpgrade, height)
require.NoError(t, err)

requirePostCheckStatus(t, daemon.stateMachine, 10)

outBuffer.Reset()
}

// The integration test for the daemon asserts that all 3 types of upgrades are successfully executed (for a given provider). This is:
// 1. GOVERNANCE
// 2. NON_GOVERNANCE_UNCOORDINATED
// 3. NON_GOVERNANCE_COORDINATED
func runNonChain(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, source urproto.ProviderType, tempDir, serviceName string, grpcPort, cometbftPort int) {
// ------ PREPARE ENVIRONMENT ------ //
cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort)

outBuffer, ctx := injectTestLogger(cfg)

// compose client with logger
dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode)
require.NoError(t, err)

// ensure we run container with current user (not root!)
setUIDEnv(t)

// initialize new upgrade registry
ur, sm := initUrSm(t, source, prvdr, tempDir)

// add test upgrades
require.NoError(t, ur.AddUpgrade(ctx, &urproto.Upgrade{
Height: 10,
Tag: strings.Split(simd2RepoTag, ":")[1],
Tag: "", // the version resolver will get this
Network: "test",
Name: "test",
Type: urproto.UpgradeType_GOVERNANCE,
Expand Down Expand Up @@ -195,10 +332,6 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
ProposalId: nil,
}, false))

// refresh the upgrade registry cache
_, _, _, _, err = ur.Update(ctx, 0, true)
require.NoError(t, err)

daemon := Daemon{
dcc: dcc,
ur: ur,
Expand All @@ -211,85 +344,20 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
},
},
}
require.NoError(t, err)

// start the siapp node
_, _, err = cmd.CheckOutputWithDeadline(ctx, 5*time.Second, nil, "docker", "compose", "-f", cfg.ComposeFile, "up", "-d", "--force-recreate")
require.NoError(t, err)

// start cosmos client and wait for it to be ready
cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout)
require.NoError(t, err)

for range 20 {
if err = cosmosClient.StartCometbftClient(); err != nil {
time.Sleep(500 * time.Millisecond)
continue
}
}

require.NoError(t, err)
daemon.cosmosClient = cosmosClient

// wait just in case the rpc is not responsive yet
time.Sleep(2 * time.Second)

// ------ TEST GOVERNANCE UPGRADE ------ //
// we expect the chain to upgrade to simd2RepoTag at height 10 //
latestHeight, err := cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(8), "the test is faulty, the chain is already at height >= 8")

height, err := daemon.waitForUpgrade(ctx, cfg)
require.NoError(t, err)
require.Equal(t, int64(10), height)

// get simapp container logs
var stdout bytes.Buffer
cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
cmd.Stdout = &stdout
err = cmd.Run()
require.NoError(t, err)

// chain process must have logged upgrade height being hit
require.Contains(t, stdout.String(), "UPGRADE \"test1\" NEEDED at height: 10")

requirePreCheckStatus(t, sm, 10)

// perform the upgrade
err = daemon.performUpgrade(ctx, &cfg.Compose, cfg.ComposeService, height)
require.NoError(t, err)

// ensure the upgrade was successful
isImageContainerRunning, err := dcc.IsServiceRunning(ctx, cfg.ComposeService, time.Second*2)
require.NoError(t, err)
require.True(t, isImageContainerRunning)

// blazar should have logged all this
require.Contains(t, outBuffer.String(), fmt.Sprintf("Monitoring %s for new upgrades", cfg.UpgradeInfoFilePath()))
require.Contains(t, outBuffer.String(), "Received upgrade data from upgrade-info.json")
require.Contains(t, outBuffer.String(), "Executing compose up")
require.Contains(t, outBuffer.String(), fmt.Sprintf("Upgrade completed. New image: %s", simd2RepoTag))

// lets see if post upgrade checks pass
err = daemon.postUpgradeChecks(ctx, sm, &cfg.Checks.PostUpgrade, height)
require.NoError(t, err)

requirePreCheckStatus(t, sm, 10)

outBuffer.Reset()
startAndTestGovUpgrade(ctx, t, &daemon, cfg, outBuffer, source)

// ------ TEST NON_GOVERNANCE_UNCOORDINATED UPGRADE ------ //
// we expect the chain to upgrade to simd2RepoTag at height 13 //
latestHeight, err = cosmosClient.GetLatestBlockHeight(ctx)
latestHeight, err := daemon.cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(11), "the test is faulty, the chain is already at height >= 11")

upgrades, err := ur.GetUpcomingUpgrades(ctx, false, 11, urproto.UpgradeStatus_SCHEDULED, urproto.UpgradeStatus_ACTIVE, urproto.UpgradeStatus_EXECUTING)
require.NoError(t, err)
require.Len(t, upgrades, 2)

height, err = daemon.waitForUpgrade(ctx, cfg)
height, err := daemon.waitForUpgrade(ctx, cfg)
require.NoError(t, err)
require.Equal(t, int64(13), height)

Expand All @@ -314,7 +382,7 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,

// ------ TEST NON_GOVERNANCE_COORDINATED UPGRADE (with HALT_HEIGHT) ------ //
// we expect the chain to upgrade to simd2RepoTag at height 19 //
latestHeight, err = cosmosClient.GetLatestBlockHeight(ctx)
latestHeight, err = daemon.cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(14), "the test is faulty, the chain is already at height > 14")

Expand All @@ -327,8 +395,8 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
require.Equal(t, int64(19), height)

// get container logs
stdout.Reset()
cmd = exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
var stdout bytes.Buffer
cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
cmd.Stdout = &stdout
err = cmd.Run()
require.NoError(t, err)
Expand Down Expand Up @@ -359,6 +427,48 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
require.NoError(t, err)
}

// Similar as above but only does the test run for GOVERNANCE upgrade for chain provider
// since chain provider doesn't support other types of upgrades
func runChain(t *testing.T, metrics *metrics.Metrics, tempDir, serviceName string, grpcPort, cometbftPort int) {
// ------ PREPARE ENVIRONMENT ------ //
cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort)

outBuffer, ctx := injectTestLogger(cfg)

// compose client with logger
dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode)
require.NoError(t, err)

setUIDEnv(t)

cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout)
require.NoError(t, err)

prvdr := chain.NewProvider(cosmosClient, "test", 1)

// initialize new upgrade registry
ur, sm := initUrSm(t, urproto.ProviderType_CHAIN, prvdr, tempDir)

daemon := Daemon{
dcc: dcc,
ur: ur,
stateMachine: sm,
metrics: metrics,
observedBlockSpeeds: make([]time.Duration, 5),
nodeInfo: &tmservice.GetNodeInfoResponse{
DefaultNodeInfo: &p2p.DefaultNodeInfo{
Network: "test",
},
},
}

startAndTestGovUpgrade(ctx, t, &daemon, cfg, outBuffer, urproto.ProviderType_LOCAL)

// cleanup
err = dcc.Down(ctx, cfg.ComposeService, cfg.Compose.DownTimeout)
require.NoError(t, err)
}

func requirePreCheckStatus(t *testing.T, sm *state_machine.StateMachine, height int64) {
require.Equal(t, checksproto.CheckStatus_FINISHED, sm.GetPreCheckStatus(height, checksproto.PreCheck_PULL_DOCKER_IMAGE))
require.Equal(t, checksproto.CheckStatus_FINISHED, sm.GetPreCheckStatus(height, checksproto.PreCheck_SET_HALT_HEIGHT))
Expand Down

0 comments on commit 3ddaa8d

Please sign in to comment.