Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: add run method to the interface, called by the dispatcher manager #846

Merged
merged 12 commits into from
Jan 14, 2025
6 changes: 2 additions & 4 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,11 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
// verify sinkURI
tempChangefeedID := common.NewChangeFeedIDWithName("sink-uri-verify-changefeed-id")
cfConfig := info.ToChangefeedConfig()
sink, err := sink.NewSink(ctx, cfConfig, tempChangefeedID, nil)
err = sink.VerifySink(ctx, cfConfig, tempChangefeedID)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err))
return
}
sink.Close(true)

needRemoveGCSafePoint := false
defer func() {
Expand Down Expand Up @@ -597,12 +596,11 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {

// verify sink
tempChangefeedID := common.NewChangeFeedIDWithName("sink-uri-verify-changefeed-id")
sink, err := sink.NewSink(ctx, oldCfInfo.ToChangefeedConfig(), tempChangefeedID, nil)
err = sink.VerifySink(ctx, oldCfInfo.ToChangefeedConfig(), tempChangefeedID)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err))
return
}
sink.Close(true)

if err := coordinator.UpdateChangefeed(ctx, oldCfInfo); err != nil {
_ = c.Error(err)
Expand Down
5 changes: 4 additions & 1 deletion downstreamadapter/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package dispatcher

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -55,7 +56,9 @@ func (s *mockSink) AddCheckpointTs(ts uint64) {
func (s *mockSink) SetTableSchemaStore(tableSchemaStore *sinkutil.TableSchemaStore) {
}

func (s *mockSink) Close(bool) error {
func (s *mockSink) Close(bool) {}

func (s *mockSink) Run(context.Context) error {
return nil
}

Expand Down
129 changes: 68 additions & 61 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ func NewEventDispatcherManager(
maintainerID node.ID,
) (*EventDispatcherManager, uint64, error) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
manager := &EventDispatcherManager{
dispatcherMap: newDispatcherMap(),
changefeedID: changefeedID,
maintainerID: maintainerID,
statusesChan: make(chan TableSpanStatusWithSeq, 8192),
blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
errCh: make(chan error, 1),
wg: wg,
cancel: cancel,
config: cfConfig,
filterConfig: toFilterConfigPB(cfConfig.Filter),
Expand All @@ -150,7 +152,8 @@ func NewEventDispatcherManager(
}
}

err := manager.startSink(ctx)
var err error
manager.sink, err = sink.NewSink(ctx, manager.config, manager.changefeedID)
if err != nil {
return nil, 0, errors.Trace(err)
}
Expand All @@ -162,35 +165,51 @@ func NewEventDispatcherManager(
return nil, 0, errors.Trace(err)
}

var tableTriggerStartTs uint64 = 0
// init table trigger event dispatcher when tableTriggerEventDispatcherID is not nil
if tableTriggerEventDispatcherID != nil {
tableTriggerStartTs, err = manager.NewTableTriggerEventDispatcher(tableTriggerEventDispatcherID, startTs)
if err != nil {
return nil, 0, errors.Trace(err)
}
}

wg.Add(1)
go func() {
defer wg.Done()
err = manager.sink.Run(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
select {
case <-ctx.Done():
return
case manager.errCh <- err:
default:
log.Error("error channel is full, discard error", zap.Any("changefeedID", changefeedID.String()), zap.Error(err))
}
}
}()

// collect errors from error channel
manager.wg.Add(1)
wg.Add(1)
go func() {
defer manager.wg.Done()
defer wg.Done()
manager.collectErrors(ctx)
}()

// collect heart beat info from all dispatchers
manager.wg.Add(1)
wg.Add(1)
go func() {
defer manager.wg.Done()
defer wg.Done()
manager.collectComponentStatusWhenChanged(ctx)
}()

// collect block status from all dispatchers
manager.wg.Add(1)
wg.Add(1)
go func() {
defer manager.wg.Done()
defer wg.Done()
manager.collectBlockStatusRequest(ctx)
}()

var tableTriggerStartTs uint64 = 0
// init table trigger event dispatcher when tableTriggerEventDispatcherID is not nil
if tableTriggerEventDispatcherID != nil {
tableTriggerStartTs, err = manager.NewTableTriggerEventDispatcher(tableTriggerEventDispatcherID, startTs)
if err != nil {
return nil, 0, errors.Trace(err)
}
}
log.Info("event dispatcher manager created",
zap.Stringer("changefeedID", changefeedID),
zap.Stringer("maintainerID", maintainerID),
Expand All @@ -199,15 +218,6 @@ func NewEventDispatcherManager(
return manager, tableTriggerStartTs, nil
}

func (e *EventDispatcherManager) startSink(ctx context.Context) error {
sink, err := sink.NewSink(ctx, e.config, e.changefeedID, e.errCh)
if err != nil {
return err
}
e.sink = sink
return nil
}

func (e *EventDispatcherManager) TryClose(removeChangefeed bool) bool {
if e.closed.Load() {
return true
Expand Down Expand Up @@ -263,12 +273,7 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) {
e.heartBeatTask.Cancel()
}

err = e.sink.Close(removeChangefeed)
if err != nil && errors.Cause(err) != context.Canceled {
log.Error("close sink failed", zap.Error(err))
return
}

e.sink.Close(removeChangefeed)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
e.cancel()
e.wg.Wait()

Expand Down Expand Up @@ -428,7 +433,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er
zap.Any("startTs", newStartTsList[idx]))

}
e.metricCreateDispatcherDuration.Observe(float64(time.Since(start).Seconds()) / float64(len(dispatcherIds)))
e.metricCreateDispatcherDuration.Observe(time.Since(start).Seconds() / float64(len(dispatcherIds)))
log.Info("batch create new dispatchers",
zap.Any("changefeedID", e.changefeedID.Name()),
zap.Any("namespace", e.changefeedID.Namespace()),
Expand All @@ -439,36 +444,38 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er

// collectErrors collect the errors from the error channel and report to the maintainer.
func (e *EventDispatcherManager) collectErrors(ctx context.Context) {
select {
case <-ctx.Done():
return
case err := <-e.errCh:
if errors.Cause(err) != context.Canceled {
log.Error("Event Dispatcher Manager Meets Error",
zap.String("changefeedID", e.changefeedID.String()),
zap.Error(err))

// report error to maintainer
var message heartbeatpb.HeartBeatRequest
message.ChangefeedID = e.changefeedID.ToPB()
message.Err = &heartbeatpb.RunningError{
Time: time.Now().String(),
Node: appcontext.GetID(),
Code: string(apperror.ErrorCode(err)),
Message: err.Error(),
}
e.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})

// resend message until the event dispatcher manager is closed
// the first error is matter most, so we just need to resend it continuely and ignore the other errors.
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
e.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})
for {
select {
case <-ctx.Done():
return
case err := <-e.errCh:
if errors.Cause(err) != context.Canceled {
log.Error("Event Dispatcher Manager Meets Error",
zap.String("changefeedID", e.changefeedID.String()),
zap.Error(err))

// report error to maintainer
var message heartbeatpb.HeartBeatRequest
message.ChangefeedID = e.changefeedID.ToPB()
message.Err = &heartbeatpb.RunningError{
Time: time.Now().String(),
Node: appcontext.GetID(),
Code: string(apperror.ErrorCode(err)),
Message: err.Error(),
}
e.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})

// resend message until the event dispatcher manager is closed
// the first error is matter most, so we just need to resend it continuely and ignore the other errors.
ticker := time.NewTicker(time.Second * 5)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
e.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions downstreamadapter/sink/blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package sink

import (
"context"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
Expand All @@ -25,7 +26,7 @@ import (
// Including DDL and DML.
type BlackHoleSink struct{}

func NewBlackHoleSink() (*BlackHoleSink, error) {
func newBlackHoleSink() (*BlackHoleSink, error) {
blackholeSink := BlackHoleSink{}
return &blackholeSink, nil
}
Expand Down Expand Up @@ -78,6 +79,8 @@ func (s *BlackHoleSink) GetStartTsList(tableIds []int64, startTsList []int64) ([
return []int64{}, nil
}

func (s *BlackHoleSink) Close(removeChangefeed bool) error {
func (s *BlackHoleSink) Close(_ bool) {}

func (s *BlackHoleSink) Run(_ context.Context) error {
return nil
}
2 changes: 1 addition & 1 deletion downstreamadapter/sink/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// Test callback and tableProgress works as expected after AddDMLEvent
func TestBlacHoleSinkBasicFunctionality(t *testing.T) {
sink, err := NewBlackHoleSink()
sink, err := newBlackHoleSink()
require.NoError(t, err)

count := 0
Expand Down
48 changes: 17 additions & 31 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
ticonfig "github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/sink/kafka"
"github.com/pingcap/ticdc/pkg/sink/util"
Expand All @@ -50,15 +49,23 @@ type KafkaSink struct {
metricsCollector kafka.MetricsCollector

errgroup *errgroup.Group
errCh chan error
isNormal uint32 // if sink is normal, isNormal is 1, otherwise is 0
}

func (s *KafkaSink) SinkType() common.SinkType {
return common.KafkaSinkType
}

func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig, errCh chan error) (*KafkaSink, error) {
func verifyKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, uri *url.URL, sinkConfig *config.SinkConfig) error {
components, _, err := worker.GetKafkaSinkComponent(ctx, changefeedID, uri, sinkConfig)
components.AdminClient.Close()
components.TopicManager.Close()
return err
}

func newKafkaSink(
ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *config.SinkConfig,
) (*KafkaSink, error) {
errGroup, ctx := errgroup.WithContext(ctx)
statistics := metrics.NewStatistics(changefeedID, "KafkaSink")
kafkaComponent, protocol, err := worker.GetKafkaSinkComponent(ctx, changefeedID, sinkURI, sinkConfig)
Expand Down Expand Up @@ -114,30 +121,20 @@ func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI
statistics: statistics,
metricsCollector: kafkaComponent.Factory.MetricsCollector(kafkaComponent.AdminClient),
errgroup: errGroup,
errCh: errCh,
}
go sink.run(ctx)
return sink, nil
}

func (s *KafkaSink) run(ctx context.Context) {
func (s *KafkaSink) Run(ctx context.Context) error {
s.dmlWorker.Run(ctx)
s.ddlWorker.Run()
s.errgroup.Go(func() error {
s.metricsCollector.Run(ctx)
return nil
})
err := s.errgroup.Wait()
if errors.Cause(err) != context.Canceled {
atomic.StoreUint32(&s.isNormal, 0)
select {
case s.errCh <- err:
default:
log.Error("error channel is full, discard error",
zap.Any("changefeedID", s.changefeedID.String()),
zap.Error(err))
}
}
atomic.StoreUint32(&s.isNormal, 0)
return errors.Trace(err)
}

func (s *KafkaSink) IsNormal() bool {
Expand Down Expand Up @@ -187,22 +184,12 @@ func (s *KafkaSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore)
s.ddlWorker.SetTableSchemaStore(tableSchemaStore)
}

func (s *KafkaSink) Close(_ bool) error {
err := s.ddlWorker.Close()
if err != nil {
return errors.Trace(err)
}

err = s.dmlWorker.Close()
if err != nil {
return errors.Trace(err)
}

func (s *KafkaSink) Close(_ bool) {
s.ddlWorker.Close()
s.dmlWorker.Close()
s.adminClient.Close()
s.topicManager.Close()
s.statistics.Close()

return nil
}

func newKafkaSinkForTest() (*KafkaSink, producer.DMLProducer, producer.DDLProducer, error) {
Expand Down Expand Up @@ -268,8 +255,7 @@ func newKafkaSinkForTest() (*KafkaSink, producer.DMLProducer, producer.DDLProduc
statistics: statistics,
metricsCollector: kafkaComponent.Factory.MetricsCollector(kafkaComponent.AdminClient),
errgroup: errGroup,
errCh: make(chan error, 1),
}
go sink.run(ctx)
go sink.Run(ctx)
return sink, dmlMockProducer, ddlMockProducer, nil
}
Loading
Loading