Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple issues with Watch #238

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/drivers/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,23 +843,29 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision,

}

func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {

func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult {
ctx, cancel := context.WithCancel(ctx)
watcher, err := d.kv.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx))

if revision > 0 {
revision--
}
_, events, err := d.listAfter(ctx, prefix, revision)

result := make(chan []*server.Event, 100)
wr := server.WatchResult{Events: result}

rev, events, err := d.listAfter(ctx, prefix, revision)
if err != nil {
logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision)
if err == server.ErrCompacted {
compact, _ := d.compactRevision()
wr.CompactRevision = compact
wr.CurrentRevision = rev
}
cancel()
}

result := make(chan []*server.Event, 100)

go func() {

if len(events) > 0 {
result <- events
revision = events[len(events)-1].KV.ModRevision
Expand Down Expand Up @@ -915,11 +921,13 @@ func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-cha
if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription {
logrus.Warnf("error stopping %s watcher: %v", prefix, err)
}
close(result)
cancel()
return
}
}
}()
return result
return wr
}

// getPreviousEntry returns the nats.KeyValueEntry previous to the one provided, if the previous entry is a nats.KeyValuePut
Expand Down
48 changes: 30 additions & 18 deletions pkg/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (

type Log interface {
Start(ctx context.Context) error
CompactRevision(ctx context.Context) (int64, error)
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Expand Down Expand Up @@ -288,13 +289,13 @@ func (l *LogStructured) ttl(ctx context.Context) {

eventKV := loadTTLEventKV(rwMutex, ttlEventKVMap, event.KV.Key)
if eventKV == nil {
logrus.Tracef("Add ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision)
expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV)
logrus.Tracef("TTL add event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires)
queue.AddAfter(event.KV.Key, expires)
} else {
if event.KV.ModRevision > eventKV.modRevision {
logrus.Tracef("Update ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision)
expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV)
logrus.Tracef("TTL update event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires)
queue.AddAfter(event.KV.Key, expires)
}
}
Expand All @@ -312,13 +313,13 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut

eventKV := loadTTLEventKV(rwMutex, store, key.(string))
if eventKV == nil {
logrus.Errorf("Failed to find ttl event for key %v", key)
logrus.Errorf("TTL event not found for key=%v", key)
return true
}

if eventKV.expiredAt.After(time.Now()) {
logrus.Tracef("TTL event key %v has not expired yet, the latest expiration time is %v, requeuing", key, eventKV.expiredAt)
queue.AddAfter(key, time.Until(eventKV.expiredAt))
if expires := time.Until(eventKV.expiredAt); expires > 0 {
logrus.Tracef("TTL has not expired for key=%v, ttl=%v, requeuing", key, expires)
queue.AddAfter(key, expires)
return true
}

Expand All @@ -327,19 +328,19 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut
}

func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMutex, queue workqueue.DelayingInterface, store map[string]*ttlEventKV, preEventKV *ttlEventKV) {
logrus.Tracef("Delete ttl event key %v, modRev %v", preEventKV.key, preEventKV.modRevision)
logrus.Tracef("TTL delete key=%v, modRev=%v", preEventKV.key, preEventKV.modRevision)
_, _, _, err := l.Delete(ctx, preEventKV.key, preEventKV.modRevision)

rwMutex.Lock()
defer rwMutex.Unlock()
curEventKV := store[preEventKV.key]
if curEventKV.expiredAt.After(preEventKV.expiredAt) {
logrus.Tracef("TTL event key %v has updated, requeuing", curEventKV.key)
queue.AddAfter(curEventKV.key, time.Until(curEventKV.expiredAt))
if expires := time.Until(preEventKV.expiredAt); expires > 0 {
logrus.Tracef("TTL changed for key=%v, ttl=%v, requeuing", curEventKV.key, expires)
queue.AddAfter(curEventKV.key, expires)
return
}
if err != nil {
logrus.Errorf("Failed to delete key %v at end of lease: %v, requeuing", curEventKV.key, err)
logrus.Errorf("TTL delete trigger failed for key=%v: %v, requeuing", curEventKV.key, err)
queue.AddAfter(curEventKV.key, retryInterval)
return
}
Expand All @@ -366,7 +367,7 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false)
for len(events) > 0 {
if err != nil {
logrus.Errorf("Failed to read old events for ttl: %v", err)
logrus.Errorf("TTL event list failed: %v", err)
break
}

Expand All @@ -389,17 +390,22 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
defer wg.Done()
revision := <-lastListRevision
if revision == 0 {
logrus.Error("TTL events last list revision is zero, retry to process ttl events")
logrus.Error("TTL event watch failed to get start revision")
return
}
for events := range l.Watch(ctx, "/", revision) {
wr := l.Watch(ctx, "/", revision)
if wr.CompactRevision != 0 {
logrus.Errorf("TTL event watch failed: %v", server.ErrCompacted)
return
}
for events := range wr.Events {
for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}
}
}
logrus.Info("TTL events watch channel was closed")
logrus.Info("TTL events watch channel closed")
}()

return result
Expand All @@ -423,7 +429,7 @@ func storeTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, eventK
return expires
}

func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {
func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult {
logrus.Tracef("WATCH %s, revision=%d", prefix, revision)

// starting watching right away so we don't miss anything
Expand All @@ -436,10 +442,16 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
}

result := make(chan []*server.Event, 100)
wr := server.WatchResult{Events: result}

rev, kvs, err := l.log.After(ctx, prefix, revision, 0)
if err != nil {
logrus.Errorf("failed to list %s for revision %d", prefix, revision)
logrus.Errorf("Failed to list %s for revision %d: %v", prefix, revision, err)
if err == server.ErrCompacted {
compact, _ := l.log.CompactRevision(ctx)
wr.CompactRevision = compact
wr.CurrentRevision = rev
}
cancel()
}

Expand All @@ -463,7 +475,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
cancel()
}()

return result
return wr
}

func filter(events []*server.Event, rev int64) []*server.Event {
Expand Down
25 changes: 22 additions & 3 deletions pkg/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
return s.d.CurrentRevision(ctx)
}

func (s *SQLLog) CompactRevision(ctx context.Context) (int64, error) {
return s.d.GetCompactRevision(ctx)
}

func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) {
if strings.HasSuffix(prefix, "/") {
prefix += "%"
Expand All @@ -247,8 +251,21 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64
}

rev, compact, result, err := RowsToEvents(rows)

if revision > 0 && len(result) == 0 {
// a zero length result won't have the compact or current revisions so get them manually
rev, err = s.d.CurrentRevision(ctx)
if err != nil {
return 0, nil, err
}
compact, err = s.d.GetCompactRevision(ctx)
if err != nil {
return 0, nil, err
}
}

if revision > 0 && revision < compact {
return rev, result, server.ErrCompacted
return rev, nil, server.ErrCompacted
}

return rev, result, err
Expand Down Expand Up @@ -299,11 +316,11 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
}

if revision > rev {
return rev, result, server.ErrFutureRev
return rev, nil, server.ErrFutureRev
}

if revision > 0 && revision < compact {
return rev, result, server.ErrCompacted
return rev, nil, server.ErrCompacted
}

select {
Expand Down Expand Up @@ -420,6 +437,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
continue
}

logrus.Tracef("POLL AFTER %d, limit=%d, events=%d", last, pollBatchSize, len(events))

if len(events) == 0 {
continue
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"

"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
)

Expand All @@ -25,6 +26,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
if err != nil {
return nil, err
}
logrus.Tracef("LIST COUNT key=%s, end=%s, revision=%d, currentRev=%d count=%d", r.Key, r.RangeEnd, r.Revision, rev, count)
return &RangeResponse{
Header: txnHeader(rev),
Count: count,
Expand All @@ -41,6 +43,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
return nil, err
}

logrus.Tracef("LIST key=%s, end=%s, revision=%d, currentRev=%d count=%d, limit=%d", r.Key, r.RangeEnd, r.Revision, rev, len(kvs), r.Limit)
resp := &RangeResponse{
Header: txnHeader(rev),
Count: int64(len(kvs)),
Expand Down
8 changes: 7 additions & 1 deletion pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Backend interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
Watch(ctx context.Context, key string, revision int64) WatchResult
DbSize(ctx context.Context) (int64, error)
}

Expand Down Expand Up @@ -77,6 +77,12 @@ type Event struct {
PrevKV *KeyValue
}

type WatchResult struct {
CurrentRevision int64
CompactRevision int64
Events <-chan []*Event
}

func unsupported(field string) error {
return status.New(codes.Unimplemented, field+" is not implemented by kine").Err()
}
Loading