diff --git a/go.mod b/go.mod index 23a4a558d..a92c1a135 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( golang.org/x/net v0.0.0-20220225172249-27dd8689420f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20220317061510-51cd9980dadf + golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 google.golang.org/grpc v1.44.0 ) @@ -158,7 +159,6 @@ require ( golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/api v0.69.0 // indirect diff --git a/pump/storage/metrics.go b/pump/storage/metrics.go index 97db9439f..59ca47c30 100644 --- a/pump/storage/metrics.go +++ b/pump/storage/metrics.go @@ -108,6 +108,15 @@ var ( Help: "How long the catch up step takes to run.", Buckets: prometheus.ExponentialBuckets(0.001, 2, 22), }) + + commitTsLagHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "pump_storage", + Name: "commit_ts_lag_time", + Help: "Bucketed histogram of the lag of currently handled maximum commit-ts", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18), + }, []string{"type"}) ) // InitMetircs register the metrics to registry @@ -123,4 +132,5 @@ func InitMetircs(registry *prometheus.Registry) { registry.MustRegister(storageSizeGauge) registry.MustRegister(slowChaserCount) registry.MustRegister(slowChaserCatchUpTimeHistogram) + registry.MustRegister(commitTsLagHistogram) } diff --git a/pump/storage/sorter.go b/pump/storage/sorter.go index 7b712a4e2..4ec8baa89 100644 --- a/pump/storage/sorter.go +++ b/pump/storage/sorter.go @@ -16,6 +16,10 @@ package storage import ( "container/list" "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" + "golang.org/x/time/rate" "math/rand" "sync" "sync/atomic" @@ -105,6 +109,9 @@ type sorter struct { // save the startTS of txn which we need to wait for the C binlog waitStartTS map[int64]struct{} + commitTsLagMetrics prometheus.Observer + maxPushItemCommitTs int64 // atomic. Used for metrics only + lock sync.Mutex cond *sync.Cond items *list.List @@ -114,14 +121,16 @@ type sorter struct { func newSorter(fn func(item sortItem)) *sorter { sorter := &sorter{ - maxTSItemCB: fn, - items: list.New(), - waitStartTS: make(map[int64]struct{}), + maxTSItemCB: fn, + items: list.New(), + waitStartTS: make(map[int64]struct{}), + commitTsLagMetrics: commitTsLagHistogram.With(map[string]string{"type": "sorter"}), } sorter.cond = sync.NewCond(&sorter.lock) - sorter.wg.Add(1) + sorter.wg.Add(2) go sorter.run() + go sorter.updateMetrics() return sorter } @@ -148,6 +157,12 @@ func (s *sorter) pushTSItem(item sortItem) { if item.tp == pb.BinlogType_Prewrite { s.waitStartTS[item.start] = struct{}{} } else { + if item.commit > atomic.LoadInt64(&s.maxPushItemCommitTs) { + // No need to CAS because we are inside a lock, and + // this is the only place where s.maxPushItemCommitTs + // is written to. + atomic.StoreInt64(&s.maxPushItemCommitTs, item.commit) + } delete(s.waitStartTS, item.start) s.cond.Signal() } @@ -225,6 +240,35 @@ func (s *sorter) run() { } } +func (s *sorter) updateMetrics() { + defer s.wg.Done() + + tick := time.NewTicker(1 * time.Second) + defer tick.Stop() + + rl := rate.NewLimiter(rate.Every(time.Second*10), 1) + + for range tick.C { + if s.isClosed() { + return + } + + curMaxPushItemCommitTs := atomic.LoadInt64(&s.maxPushItemCommitTs) + maxCommitTsPhysicalMs := oracle.ExtractPhysical(uint64(curMaxPushItemCommitTs)) + millisecond := time.Now().UnixMilli() - maxCommitTsPhysicalMs + + // Update metrics + s.commitTsLagMetrics.Observe(float64(millisecond) / 1000.0) + + if millisecond > 10000 /* 10s */ && rl.Allow() { + // Prints a log if the lag is more than 10 seconds, and + // if the rate limiter allows. + duration := time.Duration(millisecond) * time.Millisecond + log.Info("sorter commit ts lag", zap.Duration("duration", duration)) + } + } +} + func (s *sorter) isClosed() bool { return atomic.LoadInt32(&s.closed) == 1 } diff --git a/pump/storage/storage.go b/pump/storage/storage.go index f43c5aa33..968666652 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -16,6 +16,7 @@ package storage import ( "context" "encoding/binary" + "golang.org/x/time/rate" "math" "os" "path" @@ -119,6 +120,8 @@ type Append struct { options *Options + maxWriteToKVCommitTs int64 // atomic. Used for metrics only. + close chan struct{} wg sync.WaitGroup } @@ -362,6 +365,14 @@ func (a *Append) updateStatus() { logStatsTicker := time.NewTicker(time.Second * 10) defer logStatsTicker.Stop() + updateMetricsTicker := time.NewTicker(time.Second * 1) + defer updateMetricsTicker.Stop() + + kvCommitTsLagMetrics := commitTsLagHistogram.With(map[string]string{ + "type": "kv", + }) + logRL := rate.NewLimiter(rate.Every(time.Second*10), 1) + for { select { case <-a.close: @@ -389,6 +400,20 @@ func (a *Append) updateStatus() { log.Warn("in WritePaused stat") } } + case <-updateMetricsTicker.C: + curKVCommitTs := atomic.LoadInt64(&a.maxWriteToKVCommitTs) + maxCommitTsPhysicalMs := oracle.ExtractPhysical(uint64(curKVCommitTs)) + millisecond := time.Now().UnixMilli() - maxCommitTsPhysicalMs + + // Update metrics + kvCommitTsLagMetrics.Observe(float64(millisecond) / 1000.0) + + if millisecond > 10000 /* 10s */ && logRL.Allow() { + // Prints a log if the lag is more than 10 seconds, and + // if the rate limiter allows. + duration := time.Duration(millisecond) * time.Millisecond + log.Info("kv commit ts lag", zap.Duration("duration", duration)) + } } } } @@ -888,6 +913,10 @@ func (a *Append) writeToKV(reqs chan *request) chan *request { return } for _, req := range bufReqs { + if req.commitTS > atomic.LoadInt64(&a.maxWriteToKVCommitTs) { + atomic.StoreInt64(&a.maxWriteToKVCommitTs, req.commitTS) + } + done <- req } }