diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index dd707ced..ee5f6b37 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -843,23 +843,29 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision, } -func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { - +func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult { + ctx, cancel := context.WithCancel(ctx) watcher, err := d.kv.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) if revision > 0 { revision-- } - _, events, err := d.listAfter(ctx, prefix, revision) + result := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: result} + + rev, events, err := d.listAfter(ctx, prefix, revision) if err != nil { logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision) + if err == server.ErrCompacted { + compact, _ := d.compactRevision() + wr.CompactRevision = compact + wr.CurrentRevision = rev + } + cancel() } - result := make(chan []*server.Event, 100) - go func() { - if len(events) > 0 { result <- events revision = events[len(events)-1].KV.ModRevision @@ -915,11 +921,13 @@ func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-cha if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription { logrus.Warnf("error stopping %s watcher: %v", prefix, err) } + close(result) + cancel() return } } }() - return result + return wr } // getPreviousEntry returns the nats.KeyValueEntry previous to the one provided, if the previous entry is a nats.KeyValuePut diff --git a/pkg/logstructured/logstructured.go b/pkg/logstructured/logstructured.go index 2b5b51f6..9a1f57ad 100644 --- a/pkg/logstructured/logstructured.go +++ b/pkg/logstructured/logstructured.go @@ -17,6 +17,7 @@ const ( type Log interface { Start(ctx context.Context) error + CompactRevision(ctx context.Context) (int64, error) CurrentRevision(ctx context.Context) (int64, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) @@ -288,13 +289,13 @@ func (l *LogStructured) ttl(ctx context.Context) { eventKV := loadTTLEventKV(rwMutex, ttlEventKVMap, event.KV.Key) if eventKV == nil { - logrus.Tracef("Add ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + logrus.Tracef("Add ttl event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires) queue.AddAfter(event.KV.Key, expires) } else { if event.KV.ModRevision > eventKV.modRevision { - logrus.Tracef("Update ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + logrus.Tracef("Update ttl event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires) queue.AddAfter(event.KV.Key, expires) } } @@ -316,9 +317,9 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut return true } - if eventKV.expiredAt.After(time.Now()) { - logrus.Tracef("TTL event key %v has not expired yet, the latest expiration time is %v, requeuing", key, eventKV.expiredAt) - queue.AddAfter(key, time.Until(eventKV.expiredAt)) + if expires := time.Until(eventKV.expiredAt); expires > 0 { + logrus.Tracef("TTL event key %v expires in %v, requeuing", key, expires) + queue.AddAfter(key, expires) return true } @@ -333,9 +334,9 @@ func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMute rwMutex.Lock() defer rwMutex.Unlock() curEventKV := store[preEventKV.key] - if curEventKV.expiredAt.After(preEventKV.expiredAt) { - logrus.Tracef("TTL event key %v has updated, requeuing", curEventKV.key) - queue.AddAfter(curEventKV.key, time.Until(curEventKV.expiredAt)) + if expires := time.Until(preEventKV.expiredAt); expires > 0 { + logrus.Tracef("TTL event key was updated and now %v expires in %v, requeuing", curEventKV.key, expires) + queue.AddAfter(curEventKV.key, expires) return } if err != nil { @@ -389,10 +390,15 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { defer wg.Done() revision := <-lastListRevision if revision == 0 { - logrus.Error("TTL events last list revision is zero, retry to process ttl events") + logrus.Errorf("Failed to get start revision for ttl watch") return } - for events := range l.Watch(ctx, "/", revision) { + wr := l.Watch(ctx, "/", revision) + if wr.CompactRevision != 0 { + logrus.Errorf("Failed to watch for ttl: %v", server.ErrCompacted) + return + } + for events := range wr.Events { for _, event := range events { if event.KV.Lease > 0 { result <- event @@ -423,7 +429,7 @@ func storeTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, eventK return expires } -func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { +func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult { logrus.Tracef("WATCH %s, revision=%d", prefix, revision) // starting watching right away so we don't miss anything @@ -436,10 +442,16 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 } result := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: result} rev, kvs, err := l.log.After(ctx, prefix, revision, 0) if err != nil { - logrus.Errorf("failed to list %s for revision %d", prefix, revision) + logrus.Errorf("Failed to list %s for revision %d: %v", prefix, revision, err) + if err == server.ErrCompacted { + compact, _ := l.log.CompactRevision(ctx) + wr.CompactRevision = compact + wr.CurrentRevision = rev + } cancel() } @@ -463,7 +475,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 cancel() }() - return result + return wr } func filter(events []*server.Event, rev int64) []*server.Event { diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index bf9c4462..a2201df1 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -236,6 +236,10 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { return s.d.CurrentRevision(ctx) } +func (s *SQLLog) CompactRevision(ctx context.Context) (int64, error) { + return s.d.GetCompactRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { if strings.HasSuffix(prefix, "/") { prefix += "%" @@ -247,8 +251,21 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 } rev, compact, result, err := RowsToEvents(rows) + + if revision > 0 && len(result) == 0 { + // a zero length result won't have the compact or current revisions so get them manually + rev, err = s.d.CurrentRevision(ctx) + if err != nil { + return 0, nil, err + } + compact, err = s.d.GetCompactRevision(ctx) + if err != nil { + return 0, nil, err + } + } + if revision > 0 && revision < compact { - return rev, result, server.ErrCompacted + return rev, nil, server.ErrCompacted } return rev, result, err @@ -299,11 +316,11 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis } if revision > rev { - return rev, result, server.ErrFutureRev + return rev, nil, server.ErrFutureRev } if revision > 0 && revision < compact { - return rev, result, server.ErrCompacted + return rev, nil, server.ErrCompacted } select { @@ -420,6 +437,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { continue } + logrus.Tracef("POLL AFTER %d, limit=%d, events=%d", last, pollBatchSize, len(events)) + if len(events) == 0 { continue } diff --git a/pkg/server/list.go b/pkg/server/list.go index 5c98cf2e..ebec0df7 100644 --- a/pkg/server/list.go +++ b/pkg/server/list.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" + "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" ) @@ -25,6 +26,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) if err != nil { return nil, err } + logrus.Tracef("LIST COUNT key=%s, end=%s, revision=%d, currentRev=%d count=%d", r.Key, r.RangeEnd, r.Revision, rev, count) return &RangeResponse{ Header: txnHeader(rev), Count: count, @@ -41,6 +43,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) return nil, err } + logrus.Tracef("LIST key=%s, end=%s, revision=%d, currentRev=%d count=%d, limit=%d", r.Key, r.RangeEnd, r.Revision, rev, len(kvs), r.Limit) resp := &RangeResponse{ Header: txnHeader(rev), Count: int64(len(kvs)), diff --git a/pkg/server/types.go b/pkg/server/types.go index 4cf55680..797e1dd8 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -25,7 +25,7 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix string) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error) - Watch(ctx context.Context, key string, revision int64) <-chan []*Event + Watch(ctx context.Context, key string, revision int64) WatchResult DbSize(ctx context.Context) (int64, error) } @@ -77,6 +77,12 @@ type Event struct { PrevKV *KeyValue } +type WatchResult struct { + CurrentRevision int64 + CompactRevision int64 + Events <-chan []*Event +} + func unsupported(field string) error { return status.New(codes.Unimplemented, field+" is not implemented by kine").Err() } diff --git a/pkg/server/watch.go b/pkg/server/watch.go index 51b8894f..b6a7af15 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -24,17 +24,20 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } defer w.Close() + logrus.Tracef("WATCH SERVER CREATE") + for { msg, err := ws.Recv() if err != nil { return err } - if msg.GetCreateRequest() != nil { - w.Start(ws.Context(), msg.GetCreateRequest()) - } else if msg.GetCancelRequest() != nil { - logrus.Tracef("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) - w.Cancel(msg.GetCancelRequest().WatchId, nil) + if cr := msg.GetCreateRequest(); cr != nil { + w.Start(ws.Context(), cr) + } + if cr := msg.GetCancelRequest(); cr != nil { + logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) + w.Cancel(cr.WatchId, 0, 0, nil) } } } @@ -69,31 +72,62 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, err) + w.Cancel(id, 0, 0, err) return } - for events := range w.backend.Watch(ctx, key, r.StartRevision) { - if len(events) == 0 { - continue - } + wr := w.backend.Watch(ctx, key, r.StartRevision) + + // If the watch result has a non-zero CompactRevision, then the watch request failed due to + // the requested start revision having been compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. + if wr.CompactRevision != 0 { + w.Cancel(id, wr.CurrentRevision, wr.CompactRevision, ErrCompacted) + return + } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - for _, event := range events { - logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + outer := true + for outer { + // Block on initial read from channel + reads := 1 + events := <-wr.Events + + // Collect additional queued events from the channel + inner := true + for inner { + select { + case e, ok := <-wr.Events: + reads++ + events = append(events, e...) + if !ok { + // channel was closed, break out of both loops + inner = false + outer = false + } + default: + inner = false } } - if err := w.server.Send(&etcdserverpb.WatchResponse{ - Header: txnHeader(events[len(events)-1].KV.ModRevision), - WatchId: id, - Events: toEvents(events...), - }); err != nil { - w.Cancel(id, err) - continue + // Send collected events in a single response + if len(events) > 0 { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + for _, event := range events { + logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + } + } + wr := &etcdserverpb.WatchResponse{ + Header: txnHeader(events[len(events)-1].KV.ModRevision), + WatchId: id, + Events: toEvents(events...), + } + logrus.Tracef("WATCH SEND id=%d, events=%d, size=%d reads=%d", id, len(wr.Events), wr.Size(), reads) + if err := w.server.Send(wr); err != nil { + w.Cancel(id, 0, 0, err) + } } } - w.Cancel(id, nil) + w.Cancel(id, 0, 0, nil) logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -120,7 +154,7 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, err error) { +func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) { w.Lock() if cancel, ok := w.watches[watchID]; ok { cancel() @@ -132,12 +166,14 @@ func (w *watcher) Cancel(watchID int64, err error) { if err != nil { reason = err.Error() } - logrus.Tracef("WATCH CANCEL id=%d reason=%s", watchID, reason) + logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) + serr := w.server.Send(&etcdserverpb.WatchResponse{ - Header: &etcdserverpb.ResponseHeader{}, - Canceled: true, - CancelReason: "watch closed", - WatchId: watchID, + Header: txnHeader(revision), + Canceled: err != nil, + CancelReason: reason, + WatchId: watchID, + CompactRevision: compactRev, }) if serr != nil && err != nil && !clientv3.IsConnCanceled(serr) { logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, serr) @@ -145,6 +181,7 @@ func (w *watcher) Cancel(watchID int64, err error) { } func (w *watcher) Close() { + logrus.Tracef("WATCH SERVER CLOSE") w.Lock() for _, v := range w.watches { v()