From 29ed381f717dfd5b47798065c01646b3e089c12e Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Mon, 28 Aug 2023 19:49:35 -0500 Subject: [PATCH 1/9] privatize RetriesXYZ() + retriesNoInput() --- conduit/pipeline/common.go | 14 +++++- conduit/pipeline/common_test.go | 86 ++++++++++++++++++++++++++++++--- 2 files changed, 90 insertions(+), 10 deletions(-) diff --git a/conduit/pipeline/common.go b/conduit/pipeline/common.go index 4c31a3ff..15bd58c2 100644 --- a/conduit/pipeline/common.go +++ b/conduit/pipeline/common.go @@ -7,6 +7,8 @@ import ( log "github.com/sirupsen/logrus" + sdk "github.com/algorand/go-algorand-sdk/v2/types" + "github.com/algorand/conduit/conduit/data" ) @@ -20,11 +22,11 @@ func HandlePanic(logger *log.Logger) { type empty struct{} type pluginInput interface { - uint64 | data.BlockData | string + uint64 | data.BlockData | string | empty } type pluginOutput interface { - pluginInput | empty + pluginInput | * sdk.Genesis } // Retries is a wrapper for retrying a function call f() with a cancellation context, @@ -81,3 +83,11 @@ func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg }, a, p, msg) return d, err } + +// RetriesNoInput applies the same logic as Retries, but for functions that take no input. +func RetriesNoInput[Y pluginOutput](f func() (Y, error), p *pipelineImpl, msg string) (Y, time.Duration, error) { + return Retries(func(x empty) (Y, error) { + return f() + }, empty{}, p, msg) +} + diff --git a/conduit/pipeline/common_test.go b/conduit/pipeline/common_test.go index cd9dcd4a..dec9f322 100644 --- a/conduit/pipeline/common_test.go +++ b/conduit/pipeline/common_test.go @@ -41,6 +41,19 @@ func TestRetries(t *testing.T) { } } + succeedAfterFactoryNoInput := func(succeedAfter uint64, never bool) func() (uint64, error) { + tries := uint64(0) + + return func() (uint64, error) { + if tries >= succeedAfter && !never { + return tries + 1, nil + } + tries++ + return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1) + } + } + + cases := []struct { name string retryCount uint64 @@ -106,8 +119,8 @@ func TestRetries(t *testing.T) { for _, tc := range cases { tc := tc - // run cases for Retries() - t.Run("Retries() "+tc.name, func(t *testing.T) { + // run cases for retries() + t.Run("retries() "+tc.name, func(t *testing.T) { t.Parallel() ctx, ccf := context.WithCancelCause(context.Background()) p := &pipelineImpl{ @@ -127,7 +140,7 @@ func TestRetries(t *testing.T) { yChan := make(chan uint64) errChan := make(chan error) go func() { - y, _, err := Retries(succeedAfter, 0, p, "test") + y, _, err := retries(succeedAfter, 0, p, "test") yChan <- y errChan <- err }() @@ -144,7 +157,7 @@ func TestRetries(t *testing.T) { return } - y, _, err := Retries(succeedAfter, 0, p, "test") + y, _, err := retries(succeedAfter, 0, p, "test") if tc.retryCount == 0 { // WLOG tc.neverSucceed == false require.NoError(t, err, tc.name) @@ -163,8 +176,8 @@ func TestRetries(t *testing.T) { } }) - // run cases for RetriesNoOutput() - t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) { + // run cases for retriesNoOutput() + t.Run("retriesNoOutput() "+tc.name, func(t *testing.T) { t.Parallel() ctx, ccf := context.WithCancelCause(context.Background()) p := &pipelineImpl{ @@ -183,7 +196,56 @@ func TestRetries(t *testing.T) { errChan := make(chan error) go func() { - _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + _, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test") + errChan <- err + }() + time.Sleep(5 * time.Millisecond) + errTestCancelled := errors.New("test cancelled") + go func() { + ccf(errTestCancelled) + }() + err := <-errChan + require.ErrorIs(t, err, errTestCancelled, tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + return + } + + _, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test") + if tc.retryCount == 0 { // WLOG tc.neverSucceed == false + require.NoError(t, err, tc.name) + } else { // retryCount > 0 so doesn't retry forever + if tc.neverSucceed || tc.succeedAfter > tc.retryCount { + require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + } else { // !tc.neverSucceed && succeedAfter <= retryCount + require.NoError(t, err, tc.name) + } + } + }) + + // run case for retriesNoInput() + t.Run("retriesNoInput() "+tc.name, func(t *testing.T) { + t.Parallel() + ctx, ccf := context.WithCancelCause(context.Background()) + p := &pipelineImpl{ + ctx: ctx, + ccf: ccf, + logger: log.New(), + cfg: &data.Config{ + RetryCount: tc.retryCount, + RetryDelay: 1 * time.Millisecond, + }, + } + succeedAfterNoInput := succeedAfterFactoryNoInput(tc.succeedAfter, tc.neverSucceed) + + if tc.retryCount == 0 && tc.neverSucceed { + // avoid infinite loop by cancelling the context + + errChan := make(chan error) + yChan := make(chan uint64) + go func() { + out, _, err := retriesNoInput(succeedAfterNoInput, p, "test") + yChan <- out errChan <- err }() time.Sleep(5 * time.Millisecond) @@ -191,23 +253,31 @@ func TestRetries(t *testing.T) { go func() { ccf(errTestCancelled) }() + y := <-yChan err := <-errChan require.ErrorIs(t, err, errTestCancelled, tc.name) require.ErrorIs(t, err, errSentinelCause, tc.name) + require.Zero(t, y, tc.name) return } - _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + y, _, err := retriesNoInput(succeedAfterNoInput, p, "test") if tc.retryCount == 0 { // WLOG tc.neverSucceed == false require.NoError(t, err, tc.name) + + // note we subtract 1 from y below because succeedAfter has added 1 to its output + require.Equal(t, tc.succeedAfter, y-1, tc.name) } else { // retryCount > 0 so doesn't retry forever if tc.neverSucceed || tc.succeedAfter > tc.retryCount { require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name) require.ErrorIs(t, err, errSentinelCause, tc.name) + require.Zero(t, y, tc.name) } else { // !tc.neverSucceed && succeedAfter <= retryCount require.NoError(t, err, tc.name) + require.Equal(t, tc.succeedAfter, y-1, tc.name) } } }) + } } From 78b8447f8187d1ef49ad9f6c61c503e4c2ef342d Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Mon, 28 Aug 2023 20:03:08 -0500 Subject: [PATCH 2/9] don't block inside of Start() --- conduit/pipeline/common.go | 16 ++++++++-------- conduit/pipeline/pipeline.go | 10 ++++------ pkg/cli/cli.go | 3 +++ 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/conduit/pipeline/common.go b/conduit/pipeline/common.go index 15bd58c2..ceaf2df4 100644 --- a/conduit/pipeline/common.go +++ b/conduit/pipeline/common.go @@ -29,7 +29,7 @@ type pluginOutput interface { pluginInput | * sdk.Genesis } -// Retries is a wrapper for retrying a function call f() with a cancellation context, +// retries is a wrapper for retrying a function call f() with a cancellation context, // a delay and a max retry count. It attempts to call the wrapped function at least once // and only after the first attempt will pay attention to a context cancellation. // This can allow the pipeline to receive a cancellation and guarantee attempting to finish @@ -47,7 +47,7 @@ type pluginOutput interface { // - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries // - when p.cfg.retryCount == 0, the error will be the last error encountered // - the returned duration dur is the total time spent in the function, including retries -func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { +func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { start := time.Now() for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ { @@ -76,17 +76,17 @@ func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe return } -// RetriesNoOutput applies the same logic as Retries, but for functions that return no output. -func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { - _, d, err := Retries(func(x X) (empty, error) { +// retriesNoOutput applies the same logic as Retries, but for functions that return no output. +func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { + _, d, err := retries(func(x X) (empty, error) { return empty{}, f(x) }, a, p, msg) return d, err } -// RetriesNoInput applies the same logic as Retries, but for functions that take no input. -func RetriesNoInput[Y pluginOutput](f func() (Y, error), p *pipelineImpl, msg string) (Y, time.Duration, error) { - return Retries(func(x empty) (Y, error) { +// retriesNoInput applies the same logic as Retries, but for functions that take no input. +func retriesNoInput[Y pluginOutput](f func() (Y, error), p *pipelineImpl, msg string) (Y, time.Duration, error) { + return retries(func(x empty) (Y, error) { return f() }, empty{}, p, msg) } diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 61e95da6..476cc8a0 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -337,7 +337,7 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err) } - (*p.initProvider).SetGenesis(genesis) + (*p.initProvider).SetGenesis(genesis) // write pipeline metadata gh := genesis.Hash() @@ -481,7 +481,7 @@ func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <- totalSelectWait += waitTime p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd) - blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name) + blkData, importTime, lastError := retries(importer.GetBlock, rnd, p, importer.Metadata().Name) if lastError != nil { p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError)) return @@ -533,7 +533,7 @@ func (p *pipelineImpl) processorHandler(idx int, proc processors.Processor, blkI var procTime time.Duration var lastError error - blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name) + blk, procTime, lastError = retries(proc.Process, blk, p, proc.Metadata().Name) if lastError != nil { p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError)) return @@ -598,7 +598,7 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug } var exportTime time.Duration - exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName) + exportTime, lastError = retriesNoOutput(exporter.Receive, blk, p, eName) if lastError != nil { lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError) return @@ -696,8 +696,6 @@ func (p *pipelineImpl) Start() { } } }(p.pipelineMetadata.NextRound) - - <-p.ctx.Done() } func (p *pipelineImpl) Wait() { diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index de9cb2f9..1aafeee0 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -105,6 +105,7 @@ func runConduitCmdWithConfig(args *data.Args) error { // Start server if pCfg.API.Address != "" { + logger.Infof("starting API server on %s", pCfg.API.Address) shutdown, err := api.StartServer(logger, pline, pCfg.API.Address) if err != nil { // Suppress log, it is about to be printed to stderr. @@ -114,6 +115,8 @@ func runConduitCmdWithConfig(args *data.Args) error { return fmt.Errorf("failed to start API server: %w", err) } defer shutdown(context.Background()) + } else { + logger.Info("API server is disabled") } pline.Wait() From a4d7376918413b270b11d0f891a9c469e6ff2e04 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Mon, 28 Aug 2023 20:06:54 -0500 Subject: [PATCH 3/9] trim the special end of round log --- conduit/pipeline/pipeline.go | 3 +-- conduit/pipeline/pipeline_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 476cc8a0..3536cf10 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -648,8 +648,7 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { return fmt.Sprintf( - "UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s", - nextRound, + "FINISHED Pipeline round r=%d (%d txn) exported in %s", lastRound, topLevelTxnCount, exportTime, diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index ea8db42c..3cdfda09 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -995,7 +995,7 @@ func TestLogStatsE2Elog(t *testing.T) { numTxns := 13 duration := 12345600 * time.Microsecond - expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" + expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" log := logstatsE2Elog(nextRound, round, numTxns, duration) require.Equal(t, expectedLog, log) From f96dbc82f038473d315d641194ec46187851c913 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Mon, 28 Aug 2023 21:27:45 -0500 Subject: [PATCH 4/9] noop importer --- conduit/plugins/importers/all/all.go | 1 + .../plugins/importers/noop/noop_importer.go | 76 +++++++++++++++++++ .../importers/noop/noop_importer_config.go | 7 ++ conduit/plugins/importers/noop/sample.yaml | 3 + 4 files changed, 87 insertions(+) create mode 100644 conduit/plugins/importers/noop/noop_importer.go create mode 100644 conduit/plugins/importers/noop/noop_importer_config.go create mode 100644 conduit/plugins/importers/noop/sample.yaml diff --git a/conduit/plugins/importers/all/all.go b/conduit/plugins/importers/all/all.go index 7222c549..a6450a47 100644 --- a/conduit/plugins/importers/all/all.go +++ b/conduit/plugins/importers/all/all.go @@ -4,4 +4,5 @@ import ( // Call package wide init function _ "github.com/algorand/conduit/conduit/plugins/importers/algod" _ "github.com/algorand/conduit/conduit/plugins/importers/filereader" + _ "github.com/algorand/conduit/conduit/plugins/importers/noop" ) diff --git a/conduit/plugins/importers/noop/noop_importer.go b/conduit/plugins/importers/noop/noop_importer.go new file mode 100644 index 00000000..cb6aaf16 --- /dev/null +++ b/conduit/plugins/importers/noop/noop_importer.go @@ -0,0 +1,76 @@ +package noop + +import ( + "context" + _ "embed" // used to embed config + "fmt" + + "github.com/sirupsen/logrus" + + sdk "github.com/algorand/go-algorand-sdk/v2/types" + + "github.com/algorand/conduit/conduit/data" + "github.com/algorand/conduit/conduit/plugins" + "github.com/algorand/conduit/conduit/plugins/importers" +) + +// PluginName to use when configuring. +var PluginName = "noop" + +// `noopImporter`s will function without ever erroring. This means they will also process out of order blocks +// which may or may not be desirable for different use cases--it can hide errors in actual importers expecting in order +// block processing. +// The `noopImporter` will maintain `Round` state according to the round of the last block it processed. +type noopImporter struct { + round uint64 + cfg ImporterConfig +} + +//go:embed sample.yaml +var sampleConfig string + +var metadata = plugins.Metadata{ + Name: PluginName, + Description: "noop importer", + Deprecated: false, + SampleConfig: sampleConfig, +} + +func (exp *noopImporter) Metadata() plugins.Metadata { + return metadata +} + +func (exp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error { + if err := cfg.UnmarshalConfig(&exp.cfg); err != nil { + return fmt.Errorf("init failure in unmarshalConfig: %v", err) + } + exp.round = exp.cfg.Round + return nil +} + +func (exp *noopImporter) Close() error { + return nil +} + +func (i *noopImporter) GetGenesis() (*sdk.Genesis, error) { + return &sdk.Genesis{}, nil +} + +func (exp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) { + exp.round = rnd + return data.BlockData{ + BlockHeader: sdk.BlockHeader{ + Round: sdk.Round(rnd), + }, + }, nil +} + +func (exp *noopImporter) Round() uint64 { + return exp.round +} + +func init() { + importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer { + return &noopImporter{} + })) +} diff --git a/conduit/plugins/importers/noop/noop_importer_config.go b/conduit/plugins/importers/noop/noop_importer_config.go new file mode 100644 index 00000000..f49964e5 --- /dev/null +++ b/conduit/plugins/importers/noop/noop_importer_config.go @@ -0,0 +1,7 @@ +package noop + +// ImporterConfig specific to the noop importer +type ImporterConfig struct { + // Optionally specify the round to start on + Round uint64 `yaml:"round"` +} diff --git a/conduit/plugins/importers/noop/sample.yaml b/conduit/plugins/importers/noop/sample.yaml new file mode 100644 index 00000000..dace3e9c --- /dev/null +++ b/conduit/plugins/importers/noop/sample.yaml @@ -0,0 +1,3 @@ +name: noop +# noop has no config +config: \ No newline at end of file From d65afb494cc83bf431ccd437aee9b54ba6adfb88 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Mon, 28 Aug 2023 21:29:17 -0500 Subject: [PATCH 5/9] test CLI for the health endpoint --- conduit/pipeline/pipeline.go | 2 +- e2e_tests/src/e2e_conduit/subslurp.py | 2 +- pkg/cli/cli_test.go | 50 +++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 3536cf10..9b98f19a 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -337,7 +337,7 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err) } - (*p.initProvider).SetGenesis(genesis) + (*p.initProvider).SetGenesis(genesis) // write pipeline metadata gh := genesis.Hash() diff --git a/e2e_tests/src/e2e_conduit/subslurp.py b/e2e_tests/src/e2e_conduit/subslurp.py index 796982ce..97598394 100644 --- a/e2e_tests/src/e2e_conduit/subslurp.py +++ b/e2e_tests/src/e2e_conduit/subslurp.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) # Matches conduit log output: -# "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" +# "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" FINISH_ROUND: re.Pattern = re.compile(b"FINISHED Pipeline round r=(\d+)") diff --git a/pkg/cli/cli_test.go b/pkg/cli/cli_test.go index a46f12e9..640f5f31 100644 --- a/pkg/cli/cli_test.go +++ b/pkg/cli/cli_test.go @@ -2,15 +2,20 @@ package cli import ( _ "embed" + "fmt" + "net/http" "os" "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" "github.com/algorand/conduit/conduit/data" + _ "github.com/algorand/conduit/conduit/plugins/exporters/noop" + _ "github.com/algorand/conduit/conduit/plugins/importers/noop" ) // Fills in a temp data dir and creates files @@ -135,3 +140,48 @@ func TestLogFile(t *testing.T) { require.Contains(t, dataStr, "\nWriting logs to file:") }) } + +func TestHealthEndpoint(t *testing.T) { + healthPort := 7777 + healthNet := fmt.Sprintf("http://localhost:%d/health", healthPort) + + test := func(t *testing.T, address string) { + // Capture stdout. + stdout := os.Stdout + defer func() { + os.Stdout = stdout + }() + cfg := data.Config{ + ConduitArgs: &data.Args{ConduitDataDir: t.TempDir()}, + API: data.API{Address: address}, + Importer: data.NameConfigPair{Name: "noop", Config: map[string]interface{}{}}, + Processors: nil, + Exporter: data.NameConfigPair{Name: "noop", Config: map[string]interface{}{}}, + } + args := setupDataDir(t, cfg) + + + go func() { + runConduitCmdWithConfig(args) + }() + time.Sleep(1 * time.Second) + + + resp, err := http.Get(healthNet) + if address != "" { + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + } else { + require.ErrorContains(t, err, "connection refused") + require.Nil(t, resp) + } + } + + t.Run("API_OFF", func(t *testing.T) { + test(t, "") + }) + + t.Run("API_ON", func(t *testing.T) { + test(t, fmt.Sprintf(":%d", healthPort)) + }) +} From f12f4e69a2359beda1fba86986a3c9de5eafefd3 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Mon, 28 Aug 2023 22:13:55 -0500 Subject: [PATCH 6/9] lint --- conduit/pipeline/common.go | 20 ++++++++------- conduit/pipeline/pipeline.go | 4 +-- conduit/pipeline/pipeline_test.go | 3 +-- .../plugins/importers/noop/noop_importer.go | 25 +++++++++++-------- 4 files changed, 29 insertions(+), 23 deletions(-) diff --git a/conduit/pipeline/common.go b/conduit/pipeline/common.go index ceaf2df4..c88d37e3 100644 --- a/conduit/pipeline/common.go +++ b/conduit/pipeline/common.go @@ -26,7 +26,7 @@ type pluginInput interface { } type pluginOutput interface { - pluginInput | * sdk.Genesis + pluginInput | *sdk.Genesis } // retries is a wrapper for retrying a function call f() with a cancellation context, @@ -76,18 +76,20 @@ func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe return } -// retriesNoOutput applies the same logic as Retries, but for functions that return no output. -func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { - _, d, err := retries(func(x X) (empty, error) { - return empty{}, f(x) - }, a, p, msg) - return d, err -} - +// TODO: probly the following function and its unit test should be axed // retriesNoInput applies the same logic as Retries, but for functions that take no input. + +//nolint:unused func retriesNoInput[Y pluginOutput](f func() (Y, error), p *pipelineImpl, msg string) (Y, time.Duration, error) { return retries(func(x empty) (Y, error) { return f() }, empty{}, p, msg) } +// retriesNoOutput applies the same logic as Retries, but for functions that return no output. +func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { + _, d, err := retries(func(x X) (empty, error) { + return empty{}, f(x) + }, a, p, msg) + return d, err +} diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 9b98f19a..5a1adf43 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -640,13 +640,13 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug // WARNING: removing/re-log-levelling the following will BREAK: // - the E2E test (Search for "Pipeline round" in subslurp.py) // - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo) - p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime)) + p.logger.Infof(logstatsE2Elog(lastRound, len(blk.Payset), exportTime)) } } }() } -func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { +func logstatsE2Elog(lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { return fmt.Sprintf( "FINISHED Pipeline round r=%d (%d txn) exported in %s", lastRound, diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index 3cdfda09..eac08f84 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -990,13 +990,12 @@ func TestMetrics(t *testing.T) { } func TestLogStatsE2Elog(t *testing.T) { - nextRound := uint64(1337) round := uint64(42) numTxns := 13 duration := 12345600 * time.Microsecond expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" - log := logstatsE2Elog(nextRound, round, numTxns, duration) + log := logstatsE2Elog(round, numTxns, duration) require.Equal(t, expectedLog, log) logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`) diff --git a/conduit/plugins/importers/noop/noop_importer.go b/conduit/plugins/importers/noop/noop_importer.go index cb6aaf16..8fb429e3 100644 --- a/conduit/plugins/importers/noop/noop_importer.go +++ b/conduit/plugins/importers/noop/noop_importer.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" // used to embed config "fmt" + "time" "github.com/sirupsen/logrus" @@ -17,10 +18,13 @@ import ( // PluginName to use when configuring. var PluginName = "noop" +const sleepForGetBlock = 100 * time.Millisecond + // `noopImporter`s will function without ever erroring. This means they will also process out of order blocks // which may or may not be desirable for different use cases--it can hide errors in actual importers expecting in order // block processing. // The `noopImporter` will maintain `Round` state according to the round of the last block it processed. +// It also sleeps 100 milliseconds between blocks to slow down the pipeline. type noopImporter struct { round uint64 cfg ImporterConfig @@ -36,28 +40,29 @@ var metadata = plugins.Metadata{ SampleConfig: sampleConfig, } -func (exp *noopImporter) Metadata() plugins.Metadata { +func (imp *noopImporter) Metadata() plugins.Metadata { return metadata } -func (exp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error { - if err := cfg.UnmarshalConfig(&exp.cfg); err != nil { +func (imp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error { + if err := cfg.UnmarshalConfig(&imp.cfg); err != nil { return fmt.Errorf("init failure in unmarshalConfig: %v", err) } - exp.round = exp.cfg.Round + imp.round = imp.cfg.Round return nil } -func (exp *noopImporter) Close() error { +func (imp *noopImporter) Close() error { return nil } -func (i *noopImporter) GetGenesis() (*sdk.Genesis, error) { +func (imp *noopImporter) GetGenesis() (*sdk.Genesis, error) { return &sdk.Genesis{}, nil } -func (exp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) { - exp.round = rnd +func (imp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) { + time.Sleep(sleepForGetBlock) + imp.round = rnd return data.BlockData{ BlockHeader: sdk.BlockHeader{ Round: sdk.Round(rnd), @@ -65,8 +70,8 @@ func (exp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) { }, nil } -func (exp *noopImporter) Round() uint64 { - return exp.round +func (imp *noopImporter) Round() uint64 { + return imp.round } func init() { From 4d0386a4cf92e404df84d20e4773a0a9fd0b4b60 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Mon, 28 Aug 2023 22:44:30 -0500 Subject: [PATCH 7/9] gofmt --- conduit/pipeline/common_test.go | 1 - pkg/cli/cli_test.go | 2 -- 2 files changed, 3 deletions(-) diff --git a/conduit/pipeline/common_test.go b/conduit/pipeline/common_test.go index dec9f322..7083b72b 100644 --- a/conduit/pipeline/common_test.go +++ b/conduit/pipeline/common_test.go @@ -53,7 +53,6 @@ func TestRetries(t *testing.T) { } } - cases := []struct { name string retryCount uint64 diff --git a/pkg/cli/cli_test.go b/pkg/cli/cli_test.go index 640f5f31..d5baf1c7 100644 --- a/pkg/cli/cli_test.go +++ b/pkg/cli/cli_test.go @@ -160,13 +160,11 @@ func TestHealthEndpoint(t *testing.T) { } args := setupDataDir(t, cfg) - go func() { runConduitCmdWithConfig(args) }() time.Sleep(1 * time.Second) - resp, err := http.Get(healthNet) if address != "" { require.NoError(t, err) From f38ed13360ca30bb415eb06675afbcc28503631d Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Tue, 29 Aug 2023 10:23:02 -0500 Subject: [PATCH 8/9] Update conduit/plugins/importers/noop/sample.yaml --- conduit/plugins/importers/noop/sample.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conduit/plugins/importers/noop/sample.yaml b/conduit/plugins/importers/noop/sample.yaml index dace3e9c..a4e99563 100644 --- a/conduit/plugins/importers/noop/sample.yaml +++ b/conduit/plugins/importers/noop/sample.yaml @@ -1,3 +1,3 @@ name: noop # noop has no config -config: \ No newline at end of file +config: From f68c929efc7a979885256da9d96b410fa7beaf48 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Tue, 29 Aug 2023 10:25:12 -0500 Subject: [PATCH 9/9] Update pkg/cli/cli_test.go --- pkg/cli/cli_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/cli/cli_test.go b/pkg/cli/cli_test.go index d5baf1c7..3ec1b202 100644 --- a/pkg/cli/cli_test.go +++ b/pkg/cli/cli_test.go @@ -146,11 +146,6 @@ func TestHealthEndpoint(t *testing.T) { healthNet := fmt.Sprintf("http://localhost:%d/health", healthPort) test := func(t *testing.T, address string) { - // Capture stdout. - stdout := os.Stdout - defer func() { - os.Stdout = stdout - }() cfg := data.Config{ ConduitArgs: &data.Args{ConduitDataDir: t.TempDir()}, API: data.API{Address: address},