Skip to content

Commit

Permalink
Pipeline architecture v2 (preview) (#1913)
Browse files Browse the repository at this point in the history
* proof of concept for funnel (new stream engine)

* tie code into the lifecycle service to test end-to-end

* expand the implementation

* improve performance

* minimize allocations

* start implementing processor

* ensure graceful stop

* implement processor task

* build worker tasks correctly

* do not use a sandbox for Run (#1886)

Co-authored-by: Haris Osmanagić <haris@meroxa.io>

* make graceful shutdown work

* fix example

* update tests

* fix linter warnings

* rename introduced lifecycle package

* restore original lifecycle package

* fix code after merge

* add feature flag

* go mod tidy

* make acknowledgment fetching from destination stricter

* documentation

* make generate

---------

Co-authored-by: Haris Osmanagić <haris@meroxa.io>
Co-authored-by: Raúl Barroso <ra.barroso@gmail.com>
  • Loading branch information
3 people authored Nov 7, 2024
1 parent 9a216a4 commit 58284dc
Show file tree
Hide file tree
Showing 26 changed files with 3,915 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.1
github.com/rs/zerolog v1.33.0
github.com/sourcegraph/conc v0.3.0
github.com/spf13/cobra v1.8.1
github.com/stealthrocket/wazergo v0.19.1
github.com/tetratelabs/wazero v1.8.1
Expand Down Expand Up @@ -312,7 +313,6 @@ require (
github.com/sivchari/containedctx v1.0.3 // indirect
github.com/sivchari/tenv v1.10.0 // indirect
github.com/sonatard/noctx v0.0.2 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ type Config struct {
}
}

Preview struct {
// PipelineArchV2 enables the new pipeline architecture.
PipelineArchV2 bool
}

dev struct {
cpuprofile string
memprofile string
Expand Down
2 changes: 2 additions & 0 deletions pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func Flags(cfg *Config) *flag.FlagSet {
flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent")
flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string")

flags.BoolVar(&cfg.Preview.PipelineArchV2, "preview.pipeline-arch-v2", cfg.Preview.PipelineArchV2, "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)")

// NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help'
showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags")
flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")
Expand Down
139 changes: 116 additions & 23 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/metrics/prometheus"
"github.com/conduitio/conduit/pkg/lifecycle"
lifecycle_v2 "github.com/conduitio/conduit/pkg/lifecycle-poc"
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
conn_plugin "github.com/conduitio/conduit/pkg/plugin/connector"
Expand Down Expand Up @@ -77,7 +78,7 @@ import (
)

const (
exitTimeout = 10 * time.Second
exitTimeout = 30 * time.Second
)

// Runtime sets up all services for serving and monitoring a Conduit instance.
Expand All @@ -95,7 +96,7 @@ type Runtime struct {
pipelineService *pipeline.Service
connectorService *connector.Service
processorService *processor.Service
lifecycleService *lifecycle.Service
lifecycleService lifecycleService

connectorPluginService *conn_plugin.PluginService
processorPluginService *proc_plugin.PluginService
Expand All @@ -107,6 +108,14 @@ type Runtime struct {
logger log.CtxLogger
}

// lifecycleService is an interface that we use temporarily to allow for
// both the old and new lifecycle services to be used interchangeably.
type lifecycleService interface {
Start(ctx context.Context, pipelineID string) error
Stop(ctx context.Context, pipelineID string, force bool) error
Init(ctx context.Context) error
}

// NewRuntime sets up a Runtime instance and primes it for start.
func NewRuntime(cfg Config) (*Runtime, error) {
if err := cfg.Validate(); err != nil {
Expand Down Expand Up @@ -203,21 +212,28 @@ func createServices(r *Runtime) error {
tokenService,
)

// Error recovery configuration
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
}

plService := pipeline.NewService(r.logger, r.DB)
connService := connector.NewService(r.logger, r.DB, r.connectorPersister)
procService := processor.NewService(r.logger, r.DB, procPluginService)
lifecycleService := lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService)
provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)

var lifecycleService lifecycleService
if r.Config.Preview.PipelineArchV2 {
r.logger.Info(context.Background()).Msg("using lifecycle service v2")
lifecycleService = lifecycle_v2.NewService(r.logger, connService, procService, connPluginService, plService)
} else {
// Error recovery configuration
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
}

lifecycleService = lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService)
}

provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)
orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService)

r.Orchestrator = orc
Expand Down Expand Up @@ -415,6 +431,15 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error
}

func (r *Runtime) registerCleanup(t *tomb.Tomb) {
if r.Config.Preview.PipelineArchV2 {
r.registerCleanupV2(t)
} else {
r.registerCleanupV1(t)
}
}

