Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow users to store usermetadata in lsm #123

Merged
merged 1 commit into from
Jul 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions compact_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func TestCompactLogBasic(t *testing.T) {
fmt.Printf("Putting i=%d\n", i)
}
k := []byte(fmt.Sprintf("%16x", rand.Int63()))
kv.Set(k, k)
kv.Set(k, k, 0x00)
}
kv.Set([]byte("testkey"), []byte("testval"))
kv.Set([]byte("testkey"), []byte("testval"), 0x05)
kv.validate()
require.NoError(t, kv.Close())
}
Expand All @@ -99,6 +99,7 @@ func TestCompactLogBasic(t *testing.T) {
t.Error(err)
}
require.EqualValues(t, "testval", string(item.Value()))
require.EqualValues(t, byte(0x05), item.UserMeta())
require.NoError(t, kv.Close())
}

Expand Down Expand Up @@ -137,7 +138,7 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
})
for i, kv := range keyValues {
y.AssertTrue(len(kv) == 2)
err := b.Add([]byte(kv[0]), y.ValueStruct{[]byte(kv[1]), 'A', uint16(i)})
err := b.Add([]byte(kv[0]), y.ValueStruct{[]byte(kv[1]), 'A', uint16(i), 0})
if t != nil {
require.NoError(t, err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Example() {

key := []byte("hello")

kv.Set(key, []byte("world"))
kv.Set(key, []byte("world"), 0x00)
fmt.Printf("SET %s world\n", key)

var item badger.KVItem
Expand Down
7 changes: 7 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type KVItem struct {
key []byte
vptr []byte
meta byte
userMeta byte
val []byte
casCounter uint16
slice *y.Slice
Expand All @@ -54,6 +55,11 @@ func (item *KVItem) Counter() uint16 {
return item.casCounter
}

// UserMeta returns the userMeta set by the user
func (item *KVItem) UserMeta() byte {
return item.userMeta
}

type list struct {
head *KVItem
tail *KVItem
Expand Down Expand Up @@ -165,6 +171,7 @@ func (it *Iterator) Next() {
func (it *Iterator) fill(item *KVItem) {
vs := it.iitr.Value()
item.meta = vs.Meta
item.userMeta = vs.UserMeta
item.casCounter = vs.CASCounter
item.key = y.Safecopy(item.key, it.iitr.Key())
item.vptr = y.Safecopy(item.vptr, vs.Value)
Expand Down
33 changes: 20 additions & 13 deletions kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ var DefaultOptions = Options{

func (opt *Options) estimateSize(entry *Entry) int {
if len(entry.Value) < opt.ValueThreshold {
// 3 is for cas + meta
return len(entry.Key) + len(entry.Value) + 3
// 4 is for cas + meta
return len(entry.Key) + len(entry.Value) + 4
}
return len(entry.Key) + 16 + 3
return len(entry.Key) + 16 + 4
}

// KV provides the various functions required to interact with Badger.
Expand Down Expand Up @@ -264,6 +264,7 @@ func NewKV(opt *Options) (out *KV, err error) {
v := y.ValueStruct{
Value: nv,
Meta: meta,
UserMeta: e.UserMeta,
CASCounter: e.casCounter,
}
for err := out.ensureRoomForWrite(); err != nil; err = out.ensureRoomForWrite() {
Expand Down Expand Up @@ -474,6 +475,7 @@ func (s *KV) Get(key []byte, item *KVItem) error {
item.slice = new(y.Slice)
}
item.meta = vs.Meta
item.userMeta = vs.UserMeta
item.casCounter = vs.CASCounter
item.key = key
item.vptr = vs.Value
Expand Down Expand Up @@ -566,12 +568,14 @@ func (s *KV) writeToLSM(b *request) error {
y.ValueStruct{
Value: entry.Value,
Meta: entry.Meta,
UserMeta: entry.UserMeta,
CASCounter: entry.casCounter})
} else {
s.mt.Put(entry.Key,
y.ValueStruct{
Value: b.Ptrs[i].Encode(offsetBuf[:]),
Meta: entry.Meta | BitValuePointer,
UserMeta: entry.UserMeta,
CASCounter: entry.casCounter})
}
}
Expand Down Expand Up @@ -751,28 +755,30 @@ func (s *KV) BatchSetAsync(entries []*Entry, f func(error)) {

// Set sets the provided value for a given key. If key is not present, it is created.
// If it is present, the existing value is overwritten with the one provided.
func (s *KV) Set(key, val []byte) error {
func (s *KV) Set(key, val []byte, userMeta byte) error {
e := &Entry{
Key: key,
Value: val,
Key: key,
Value: val,
UserMeta: userMeta,
}
return s.BatchSet([]*Entry{e})
}

// SetAsync is the asynchronous version of Set. It accepts a callback function which is called
// when the set is complete. Any error encountered during execution is passed as an argument
// to the callback function.
func (s *KV) SetAsync(key, val []byte, f func(error)) {
func (s *KV) SetAsync(key, val []byte, userMeta byte, f func(error)) {
e := &Entry{
Key: key,
Value: val,
Key: key,
Value: val,
UserMeta: userMeta,
}
s.BatchSetAsync([]*Entry{e}, f)
}

// SetIfAbsent sets value of key if key is not present.
// If it is present, it returns the KeyExists error.
func (s *KV) SetIfAbsent(key, val []byte) error {
func (s *KV) SetIfAbsent(key, val []byte, userMeta byte) error {
exists, err := s.Exists(key)
if err != nil {
return err
Expand All @@ -783,9 +789,10 @@ func (s *KV) SetIfAbsent(key, val []byte) error {
}

e := &Entry{
Key: key,
Meta: BitSetIfAbsent,
Value: val,
Key: key,
Meta: BitSetIfAbsent,
Value: val,
UserMeta: userMeta,
}
if err := s.BatchSet([]*Entry{e}); err != nil {
return err
Expand Down
31 changes: 18 additions & 13 deletions kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestConcurrentWrite(t *testing.T) {
defer wg.Done()
for j := 0; j < m; j++ {
kv.Set([]byte(fmt.Sprintf("k%05d_%08d", i, j)),
[]byte(fmt.Sprintf("v%05d_%08d", i, j)))
[]byte(fmt.Sprintf("v%05d_%08d", i, j)), byte(j%127))
}
}(i)
}
Expand All @@ -105,6 +105,7 @@ func TestConcurrentWrite(t *testing.T) {
require.EqualValues(t, fmt.Sprintf("k%05d_%08d", i, j), string(k))
v := item.Value()
require.EqualValues(t, fmt.Sprintf("v%05d_%08d", i, j), string(v))
require.Equal(t, item.UserMeta(), byte(j%127))
j++
if j == m {
i++
Expand Down Expand Up @@ -218,19 +219,21 @@ func TestGet(t *testing.T) {
defer kv.Close()

var item KVItem
kv.Set([]byte("key1"), []byte("val1"))
kv.Set([]byte("key1"), []byte("val1"), 0x08)

if err := kv.Get([]byte("key1"), &item); err != nil {
t.Error(err)
}
require.EqualValues(t, "val1", item.Value())
require.Equal(t, byte(0x08), item.UserMeta())
require.True(t, item.Counter() != 0)

kv.Set([]byte("key1"), []byte("val2"))
kv.Set([]byte("key1"), []byte("val2"), 0x09)
if err := kv.Get([]byte("key1"), &item); err != nil {
t.Error(err)
}
require.EqualValues(t, "val2", item.Value())
require.Equal(t, byte(0x09), item.UserMeta())
require.True(t, item.Counter() != 0)

kv.Delete([]byte("key1"))
Expand All @@ -240,15 +243,16 @@ func TestGet(t *testing.T) {
require.Nil(t, item.Value())
require.True(t, item.Counter() != 0)

kv.Set([]byte("key1"), []byte("val3"))
kv.Set([]byte("key1"), []byte("val3"), 0x01)
if err := kv.Get([]byte("key1"), &item); err != nil {
t.Error(err)
}
require.EqualValues(t, "val3", item.Value())
require.Equal(t, byte(0x01), item.UserMeta())
require.True(t, item.Counter() != 0)

longVal := make([]byte, 1000)
kv.Set([]byte("key1"), longVal)
kv.Set([]byte("key1"), longVal, 0x00)
if err := kv.Get([]byte("key1"), &item); err != nil {
t.Error(err)
}
Expand All @@ -267,7 +271,7 @@ func TestExists(t *testing.T) {
defer kv.Close()

// populate with one entry
err = kv.Set([]byte("key1"), []byte("val1"))
err = kv.Set([]byte("key1"), []byte("val1"), 0x00)
require.NoError(t, err)

tt := []struct {
Expand Down Expand Up @@ -511,7 +515,7 @@ func TestIterate2Basic(t *testing.T) {
if (i % 1000) == 0 {
t.Logf("Put i=%d\n", i)
}
kv.Set(bkey(i), bval(i))
kv.Set(bkey(i), bval(i), byte(i%127))
}

opt := IteratorOptions{}
Expand All @@ -537,6 +541,7 @@ func TestIterate2Basic(t *testing.T) {
require.EqualValues(t, bkey(count), string(key))
val := item.Value()
require.EqualValues(t, bval(count), string(val))
require.Equal(t, byte(count%127), item.UserMeta())
count++
}
require.EqualValues(t, n, count)
Expand Down Expand Up @@ -569,7 +574,7 @@ func TestLoad(t *testing.T) {
fmt.Printf("Putting i=%d\n", i)
}
k := []byte(fmt.Sprintf("%09d", i))
kv.Set(k, k)
kv.Set(k, k, 0x00)
}
kv.Close()
}
Expand Down Expand Up @@ -617,8 +622,8 @@ func TestIterateDeleted(t *testing.T) {
ps, err := NewKV(&opt)
require.NoError(t, err)
defer ps.Close()
ps.Set([]byte("Key1"), []byte("Value1"))
ps.Set([]byte("Key2"), []byte("Value2"))
ps.Set([]byte("Key1"), []byte("Value1"), 0x00)
ps.Set([]byte("Key2"), []byte("Value2"), 0x00)

iterOpt := DefaultIteratorOptions
iterOpt.FetchValues = false
Expand Down Expand Up @@ -682,7 +687,7 @@ func TestDeleteWithoutSyncWrite(t *testing.T) {

key := []byte("k1")
// Set a value with size > value threshold so that its written to value log.
require.NoError(t, kv.Set(key, []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789FOOBARZOGZOG")))
require.NoError(t, kv.Set(key, []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789FOOBARZOGZOG"), 0x00))
require.NoError(t, kv.Delete(key))
kv.Close()

Expand All @@ -706,10 +711,10 @@ func TestSetIfAbsent(t *testing.T) {
require.NoError(t, err)

key := []byte("k1")
err = kv.SetIfAbsent(key, []byte("val"))
err = kv.SetIfAbsent(key, []byte("val"), 0x00)
require.NoError(t, err)

err = kv.SetIfAbsent(key, []byte("val2"))
err = kv.SetIfAbsent(key, []byte("val2"), 0x00)
require.EqualError(t, err, KeyExists.Error())
}

Expand Down
12 changes: 7 additions & 5 deletions skl/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ func (s *Arena) Reset() {
// size of val. We could also store this size inside arena but the encoding and
// decoding will incur some overhead.
func (s *Arena) PutVal(v y.ValueStruct) uint32 {
l := uint32(len(v.Value)) + 3
l := uint32(len(v.Value)) + 4
n := atomic.AddUint32(&s.n, l)
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))
m := n - l
s.buf[m] = v.Meta
binary.BigEndian.PutUint16(s.buf[m+1:m+3], v.CASCounter)
copy(s.buf[m+3:n], v.Value)
s.buf[m+1] = v.UserMeta
binary.BigEndian.PutUint16(s.buf[m+2:m+4], v.CASCounter)
copy(s.buf[m+4:n], v.Value)
return m
}

Expand All @@ -82,9 +83,10 @@ func (s *Arena) GetKey(offset uint32, size uint16) []byte {
// size and should NOT include the meta byte.
func (s *Arena) GetVal(offset uint32, size uint16) y.ValueStruct {
out := y.ValueStruct{
Value: s.buf[offset+3 : offset+3+uint32(size)],
Value: s.buf[offset+4 : offset+4+uint32(size)],
Meta: s.buf[offset],
CASCounter: binary.BigEndian.Uint16(s.buf[offset+1 : offset+3]),
UserMeta: s.buf[offset+1],
CASCounter: binary.BigEndian.Uint16(s.buf[offset+2 : offset+4]),
}
return out
}
Loading