Skip to content

Commit

Permalink
Add support for TTL
Browse files Browse the repository at this point in the history
We add an additional field in the LSM key structure to capture a Unix timestamp, beyond which the key would be considered expired and would be treated as if it is deleted.

This changes the on-disk format, so we also need to increment the manifest version number.

Fixes dgraph-io#298
  • Loading branch information
deepakjois committed Nov 2, 2017
1 parent c02ea15 commit a5499e5
Show file tree
Hide file tree
Showing 17 changed files with 383 additions and 219 deletions.
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We are currently gearing up for a [v1.0 release][v1-milestone].
- [Read-write transactions](#read-write-transactions)
- [Managing transactions manually](#managing-transactions-manually)
+ [Using key/value pairs](#using-keyvalue-pairs)
+ [Setting Time To Live(TTL) and User Metadata on Keys](#setting-time-to-livettl-and-user-metadata-on-keys)
+ [Iterating over keys](#iterating-over-keys)
- [Prefix scans](#prefix-scans)
- [Key-only iteration](#key-only-iteration)
Expand Down Expand Up @@ -150,7 +151,7 @@ if err != nil {
defer txn.Discard()

// Use the transaction...
err := txn.Set([]byte("answer"), []byte("42"), 0)
err := txn.Set([]byte("answer"), []byte("42"))
if err != nil {
return err
}
Expand Down Expand Up @@ -178,7 +179,7 @@ To save a key/value pair, use the `Txn.Set()` method:

```go
err := db.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte("answer"), []byte("42"), 0)
err := txn.Set([]byte("answer"), []byte("42"))
return err
})
```
Expand Down Expand Up @@ -209,6 +210,17 @@ then you must use `copy()` to copy it to another byte slice.

Use the `Txn.Delete()` method to delete a key.

### Setting Time To Live(TTL) and User Metadata on Keys
Badger allows setting an optional Time to Live (TTL) value on keys. Once the TTL has
elapsed, the key will no longer be retrievable and will be eligible for garbage
collection. A TTL can be set as a `time.Duration` value using the `Txn.SetWithTTL()`
API method.

An optional user metadata value can be set on each key. A user metadata value
is represented by a single byte. It can be used to set certain bits along
with the key to aid in interpreting or decoding the key-value pair. User
metadata can be set using the `Txn.SetWithMeta()` API method.

### Iterating over keys
To iterate over keys, we can use an `Iterator`, which can be obtained using the
`Txn.NewIterator()` method.
Expand Down Expand Up @@ -386,6 +398,7 @@ Values in SSD-conscious Storage][wisckey]_.
| Pure Go (no Cgo) | Yes | No | Yes |
| Transactions | Yes, ACID, concurrent with SSI<sup>3</sup> | Yes (but non-ACID) | Yes, ACID |
| Snapshots | Yes | Yes | Yes |
| TTL support | Yes | Yes | No |

<sup>1</sup> The [WISCKEY paper][wisckey] (on which Badger is based) saw big
wins with separating values from keys, significantly reducing the write
Expand Down
16 changes: 9 additions & 7 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
}

entry := &protos.KVPair{
Key: y.Copy(item.Key()),
Value: y.Copy(val),
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
Key: y.Copy(item.Key()),
Value: y.Copy(val),
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
}

// Write entries to disk
Expand Down Expand Up @@ -118,9 +119,10 @@ func (db *DB) Load(r io.Reader) error {
return err
}
entries = append(entries, &entry{
Key: y.KeyWithTs(e.Key, e.Version),
Value: e.Value,
UserMeta: e.UserMeta[0],
Key: y.KeyWithTs(e.Key, e.Version),
Value: e.Value,
UserMeta: e.UserMeta[0],
ExpiresAt: e.ExpiresAt,
})

if len(entries) == 1000 {
Expand Down
2 changes: 1 addition & 1 deletion backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestDumpLoad(t *testing.T) {

err = db.Update(func(txn *Txn) error {
for _, e := range entries {
err := txn.Set(e.key, e.val, e.userMeta)
err := txn.SetWithMeta(e.key, e.val, e.userMeta)
if err != nil {
return err
}
Expand Down
70 changes: 36 additions & 34 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error {
nk := make([]byte, len(e.Key))
copy(nk, e.Key)
var nv []byte
meta := e.Meta
meta := e.meta
if out.shouldWriteValueToLSM(e) {
nv = make([]byte, len(e.Value))
copy(nv, e.Value)
Expand All @@ -125,7 +125,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error {
UserMeta: e.UserMeta,
}

if e.Meta&bitFinTxn > 0 {
if e.meta&bitFinTxn > 0 {
txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
if err != nil {
return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value)
Expand All @@ -139,7 +139,7 @@ func replayFunction(out *DB) func(entry, valuePointer) error {
txn = txn[:0]
lastCommit = 0

} else if e.Meta&bitTxn == 0 {
} else if e.meta&bitTxn == 0 {
// This entry is from a rewrite.
toLSM(nk, v)

Expand Down Expand Up @@ -488,23 +488,25 @@ func (db *DB) writeToLSM(b *request) error {
}

for i, entry := range b.Entries {
if entry.Meta&bitFinTxn != 0 {
if entry.meta&bitFinTxn != 0 {
continue
}
if db.shouldWriteValueToLSM(*entry) { // Will include deletion / tombstone case.
db.mt.Put(entry.Key,
y.ValueStruct{
Value: entry.Value,
Meta: entry.Meta,
UserMeta: entry.UserMeta,
Value: entry.Value,
Meta: entry.meta,
UserMeta: entry.UserMeta,
ExpiresAt: entry.ExpiresAt,
})
} else {
var offsetBuf [vptrSize]byte
db.mt.Put(entry.Key,
y.ValueStruct{
Value: b.Ptrs[i].Encode(offsetBuf[:]),
Meta: entry.Meta | bitValuePointer,
UserMeta: entry.UserMeta,
Value: b.Ptrs[i].Encode(offsetBuf[:]),
Meta: entry.meta | bitValuePointer,
UserMeta: entry.UserMeta,
ExpiresAt: entry.ExpiresAt,
})
}
}
Expand Down Expand Up @@ -561,6 +563,28 @@ func (db *DB) writeRequests(reqs []*request) error {
return nil
}

func (db *DB) sendToWriteCh(entries []*entry) (*request, error) {
var count, size int64
for _, e := range entries {
size += int64(db.opt.estimateSize(e))
count++
}
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
return nil, ErrTxnTooBig
}

// We can only service one request because we need each txn to be stored in a contigous section.
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.Entries = entries
req.Wg = sync.WaitGroup{}
req.Wg.Add(1)
db.writeCh <- req // Handled in doWrites.
y.NumPuts.Add(int64(len(entries)))

return req, nil
}

func (db *DB) doWrites(lc *y.Closer) {
defer lc.Done()
pendingCh := make(chan struct{}, 1)
Expand Down Expand Up @@ -621,28 +645,6 @@ func (db *DB) doWrites(lc *y.Closer) {
}
}

func (db *DB) sendToWriteCh(entries []*entry) (*request, error) {
var count, size int64
for _, e := range entries {
size += int64(db.opt.estimateSize(e))
count++
}
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
return nil, ErrTxnTooBig
}

// We can only service one request because we need each txn to be stored in a contigous section.
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.Entries = entries
req.Wg = sync.WaitGroup{}
req.Wg.Add(1)
db.writeCh <- req
y.NumPuts.Add(int64(len(entries)))

return req, nil
}

// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
// Check(kv.BatchSet(entries))
Expand Down Expand Up @@ -887,7 +889,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
entries = append(entries,
&entry{
Key: y.KeyWithTs(key, item.version),
Meta: bitDelete,
meta: bitDelete,
})
db.vlog.updateGCStats(item)
}
Expand Down Expand Up @@ -942,7 +944,7 @@ func (db *DB) PurgeOlderVersions() error {
entries = append(entries,
&entry{
Key: y.KeyWithTs(lastKey, item.version),
Meta: bitDelete,
meta: bitDelete,
})
db.vlog.updateGCStats(item)
count++
Expand Down
Loading

0 comments on commit a5499e5

Please sign in to comment.