func (r *Runtime) registerCleanupV1(t *tomb.Tomb) {
ls := r.lifecycleService.(*lifecycle.Service)
t.Go(func() error {
<-t.Dying()
// start cleanup with a fresh context
Expand All @@ -423,12 +448,12 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
// t.Err() can be nil, when we had a call: t.Kill(nil)
// t.Err() will be context.Canceled, if the tomb's context was canceled
if t.Err() == nil || cerrors.Is(t.Err(), context.Canceled) {
r.lifecycleService.StopAll(ctx, pipeline.ErrGracefulShutdown)
ls.StopAll(ctx, pipeline.ErrGracefulShutdown)
} else {
// tomb died due to a real error
r.lifecycleService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
ls.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
}
err := r.lifecycleService.Wait(exitTimeout)
err := ls.Wait(exitTimeout)
t.Go(func() error {
r.connectorPersister.Wait()
return r.DB.Close()
Expand All @@ -437,6 +462,62 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
})
}

func (r *Runtime) registerCleanupV2(t *tomb.Tomb) {
ls := r.lifecycleService.(*lifecycle_v2.Service)
t.Go(func() error {
<-t.Dying()
// start cleanup with a fresh context
ctx := context.Background()

err := ls.StopAll(ctx, false)
if err != nil {
r.logger.Err(ctx, err).Msg("some pipelines stopped with an error")
}

// Wait for the pipelines to stop
const (
count = 6
interval = exitTimeout / count
)

pipelinesStopped := make(chan struct{})
go func() {
for i := count; i > 0; i-- {
if i == 1 {
// on last try, stop forcefully
_ = ls.StopAll(ctx, true)
}

r.logger.Info(ctx).Msgf("waiting for pipelines to stop running (time left: %s)", time.Duration(i)*interval)
select {
case <-time.After(interval):
case <-pipelinesStopped:
return
}
}
}()

err = ls.Wait(exitTimeout)
switch {
case err != nil && err != context.DeadlineExceeded:
r.logger.Warn(ctx).Err(err).Msg("some pipelines stopped with an error")
case err == context.DeadlineExceeded:
r.logger.Warn(ctx).Msg("some pipelines did not stop in time")
default:
r.logger.Info(ctx).Msg("all pipelines stopped gracefully")
}

pipelinesStopped <- struct{}{}

t.Go(func() error {
r.connectorPersister.Wait()
return r.DB.Close()
})

return nil
})
}

func (r *Runtime) newHTTPMetricsHandler() http.Handler {
return promhttp.Handler()
}
Expand Down Expand Up @@ -770,13 +851,25 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
}

if r.Config.Pipelines.ExitOnDegraded {
r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
})
if r.Config.Preview.PipelineArchV2 {
ls := r.lifecycleService.(*lifecycle_v2.Service)
ls.OnFailure(func(e lifecycle_v2.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
})
} else {
ls := r.lifecycleService.(*lifecycle.Service)
ls.OnFailure(func(e lifecycle.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
})
}
}
err = r.pipelineService.Init(ctx)
if err != nil {
Expand Down
23 changes: 17 additions & 6 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package connector

import (
"context"
"strconv"
"sync"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-protocol/pconnector"
Expand Down Expand Up @@ -153,6 +155,7 @@ func (s *Source) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

s.Instance.logger.Debug(ctx).Msg("closing stream")
// close stream
if s.stopStream != nil {
s.stopStream()
Expand Down Expand Up @@ -192,8 +195,9 @@ func (s *Source) Read(ctx context.Context) ([]opencdc.Record, error) {
return nil, err
}

now := strconv.FormatInt(time.Now().UnixNano(), 10)
for _, r := range resp.Records {
s.sanitizeRecord(&r)
s.sanitizeRecord(&r, now)
}

s.Instance.inspector.Send(ctx, resp.Records)
Expand Down Expand Up @@ -375,7 +379,7 @@ func (s *Source) triggerLifecycleEvent(ctx context.Context, oldConfig, newConfig
}
}

func (s *Source) sanitizeRecord(r *opencdc.Record) {
func (s *Source) sanitizeRecord(r *opencdc.Record, now string) {
if r.Key == nil {
r.Key = opencdc.RawData{}
}
Expand All @@ -385,12 +389,19 @@ func (s *Source) sanitizeRecord(r *opencdc.Record) {
if r.Payload.After == nil {
r.Payload.After = opencdc.RawData{}
}

if r.Metadata == nil {
r.Metadata = opencdc.Metadata{}
r.Metadata = opencdc.Metadata{
opencdc.MetadataReadAt: now,
opencdc.MetadataConduitSourceConnectorID: s.Instance.ID,
}
} else {
if r.Metadata[opencdc.MetadataReadAt] == "" {
r.Metadata[opencdc.MetadataReadAt] = now
}
if r.Metadata[opencdc.MetadataConduitSourceConnectorID] == "" {
r.Metadata[opencdc.MetadataConduitSourceConnectorID] = s.Instance.ID
}
}
// source connector ID is added to all records
r.Metadata.SetConduitSourceConnectorID(s.Instance.ID)
}

func (*Source) isEqual(cfg1, cfg2 map[string]string) bool {
Expand Down
10 changes: 7 additions & 3 deletions pkg/foundation/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,18 @@ func (mt *labeledHistogram) WithValues(vs ...string) Histogram {
// RecordBytesHistogram wraps a histrogram metric and allows to observe record
// sizes in bytes.
type RecordBytesHistogram struct {
h Histogram
H Histogram
}

func NewRecordBytesHistogram(h Histogram) RecordBytesHistogram {
return RecordBytesHistogram{h}
return RecordBytesHistogram{H: h}
}

func (m RecordBytesHistogram) Observe(r opencdc.Record) {
m.H.Observe(m.SizeOf(r))
}

func (m RecordBytesHistogram) SizeOf(r opencdc.Record) float64 {
// TODO for now we call method Bytes() on key and payload to get the
// bytes representation. In case of a structured payload or key it
// is marshaled into JSON, which might not be the correct way to
Expand All @@ -429,5 +433,5 @@ func (m RecordBytesHistogram) Observe(r opencdc.Record) {
if r.Payload.After != nil {
bytes += len(r.Payload.After.Bytes())
}
m.h.Observe(float64(bytes))
return float64(bytes)
}
Loading

0 comments on commit 58284dc

Please sign in to comment.