diff --git a/conduit/pipeline/common.go b/conduit/pipeline/common.go index 4c31a3ff..c88d37e3 100644 --- a/conduit/pipeline/common.go +++ b/conduit/pipeline/common.go @@ -7,6 +7,8 @@ import ( log "github.com/sirupsen/logrus" + sdk "github.com/algorand/go-algorand-sdk/v2/types" + "github.com/algorand/conduit/conduit/data" ) @@ -20,14 +22,14 @@ func HandlePanic(logger *log.Logger) { type empty struct{} type pluginInput interface { - uint64 | data.BlockData | string + uint64 | data.BlockData | string | empty } type pluginOutput interface { - pluginInput | empty + pluginInput | *sdk.Genesis } -// Retries is a wrapper for retrying a function call f() with a cancellation context, +// retries is a wrapper for retrying a function call f() with a cancellation context, // a delay and a max retry count. It attempts to call the wrapped function at least once // and only after the first attempt will pay attention to a context cancellation. // This can allow the pipeline to receive a cancellation and guarantee attempting to finish @@ -45,7 +47,7 @@ type pluginOutput interface { // - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries // - when p.cfg.retryCount == 0, the error will be the last error encountered // - the returned duration dur is the total time spent in the function, including retries -func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { +func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { start := time.Now() for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ { @@ -74,9 +76,19 @@ func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe return } -// RetriesNoOutput applies the same logic as Retries, but for functions that return no output. -func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { - _, d, err := Retries(func(x X) (empty, error) { +// TODO: probly the following function and its unit test should be axed +// retriesNoInput applies the same logic as Retries, but for functions that take no input. + +//nolint:unused +func retriesNoInput[Y pluginOutput](f func() (Y, error), p *pipelineImpl, msg string) (Y, time.Duration, error) { + return retries(func(x empty) (Y, error) { + return f() + }, empty{}, p, msg) +} + +// retriesNoOutput applies the same logic as Retries, but for functions that return no output. +func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { + _, d, err := retries(func(x X) (empty, error) { return empty{}, f(x) }, a, p, msg) return d, err diff --git a/conduit/pipeline/common_test.go b/conduit/pipeline/common_test.go index cd9dcd4a..7083b72b 100644 --- a/conduit/pipeline/common_test.go +++ b/conduit/pipeline/common_test.go @@ -41,6 +41,18 @@ func TestRetries(t *testing.T) { } } + succeedAfterFactoryNoInput := func(succeedAfter uint64, never bool) func() (uint64, error) { + tries := uint64(0) + + return func() (uint64, error) { + if tries >= succeedAfter && !never { + return tries + 1, nil + } + tries++ + return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1) + } + } + cases := []struct { name string retryCount uint64 @@ -106,8 +118,8 @@ func TestRetries(t *testing.T) { for _, tc := range cases { tc := tc - // run cases for Retries() - t.Run("Retries() "+tc.name, func(t *testing.T) { + // run cases for retries() + t.Run("retries() "+tc.name, func(t *testing.T) { t.Parallel() ctx, ccf := context.WithCancelCause(context.Background()) p := &pipelineImpl{ @@ -127,7 +139,7 @@ func TestRetries(t *testing.T) { yChan := make(chan uint64) errChan := make(chan error) go func() { - y, _, err := Retries(succeedAfter, 0, p, "test") + y, _, err := retries(succeedAfter, 0, p, "test") yChan <- y errChan <- err }() @@ -144,7 +156,7 @@ func TestRetries(t *testing.T) { return } - y, _, err := Retries(succeedAfter, 0, p, "test") + y, _, err := retries(succeedAfter, 0, p, "test") if tc.retryCount == 0 { // WLOG tc.neverSucceed == false require.NoError(t, err, tc.name) @@ -163,8 +175,8 @@ func TestRetries(t *testing.T) { } }) - // run cases for RetriesNoOutput() - t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) { + // run cases for retriesNoOutput() + t.Run("retriesNoOutput() "+tc.name, func(t *testing.T) { t.Parallel() ctx, ccf := context.WithCancelCause(context.Background()) p := &pipelineImpl{ @@ -183,7 +195,7 @@ func TestRetries(t *testing.T) { errChan := make(chan error) go func() { - _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + _, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test") errChan <- err }() time.Sleep(5 * time.Millisecond) @@ -197,7 +209,7 @@ func TestRetries(t *testing.T) { return } - _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + _, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test") if tc.retryCount == 0 { // WLOG tc.neverSucceed == false require.NoError(t, err, tc.name) } else { // retryCount > 0 so doesn't retry forever @@ -209,5 +221,62 @@ func TestRetries(t *testing.T) { } } }) + + // run case for retriesNoInput() + t.Run("retriesNoInput() "+tc.name, func(t *testing.T) { + t.Parallel() + ctx, ccf := context.WithCancelCause(context.Background()) + p := &pipelineImpl{ + ctx: ctx, + ccf: ccf, + logger: log.New(), + cfg: &data.Config{ + RetryCount: tc.retryCount, + RetryDelay: 1 * time.Millisecond, + }, + } + succeedAfterNoInput := succeedAfterFactoryNoInput(tc.succeedAfter, tc.neverSucceed) + + if tc.retryCount == 0 && tc.neverSucceed { + // avoid infinite loop by cancelling the context + + errChan := make(chan error) + yChan := make(chan uint64) + go func() { + out, _, err := retriesNoInput(succeedAfterNoInput, p, "test") + yChan <- out + errChan <- err + }() + time.Sleep(5 * time.Millisecond) + errTestCancelled := errors.New("test cancelled") + go func() { + ccf(errTestCancelled) + }() + y := <-yChan + err := <-errChan + require.ErrorIs(t, err, errTestCancelled, tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + require.Zero(t, y, tc.name) + return + } + + y, _, err := retriesNoInput(succeedAfterNoInput, p, "test") + if tc.retryCount == 0 { // WLOG tc.neverSucceed == false + require.NoError(t, err, tc.name) + + // note we subtract 1 from y below because succeedAfter has added 1 to its output + require.Equal(t, tc.succeedAfter, y-1, tc.name) + } else { // retryCount > 0 so doesn't retry forever + if tc.neverSucceed || tc.succeedAfter > tc.retryCount { + require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name) + require.ErrorIs(t, err, errSentinelCause, tc.name) + require.Zero(t, y, tc.name) + } else { // !tc.neverSucceed && succeedAfter <= retryCount + require.NoError(t, err, tc.name) + require.Equal(t, tc.succeedAfter, y-1, tc.name) + } + } + }) + } } diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 61e95da6..5a1adf43 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -481,7 +481,7 @@ func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <- totalSelectWait += waitTime p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd) - blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name) + blkData, importTime, lastError := retries(importer.GetBlock, rnd, p, importer.Metadata().Name) if lastError != nil { p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError)) return @@ -533,7 +533,7 @@ func (p *pipelineImpl) processorHandler(idx int, proc processors.Processor, blkI var procTime time.Duration var lastError error - blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name) + blk, procTime, lastError = retries(proc.Process, blk, p, proc.Metadata().Name) if lastError != nil { p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError)) return @@ -598,7 +598,7 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug } var exportTime time.Duration - exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName) + exportTime, lastError = retriesNoOutput(exporter.Receive, blk, p, eName) if lastError != nil { lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError) return @@ -640,16 +640,15 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug // WARNING: removing/re-log-levelling the following will BREAK: // - the E2E test (Search for "Pipeline round" in subslurp.py) // - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo) - p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime)) + p.logger.Infof(logstatsE2Elog(lastRound, len(blk.Payset), exportTime)) } } }() } -func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { +func logstatsE2Elog(lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { return fmt.Sprintf( - "UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s", - nextRound, + "FINISHED Pipeline round r=%d (%d txn) exported in %s", lastRound, topLevelTxnCount, exportTime, @@ -696,8 +695,6 @@ func (p *pipelineImpl) Start() { } } }(p.pipelineMetadata.NextRound) - - <-p.ctx.Done() } func (p *pipelineImpl) Wait() { diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index ea8db42c..eac08f84 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -990,13 +990,12 @@ func TestMetrics(t *testing.T) { } func TestLogStatsE2Elog(t *testing.T) { - nextRound := uint64(1337) round := uint64(42) numTxns := 13 duration := 12345600 * time.Microsecond - expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" - log := logstatsE2Elog(nextRound, round, numTxns, duration) + expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" + log := logstatsE2Elog(round, numTxns, duration) require.Equal(t, expectedLog, log) logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`) diff --git a/conduit/plugins/importers/all/all.go b/conduit/plugins/importers/all/all.go index 7222c549..a6450a47 100644 --- a/conduit/plugins/importers/all/all.go +++ b/conduit/plugins/importers/all/all.go @@ -4,4 +4,5 @@ import ( // Call package wide init function _ "github.com/algorand/conduit/conduit/plugins/importers/algod" _ "github.com/algorand/conduit/conduit/plugins/importers/filereader" + _ "github.com/algorand/conduit/conduit/plugins/importers/noop" ) diff --git a/conduit/plugins/importers/noop/noop_importer.go b/conduit/plugins/importers/noop/noop_importer.go new file mode 100644 index 00000000..8fb429e3 --- /dev/null +++ b/conduit/plugins/importers/noop/noop_importer.go @@ -0,0 +1,81 @@ +package noop + +import ( + "context" + _ "embed" // used to embed config + "fmt" + "time" + + "github.com/sirupsen/logrus" + + sdk "github.com/algorand/go-algorand-sdk/v2/types" + + "github.com/algorand/conduit/conduit/data" + "github.com/algorand/conduit/conduit/plugins" + "github.com/algorand/conduit/conduit/plugins/importers" +) + +// PluginName to use when configuring. +var PluginName = "noop" + +const sleepForGetBlock = 100 * time.Millisecond + +// `noopImporter`s will function without ever erroring. This means they will also process out of order blocks +// which may or may not be desirable for different use cases--it can hide errors in actual importers expecting in order +// block processing. +// The `noopImporter` will maintain `Round` state according to the round of the last block it processed. +// It also sleeps 100 milliseconds between blocks to slow down the pipeline. +type noopImporter struct { + round uint64 + cfg ImporterConfig +} + +//go:embed sample.yaml +var sampleConfig string + +var metadata = plugins.Metadata{ + Name: PluginName, + Description: "noop importer", + Deprecated: false, + SampleConfig: sampleConfig, +} + +func (imp *noopImporter) Metadata() plugins.Metadata { + return metadata +} + +func (imp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error { + if err := cfg.UnmarshalConfig(&imp.cfg); err != nil { + return fmt.Errorf("init failure in unmarshalConfig: %v", err) + } + imp.round = imp.cfg.Round + return nil +} + +func (imp *noopImporter) Close() error { + return nil +} + +func (imp *noopImporter) GetGenesis() (*sdk.Genesis, error) { + return &sdk.Genesis{}, nil +} + +func (imp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) { + time.Sleep(sleepForGetBlock) + imp.round = rnd + return data.BlockData{ + BlockHeader: sdk.BlockHeader{ + Round: sdk.Round(rnd), + }, + }, nil +} + +func (imp *noopImporter) Round() uint64 { + return imp.round +} + +func init() { + importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer { + return &noopImporter{} + })) +} diff --git a/conduit/plugins/importers/noop/noop_importer_config.go b/conduit/plugins/importers/noop/noop_importer_config.go new file mode 100644 index 00000000..f49964e5 --- /dev/null +++ b/conduit/plugins/importers/noop/noop_importer_config.go @@ -0,0 +1,7 @@ +package noop + +// ImporterConfig specific to the noop importer +type ImporterConfig struct { + // Optionally specify the round to start on + Round uint64 `yaml:"round"` +} diff --git a/conduit/plugins/importers/noop/sample.yaml b/conduit/plugins/importers/noop/sample.yaml new file mode 100644 index 00000000..a4e99563 --- /dev/null +++ b/conduit/plugins/importers/noop/sample.yaml @@ -0,0 +1,3 @@ +name: noop +# noop has no config +config: diff --git a/e2e_tests/src/e2e_conduit/subslurp.py b/e2e_tests/src/e2e_conduit/subslurp.py index 796982ce..97598394 100644 --- a/e2e_tests/src/e2e_conduit/subslurp.py +++ b/e2e_tests/src/e2e_conduit/subslurp.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) # Matches conduit log output: -# "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" +# "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" FINISH_ROUND: re.Pattern = re.compile(b"FINISHED Pipeline round r=(\d+)") diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index de9cb2f9..1aafeee0 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -105,6 +105,7 @@ func runConduitCmdWithConfig(args *data.Args) error { // Start server if pCfg.API.Address != "" { + logger.Infof("starting API server on %s", pCfg.API.Address) shutdown, err := api.StartServer(logger, pline, pCfg.API.Address) if err != nil { // Suppress log, it is about to be printed to stderr. @@ -114,6 +115,8 @@ func runConduitCmdWithConfig(args *data.Args) error { return fmt.Errorf("failed to start API server: %w", err) } defer shutdown(context.Background()) + } else { + logger.Info("API server is disabled") } pline.Wait() diff --git a/pkg/cli/cli_test.go b/pkg/cli/cli_test.go index a46f12e9..3ec1b202 100644 --- a/pkg/cli/cli_test.go +++ b/pkg/cli/cli_test.go @@ -2,15 +2,20 @@ package cli import ( _ "embed" + "fmt" + "net/http" "os" "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" "github.com/algorand/conduit/conduit/data" + _ "github.com/algorand/conduit/conduit/plugins/exporters/noop" + _ "github.com/algorand/conduit/conduit/plugins/importers/noop" ) // Fills in a temp data dir and creates files @@ -135,3 +140,41 @@ func TestLogFile(t *testing.T) { require.Contains(t, dataStr, "\nWriting logs to file:") }) } + +func TestHealthEndpoint(t *testing.T) { + healthPort := 7777 + healthNet := fmt.Sprintf("http://localhost:%d/health", healthPort) + + test := func(t *testing.T, address string) { + cfg := data.Config{ + ConduitArgs: &data.Args{ConduitDataDir: t.TempDir()}, + API: data.API{Address: address}, + Importer: data.NameConfigPair{Name: "noop", Config: map[string]interface{}{}}, + Processors: nil, + Exporter: data.NameConfigPair{Name: "noop", Config: map[string]interface{}{}}, + } + args := setupDataDir(t, cfg) + + go func() { + runConduitCmdWithConfig(args) + }() + time.Sleep(1 * time.Second) + + resp, err := http.Get(healthNet) + if address != "" { + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + } else { + require.ErrorContains(t, err, "connection refused") + require.Nil(t, resp) + } + } + + t.Run("API_OFF", func(t *testing.T) { + test(t, "") + }) + + t.Run("API_ON", func(t *testing.T) { + test(t, fmt.Sprintf(":%d", healthPort)) + }) +}