diff --git a/db_test.go b/db_test.go index 3912f146b..1b705785b 100644 --- a/db_test.go +++ b/db_test.go @@ -71,513 +71,435 @@ func txnDelete(t *testing.T, kv *DB, key []byte) { require.NoError(t, txn.Commit(nil)) } -func TestWrite(t *testing.T) { +// Opens a badger db and runs a a test on it. +func runBadgerTest(t *testing.T, opts *Options, test func(t *testing.T, db *DB)) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() - - for i := 0; i < 100; i++ { - txnSet(t, kv, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i)), 0x00) + if opts == nil { + opts = new(Options) + *opts = getTestOptions(dir) } -} - -func TestUpdateAndView(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - db, err := Open(getTestOptions(dir)) + db, err := Open(*opts) require.NoError(t, err) defer db.Close() + test(t, db) +} - err = db.Update(func(txn *Txn) error { - for i := 0; i < 10; i++ { - err := txn.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i))) - if err != nil { - return err - } +func TestWrite(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + for i := 0; i < 100; i++ { + txnSet(t, db, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i)), 0x00) } - return nil }) - require.NoError(t, err) +} - err = db.View(func(txn *Txn) error { - for i := 0; i < 10; i++ { - item, err := txn.Get([]byte(fmt.Sprintf("key%d", i))) - if err != nil { - return err +func TestUpdateAndView(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + err := db.Update(func(txn *Txn) error { + for i := 0; i < 10; i++ { + err := txn.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i))) + if err != nil { + return err + } } + return nil + }) + require.NoError(t, err) - val, err := item.Value() - if err != nil { - return err + err = db.View(func(txn *Txn) error { + for i := 0; i < 10; i++ { + item, err := txn.Get([]byte(fmt.Sprintf("key%d", i))) + if err != nil { + return err + } + + val, err := item.Value() + if err != nil { + return err + } + expected := []byte(fmt.Sprintf("val%d", i)) + require.Equal(t, expected, val, + "Invalid value for key %q. expected: %q, actual: %q", + item.Key(), expected, val) } - expected := []byte(fmt.Sprintf("val%d", i)) - require.Equal(t, expected, val, - "Invalid value for key %q. expected: %q, actual: %q", - item.Key(), expected, val) - } - return nil + return nil + }) + require.NoError(t, err) }) - require.NoError(t, err) } func TestConcurrentWrite(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, _ := Open(getTestOptions(dir)) - defer kv.Close() - - // Not a benchmark. Just a simple test for concurrent writes. - n := 20 - m := 500 - var wg sync.WaitGroup - for i := 0; i < n; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - for j := 0; j < m; j++ { - txnSet(t, kv, []byte(fmt.Sprintf("k%05d_%08d", i, j)), - []byte(fmt.Sprintf("v%05d_%08d", i, j)), byte(j%127)) - } - }(i) - } - wg.Wait() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Not a benchmark. Just a simple test for concurrent writes. + n := 20 + m := 500 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < m; j++ { + txnSet(t, db, []byte(fmt.Sprintf("k%05d_%08d", i, j)), + []byte(fmt.Sprintf("v%05d_%08d", i, j)), byte(j%127)) + } + }(i) + } + wg.Wait() - t.Log("Starting iteration") + t.Log("Starting iteration") - opt := IteratorOptions{} - opt.Reverse = false - opt.PrefetchSize = 10 - opt.PrefetchValues = true + opt := IteratorOptions{} + opt.Reverse = false + opt.PrefetchSize = 10 + opt.PrefetchValues = true - txn := kv.NewTransaction(true) - it := txn.NewIterator(opt) - defer it.Close() - var i, j int - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - k := item.Key() - if k == nil { - break // end of iteration. - } + txn := db.NewTransaction(true) + it := txn.NewIterator(opt) + defer it.Close() + var i, j int + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + k := item.Key() + if k == nil { + break // end of iteration. + } - require.EqualValues(t, fmt.Sprintf("k%05d_%08d", i, j), string(k)) - v := getItemValue(t, item) - require.EqualValues(t, fmt.Sprintf("v%05d_%08d", i, j), string(v)) - require.Equal(t, item.UserMeta(), byte(j%127)) - j++ - if j == m { - i++ - j = 0 + require.EqualValues(t, fmt.Sprintf("k%05d_%08d", i, j), string(k)) + v := getItemValue(t, item) + require.EqualValues(t, fmt.Sprintf("v%05d_%08d", i, j), string(v)) + require.Equal(t, item.UserMeta(), byte(j%127)) + j++ + if j == m { + i++ + j = 0 + } } - } - require.EqualValues(t, n, i) - require.EqualValues(t, 0, j) + require.EqualValues(t, n, i) + require.EqualValues(t, 0, j) + }) } func TestGet(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - if err != nil { - t.Error(err) - } - defer kv.Close() - txnSet(t, kv, []byte("key1"), []byte("val1"), 0x08) - - txn := kv.NewTransaction(false) - item, err := txn.Get([]byte("key1")) - require.NoError(t, err) - require.EqualValues(t, "val1", getItemValue(t, item)) - require.Equal(t, byte(0x08), item.UserMeta()) - txn.Discard() - - txnSet(t, kv, []byte("key1"), []byte("val2"), 0x09) + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + txnSet(t, db, []byte("key1"), []byte("val1"), 0x08) - txn = kv.NewTransaction(false) - item, err = txn.Get([]byte("key1")) - require.NoError(t, err) - require.EqualValues(t, "val2", getItemValue(t, item)) - require.Equal(t, byte(0x09), item.UserMeta()) - txn.Discard() + txn := db.NewTransaction(false) + item, err := txn.Get([]byte("key1")) + require.NoError(t, err) + require.EqualValues(t, "val1", getItemValue(t, item)) + require.Equal(t, byte(0x08), item.UserMeta()) + txn.Discard() - txnDelete(t, kv, []byte("key1")) + txnSet(t, db, []byte("key1"), []byte("val2"), 0x09) - txn = kv.NewTransaction(false) - _, err = txn.Get([]byte("key1")) - require.Equal(t, ErrKeyNotFound, err) - txn.Discard() + txn = db.NewTransaction(false) + item, err = txn.Get([]byte("key1")) + require.NoError(t, err) + require.EqualValues(t, "val2", getItemValue(t, item)) + require.Equal(t, byte(0x09), item.UserMeta()) + txn.Discard() - txnSet(t, kv, []byte("key1"), []byte("val3"), 0x01) + txnDelete(t, db, []byte("key1")) - txn = kv.NewTransaction(false) - item, err = txn.Get([]byte("key1")) - require.NoError(t, err) - require.EqualValues(t, "val3", getItemValue(t, item)) - require.Equal(t, byte(0x01), item.UserMeta()) + txn = db.NewTransaction(false) + _, err = txn.Get([]byte("key1")) + require.Equal(t, ErrKeyNotFound, err) + txn.Discard() - longVal := make([]byte, 1000) - txnSet(t, kv, []byte("key1"), longVal, 0x00) + txnSet(t, db, []byte("key1"), []byte("val3"), 0x01) - txn = kv.NewTransaction(false) - item, err = txn.Get([]byte("key1")) - require.NoError(t, err) - require.EqualValues(t, longVal, getItemValue(t, item)) - txn.Discard() -} + txn = db.NewTransaction(false) + item, err = txn.Get([]byte("key1")) + require.NoError(t, err) + require.EqualValues(t, "val3", getItemValue(t, item)) + require.Equal(t, byte(0x01), item.UserMeta()) -func TestGetAfterDelete(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - if err != nil { - t.Error(err) - } - defer kv.Close() + longVal := make([]byte, 1000) + txnSet(t, db, []byte("key1"), longVal, 0x00) - // populate with one entry - key := []byte("key") - txnSet(t, kv, key, []byte("val1"), 0x00) - require.NoError(t, kv.Update(func(txn *Txn) error { - err := txn.Delete(key) + txn = db.NewTransaction(false) + item, err = txn.Get([]byte("key1")) require.NoError(t, err) - - _, err = txn.Get(key) - require.Equal(t, ErrKeyNotFound, err) - return nil - })) + require.EqualValues(t, longVal, getItemValue(t, item)) + txn.Discard() + }) } -func TestExists(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - if err != nil { - t.Error(err) - } - defer kv.Close() - - // populate with one entry - txnSet(t, kv, []byte("key1"), []byte("val1"), 0x00) - - tt := []struct { - key []byte - exists bool - }{ - { - key: []byte("key1"), - exists: true, - }, - { - key: []byte("non-exits"), - exists: false, - }, - } +func TestGetAfterDelete(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // populate with one entry + key := []byte("key") + txnSet(t, db, key, []byte("val1"), 0x00) + require.NoError(t, db.Update(func(txn *Txn) error { + err := txn.Delete(key) + require.NoError(t, err) - for _, test := range tt { - require.NoError(t, kv.View(func(tx *Txn) error { - _, err := tx.Get(test.key) - if test.exists { - require.NoError(t, err) - return nil - } + _, err = txn.Get(key) require.Equal(t, ErrKeyNotFound, err) return nil })) - } - + }) } func TestTxnTooBig(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - if err != nil { - t.Error(err) - t.Fail() - } - defer kv.Close() - data := func(i int) []byte { - return []byte(fmt.Sprintf("%b", i)) - } - // n := 500000 - n := 1000 - txn := kv.NewTransaction(true) - for i := 0; i < n; { - if err := txn.Set(data(i), data(i)); err != nil { - require.NoError(t, txn.Commit(nil)) - txn = kv.NewTransaction(true) - } else { - i++ + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + data := func(i int) []byte { + return []byte(fmt.Sprintf("%b", i)) } - } - require.NoError(t, txn.Commit(nil)) + // n := 500000 + n := 1000 + txn := db.NewTransaction(true) + for i := 0; i < n; { + if err := txn.Set(data(i), data(i)); err != nil { + require.NoError(t, txn.Commit(nil)) + txn = db.NewTransaction(true) + } else { + i++ + } + } + require.NoError(t, txn.Commit(nil)) - txn = kv.NewTransaction(true) - for i := 0; i < n; { - if err := txn.Delete(data(i)); err != nil { - require.NoError(t, txn.Commit(nil)) - txn = kv.NewTransaction(true) - } else { - i++ + txn = db.NewTransaction(true) + for i := 0; i < n; { + if err := txn.Delete(data(i)); err != nil { + require.NoError(t, txn.Commit(nil)) + txn = db.NewTransaction(true) + } else { + i++ + } } - } - require.NoError(t, txn.Commit(nil)) + require.NoError(t, txn.Commit(nil)) + }) } // Put a lot of data to move some data to disk. // WARNING: This test might take a while but it should pass! func TestGetMore(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - if err != nil { - t.Error(err) - t.Fail() - } - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { - data := func(i int) []byte { - return []byte(fmt.Sprintf("%b", i)) - } - // n := 500000 - n := 10000 - m := 45 // Increasing would cause ErrTxnTooBig - for i := 0; i < n; i += m { - txn := kv.NewTransaction(true) - for j := i; j < i+m && j < n; j++ { - require.NoError(t, txn.Set(data(j), data(j))) + data := func(i int) []byte { + return []byte(fmt.Sprintf("%b", i)) } - require.NoError(t, txn.Commit(nil)) - } - require.NoError(t, kv.validate()) - - for i := 0; i < n; i++ { - txn := kv.NewTransaction(false) - item, err := txn.Get(data(i)) - if err != nil { - t.Error(err) + // n := 500000 + n := 10000 + m := 45 // Increasing would cause ErrTxnTooBig + for i := 0; i < n; i += m { + txn := db.NewTransaction(true) + for j := i; j < i+m && j < n; j++ { + require.NoError(t, txn.Set(data(j), data(j))) + } + require.NoError(t, txn.Commit(nil)) } - require.EqualValues(t, string(data(i)), string(getItemValue(t, item))) - txn.Discard() - } + require.NoError(t, db.validate()) - // Overwrite - for i := 0; i < n; i += m { - txn := kv.NewTransaction(true) - for j := i; j < i+m && j < n; j++ { - require.NoError(t, txn.Set(data(j), - // Use a long value that will certainly exceed value threshold. - []byte(fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", j)))) + for i := 0; i < n; i++ { + txn := db.NewTransaction(false) + item, err := txn.Get(data(i)) + if err != nil { + t.Error(err) + } + require.EqualValues(t, string(data(i)), string(getItemValue(t, item))) + txn.Discard() } - require.NoError(t, txn.Commit(nil)) - } - require.NoError(t, kv.validate()) - for i := 0; i < n; i++ { - expectedValue := fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", i) - k := data(i) - txn := kv.NewTransaction(false) - item, err := txn.Get(k) - if err != nil { - t.Error(err) + // Overwrite + for i := 0; i < n; i += m { + txn := db.NewTransaction(true) + for j := i; j < i+m && j < n; j++ { + require.NoError(t, txn.Set(data(j), + // Use a long value that will certainly exceed value threshold. + []byte(fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", j)))) + } + require.NoError(t, txn.Commit(nil)) } - got := string(getItemValue(t, item)) - if expectedValue != got { + require.NoError(t, db.validate()) - vs, err := kv.get(y.KeyWithTs(k, math.MaxUint64)) - require.NoError(t, err) - fmt.Printf("wanted=%q Item: %s\n", k, item.ToString()) - fmt.Printf("on re-run, got version: %+v\n", vs) - - txn := kv.NewTransaction(false) - itr := txn.NewIterator(DefaultIteratorOptions) - for itr.Seek(k); itr.Valid(); itr.Next() { - item := itr.Item() - fmt.Printf("item=%s\n", item.ToString()) - if !bytes.Equal(item.Key(), k) { - break + for i := 0; i < n; i++ { + expectedValue := fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", i) + k := data(i) + txn := db.NewTransaction(false) + item, err := txn.Get(k) + if err != nil { + t.Error(err) + } + got := string(getItemValue(t, item)) + if expectedValue != got { + + vs, err := db.get(y.KeyWithTs(k, math.MaxUint64)) + require.NoError(t, err) + fmt.Printf("wanted=%q Item: %s\n", k, item.ToString()) + fmt.Printf("on re-run, got version: %+v\n", vs) + + txn := db.NewTransaction(false) + itr := txn.NewIterator(DefaultIteratorOptions) + for itr.Seek(k); itr.Valid(); itr.Next() { + item := itr.Item() + fmt.Printf("item=%s\n", item.ToString()) + if !bytes.Equal(item.Key(), k) { + break + } } + itr.Close() + txn.Discard() } - itr.Close() + require.EqualValues(t, expectedValue, string(getItemValue(t, item)), "wanted=%q Item: %s\n", k, item.ToString()) txn.Discard() } - require.EqualValues(t, expectedValue, string(getItemValue(t, item)), "wanted=%q Item: %s\n", k, item.ToString()) - txn.Discard() - } - // "Delete" key. - for i := 0; i < n; i += m { - if (i % 10000) == 0 { - fmt.Printf("Deleting i=%d\n", i) - } - txn := kv.NewTransaction(true) - for j := i; j < i+m && j < n; j++ { - require.NoError(t, txn.Delete(data(j))) + // "Delete" key. + for i := 0; i < n; i += m { + if (i % 10000) == 0 { + fmt.Printf("Deleting i=%d\n", i) + } + txn := db.NewTransaction(true) + for j := i; j < i+m && j < n; j++ { + require.NoError(t, txn.Delete(data(j))) + } + require.NoError(t, txn.Commit(nil)) } - require.NoError(t, txn.Commit(nil)) - } - kv.validate() - for i := 0; i < n; i++ { - if (i % 10000) == 0 { - // Display some progress. Right now, it's not very fast with no caching. - fmt.Printf("Testing i=%d\n", i) + db.validate() + for i := 0; i < n; i++ { + if (i % 10000) == 0 { + // Display some progress. Right now, it's not very fast with no caching. + fmt.Printf("Testing i=%d\n", i) + } + k := data(i) + txn := db.NewTransaction(false) + _, err := txn.Get([]byte(k)) + require.Equal(t, ErrKeyNotFound, err, "should not have found k: %q", k) + txn.Discard() } - k := data(i) - txn := kv.NewTransaction(false) - _, err := txn.Get([]byte(k)) - require.Equal(t, ErrKeyNotFound, err, "should not have found k: %q", k) - txn.Discard() - } + }) } // Put a lot of data to move some data to disk. // WARNING: This test might take a while but it should pass! func TestExistsMore(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - if err != nil { - t.Error(err) - t.Fail() - } - defer kv.Close() - - // n := 500000 - n := 10000 - m := 45 - for i := 0; i < n; i += m { - if (i % 1000) == 0 { - t.Logf("Putting i=%d\n", i) - } - txn := kv.NewTransaction(true) - for j := i; j < i+m && j < n; j++ { - require.NoError(t, txn.Set([]byte(fmt.Sprintf("%09d", j)), - []byte(fmt.Sprintf("%09d", j)))) + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // n := 500000 + n := 10000 + m := 45 + for i := 0; i < n; i += m { + if (i % 1000) == 0 { + t.Logf("Putting i=%d\n", i) + } + txn := db.NewTransaction(true) + for j := i; j < i+m && j < n; j++ { + require.NoError(t, txn.Set([]byte(fmt.Sprintf("%09d", j)), + []byte(fmt.Sprintf("%09d", j)))) + } + require.NoError(t, txn.Commit(nil)) } - require.NoError(t, txn.Commit(nil)) - } - kv.validate() + db.validate() - for i := 0; i < n; i++ { - if (i % 1000) == 0 { - fmt.Printf("Testing i=%d\n", i) + for i := 0; i < n; i++ { + if (i % 1000) == 0 { + fmt.Printf("Testing i=%d\n", i) + } + k := fmt.Sprintf("%09d", i) + require.NoError(t, db.View(func(txn *Txn) error { + _, err := txn.Get([]byte(k)) + require.NoError(t, err) + return nil + })) } - k := fmt.Sprintf("%09d", i) - require.NoError(t, kv.View(func(txn *Txn) error { - _, err := txn.Get([]byte(k)) - require.NoError(t, err) + require.NoError(t, db.View(func(txn *Txn) error { + _, err := txn.Get([]byte("non-exists")) + require.Error(t, err) return nil })) - } - require.NoError(t, kv.View(func(txn *Txn) error { - _, err := txn.Get([]byte("non-exists")) - require.Error(t, err) - return nil - })) - // "Delete" key. - for i := 0; i < n; i += m { - if (i % 1000) == 0 { - fmt.Printf("Deleting i=%d\n", i) - } - txn := kv.NewTransaction(true) - for j := i; j < i+m && j < n; j++ { - require.NoError(t, txn.Delete([]byte(fmt.Sprintf("%09d", j)))) - } - require.NoError(t, txn.Commit(nil)) - } - kv.validate() - for i := 0; i < n; i++ { - if (i % 10000) == 0 { - // Display some progress. Right now, it's not very fast with no caching. - fmt.Printf("Testing i=%d\n", i) + // "Delete" key. + for i := 0; i < n; i += m { + if (i % 1000) == 0 { + fmt.Printf("Deleting i=%d\n", i) + } + txn := db.NewTransaction(true) + for j := i; j < i+m && j < n; j++ { + require.NoError(t, txn.Delete([]byte(fmt.Sprintf("%09d", j)))) + } + require.NoError(t, txn.Commit(nil)) } - k := fmt.Sprintf("%09d", i) + db.validate() + for i := 0; i < n; i++ { + if (i % 10000) == 0 { + // Display some progress. Right now, it's not very fast with no caching. + fmt.Printf("Testing i=%d\n", i) + } + k := fmt.Sprintf("%09d", i) - require.NoError(t, kv.View(func(txn *Txn) error { - _, err := txn.Get([]byte(k)) - require.Error(t, err) - return nil - })) - } - fmt.Println("Done and closing") + require.NoError(t, db.View(func(txn *Txn) error { + _, err := txn.Get([]byte(k)) + require.Error(t, err) + return nil + })) + } + fmt.Println("Done and closing") + }) } func TestIterate2Basic(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, _ := Open(getTestOptions(dir)) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { - bkey := func(i int) []byte { - return []byte(fmt.Sprintf("%09d", i)) - } - bval := func(i int) []byte { - return []byte(fmt.Sprintf("%025d", i)) - } + bkey := func(i int) []byte { + return []byte(fmt.Sprintf("%09d", i)) + } + bval := func(i int) []byte { + return []byte(fmt.Sprintf("%025d", i)) + } - // n := 500000 - n := 10000 - for i := 0; i < n; i++ { - if (i % 1000) == 0 { - t.Logf("Put i=%d\n", i) + // n := 500000 + n := 10000 + for i := 0; i < n; i++ { + if (i % 1000) == 0 { + t.Logf("Put i=%d\n", i) + } + txnSet(t, db, bkey(i), bval(i), byte(i%127)) } - txnSet(t, kv, bkey(i), bval(i), byte(i%127)) - } - opt := IteratorOptions{} - opt.PrefetchValues = true - opt.PrefetchSize = 10 + opt := IteratorOptions{} + opt.PrefetchValues = true + opt.PrefetchSize = 10 - txn := kv.NewTransaction(false) - it := txn.NewIterator(opt) - { - var count int - rewind := true - t.Log("Starting first basic iteration") - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - key := item.Key() - if rewind && count == 5000 { - // Rewind would skip /head/ key, and it.Next() would skip 0. - count = 1 - it.Rewind() - t.Log("Rewinding from 5000 to zero.") - rewind = false - continue + txn := db.NewTransaction(false) + it := txn.NewIterator(opt) + { + var count int + rewind := true + t.Log("Starting first basic iteration") + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + if rewind && count == 5000 { + // Rewind would skip /head/ key, and it.Next() would skip 0. + count = 1 + it.Rewind() + t.Log("Rewinding from 5000 to zero.") + rewind = false + continue + } + require.EqualValues(t, bkey(count), string(key)) + val := getItemValue(t, item) + require.EqualValues(t, bval(count), string(val)) + require.Equal(t, byte(count%127), item.UserMeta()) + count++ } - require.EqualValues(t, bkey(count), string(key)) - val := getItemValue(t, item) - require.EqualValues(t, bval(count), string(val)) - require.Equal(t, byte(count%127), item.UserMeta()) - count++ + require.EqualValues(t, n, count) } - require.EqualValues(t, n, count) - } - { - t.Log("Starting second basic iteration") - idx := 5030 - for it.Seek(bkey(idx)); it.Valid(); it.Next() { - item := it.Item() - require.EqualValues(t, bkey(idx), string(item.Key())) - require.EqualValues(t, bval(idx), string(getItemValue(t, item))) - idx++ + { + t.Log("Starting second basic iteration") + idx := 5030 + for it.Seek(bkey(idx)); it.Valid(); it.Next() { + item := it.Item() + require.EqualValues(t, bkey(idx), string(item.Key())) + require.EqualValues(t, bval(idx), string(getItemValue(t, item))) + idx++ + } } - } - it.Close() + it.Close() + }) } func TestLoad(t *testing.T) { @@ -634,62 +556,53 @@ func TestLoad(t *testing.T) { } func TestIterateDeleted(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - - opt := DefaultOptions - opt.SyncWrites = true - opt.Dir = dir - opt.ValueDir = dir - ps, err := Open(opt) - require.NoError(t, err) - defer ps.Close() - txnSet(t, ps, []byte("Key1"), []byte("Value1"), 0x00) - txnSet(t, ps, []byte("Key2"), []byte("Value2"), 0x00) - - iterOpt := DefaultIteratorOptions - iterOpt.PrefetchValues = false - txn := ps.NewTransaction(false) - idxIt := txn.NewIterator(iterOpt) - defer idxIt.Close() - - count := 0 - txn2 := ps.NewTransaction(true) - prefix := []byte("Key") - for idxIt.Seek(prefix); idxIt.ValidForPrefix(prefix); idxIt.Next() { - key := idxIt.Item().Key() - count++ - newKey := make([]byte, len(key)) - copy(newKey, key) - require.NoError(t, txn2.Delete(newKey)) - } - require.Equal(t, 2, count) - require.NoError(t, txn2.Commit(nil)) - - for _, prefetch := range [...]bool{true, false} { - t.Run(fmt.Sprintf("Prefetch=%t", prefetch), func(t *testing.T) { - txn := ps.NewTransaction(false) - iterOpt = DefaultIteratorOptions - iterOpt.PrefetchValues = prefetch - idxIt = txn.NewIterator(iterOpt) - - var estSize int64 - var idxKeys []string - for idxIt.Seek(prefix); idxIt.Valid(); idxIt.Next() { - item := idxIt.Item() - key := item.Key() - estSize += item.EstimatedSize() - if !bytes.HasPrefix(key, prefix) { - break + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + txnSet(t, db, []byte("Key1"), []byte("Value1"), 0x00) + txnSet(t, db, []byte("Key2"), []byte("Value2"), 0x00) + + iterOpt := DefaultIteratorOptions + iterOpt.PrefetchValues = false + txn := db.NewTransaction(false) + idxIt := txn.NewIterator(iterOpt) + defer idxIt.Close() + + count := 0 + txn2 := db.NewTransaction(true) + prefix := []byte("Key") + for idxIt.Seek(prefix); idxIt.ValidForPrefix(prefix); idxIt.Next() { + key := idxIt.Item().Key() + count++ + newKey := make([]byte, len(key)) + copy(newKey, key) + require.NoError(t, txn2.Delete(newKey)) + } + require.Equal(t, 2, count) + require.NoError(t, txn2.Commit(nil)) + + for _, prefetch := range [...]bool{true, false} { + t.Run(fmt.Sprintf("Prefetch=%t", prefetch), func(t *testing.T) { + txn := db.NewTransaction(false) + iterOpt = DefaultIteratorOptions + iterOpt.PrefetchValues = prefetch + idxIt = txn.NewIterator(iterOpt) + + var estSize int64 + var idxKeys []string + for idxIt.Seek(prefix); idxIt.Valid(); idxIt.Next() { + item := idxIt.Item() + key := item.Key() + estSize += item.EstimatedSize() + if !bytes.HasPrefix(key, prefix) { + break + } + idxKeys = append(idxKeys, string(key)) + t.Logf("%+v\n", idxIt.Item()) } - idxKeys = append(idxKeys, string(key)) - t.Logf("%+v\n", idxIt.Item()) - } - require.Equal(t, 0, len(idxKeys)) - require.Equal(t, int64(0), estSize) - }) - } + require.Equal(t, 0, len(idxKeys)) + require.Equal(t, int64(0), estSize) + }) + } + }) } func TestDeleteWithoutSyncWrite(t *testing.T) { @@ -727,91 +640,77 @@ func TestDeleteWithoutSyncWrite(t *testing.T) { } func TestPidFile(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - options := getTestOptions(dir) - kv1, err := Open(options) - require.NoError(t, err) - defer kv1.Close() - _, err = Open(options) - require.Error(t, err) - require.Contains(t, err.Error(), "Another process is using this Badger database") + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Reopen database + _, err := Open(getTestOptions(db.opt.Dir)) + require.Error(t, err) + require.Contains(t, err.Error(), "Another process is using this Badger database") + }) } func TestBigKeyValuePairs(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + bigK := make([]byte, maxKeySize+1) + bigV := make([]byte, db.opt.ValueLogFileSize+1) + small := make([]byte, 10) - opt := getTestOptions(dir) - kv, err := Open(opt) - require.NoError(t, err) + txn := db.NewTransaction(true) + require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, small)) + require.Regexp(t, regexp.MustCompile("Value.*exceeded"), txn.Set(small, bigV)) - bigK := make([]byte, maxKeySize+1) - bigV := make([]byte, opt.ValueLogFileSize+1) - small := make([]byte, 10) + require.NoError(t, txn.Set(small, small)) + require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, bigV)) - txn := kv.NewTransaction(true) - require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, small)) - require.Regexp(t, regexp.MustCompile("Value.*exceeded"), txn.Set(small, bigV)) - - require.NoError(t, txn.Set(small, small)) - require.Regexp(t, regexp.MustCompile("Key.*exceeded"), txn.Set(bigK, bigV)) - - require.NoError(t, kv.View(func(txn *Txn) error { - _, err := txn.Get(small) - require.Equal(t, ErrKeyNotFound, err) - return nil - })) - require.NoError(t, kv.Close()) + require.NoError(t, db.View(func(txn *Txn) error { + _, err := txn.Get(small) + require.Equal(t, ErrKeyNotFound, err) + return nil + })) + }) } func TestIteratorPrefetchSize(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, _ := Open(getTestOptions(dir)) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { - bkey := func(i int) []byte { - return []byte(fmt.Sprintf("%09d", i)) - } - bval := func(i int) []byte { - return []byte(fmt.Sprintf("%025d", i)) - } - - n := 100 - for i := 0; i < n; i++ { - // if (i % 10) == 0 { - // t.Logf("Put i=%d\n", i) - // } - txnSet(t, kv, bkey(i), bval(i), byte(i%127)) - } + bkey := func(i int) []byte { + return []byte(fmt.Sprintf("%09d", i)) + } + bval := func(i int) []byte { + return []byte(fmt.Sprintf("%025d", i)) + } - getIteratorCount := func(prefetchSize int) int { - opt := IteratorOptions{} - opt.PrefetchValues = true - opt.PrefetchSize = prefetchSize + n := 100 + for i := 0; i < n; i++ { + // if (i % 10) == 0 { + // t.Logf("Put i=%d\n", i) + // } + txnSet(t, db, bkey(i), bval(i), byte(i%127)) + } - var count int - txn := kv.NewTransaction(false) - it := txn.NewIterator(opt) - { - t.Log("Starting first basic iteration") - for it.Rewind(); it.Valid(); it.Next() { - count++ + getIteratorCount := func(prefetchSize int) int { + opt := IteratorOptions{} + opt.PrefetchValues = true + opt.PrefetchSize = prefetchSize + + var count int + txn := db.NewTransaction(false) + it := txn.NewIterator(opt) + { + t.Log("Starting first basic iteration") + for it.Rewind(); it.Valid(); it.Next() { + count++ + } + require.EqualValues(t, n, count) } - require.EqualValues(t, n, count) + return count } - return count - } - var sizes = []int{-10, 0, 1, 10} - for _, size := range sizes { - c := getIteratorCount(size) - require.Equal(t, 100, c) - } + var sizes = []int{-10, 0, 1, 10} + for _, size := range sizes { + c := getIteratorCount(size) + require.Equal(t, 100, c) + } + }) } func TestSetIfAbsentAsync(t *testing.T) { @@ -858,215 +757,327 @@ func TestSetIfAbsentAsync(t *testing.T) { } func TestGetSetRace(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, _ := Open(getTestOptions(dir)) + runBadgerTest(t, nil, func(t *testing.T, db *DB) { - data := make([]byte, 4096) - _, err = rand.Read(data) - require.NoError(t, err) + data := make([]byte, 4096) + _, err := rand.Read(data) + require.NoError(t, err) - var ( - numOp = 100 - wg sync.WaitGroup - keyCh = make(chan string) - ) - - // writer - wg.Add(1) - go func() { - defer func() { - wg.Done() - close(keyCh) + var ( + numOp = 100 + wg sync.WaitGroup + keyCh = make(chan string) + ) + + // writer + wg.Add(1) + go func() { + defer func() { + wg.Done() + close(keyCh) + }() + + for i := 0; i < numOp; i++ { + key := fmt.Sprintf("%d", i) + txnSet(t, db, []byte(key), data, 0x00) + keyCh <- key + } }() - for i := 0; i < numOp; i++ { - key := fmt.Sprintf("%d", i) - txnSet(t, kv, []byte(key), data, 0x00) - keyCh <- key - } - }() + // reader + wg.Add(1) + go func() { + defer wg.Done() + + for key := range keyCh { + require.NoError(t, db.View(func(txn *Txn) error { + item, err := txn.Get([]byte(key)) + require.NoError(t, err) + _, err = item.Value() + require.NoError(t, err) + return nil + })) + } + }() - // reader - wg.Add(1) - go func() { - defer wg.Done() + wg.Wait() + }) +} - for key := range keyCh { - require.NoError(t, kv.View(func(txn *Txn) error { - item, err := txn.Get([]byte(key)) - require.NoError(t, err) - _, err = item.Value() - require.NoError(t, err) - return nil - })) +func TestPurgeVersionsBelow(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Write 4 versions of the same key + for i := 0; i < 4; i++ { + err := db.Update(func(txn *Txn) error { + return txn.Set([]byte("answer"), []byte(fmt.Sprintf("%25d", i))) + }) + require.NoError(t, err) } - }() - wg.Wait() + opts := DefaultIteratorOptions + opts.AllVersions = true + opts.PrefetchValues = false + + // Verify that there are 4 versions, and record 3rd version (2nd from top in iteration) + var ts uint64 + db.View(func(txn *Txn) error { + it := txn.NewIterator(opts) + var count int + for it.Rewind(); it.Valid(); it.Next() { + count++ + item := it.Item() + if count == 2 { + ts = item.Version() + } + require.Equal(t, []byte("answer"), item.Key()) + } + require.Equal(t, 4, count) + return nil + }) + + // Delete all versions below the 3rd version + err := db.PurgeVersionsBelow([]byte("answer"), ts) + require.NoError(t, err) + require.NotEmpty(t, db.vlog.lfDiscardStats.m) + + // Verify that there are only 2 versions left + db.View(func(txn *Txn) error { + it := txn.NewIterator(opts) + var count int + for it.Rewind(); it.Valid(); it.Next() { + count++ + item := it.Item() + require.True(t, item.Version() >= ts, + "item version: %d older than ts: %d", + item.Version(), ts) + require.Equal(t, []byte("answer"), item.Key()) + } + require.Equal(t, 2, count) + return nil + }) + }) } -func TestPurgeVersionsBelow(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - db, err := Open(getTestOptions(dir)) - require.NoError(t, err) +func TestPurgeOlderVersions(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Write two versions of a key + err := db.Update(func(txn *Txn) error { + return txn.Set([]byte("answer"), []byte("42")) + }) + require.NoError(t, err) - // Write 4 versions of the same key - for i := 0; i < 4; i++ { err = db.Update(func(txn *Txn) error { - return txn.Set([]byte("answer"), []byte(fmt.Sprintf("%25d", i))) + return txn.Set([]byte("answer"), []byte("43")) }) require.NoError(t, err) - } - opts := DefaultIteratorOptions - opts.AllVersions = true - opts.PrefetchValues = false + opts := DefaultIteratorOptions + opts.AllVersions = true + opts.PrefetchValues = false - // Verify that there are 4 versions, and record 3rd version (2nd from top in iteration) - var ts uint64 - db.View(func(txn *Txn) error { - it := txn.NewIterator(opts) - var count int - for it.Rewind(); it.Valid(); it.Next() { - count++ - item := it.Item() - if count == 2 { - ts = item.Version() + // Verify that two versions are found during iteration + err = db.View(func(txn *Txn) error { + it := txn.NewIterator(opts) + var count int + for it.Rewind(); it.Valid(); it.Next() { + count++ + item := it.Item() + require.Equal(t, []byte("answer"), item.Key()) } - require.Equal(t, []byte("answer"), item.Key()) - } - require.Equal(t, 4, count) - return nil + require.Equal(t, 2, count) + return nil + }) + require.NoError(t, err) + + // Invoke DeleteOlderVersions() to delete older version + err = db.PurgeOlderVersions() + require.NoError(t, err) + + // Verify that only one version is found + err = db.View(func(txn *Txn) error { + it := txn.NewIterator(opts) + var count int + for it.Rewind(); it.Valid(); it.Next() { + count++ + item := it.Item() + require.Equal(t, []byte("answer"), item.Key()) + val, err := item.Value() + require.NoError(t, err) + t.Logf("Item value is %q", val) + //require.Equal(t, []byte("43"), val) + } + require.Equal(t, 1, count) + return nil + }) + require.NoError(t, err) }) +} - // Delete all versions below the 3rd version - err = db.PurgeVersionsBelow([]byte("answer"), ts) - require.NoError(t, err) - require.NotEmpty(t, db.vlog.lfDiscardStats.m) +func TestExpiry(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Write two keys, one with a TTL + err := db.Update(func(txn *Txn) error { + return txn.Set([]byte("answer1"), []byte("42")) + }) + require.NoError(t, err) - // Verify that there are only 2 versions left - db.View(func(txn *Txn) error { - it := txn.NewIterator(opts) - var count int - for it.Rewind(); it.Valid(); it.Next() { - count++ - item := it.Item() - require.True(t, item.Version() >= ts, - "item version: %d older than ts: %d", - item.Version(), ts) - require.Equal(t, []byte("answer"), item.Key()) - } - require.Equal(t, 2, count) - return nil + err = db.Update(func(txn *Txn) error { + return txn.SetWithTTL([]byte("answer2"), []byte("43"), 1*time.Second) + }) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + // Verify that only unexpired key is found during iteration + err = db.View(func(txn *Txn) error { + _, err := txn.Get([]byte("answer1")) + require.NoError(t, err) + + _, err = txn.Get([]byte("answer2")) + require.Error(t, ErrKeyNotFound, err) + return nil + }) + require.NoError(t, err) + + // Verify that only one key is found during iteration + opts := DefaultIteratorOptions + opts.PrefetchValues = false + err = db.View(func(txn *Txn) error { + it := txn.NewIterator(opts) + var count int + for it.Rewind(); it.Valid(); it.Next() { + count++ + item := it.Item() + require.Equal(t, []byte("answer1"), item.Key()) + } + require.Equal(t, 1, count) + return nil + }) + require.NoError(t, err) }) } -func TestPurgeOlderVersions(t *testing.T) { +func randBytes(n int) []byte { + recv := make([]byte, n) + in, err := rand.Read(recv) + if err != nil { + log.Fatal(err) + } + return recv[:in] +} + +var benchmarkData = []struct { + key, value []byte +}{ + {randBytes(100), nil}, + {randBytes(1000), []byte("foo")}, + {[]byte("foo"), randBytes(1000)}, + {[]byte(""), randBytes(1000)}, + {nil, randBytes(1000000)}, + {randBytes(100000), nil}, + {randBytes(1000000), nil}, +} + +func TestLargeKeys(t *testing.T) { dir, err := ioutil.TempDir("", "badger") require.NoError(t, err) defer os.RemoveAll(dir) - db, err := Open(getTestOptions(dir)) - require.NoError(t, err) - - // Write two versions of a key - err = db.Update(func(txn *Txn) error { - return txn.Set([]byte("answer"), []byte("42")) - }) - require.NoError(t, err) - err = db.Update(func(txn *Txn) error { - return txn.Set([]byte("answer"), []byte("43")) - }) - require.NoError(t, err) + opts := new(Options) + *opts = DefaultOptions + opts.ValueLogFileSize = 1024 * 1024 * 1024 + opts.Dir = dir + opts.ValueDir = dir - opts := DefaultIteratorOptions - opts.AllVersions = true - opts.PrefetchValues = false + db, err := Open(*opts) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 1000; i++ { + tx := db.NewTransaction(true) + for _, kv := range benchmarkData { + k := make([]byte, len(kv.key)) + copy(k, kv.key) - // Verify that two versions are found during iteration - err = db.View(func(txn *Txn) error { - it := txn.NewIterator(opts) - var count int - for it.Rewind(); it.Valid(); it.Next() { - count++ - item := it.Item() - require.Equal(t, []byte("answer"), item.Key()) + v := make([]byte, len(kv.value)) + copy(v, kv.value) + if err := tx.Set(k, v); err != nil { + // Skip over this record. + } } - require.Equal(t, 2, count) - return nil - }) - require.NoError(t, err) + if err := tx.Commit(nil); err != nil { + t.Fatalf("#%d: batchSet err: %v", i, err) + } + } +} - // Invoke DeleteOlderVersions() to delete older version - err = db.PurgeOlderVersions() +func TestCreateDirs(t *testing.T) { + dir, err := ioutil.TempDir("", "parent") require.NoError(t, err) + defer os.RemoveAll(dir) - // Verify that only one version is found - err = db.View(func(txn *Txn) error { - it := txn.NewIterator(opts) - var count int - for it.Rewind(); it.Valid(); it.Next() { - count++ - item := it.Item() - require.Equal(t, []byte("answer"), item.Key()) - val, err := item.Value() - require.NoError(t, err) - t.Logf("Item value is %q", val) - //require.Equal(t, []byte("43"), val) - } - require.Equal(t, 1, count) - return nil - }) + opts := DefaultOptions + dir = filepath.Join(dir, "badger") + opts.Dir = dir + opts.ValueDir = dir + db, err := Open(opts) + require.NoError(t, err) + db.Close() + _, err = os.Stat(dir) require.NoError(t, err) } - -func TestExpiry(t *testing.T) { +func TestWriteDeadlock(t *testing.T) { dir, err := ioutil.TempDir("", "badger") + fmt.Println(dir) require.NoError(t, err) defer os.RemoveAll(dir) - db, err := Open(getTestOptions(dir)) - require.NoError(t, err) - - // Write two keys, one with a TTL - err = db.Update(func(txn *Txn) error { - return txn.Set([]byte("answer1"), []byte("42")) - }) - require.NoError(t, err) - err = db.Update(func(txn *Txn) error { - return txn.SetWithTTL([]byte("answer2"), []byte("43"), 1*time.Second) - }) + opt := DefaultOptions + opt.Dir = dir + opt.ValueDir = dir + opt.ValueLogFileSize = 10 << 20 + db, err := Open(opt) require.NoError(t, err) - time.Sleep(2 * time.Second) - - // Verify that only unexpired key is found during iteration - err = db.View(func(txn *Txn) error { - _, err := txn.Get([]byte("answer1")) - require.NoError(t, err) + print := func(count *int) { + *count++ + if *count%100 == 0 { + fmt.Printf("%05d\r", *count) + } + } - _, err = txn.Get([]byte("answer2")) - require.Error(t, ErrKeyNotFound, err) + var count int + val := make([]byte, 10000) + require.NoError(t, db.Update(func(txn *Txn) error { + for i := 0; i < 1500; i++ { + key := fmt.Sprintf("%d", i) + rand.Read(val) + require.NoError(t, txn.Set([]byte(key), val)) + print(&count) + } return nil - }) - require.NoError(t, err) + })) - // Verify that only one key is found during iteration - opts := DefaultIteratorOptions - opts.PrefetchValues = false - err = db.View(func(txn *Txn) error { - it := txn.NewIterator(opts) - var count int + count = 0 + fmt.Println("\nWrites done. Iteration and updates starting...") + err = db.Update(func(txn *Txn) error { + opt := DefaultIteratorOptions + opt.PrefetchValues = false + it := txn.NewIterator(opt) for it.Rewind(); it.Valid(); it.Next() { - count++ item := it.Item() - require.Equal(t, []byte("answer1"), item.Key()) + + // Using Value() would cause deadlock. + // item.Value() + out, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, len(val), len(out)) + + key := y.Copy(item.Key()) + rand.Read(val) + require.NoError(t, txn.Set(key, val)) + print(&count) } - require.Equal(t, 1, count) return nil }) require.NoError(t, err) @@ -1189,130 +1200,3 @@ func ExampleTxn_NewIterator() { // Output: // Counted 1000 elements } - -func randBytes(n int) []byte { - recv := make([]byte, n) - in, err := rand.Read(recv) - if err != nil { - log.Fatal(err) - } - return recv[:in] -} - -var benchmarkData = []struct { - key, value []byte -}{ - {randBytes(100), nil}, - {randBytes(1000), []byte("foo")}, - {[]byte("foo"), randBytes(1000)}, - {[]byte(""), randBytes(1000)}, - {nil, randBytes(1000000)}, - {randBytes(100000), nil}, - {randBytes(1000000), nil}, -} - -func TestLargeKeys(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - - opts := new(Options) - *opts = DefaultOptions - opts.ValueLogFileSize = 1024 * 1024 * 1024 - opts.Dir = dir - opts.ValueDir = dir - - db, err := Open(*opts) - if err != nil { - t.Fatal(err) - } - for i := 0; i < 1000; i++ { - tx := db.NewTransaction(true) - for _, kv := range benchmarkData { - k := make([]byte, len(kv.key)) - copy(k, kv.key) - - v := make([]byte, len(kv.value)) - copy(v, kv.value) - if err := tx.Set(k, v); err != nil { - // Skip over this record. - } - } - if err := tx.Commit(nil); err != nil { - t.Fatalf("#%d: batchSet err: %v", i, err) - } - } -} - -func TestCreateDirs(t *testing.T) { - dir, err := ioutil.TempDir("", "parent") - require.NoError(t, err) - defer os.RemoveAll(dir) - - opts := DefaultOptions - dir = filepath.Join(dir, "badger") - opts.Dir = dir - opts.ValueDir = dir - db, err := Open(opts) - require.NoError(t, err) - db.Close() - _, err = os.Stat(dir) - require.NoError(t, err) -} - -func TestWriteDeadlock(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - fmt.Println(dir) - require.NoError(t, err) - defer os.RemoveAll(dir) - - opt := DefaultOptions - opt.Dir = dir - opt.ValueDir = dir - opt.ValueLogFileSize = 10 << 20 - db, err := Open(opt) - require.NoError(t, err) - - print := func(count *int) { - *count++ - if *count%100 == 0 { - fmt.Printf("%05d\r", *count) - } - } - - var count int - val := make([]byte, 10000) - require.NoError(t, db.Update(func(txn *Txn) error { - for i := 0; i < 1500; i++ { - key := fmt.Sprintf("%d", i) - rand.Read(val) - require.NoError(t, txn.Set([]byte(key), val)) - print(&count) - } - return nil - })) - - count = 0 - fmt.Println("\nWrites done. Iteration and updates starting...") - err = db.Update(func(txn *Txn) error { - opt := DefaultIteratorOptions - opt.PrefetchValues = false - it := txn.NewIterator(opt) - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - - // Using Value() would cause deadlock. - // item.Value() - out, err := item.ValueCopy(nil) - require.NoError(t, err) - require.Equal(t, len(val), len(out)) - - key := y.Copy(item.Key()) - rand.Read(val) - require.NoError(t, txn.Set(key, val)) - print(&count) - } - return nil - }) - require.NoError(t, err) -} diff --git a/transaction_test.go b/transaction_test.go index f408757a2..fe1566ce9 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -34,269 +34,249 @@ import ( ) func TestTxnSimple(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { - txn := kv.NewTransaction(true) + txn := db.NewTransaction(true) - for i := 0; i < 10; i++ { - k := []byte(fmt.Sprintf("key=%d", i)) - v := []byte(fmt.Sprintf("val=%d", i)) - txn.Set(k, v) - } + for i := 0; i < 10; i++ { + k := []byte(fmt.Sprintf("key=%d", i)) + v := []byte(fmt.Sprintf("val=%d", i)) + txn.Set(k, v) + } - item, err := txn.Get([]byte("key=8")) - require.NoError(t, err) + item, err := txn.Get([]byte("key=8")) + require.NoError(t, err) - val, err := item.Value() - require.NoError(t, err) - require.Equal(t, []byte("val=8"), val) + val, err := item.Value() + require.NoError(t, err) + require.Equal(t, []byte("val=8"), val) - require.Error(t, ErrManagedTxn, txn.CommitAt(100, nil)) - require.NoError(t, txn.Commit(nil)) + require.Error(t, ErrManagedTxn, txn.CommitAt(100, nil)) + require.NoError(t, txn.Commit(nil)) + }) } func TestTxnCommitAsync(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { - txn := kv.NewTransaction(true) - key := func(i int) []byte { - return []byte(fmt.Sprintf("key=%d", i)) - } - for i := 0; i < 40; i++ { - err := txn.Set(key(i), []byte(strconv.Itoa(100))) - require.NoError(t, err) - } - require.NoError(t, txn.Commit(nil)) - txn.Discard() - - var done uint64 - go func() { - // Keep checking balance variant - for atomic.LoadUint64(&done) == 0 { - txn := kv.NewTransaction(false) - totalBalance := 0 - for i := 0; i < 40; i++ { - item, err := txn.Get(key(i)) - require.NoError(t, err) - val, err := item.Value() - require.NoError(t, err) - bal, err := strconv.Atoi(string(val)) - require.NoError(t, err) - totalBalance += bal - } - require.Equal(t, totalBalance, 4000) - txn.Discard() + txn := db.NewTransaction(true) + key := func(i int) []byte { + return []byte(fmt.Sprintf("key=%d", i)) } - }() + for i := 0; i < 40; i++ { + err := txn.Set(key(i), []byte(strconv.Itoa(100))) + require.NoError(t, err) + } + require.NoError(t, txn.Commit(nil)) + txn.Discard() - var wg sync.WaitGroup - wg.Add(100) - for i := 0; i < 100; i++ { + var done uint64 go func() { - txn := kv.NewTransaction(true) - delta := rand.Intn(100) - for i := 0; i < 20; i++ { - err := txn.Set(key(i), []byte(strconv.Itoa(100-delta))) - require.NoError(t, err) - } - for i := 20; i < 40; i++ { - err := txn.Set(key(i), []byte(strconv.Itoa(100+delta))) - require.NoError(t, err) + // Keep checking balance variant + for atomic.LoadUint64(&done) == 0 { + txn := db.NewTransaction(false) + totalBalance := 0 + for i := 0; i < 40; i++ { + item, err := txn.Get(key(i)) + require.NoError(t, err) + val, err := item.Value() + require.NoError(t, err) + bal, err := strconv.Atoi(string(val)) + require.NoError(t, err) + totalBalance += bal + } + require.Equal(t, totalBalance, 4000) + txn.Discard() } - // We are only doing writes, so there won't be any conflicts. - require.NoError(t, txn.Commit(func(err error) {})) - txn.Discard() - wg.Done() }() - } - wg.Wait() - atomic.StoreUint64(&done, 1) - time.Sleep(time.Millisecond * 10) // allow goroutine to complete. + + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + txn := db.NewTransaction(true) + delta := rand.Intn(100) + for i := 0; i < 20; i++ { + err := txn.Set(key(i), []byte(strconv.Itoa(100-delta))) + require.NoError(t, err) + } + for i := 20; i < 40; i++ { + err := txn.Set(key(i), []byte(strconv.Itoa(100+delta))) + require.NoError(t, err) + } + // We are only doing writes, so there won't be any conflicts. + require.NoError(t, txn.Commit(func(err error) {})) + txn.Discard() + wg.Done() + }() + } + wg.Wait() + atomic.StoreUint64(&done, 1) + time.Sleep(time.Millisecond * 10) // allow goroutine to complete. + }) } func TestTxnVersions(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + k := []byte("key") + for i := 1; i < 10; i++ { + txn := db.NewTransaction(true) + + txn.Set(k, []byte(fmt.Sprintf("valversion=%d", i))) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(i), db.orc.readTs()) + } - k := []byte("key") - for i := 1; i < 10; i++ { - txn := kv.NewTransaction(true) + checkIterator := func(itr *Iterator, i int) { + count := 0 + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + require.Equal(t, k, item.Key()) - txn.Set(k, []byte(fmt.Sprintf("valversion=%d", i))) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(i), kv.orc.readTs()) - } + val, err := item.Value() + require.NoError(t, err) + exp := fmt.Sprintf("valversion=%d", i) + require.Equal(t, exp, string(val), "i=%d", i) + count++ + } + require.Equal(t, 1, count, "i=%d", i) // Should only loop once. + } + + checkAllVersions := func(itr *Iterator, i int) { + var version uint64 + if itr.opt.Reverse { + version = 1 + } else { + version = uint64(i) + } - checkIterator := func(itr *Iterator, i int) { - count := 0 - for itr.Rewind(); itr.Valid(); itr.Next() { - item := itr.Item() - require.Equal(t, k, item.Key()) + count := 0 + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + require.Equal(t, k, item.Key()) + require.Equal(t, version, item.Version()) - val, err := item.Value() - require.NoError(t, err) - exp := fmt.Sprintf("valversion=%d", i) - require.Equal(t, exp, string(val), "i=%d", i) - count++ + val, err := item.Value() + require.NoError(t, err) + exp := fmt.Sprintf("valversion=%d", version) + require.Equal(t, exp, string(val), "v=%d", version) + count++ + + if itr.opt.Reverse { + version++ + } else { + version-- + } + } + require.Equal(t, i, count, "i=%d", i) // Should loop as many times as i. } - require.Equal(t, 1, count, "i=%d", i) // Should only loop once. - } - checkAllVersions := func(itr *Iterator, i int) { - var version uint64 - if itr.opt.Reverse { - version = 1 - } else { - version = uint64(i) - } + for i := 1; i < 10; i++ { + txn := db.NewTransaction(true) + txn.readTs = uint64(i) // Read version at i. - count := 0 - for itr.Rewind(); itr.Valid(); itr.Next() { - item := itr.Item() - require.Equal(t, k, item.Key()) - require.Equal(t, version, item.Version()) + item, err := txn.Get(k) + require.NoError(t, err) val, err := item.Value() require.NoError(t, err) - exp := fmt.Sprintf("valversion=%d", version) - require.Equal(t, exp, string(val), "v=%d", version) - count++ + require.Equal(t, []byte(fmt.Sprintf("valversion=%d", i)), val, + "Expected versions to match up at i=%d", i) + + // Try retrieving the latest version forward and reverse. + itr := txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, i) + + opt := DefaultIteratorOptions + opt.Reverse = true + itr = txn.NewIterator(opt) + checkIterator(itr, i) + + // Now try retrieving all versions forward and reverse. + opt = DefaultIteratorOptions + opt.AllVersions = true + itr = txn.NewIterator(opt) + checkAllVersions(itr, i) + + opt = DefaultIteratorOptions + opt.AllVersions = true + opt.Reverse = true + itr = txn.NewIterator(opt) + checkAllVersions(itr, i) - if itr.opt.Reverse { - version++ - } else { - version-- - } + txn.Discard() } - require.Equal(t, i, count, "i=%d", i) // Should loop as many times as i. - } - - for i := 1; i < 10; i++ { - txn := kv.NewTransaction(true) - require.NoError(t, err) - txn.readTs = uint64(i) // Read version at i. - + txn := db.NewTransaction(true) + defer txn.Discard() item, err := txn.Get(k) require.NoError(t, err) val, err := item.Value() require.NoError(t, err) - require.Equal(t, []byte(fmt.Sprintf("valversion=%d", i)), val, - "Expected versions to match up at i=%d", i) - - // Try retrieving the latest version forward and reverse. - itr := txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, i) - - opt := DefaultIteratorOptions - opt.Reverse = true - itr = txn.NewIterator(opt) - checkIterator(itr, i) - - // Now try retrieving all versions forward and reverse. - opt = DefaultIteratorOptions - opt.AllVersions = true - itr = txn.NewIterator(opt) - checkAllVersions(itr, i) - - opt = DefaultIteratorOptions - opt.AllVersions = true - opt.Reverse = true - itr = txn.NewIterator(opt) - checkAllVersions(itr, i) - - txn.Discard() - } - txn := kv.NewTransaction(true) - defer txn.Discard() - require.NoError(t, err) - item, err := txn.Get(k) - require.NoError(t, err) - - val, err := item.Value() - require.NoError(t, err) - require.Equal(t, []byte("valversion=9"), val) + require.Equal(t, []byte("valversion=9"), val) + }) } func TestTxnWriteSkew(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Accounts + ax := []byte("x") + ay := []byte("y") + + // Set balance to $100 in each account. + txn := db.NewTransaction(true) + defer txn.Discard() + val := []byte(strconv.Itoa(100)) + txn.Set(ax, val) + txn.Set(ay, val) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(1), db.orc.readTs()) - // Accounts - ax := []byte("x") - ay := []byte("y") - - // Set balance to $100 in each account. - txn := kv.NewTransaction(true) - defer txn.Discard() - val := []byte(strconv.Itoa(100)) - txn.Set(ax, val) - txn.Set(ay, val) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(1), kv.orc.readTs()) - - getBal := func(txn *Txn, key []byte) (bal int) { - item, err := txn.Get(key) - require.NoError(t, err) + getBal := func(txn *Txn, key []byte) (bal int) { + item, err := txn.Get(key) + require.NoError(t, err) - val, err := item.Value() - require.NoError(t, err) - bal, err = strconv.Atoi(string(val)) - require.NoError(t, err) - return bal - } + val, err := item.Value() + require.NoError(t, err) + bal, err = strconv.Atoi(string(val)) + require.NoError(t, err) + return bal + } - // Start two transactions, each would read both accounts and deduct from one account. - txn1 := kv.NewTransaction(true) + // Start two transactions, each would read both accounts and deduct from one account. + txn1 := db.NewTransaction(true) - sum := getBal(txn1, ax) - sum += getBal(txn1, ay) - require.Equal(t, 200, sum) - txn1.Set(ax, []byte("0")) // Deduct 100 from ax. + sum := getBal(txn1, ax) + sum += getBal(txn1, ay) + require.Equal(t, 200, sum) + txn1.Set(ax, []byte("0")) // Deduct 100 from ax. - // Let's read this back. - sum = getBal(txn1, ax) - require.Equal(t, 0, sum) - sum += getBal(txn1, ay) - require.Equal(t, 100, sum) - // Don't commit yet. + // Let's read this back. + sum = getBal(txn1, ax) + require.Equal(t, 0, sum) + sum += getBal(txn1, ay) + require.Equal(t, 100, sum) + // Don't commit yet. - txn2 := kv.NewTransaction(true) + txn2 := db.NewTransaction(true) - sum = getBal(txn2, ax) - sum += getBal(txn2, ay) - require.Equal(t, 200, sum) - txn2.Set(ay, []byte("0")) // Deduct 100 from ay. + sum = getBal(txn2, ax) + sum += getBal(txn2, ay) + require.Equal(t, 200, sum) + txn2.Set(ay, []byte("0")) // Deduct 100 from ay. - // Let's read this back. - sum = getBal(txn2, ax) - require.Equal(t, 100, sum) - sum += getBal(txn2, ay) - require.Equal(t, 100, sum) + // Let's read this back. + sum = getBal(txn2, ax) + require.Equal(t, 100, sum) + sum += getBal(txn2, ay) + require.Equal(t, 100, sum) - // Commit both now. - require.NoError(t, txn1.Commit(nil)) - require.Error(t, txn2.Commit(nil)) // This should fail. + // Commit both now. + require.NoError(t, txn1.Commit(nil)) + require.Error(t, txn2.Commit(nil)) // This should fail. - require.Equal(t, uint64(2), kv.orc.readTs()) + require.Equal(t, uint64(2), db.orc.readTs()) + }) } // a3, a2, b4 (del), b3, c2, c1 @@ -306,92 +286,87 @@ func TestTxnWriteSkew(t *testing.T) { // Read at ts=2 -> a2, c2 // Read at ts=1 -> c1 func TestTxnIterationEdgeCase(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + ka := []byte("a") + kb := []byte("b") + kc := []byte("c") + + // c1 + txn := db.NewTransaction(true) + txn.Set(kc, []byte("c1")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(1), db.orc.readTs()) - ka := []byte("a") - kb := []byte("b") - kc := []byte("c") - - // c1 - txn := kv.NewTransaction(true) - txn.Set(kc, []byte("c1")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(1), kv.orc.readTs()) - - // a2, c2 - txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a2")) - txn.Set(kc, []byte("c2")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(2), kv.orc.readTs()) - - // b3 - txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a3")) - txn.Set(kb, []byte("b3")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(3), kv.orc.readTs()) - - // b4, c4(del) (Uncomitted) - txn4 := kv.NewTransaction(true) - require.NoError(t, txn4.Set(kb, []byte("b4"))) - require.NoError(t, txn4.Delete(kc)) - require.Equal(t, uint64(3), kv.orc.readTs()) - - // b4 (del) - txn = kv.NewTransaction(true) - txn.Delete(kb) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(4), kv.orc.readTs()) - - checkIterator := func(itr *Iterator, expected []string) { - var i int - for itr.Rewind(); itr.Valid(); itr.Next() { - item := itr.Item() - val, err := item.Value() - require.NoError(t, err) - require.Equal(t, expected[i], string(val), "readts=%d", itr.readTs) - i++ + // a2, c2 + txn = db.NewTransaction(true) + txn.Set(ka, []byte("a2")) + txn.Set(kc, []byte("c2")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(2), db.orc.readTs()) + + // b3 + txn = db.NewTransaction(true) + txn.Set(ka, []byte("a3")) + txn.Set(kb, []byte("b3")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(3), db.orc.readTs()) + + // b4, c4(del) (Uncomitted) + txn4 := db.NewTransaction(true) + require.NoError(t, txn4.Set(kb, []byte("b4"))) + require.NoError(t, txn4.Delete(kc)) + require.Equal(t, uint64(3), db.orc.readTs()) + + // b4 (del) + txn = db.NewTransaction(true) + txn.Delete(kb) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(4), db.orc.readTs()) + + checkIterator := func(itr *Iterator, expected []string) { + var i int + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + val, err := item.Value() + require.NoError(t, err) + require.Equal(t, expected[i], string(val), "readts=%d", itr.readTs) + i++ + } + require.Equal(t, len(expected), i) } - require.Equal(t, len(expected), i) - } - txn = kv.NewTransaction(true) - defer txn.Discard() - itr := txn.NewIterator(DefaultIteratorOptions) - itr5 := txn4.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"a3", "c2"}) - checkIterator(itr5, []string{"a3", "b4"}) - - rev := DefaultIteratorOptions - rev.Reverse = true - itr = txn.NewIterator(rev) - itr5 = txn4.NewIterator(rev) - checkIterator(itr, []string{"c2", "a3"}) - checkIterator(itr5, []string{"b4", "a3"}) - - txn.readTs = 3 - itr = txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"a3", "b3", "c2"}) - itr = txn.NewIterator(rev) - checkIterator(itr, []string{"c2", "b3", "a3"}) - - txn.readTs = 2 - itr = txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"a2", "c2"}) - itr = txn.NewIterator(rev) - checkIterator(itr, []string{"c2", "a2"}) - - txn.readTs = 1 - itr = txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"c1"}) - itr = txn.NewIterator(rev) - checkIterator(itr, []string{"c1"}) + txn = db.NewTransaction(true) + defer txn.Discard() + itr := txn.NewIterator(DefaultIteratorOptions) + itr5 := txn4.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"a3", "c2"}) + checkIterator(itr5, []string{"a3", "b4"}) + + rev := DefaultIteratorOptions + rev.Reverse = true + itr = txn.NewIterator(rev) + itr5 = txn4.NewIterator(rev) + checkIterator(itr, []string{"c2", "a3"}) + checkIterator(itr5, []string{"b4", "a3"}) + + txn.readTs = 3 + itr = txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"a3", "b3", "c2"}) + itr = txn.NewIterator(rev) + checkIterator(itr, []string{"c2", "b3", "a3"}) + + txn.readTs = 2 + itr = txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"a2", "c2"}) + itr = txn.NewIterator(rev) + checkIterator(itr, []string{"c2", "a2"}) + + txn.readTs = 1 + itr = txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"c1"}) + itr = txn.NewIterator(rev) + checkIterator(itr, []string{"c1"}) + }) } // a2, a3, b4 (del), b3, c2, c1 @@ -400,258 +375,244 @@ func TestTxnIterationEdgeCase(t *testing.T) { // Read at ts=2 -> a2, c2 // Read at ts=1 -> c1 func TestTxnIterationEdgeCase2(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + ka := []byte("a") + kb := []byte("aa") + kc := []byte("aaa") + + // c1 + txn := db.NewTransaction(true) + txn.Set(kc, []byte("c1")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(1), db.orc.readTs()) - ka := []byte("a") - kb := []byte("aa") - kc := []byte("aaa") - - // c1 - txn := kv.NewTransaction(true) - txn.Set(kc, []byte("c1")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(1), kv.orc.readTs()) - - // a2, c2 - txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a2")) - txn.Set(kc, []byte("c2")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(2), kv.orc.readTs()) - - // b3 - txn = kv.NewTransaction(true) - txn.Set(ka, []byte("a3")) - txn.Set(kb, []byte("b3")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(3), kv.orc.readTs()) - - // b4 (del) - txn = kv.NewTransaction(true) - txn.Delete(kb) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(4), kv.orc.readTs()) - - checkIterator := func(itr *Iterator, expected []string) { - var i int - for itr.Rewind(); itr.Valid(); itr.Next() { - item := itr.Item() - val, err := item.Value() - require.NoError(t, err) - require.Equal(t, expected[i], string(val), "readts=%d", itr.readTs) - i++ + // a2, c2 + txn = db.NewTransaction(true) + txn.Set(ka, []byte("a2")) + txn.Set(kc, []byte("c2")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(2), db.orc.readTs()) + + // b3 + txn = db.NewTransaction(true) + txn.Set(ka, []byte("a3")) + txn.Set(kb, []byte("b3")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(3), db.orc.readTs()) + + // b4 (del) + txn = db.NewTransaction(true) + txn.Delete(kb) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(4), db.orc.readTs()) + + checkIterator := func(itr *Iterator, expected []string) { + var i int + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + val, err := item.Value() + require.NoError(t, err) + require.Equal(t, expected[i], string(val), "readts=%d", itr.readTs) + i++ + } + require.Equal(t, len(expected), i) } - require.Equal(t, len(expected), i) - } - txn = kv.NewTransaction(true) - defer txn.Discard() - rev := DefaultIteratorOptions - rev.Reverse = true - - itr := txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"a3", "c2"}) - itr = txn.NewIterator(rev) - checkIterator(itr, []string{"c2", "a3"}) - - txn.readTs = 5 - itr = txn.NewIterator(DefaultIteratorOptions) - itr.Seek(ka) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), ka) - itr.Seek(kc) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - - itr = txn.NewIterator(rev) - itr.Seek(ka) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), ka) - itr.Seek(kc) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - - txn.readTs = 3 - itr = txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"a3", "b3", "c2"}) - itr = txn.NewIterator(rev) - checkIterator(itr, []string{"c2", "b3", "a3"}) - - txn.readTs = 2 - itr = txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"a2", "c2"}) - itr = txn.NewIterator(rev) - checkIterator(itr, []string{"c2", "a2"}) - - txn.readTs = 1 - itr = txn.NewIterator(DefaultIteratorOptions) - checkIterator(itr, []string{"c1"}) - itr = txn.NewIterator(rev) - checkIterator(itr, []string{"c1"}) + txn = db.NewTransaction(true) + defer txn.Discard() + rev := DefaultIteratorOptions + rev.Reverse = true + + itr := txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"a3", "c2"}) + itr = txn.NewIterator(rev) + checkIterator(itr, []string{"c2", "a3"}) + + txn.readTs = 5 + itr = txn.NewIterator(DefaultIteratorOptions) + itr.Seek(ka) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), ka) + itr.Seek(kc) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + + itr = txn.NewIterator(rev) + itr.Seek(ka) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), ka) + itr.Seek(kc) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + + txn.readTs = 3 + itr = txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"a3", "b3", "c2"}) + itr = txn.NewIterator(rev) + checkIterator(itr, []string{"c2", "b3", "a3"}) + + txn.readTs = 2 + itr = txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"a2", "c2"}) + itr = txn.NewIterator(rev) + checkIterator(itr, []string{"c2", "a2"}) + + txn.readTs = 1 + itr = txn.NewIterator(DefaultIteratorOptions) + checkIterator(itr, []string{"c1"}) + itr = txn.NewIterator(rev) + checkIterator(itr, []string{"c1"}) + }) } func TestTxnIterationEdgeCase3(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - kv, err := Open(getTestOptions(dir)) - require.NoError(t, err) - defer kv.Close() + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + kb := []byte("abc") + kc := []byte("acd") + kd := []byte("ade") + + // c1 + txn := db.NewTransaction(true) + txn.Set(kc, []byte("c1")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(1), db.orc.readTs()) - kb := []byte("abc") - kc := []byte("acd") - kd := []byte("ade") - - // c1 - txn := kv.NewTransaction(true) - txn.Set(kc, []byte("c1")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(1), kv.orc.readTs()) - - // b2 - txn = kv.NewTransaction(true) - txn.Set(kb, []byte("b2")) - require.NoError(t, txn.Commit(nil)) - require.Equal(t, uint64(2), kv.orc.readTs()) - - txn2 := kv.NewTransaction(true) - require.NoError(t, txn2.Set(kd, []byte("d2"))) - require.NoError(t, txn2.Delete(kc)) - - txn = kv.NewTransaction(true) - defer txn.Discard() - rev := DefaultIteratorOptions - rev.Reverse = true - - itr := txn.NewIterator(DefaultIteratorOptions) - itr.Seek([]byte("ab")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ac")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - itr.Seek(nil) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ac")) - itr.Rewind() - itr.Seek(nil) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ac")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - - // Keys: "abc", "ade" - // Read pending writes. - itr = txn2.NewIterator(DefaultIteratorOptions) - itr.Seek([]byte("ab")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ac")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kd) - itr.Seek(nil) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ac")) - itr.Rewind() - itr.Seek(nil) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ad")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kd) - - itr = txn.NewIterator(rev) - itr.Seek([]byte("ac")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ad")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - itr.Seek(nil) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - itr.Seek([]byte("ac")) - itr.Rewind() - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - itr.Seek([]byte("ad")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kc) - - // Keys: "abc", "ade" - itr = txn2.NewIterator(rev) - itr.Seek([]byte("ad")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) - itr.Seek([]byte("ae")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kd) - itr.Seek(nil) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kd) - itr.Seek([]byte("ab")) - itr.Rewind() - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kd) - itr.Seek([]byte("ac")) - require.True(t, itr.Valid()) - require.Equal(t, itr.item.Key(), kb) -} + // b2 + txn = db.NewTransaction(true) + txn.Set(kb, []byte("b2")) + require.NoError(t, txn.Commit(nil)) + require.Equal(t, uint64(2), db.orc.readTs()) -func TestIteratorAllVersionsButDeleted(t *testing.T) { - dir, err := ioutil.TempDir("", "badger") - require.NoError(t, err) - defer os.RemoveAll(dir) - db, err := Open(getTestOptions(dir)) - require.NoError(t, err) + txn2 := db.NewTransaction(true) + require.NoError(t, txn2.Set(kd, []byte("d2"))) + require.NoError(t, txn2.Delete(kc)) - // Write two keys - err = db.Update(func(txn *Txn) error { - txn.Set([]byte("answer1"), []byte("42")) - txn.Set([]byte("answer2"), []byte("43")) - return nil + txn = db.NewTransaction(true) + defer txn.Discard() + rev := DefaultIteratorOptions + rev.Reverse = true + + itr := txn.NewIterator(DefaultIteratorOptions) + itr.Seek([]byte("ab")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ac")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + itr.Seek(nil) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ac")) + itr.Rewind() + itr.Seek(nil) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ac")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + + // Keys: "abc", "ade" + // Read pending writes. + itr = txn2.NewIterator(DefaultIteratorOptions) + itr.Seek([]byte("ab")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ac")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kd) + itr.Seek(nil) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ac")) + itr.Rewind() + itr.Seek(nil) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ad")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kd) + + itr = txn.NewIterator(rev) + itr.Seek([]byte("ac")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ad")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + itr.Seek(nil) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + itr.Seek([]byte("ac")) + itr.Rewind() + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + itr.Seek([]byte("ad")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kc) + + // Keys: "abc", "ade" + itr = txn2.NewIterator(rev) + itr.Seek([]byte("ad")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) + itr.Seek([]byte("ae")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kd) + itr.Seek(nil) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kd) + itr.Seek([]byte("ab")) + itr.Rewind() + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kd) + itr.Seek([]byte("ac")) + require.True(t, itr.Valid()) + require.Equal(t, itr.item.Key(), kb) }) - require.NoError(t, err) +} - // Delete the specific key version from underlying db directly - err = db.View(func(txn *Txn) error { - item, err := txn.Get([]byte("answer1")) +func TestIteratorAllVersionsButDeleted(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Write two keys + err := db.Update(func(txn *Txn) error { + txn.Set([]byte("answer1"), []byte("42")) + txn.Set([]byte("answer2"), []byte("43")) + return nil + }) require.NoError(t, err) - err = txn.db.batchSet([]*entry{ - { - Key: y.KeyWithTs(item.key, item.version), - meta: bitDelete, - }, + + // Delete the specific key version from underlying db directly + err = db.View(func(txn *Txn) error { + item, err := txn.Get([]byte("answer1")) + require.NoError(t, err) + err = txn.db.batchSet([]*entry{ + { + Key: y.KeyWithTs(item.key, item.version), + meta: bitDelete, + }, + }) + require.NoError(t, err) + return err }) require.NoError(t, err) - return err - }) - require.NoError(t, err) - opts := DefaultIteratorOptions - opts.AllVersions = true - opts.PrefetchValues = false - - // Verify that deleted key does not show up when AllVersions is set. - err = db.View(func(txn *Txn) error { - it := txn.NewIterator(opts) - var count int - for it.Rewind(); it.Valid(); it.Next() { - count++ - item := it.Item() - require.Equal(t, []byte("answer2"), item.Key()) - } - require.Equal(t, 1, count) - return nil + opts := DefaultIteratorOptions + opts.AllVersions = true + opts.PrefetchValues = false + + // Verify that deleted key does not show up when AllVersions is set. + err = db.View(func(txn *Txn) error { + it := txn.NewIterator(opts) + var count int + for it.Rewind(); it.Valid(); it.Next() { + count++ + item := it.Item() + require.Equal(t, []byte("answer2"), item.Key()) + } + require.Equal(t, 1, count) + return nil + }) + require.NoError(t, err) }) - require.NoError(t, err) } func TestManagedDB(t *testing.T) {