diff --git a/internal/pkg/daemon/daemon_test.go b/internal/pkg/daemon/daemon_test.go index fd34d0c..cd773e1 100644 --- a/internal/pkg/daemon/daemon_test.go +++ b/internal/pkg/daemon/daemon_test.go @@ -28,6 +28,7 @@ import ( urproto "blazar/internal/pkg/proto/upgrades_registry" vrproto "blazar/internal/pkg/proto/version_resolver" "blazar/internal/pkg/provider" + "blazar/internal/pkg/provider/chain" "blazar/internal/pkg/provider/database" "blazar/internal/pkg/provider/local" "blazar/internal/pkg/state_machine" @@ -77,7 +78,7 @@ func TestIntegrationDaemon(t *testing.T) { metrics, err := metrics.NewMetrics("/path/to/docker-compose.yml", "dummy", "test") require.NoError(t, err) - ports := getFreePorts(t, 4) + ports := getFreePorts(t, 6) t.Run("LocalProvider", func(t *testing.T) { name := fmt.Sprintf("blazar-e2e-test-local-simapp-%d", rand.Uint64()) @@ -93,7 +94,7 @@ func TestIntegrationDaemon(t *testing.T) { t.Fatalf("failed to create local provider: %v", err) } - run(t, metrics, provider, urproto.ProviderType_LOCAL, tempDir, name, ports[0], ports[1]) + runNonChain(t, metrics, provider, urproto.ProviderType_LOCAL, tempDir, name, ports[0], ports[1]) }) t.Run("DatabaseProvider", func(t *testing.T) { @@ -106,18 +107,19 @@ func TestIntegrationDaemon(t *testing.T) { t.Fatalf("failed to create database provider: %v", err) } - run(t, metrics, provider, urproto.ProviderType_DATABASE, tempDir, name, ports[2], ports[3]) + runNonChain(t, metrics, provider, urproto.ProviderType_DATABASE, tempDir, name, ports[2], ports[3]) }) -} -// The integration test for the daemon asserts that all 3 types of upgrades are successfully executed (for a given provider). This is: -// 1. GOVERNANCE -// 2. NON_GOVERNANCE_UNCOORDINATED -// 3. NON_GOVERNANCE_COORDINATED -func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, source urproto.ProviderType, tempDir, serviceName string, grpcPort, cometbftPort int) { - // ------ PREPARE ENVIRONMENT ------ // - cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort) + t.Run("ChainProvider", func(t *testing.T) { + name := fmt.Sprintf("blazar-e2e-test-chain-simapp-%d", rand.Uint64()) + t.Parallel() + tempDir := testutils.PrepareTestData(t, "", "daemon", name) + runChain(t, metrics, tempDir, name, ports[4], ports[5]) + }) +} + +func injectTestLogger(cfg *config.Config) (*threadSafeBuffer, context.Context) { // inject test logger outBuffer := &threadSafeBuffer{} output := zerolog.ConsoleWriter{Out: outBuffer, TimeFormat: time.Kitchen, NoColor: true} @@ -126,30 +128,165 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, ctx := logger.WithContext(context.Background(), &log) fallbackNotifier := notification.NewFallbackNotifier(cfg, nil, &log, "test") ctx = notification.WithContextFallback(ctx, fallbackNotifier) + return outBuffer, ctx +} - // compose client with logger - dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode) - require.NoError(t, err) - +func setUIDEnv(t *testing.T) { // ensure we run container with current user (not root!) currentUser, err := user.Current() require.NoError(t, err) err = os.Setenv("MY_UID", currentUser.Uid) require.NoError(t, err) +} +func initUrSm(t *testing.T, source urproto.ProviderType, prvdr provider.UpgradeProvider, tempDir string) (*upgrades_registry.UpgradeRegistry, *state_machine.StateMachine) { // initialzie new upgrade registry sm := state_machine.NewStateMachine(nil) + versionResolvers := []urproto.ProviderType{source} + upgradeProviders := map[urproto.ProviderType]provider.UpgradeProvider{source: prvdr} + // we will attach a local provider in the chain case, to serve as the version resolver + if source == urproto.ProviderType_CHAIN { + versionResolvers = append(versionResolvers, urproto.ProviderType_LOCAL) + provider, err := local.NewProvider( + path.Join(tempDir, "blazar", "local.db.json"), + "test", + 1, + ) + require.NoError(t, err) + upgradeProviders[urproto.ProviderType_LOCAL] = provider + } ur := upgrades_registry.NewUpgradeRegistry( - map[urproto.ProviderType]provider.UpgradeProvider{source: prvdr}, - []urproto.ProviderType{source}, + upgradeProviders, + versionResolvers, sm, "test", ) + return ur, sm +} + +func startAndTestGovUpgrade(ctx context.Context, t *testing.T, daemon *Daemon, cfg *config.Config, outBuffer *threadSafeBuffer, versionProvideer urproto.ProviderType) { + // start the simapp node + _, _, err := cmd.CheckOutputWithDeadline(ctx, 5*time.Second, nil, "docker", "compose", "-f", cfg.ComposeFile, "up", "-d", "--force-recreate") + require.NoError(t, err) + + // start cosmos client and wait for it to be ready + cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout) + require.NoError(t, err) + + for range 20 { + if err = cosmosClient.StartCometbftClient(); err != nil { + time.Sleep(500 * time.Millisecond) + continue + } + } + + require.NoError(t, err) + daemon.cosmosClient = cosmosClient + + // wait just in case the rpc is not responsive yet + time.Sleep(2 * time.Second) + + // the governance proposal passes by ~7 height in chain provoider case + heightIncreased := false + for range 50 { + height, err := cosmosClient.GetLatestBlockHeight(ctx) + require.NoError(t, err) + if height > 7 { + heightIncreased = true + break + } + time.Sleep(500 * time.Millisecond) + } + + if !heightIncreased { + t.Fatal("Test chain height did not cross 7 in expected time") + } + + err = daemon.ur.RegisterVersion(ctx, &vrproto.Version{ + Height: 10, + Tag: strings.Split(simd2RepoTag, ":")[1], + Network: "test", + Source: versionProvideer, + Priority: 1, + }, false) + + require.NoError(t, err) + + // refresh the upgrade registry cache + // in the gov case the upgrade should be registered by the test cripts by now + _, _, _, _, err = daemon.ur.Update(ctx, 0, true) + require.NoError(t, err) + + // ------ TEST GOVERNANCE UPGRADE ------ // + // we expect the chain to upgrade to simd2RepoTag at height 10 // + latestHeight, err := cosmosClient.GetLatestBlockHeight(ctx) + require.NoError(t, err) + require.LessOrEqual(t, latestHeight, int64(8), "the test is faulty, the chain is already at height >= 8") + + height, err := daemon.waitForUpgrade(ctx, cfg) + require.NoError(t, err) + require.Equal(t, int64(10), height) + + // get simapp container logs + var stdout bytes.Buffer + cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs") + cmd.Stdout = &stdout + err = cmd.Run() + require.NoError(t, err) + + // chain process must have logged upgrade height being hit + require.Contains(t, stdout.String(), "UPGRADE \"test1\" NEEDED at height: 10") + + requirePreCheckStatus(t, daemon.stateMachine, 10) + + // perform the upgrade + err = daemon.performUpgrade(ctx, &cfg.Compose, cfg.ComposeService, height) + require.NoError(t, err) + + // ensure the upgrade was successful + isImageContainerRunning, err := daemon.dcc.IsServiceRunning(ctx, cfg.ComposeService, time.Second*2) + require.NoError(t, err) + require.True(t, isImageContainerRunning) + + // blazar should have logged all this + require.Contains(t, outBuffer.String(), fmt.Sprintf("Monitoring %s for new upgrades", cfg.UpgradeInfoFilePath())) + require.Contains(t, outBuffer.String(), "Received upgrade data from upgrade-info.json") + require.Contains(t, outBuffer.String(), "Executing compose up") + require.Contains(t, outBuffer.String(), fmt.Sprintf("Upgrade completed. New image: %s", simd2RepoTag)) + + // lets see if post upgrade checks pass + err = daemon.postUpgradeChecks(ctx, daemon.stateMachine, &cfg.Checks.PostUpgrade, height) + require.NoError(t, err) + + requirePostCheckStatus(t, daemon.stateMachine, 10) + + outBuffer.Reset() +} + +// The integration test for the daemon asserts that all 3 types of upgrades are successfully executed (for a given provider). This is: +// 1. GOVERNANCE +// 2. NON_GOVERNANCE_UNCOORDINATED +// 3. NON_GOVERNANCE_COORDINATED +func runNonChain(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, source urproto.ProviderType, tempDir, serviceName string, grpcPort, cometbftPort int) { + // ------ PREPARE ENVIRONMENT ------ // + cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort) + + outBuffer, ctx := injectTestLogger(cfg) + + // compose client with logger + dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode) + require.NoError(t, err) + + // ensure we run container with current user (not root!) + setUIDEnv(t) + + // initialize new upgrade registry + ur, sm := initUrSm(t, source, prvdr, tempDir) // add test upgrades require.NoError(t, ur.AddUpgrade(ctx, &urproto.Upgrade{ Height: 10, - Tag: strings.Split(simd2RepoTag, ":")[1], + Tag: "", // the version resolver will get this Network: "test", Name: "test", Type: urproto.UpgradeType_GOVERNANCE, @@ -195,10 +332,6 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, ProposalId: nil, }, false)) - // refresh the upgrade registry cache - _, _, _, _, err = ur.Update(ctx, 0, true) - require.NoError(t, err) - daemon := Daemon{ dcc: dcc, ur: ur, @@ -211,77 +344,12 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, }, }, } - require.NoError(t, err) - - // start the siapp node - _, _, err = cmd.CheckOutputWithDeadline(ctx, 5*time.Second, nil, "docker", "compose", "-f", cfg.ComposeFile, "up", "-d", "--force-recreate") - require.NoError(t, err) - - // start cosmos client and wait for it to be ready - cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout) - require.NoError(t, err) - - for range 20 { - if err = cosmosClient.StartCometbftClient(); err != nil { - time.Sleep(500 * time.Millisecond) - continue - } - } - - require.NoError(t, err) - daemon.cosmosClient = cosmosClient - - // wait just in case the rpc is not responsive yet - time.Sleep(2 * time.Second) - - // ------ TEST GOVERNANCE UPGRADE ------ // - // we expect the chain to upgrade to simd2RepoTag at height 10 // - latestHeight, err := cosmosClient.GetLatestBlockHeight(ctx) - require.NoError(t, err) - require.LessOrEqual(t, latestHeight, int64(8), "the test is faulty, the chain is already at height >= 8") - - height, err := daemon.waitForUpgrade(ctx, cfg) - require.NoError(t, err) - require.Equal(t, int64(10), height) - - // get simapp container logs - var stdout bytes.Buffer - cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs") - cmd.Stdout = &stdout - err = cmd.Run() - require.NoError(t, err) - - // chain process must have logged upgrade height being hit - require.Contains(t, stdout.String(), "UPGRADE \"test1\" NEEDED at height: 10") - - requirePreCheckStatus(t, sm, 10) - - // perform the upgrade - err = daemon.performUpgrade(ctx, &cfg.Compose, cfg.ComposeService, height) - require.NoError(t, err) - - // ensure the upgrade was successful - isImageContainerRunning, err := dcc.IsServiceRunning(ctx, cfg.ComposeService, time.Second*2) - require.NoError(t, err) - require.True(t, isImageContainerRunning) - - // blazar should have logged all this - require.Contains(t, outBuffer.String(), fmt.Sprintf("Monitoring %s for new upgrades", cfg.UpgradeInfoFilePath())) - require.Contains(t, outBuffer.String(), "Received upgrade data from upgrade-info.json") - require.Contains(t, outBuffer.String(), "Executing compose up") - require.Contains(t, outBuffer.String(), fmt.Sprintf("Upgrade completed. New image: %s", simd2RepoTag)) - // lets see if post upgrade checks pass - err = daemon.postUpgradeChecks(ctx, sm, &cfg.Checks.PostUpgrade, height) - require.NoError(t, err) - - requirePreCheckStatus(t, sm, 10) - - outBuffer.Reset() + startAndTestGovUpgrade(ctx, t, &daemon, cfg, outBuffer, source) // ------ TEST NON_GOVERNANCE_UNCOORDINATED UPGRADE ------ // // we expect the chain to upgrade to simd2RepoTag at height 13 // - latestHeight, err = cosmosClient.GetLatestBlockHeight(ctx) + latestHeight, err := daemon.cosmosClient.GetLatestBlockHeight(ctx) require.NoError(t, err) require.LessOrEqual(t, latestHeight, int64(11), "the test is faulty, the chain is already at height >= 11") @@ -289,7 +357,7 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, require.NoError(t, err) require.Len(t, upgrades, 2) - height, err = daemon.waitForUpgrade(ctx, cfg) + height, err := daemon.waitForUpgrade(ctx, cfg) require.NoError(t, err) require.Equal(t, int64(13), height) @@ -314,7 +382,7 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, // ------ TEST NON_GOVERNANCE_COORDINATED UPGRADE (with HALT_HEIGHT) ------ // // we expect the chain to upgrade to simd2RepoTag at height 19 // - latestHeight, err = cosmosClient.GetLatestBlockHeight(ctx) + latestHeight, err = daemon.cosmosClient.GetLatestBlockHeight(ctx) require.NoError(t, err) require.LessOrEqual(t, latestHeight, int64(14), "the test is faulty, the chain is already at height > 14") @@ -327,8 +395,8 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, require.Equal(t, int64(19), height) // get container logs - stdout.Reset() - cmd = exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs") + var stdout bytes.Buffer + cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs") cmd.Stdout = &stdout err = cmd.Run() require.NoError(t, err) @@ -359,6 +427,48 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, require.NoError(t, err) } +// Similar as above but only does the test run for GOVERNANCE upgrade for chain provider +// since chain provider doesn't support other types of upgrades +func runChain(t *testing.T, metrics *metrics.Metrics, tempDir, serviceName string, grpcPort, cometbftPort int) { + // ------ PREPARE ENVIRONMENT ------ // + cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort) + + outBuffer, ctx := injectTestLogger(cfg) + + // compose client with logger + dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode) + require.NoError(t, err) + + setUIDEnv(t) + + cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout) + require.NoError(t, err) + + prvdr := chain.NewProvider(cosmosClient, "test", 1) + + // initialize new upgrade registry + ur, sm := initUrSm(t, urproto.ProviderType_CHAIN, prvdr, tempDir) + + daemon := Daemon{ + dcc: dcc, + ur: ur, + stateMachine: sm, + metrics: metrics, + observedBlockSpeeds: make([]time.Duration, 5), + nodeInfo: &tmservice.GetNodeInfoResponse{ + DefaultNodeInfo: &p2p.DefaultNodeInfo{ + Network: "test", + }, + }, + } + + startAndTestGovUpgrade(ctx, t, &daemon, cfg, outBuffer, urproto.ProviderType_LOCAL) + + // cleanup + err = dcc.Down(ctx, cfg.ComposeService, cfg.Compose.DownTimeout) + require.NoError(t, err) +} + func requirePreCheckStatus(t *testing.T, sm *state_machine.StateMachine, height int64) { require.Equal(t, checksproto.CheckStatus_FINISHED, sm.GetPreCheckStatus(height, checksproto.PreCheck_PULL_DOCKER_IMAGE)) require.Equal(t, checksproto.CheckStatus_FINISHED, sm.GetPreCheckStatus(height, checksproto.PreCheck_SET_HALT_HEIGHT))