From a2503371a7cf598c9cc0941e335022b06dcb00e4 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Tue, 31 Oct 2023 19:44:52 +0000 Subject: [PATCH] Fix multiple issues with Watch * Fixes an error where watching a compacted revision would return events, followed by an error. The watch is now immediately cancelled with an appropriate error, and the compact/current revision fields set. * Fixes an error where a high event rate could flood the GRPC stream with single-event watch responses. The event channel reader now merges all available events, and sends them in a single watch message. This was most frequently an issue on sqlite, where all inserts are done locally and immediately wake the polling goroutine to send the event without any opportunity for batching. Signed-off-by: Brad Davidson --- pkg/drivers/nats/nats.go | 22 +++++--- pkg/logstructured/logstructured.go | 38 ++++++++----- pkg/logstructured/sqllog/sql.go | 25 +++++++- pkg/server/list.go | 3 + pkg/server/types.go | 8 ++- pkg/server/watch.go | 91 +++++++++++++++++++++--------- 6 files changed, 136 insertions(+), 51 deletions(-) diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index dd707ced..ee5f6b37 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -843,23 +843,29 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision, } -func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { - +func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult { + ctx, cancel := context.WithCancel(ctx) watcher, err := d.kv.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) if revision > 0 { revision-- } - _, events, err := d.listAfter(ctx, prefix, revision) + result := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: result} + + rev, events, err := d.listAfter(ctx, prefix, revision) if err != nil { logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision) + if err == server.ErrCompacted { + compact, _ := d.compactRevision() + wr.CompactRevision = compact + wr.CurrentRevision = rev + } + cancel() } - result := make(chan []*server.Event, 100) - go func() { - if len(events) > 0 { result <- events revision = events[len(events)-1].KV.ModRevision @@ -915,11 +921,13 @@ func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-cha if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription { logrus.Warnf("error stopping %s watcher: %v", prefix, err) } + close(result) + cancel() return } } }() - return result + return wr } // getPreviousEntry returns the nats.KeyValueEntry previous to the one provided, if the previous entry is a nats.KeyValuePut diff --git a/pkg/logstructured/logstructured.go b/pkg/logstructured/logstructured.go index 2b5b51f6..9a1f57ad 100644 --- a/pkg/logstructured/logstructured.go +++ b/pkg/logstructured/logstructured.go @@ -17,6 +17,7 @@ const ( type Log interface { Start(ctx context.Context) error + CompactRevision(ctx context.Context) (int64, error) CurrentRevision(ctx context.Context) (int64, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) @@ -288,13 +289,13 @@ func (l *LogStructured) ttl(ctx context.Context) { eventKV := loadTTLEventKV(rwMutex, ttlEventKVMap, event.KV.Key) if eventKV == nil { - logrus.Tracef("Add ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + logrus.Tracef("Add ttl event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires) queue.AddAfter(event.KV.Key, expires) } else { if event.KV.ModRevision > eventKV.modRevision { - logrus.Tracef("Update ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + logrus.Tracef("Update ttl event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires) queue.AddAfter(event.KV.Key, expires) } } @@ -316,9 +317,9 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut return true } - if eventKV.expiredAt.After(time.Now()) { - logrus.Tracef("TTL event key %v has not expired yet, the latest expiration time is %v, requeuing", key, eventKV.expiredAt) - queue.AddAfter(key, time.Until(eventKV.expiredAt)) + if expires := time.Until(eventKV.expiredAt); expires > 0 { + logrus.Tracef("TTL event key %v expires in %v, requeuing", key, expires) + queue.AddAfter(key, expires) return true } @@ -333,9 +334,9 @@ func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMute rwMutex.Lock() defer rwMutex.Unlock() curEventKV := store[preEventKV.key] - if curEventKV.expiredAt.After(preEventKV.expiredAt) { - logrus.Tracef("TTL event key %v has updated, requeuing", curEventKV.key) - queue.AddAfter(curEventKV.key, time.Until(curEventKV.expiredAt)) + if expires := time.Until(preEventKV.expiredAt); expires > 0 { + logrus.Tracef("TTL event key was updated and now %v expires in %v, requeuing", curEventKV.key, expires) + queue.AddAfter(curEventKV.key, expires) return } if err != nil { @@ -389,10 +390,15 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { defer wg.Done() revision := <-lastListRevision if revision == 0 { - logrus.Error("TTL events last list revision is zero, retry to process ttl events") + logrus.Errorf("Failed to get start revision for ttl watch") return } - for events := range l.Watch(ctx, "/", revision) { + wr := l.Watch(ctx, "/", revision) + if wr.CompactRevision != 0 { + logrus.Errorf("Failed to watch for ttl: %v", server.ErrCompacted) + return + } + for events := range wr.Events { for _, event := range events { if event.KV.Lease > 0 { result <- event @@ -423,7 +429,7 @@ func storeTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, eventK return expires } -func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { +func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult { logrus.Tracef("WATCH %s, revision=%d", prefix, revision) // starting watching right away so we don't miss anything @@ -436,10 +442,16 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 } result := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: result} rev, kvs, err := l.log.After(ctx, prefix, revision, 0) if err != nil { - logrus.Errorf("failed to list %s for revision %d", prefix, revision) + logrus.Errorf("Failed to list %s for revision %d: %v", prefix, revision, err) + if err == server.ErrCompacted { + compact, _ := l.log.CompactRevision(ctx) + wr.CompactRevision = compact + wr.CurrentRevision = rev + } cancel() } @@ -463,7 +475,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 cancel() }() - return result + return wr } func filter(events []*server.Event, rev int64) []*server.Event { diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index bf9c4462..a2201df1 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -236,6 +236,10 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { return s.d.CurrentRevision(ctx) } +func (s *SQLLog) CompactRevision(ctx context.Context) (int64, error) { + return s.d.GetCompactRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { if strings.HasSuffix(prefix, "/") { prefix += "%" @@ -247,8 +251,21 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 } rev, compact, result, err := RowsToEvents(rows) + + if revision > 0 && len(result) == 0 { + // a zero length result won't have the compact or current revisions so get them manually + rev, err = s.d.CurrentRevision(ctx) + if err != nil { + return 0, nil, err + } + compact, err = s.d.GetCompactRevision(ctx) + if err != nil { + return 0, nil, err + } + } + if revision > 0 && revision < compact { - return rev, result, server.ErrCompacted + return rev, nil, server.ErrCompacted } return rev, result, err @@ -299,11 +316,11 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis } if revision > rev { - return rev, result, server.ErrFutureRev + return rev, nil, server.ErrFutureRev } if revision > 0 && revision < compact { - return rev, result, server.ErrCompacted + return rev, nil, server.ErrCompacted } select { @@ -420,6 +437,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { continue } + logrus.Tracef("POLL AFTER %d, limit=%d, events=%d", last, pollBatchSize, len(events)) + if len(events) == 0 { continue } diff --git a/pkg/server/list.go b/pkg/server/list.go index 5c98cf2e..ebec0df7 100644 --- a/pkg/server/list.go +++ b/pkg/server/list.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" + "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" ) @@ -25,6 +26,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) if err != nil { return nil, err } + logrus.Tracef("LIST COUNT key=%s, end=%s, revision=%d, currentRev=%d count=%d", r.Key, r.RangeEnd, r.Revision, rev, count) return &RangeResponse{ Header: txnHeader(rev), Count: count, @@ -41,6 +43,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) return nil, err } + logrus.Tracef("LIST key=%s, end=%s, revision=%d, currentRev=%d count=%d, limit=%d", r.Key, r.RangeEnd, r.Revision, rev, len(kvs), r.Limit) resp := &RangeResponse{ Header: txnHeader(rev), Count: int64(len(kvs)), diff --git a/pkg/server/types.go b/pkg/server/types.go index 4cf55680..797e1dd8 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -25,7 +25,7 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix string) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error) - Watch(ctx context.Context, key string, revision int64) <-chan []*Event + Watch(ctx context.Context, key string, revision int64) WatchResult DbSize(ctx context.Context) (int64, error) } @@ -77,6 +77,12 @@ type Event struct { PrevKV *KeyValue } +type WatchResult struct { + CurrentRevision int64 + CompactRevision int64 + Events <-chan []*Event +} + func unsupported(field string) error { return status.New(codes.Unimplemented, field+" is not implemented by kine").Err() } diff --git a/pkg/server/watch.go b/pkg/server/watch.go index 51b8894f..b6a7af15 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -24,17 +24,20 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } defer w.Close() + logrus.Tracef("WATCH SERVER CREATE") + for { msg, err := ws.Recv() if err != nil { return err } - if msg.GetCreateRequest() != nil { - w.Start(ws.Context(), msg.GetCreateRequest()) - } else if msg.GetCancelRequest() != nil { - logrus.Tracef("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) - w.Cancel(msg.GetCancelRequest().WatchId, nil) + if cr := msg.GetCreateRequest(); cr != nil { + w.Start(ws.Context(), cr) + } + if cr := msg.GetCancelRequest(); cr != nil { + logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) + w.Cancel(cr.WatchId, 0, 0, nil) } } } @@ -69,31 +72,62 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, err) + w.Cancel(id, 0, 0, err) return } - for events := range w.backend.Watch(ctx, key, r.StartRevision) { - if len(events) == 0 { - continue - } + wr := w.backend.Watch(ctx, key, r.StartRevision) + + // If the watch result has a non-zero CompactRevision, then the watch request failed due to + // the requested start revision having been compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. + if wr.CompactRevision != 0 { + w.Cancel(id, wr.CurrentRevision, wr.CompactRevision, ErrCompacted) + return + } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - for _, event := range events { - logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + outer := true + for outer { + // Block on initial read from channel + reads := 1 + events := <-wr.Events + + // Collect additional queued events from the channel + inner := true + for inner { + select { + case e, ok := <-wr.Events: + reads++ + events = append(events, e...) + if !ok { + // channel was closed, break out of both loops + inner = false + outer = false + } + default: + inner = false } } - if err := w.server.Send(&etcdserverpb.WatchResponse{ - Header: txnHeader(events[len(events)-1].KV.ModRevision), - WatchId: id, - Events: toEvents(events...), - }); err != nil { - w.Cancel(id, err) - continue + // Send collected events in a single response + if len(events) > 0 { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + for _, event := range events { + logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + } + } + wr := &etcdserverpb.WatchResponse{ + Header: txnHeader(events[len(events)-1].KV.ModRevision), + WatchId: id, + Events: toEvents(events...), + } + logrus.Tracef("WATCH SEND id=%d, events=%d, size=%d reads=%d", id, len(wr.Events), wr.Size(), reads) + if err := w.server.Send(wr); err != nil { + w.Cancel(id, 0, 0, err) + } } } - w.Cancel(id, nil) + w.Cancel(id, 0, 0, nil) logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -120,7 +154,7 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, err error) { +func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) { w.Lock() if cancel, ok := w.watches[watchID]; ok { cancel() @@ -132,12 +166,14 @@ func (w *watcher) Cancel(watchID int64, err error) { if err != nil { reason = err.Error() } - logrus.Tracef("WATCH CANCEL id=%d reason=%s", watchID, reason) + logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) + serr := w.server.Send(&etcdserverpb.WatchResponse{ - Header: &etcdserverpb.ResponseHeader{}, - Canceled: true, - CancelReason: "watch closed", - WatchId: watchID, + Header: txnHeader(revision), + Canceled: err != nil, + CancelReason: reason, + WatchId: watchID, + CompactRevision: compactRev, }) if serr != nil && err != nil && !clientv3.IsConnCanceled(serr) { logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, serr) @@ -145,6 +181,7 @@ func (w *watcher) Cancel(watchID int64, err error) { } func (w *watcher) Close() { + logrus.Tracef("WATCH SERVER CLOSE") w.Lock() for _, v := range w.watches { v()