diff --git a/pkg/client/client.go b/pkg/client/client.go index a686d0d8..f3915545 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -27,6 +27,7 @@ type Client interface { Create(ctx context.Context, key string, value []byte) error Update(ctx context.Context, key string, revision int64, value []byte) error Delete(ctx context.Context, key string, revision int64) error + Compact(ctx context.Context, revision int64) (int64, error) Close() error } @@ -144,6 +145,14 @@ func (c *client) Delete(ctx context.Context, key string, revision int64) error { return nil } +func (c *client) Compact(ctx context.Context, revision int64) (int64, error) { + resp, err := c.c.Compact(ctx, revision) + if resp != nil { + return resp.Header.GetRevision(), err + } + return 0, err +} + func (c *client) Close() error { return c.c.Close() } diff --git a/pkg/drivers/nats/backend.go b/pkg/drivers/nats/backend.go index d094e200..5feb6b54 100644 --- a/pkg/drivers/nats/backend.go +++ b/pkg/drivers/nats/backend.go @@ -427,3 +427,8 @@ func (b *Backend) Watch(ctx context.Context, prefix string, startRevision int64) CurrentRevision: rev, } } + +// Compact is a no-op / not implemented. Revision history is managed by the jetstream bucket. +func (b *Backend) Compact(ctx context.Context, revision int64) (int64, error) { + return revision, nil +} diff --git a/pkg/drivers/nats/logger.go b/pkg/drivers/nats/logger.go index fbfb0d0f..b288aa85 100644 --- a/pkg/drivers/nats/logger.go +++ b/pkg/drivers/nats/logger.go @@ -120,3 +120,8 @@ func (b *BackendLogger) DbSize(ctx context.Context) (int64, error) { func (b *BackendLogger) CurrentRevision(ctx context.Context) (int64, error) { return b.backend.CurrentRevision(ctx) } + +// Compact is a no-op / not implemented. Revision history is managed by the jetstream bucket. +func (b *BackendLogger) Compact(ctx context.Context, revision int64) (int64, error) { + return revision, nil +} diff --git a/pkg/logstructured/logstructured.go b/pkg/logstructured/logstructured.go index fed96e21..cfeb106f 100644 --- a/pkg/logstructured/logstructured.go +++ b/pkg/logstructured/logstructured.go @@ -26,6 +26,7 @@ type Log interface { Watch(ctx context.Context, prefix string) <-chan []*server.Event Append(ctx context.Context, event *server.Event) (int64, error) DbSize(ctx context.Context) (int64, error) + Compact(ctx context.Context, revision int64) (int64, error) } type ttlEventKV struct { @@ -496,3 +497,7 @@ func (l *LogStructured) DbSize(ctx context.Context) (int64, error) { func (l *LogStructured) CurrentRevision(ctx context.Context) (int64, error) { return l.log.CurrentRevision(ctx) } + +func (l *LogStructured) Compact(ctx context.Context, revision int64) (int64, error) { + return l.log.Compact(ctx, revision) +} diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index 8305e20f..afd62fbc 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -612,3 +612,7 @@ func safeCompactRev(targetCompactRev int64, currentRev int64) int64 { func (s *SQLLog) DbSize(ctx context.Context) (int64, error) { return s.d.GetSize(ctx) } + +func (s *SQLLog) Compact(ctx context.Context, revision int64) (int64, error) { + return s.d.Compact(ctx, revision) +} diff --git a/pkg/server/compact.go b/pkg/server/compact.go index 340b9ae1..f2fc83a6 100644 --- a/pkg/server/compact.go +++ b/pkg/server/compact.go @@ -7,6 +7,15 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" ) +func (l *LimitedServer) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { + rev, err := l.backend.Compact(ctx, r.Revision) + return &etcdserverpb.CompactionResponse{ + Header: &etcdserverpb.ResponseHeader{ + Revision: rev, + }, + }, err +} + func isCompact(txn *etcdserverpb.TxnRequest) bool { // See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact.go#L72 return len(txn.Compare) == 1 && @@ -19,7 +28,7 @@ func isCompact(txn *etcdserverpb.TxnRequest) bool { string(txn.Compare[0].Key) == "compact_rev_key" } -func (l *LimitedServer) compact(ctx context.Context) (*etcdserverpb.TxnResponse, error) { +func (l *LimitedServer) compact() (*etcdserverpb.TxnResponse, error) { // return comparison failure so that the apiserver does not bother compacting return &etcdserverpb.TxnResponse{ Header: &etcdserverpb.ResponseHeader{}, diff --git a/pkg/server/create.go b/pkg/server/create.go index af6f43c1..00ce3c43 100644 --- a/pkg/server/create.go +++ b/pkg/server/create.go @@ -19,7 +19,7 @@ func isCreate(txn *etcdserverpb.TxnRequest) *etcdserverpb.PutRequest { return nil } -func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { +func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest) (*etcdserverpb.TxnResponse, error) { if put.IgnoreLease { return nil, unsupported("ignoreLease") } else if put.IgnoreValue { diff --git a/pkg/server/kv.go b/pkg/server/kv.go index bf2ea020..fda2a373 100644 --- a/pkg/server/kv.go +++ b/pkg/server/kv.go @@ -109,9 +109,9 @@ func (k *KVServerBridge) Txn(ctx context.Context, r *etcdserverpb.TxnRequest) (* } func (k *KVServerBridge) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { - return &etcdserverpb.CompactionResponse{ - Header: &etcdserverpb.ResponseHeader{ - Revision: r.Revision, - }, - }, nil + res, err := k.limited.Compact(ctx, r) + if err != nil { + logrus.Errorf("error in compact %s: %v", r, err) + } + return res, err } diff --git a/pkg/server/limited.go b/pkg/server/limited.go index 7957c84d..45bc6771 100644 --- a/pkg/server/limited.go +++ b/pkg/server/limited.go @@ -28,7 +28,7 @@ func txnHeader(rev int64) *etcdserverpb.ResponseHeader { func (l *LimitedServer) Txn(ctx context.Context, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { if put := isCreate(txn); put != nil { - return l.create(ctx, put, txn) + return l.create(ctx, put) } if rev, key, ok := isDelete(txn); ok { return l.delete(ctx, key, rev) @@ -37,7 +37,7 @@ func (l *LimitedServer) Txn(ctx context.Context, txn *etcdserverpb.TxnRequest) ( return l.update(ctx, rev, key, value, lease) } if isCompact(txn) { - return l.compact(ctx) + return l.compact() } return nil, ErrNotSupported } diff --git a/pkg/server/types.go b/pkg/server/types.go index 4afdba0c..83a87322 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -28,6 +28,7 @@ type Backend interface { Watch(ctx context.Context, key string, revision int64) WatchResult DbSize(ctx context.Context) (int64, error) CurrentRevision(ctx context.Context) (int64, error) + Compact(ctx context.Context, revision int64) (int64, error) } type Dialect interface {