Skip to content

Commit

Permalink
Use mmap for skiplist (dgraph-io#1571)
Browse files Browse the repository at this point in the history
Add support for memory-mapping in skiplist and expose the APIs 
for Uint64 values, i.e. GetUint64 and PutUint64.
  • Loading branch information
ahsanbarkati committed Oct 23, 2020
1 parent c3d012e commit a8d1f08
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 78 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20201020115802-f071429c1049
github.com/dgraph-io/ristretto v0.0.4-0.20201022105248-f32a01612740
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20201020115802-f071429c1049 h1:lT8vahI6E7R84KeSsXvj3QST/OusCP8g5rEctzsjUIA=
github.com/dgraph-io/ristretto v0.0.4-0.20201020115802-f071429c1049/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgraph-io/ristretto v0.0.4-0.20201022105248-f32a01612740 h1:TzbxnxH3PoFUWx5024RX1+uqLnUVbfdHANjrHMb5Xnc=
github.com/dgraph-io/ristretto v0.0.4-0.20201022105248-f32a01612740/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
70 changes: 27 additions & 43 deletions skl/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package skl

import (
"sync/atomic"
"unsafe"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

const (
Expand All @@ -35,23 +35,18 @@ const (

// Arena should be lock-free.
type Arena struct {
n uint32
buf []byte
*z.Buffer
}

// newArena returns a new arena.
func newArena(n int64) *Arena {
// Don't store data at position 0 in order to reserve offset=0 as a kind
// of nil pointer.
out := &Arena{
n: 1,
buf: make([]byte, n),
}
return out
func (s *Arena) size() int64 {
return int64(s.LenNoPadding())
}

func (s *Arena) size() int64 {
return int64(atomic.LoadUint32(&s.n))
// allocateValue encodes valueStruct and put it in the arena buffer.
// It returns the encoded uint64 => | size (32 bits) | offset (32 bits) |
func (s *Arena) allocateValue(v y.ValueStruct) uint64 {
valOffset := s.putVal(v)
return encodeValue(valOffset, v.EncodedSize())
}

// putNode allocates a node in the arena. The node is aligned on a pointer-sized
Expand All @@ -63,13 +58,9 @@ func (s *Arena) putNode(height int) uint32 {

// Pad the allocation with enough bytes to ensure pointer alignment.
l := uint32(MaxNodeSize - unusedSize + nodeAlign)
n := atomic.AddUint32(&s.n, l)
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))

n := s.IncrementOffset(int(l))
// Return the aligned offset.
m := (n - l + uint32(nodeAlign)) & ^uint32(nodeAlign)
m := (uint32(n) - l + uint32(nodeAlign)) & ^uint32(nodeAlign)
return m
}

Expand All @@ -79,27 +70,20 @@ func (s *Arena) putNode(height int) uint32 {
// decoding will incur some overhead.
func (s *Arena) putVal(v y.ValueStruct) uint32 {
l := uint32(v.EncodedSize())
n := atomic.AddUint32(&s.n, l)
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))
m := n - l
v.Encode(s.buf[m:])
return m
m := s.IncrementOffset(int(l))
buf := s.Bytes()[uint32(m)-l : m]
v.Encode(buf)
return uint32(m) - l
}

// putKey puts the key and returns its offset
func (s *Arena) putKey(key []byte) uint32 {
l := uint32(len(key))
n := atomic.AddUint32(&s.n, l)
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))
// m is the offset where you should write.
// n = new len - key len give you the offset at which you should write.
m := n - l
// Copy to buffer from m:n
y.AssertTrue(len(key) == copy(s.buf[m:n], key))
return m
keySz := uint32(len(key))
n := s.IncrementOffset(int(keySz))
offset := uint32(n) - keySz
buf := s.Bytes()[offset : offset+keySz]
y.AssertTrue(len(key) == copy(buf, key))
return offset
}

// getNode returns a pointer to the node located at offset. If the offset is
Expand All @@ -108,19 +92,18 @@ func (s *Arena) getNode(offset uint32) *node {
if offset == 0 {
return nil
}

return (*node)(unsafe.Pointer(&s.buf[offset]))
return (*node)(unsafe.Pointer(&s.Bytes()[offset]))
}

// getKey returns byte slice at offset.
func (s *Arena) getKey(offset uint32, size uint16) []byte {
return s.buf[offset : offset+uint32(size)]
return s.Bytes()[offset : offset+uint32(size)]
}

// getVal returns byte slice at offset. The given size should be just the value
// size and should NOT include the meta bytes.
func (s *Arena) getVal(offset uint32, size uint32) (ret y.ValueStruct) {
ret.Decode(s.buf[offset : offset+size])
ret.Decode(s.Bytes()[offset : offset+size])
return
}

Expand All @@ -130,6 +113,7 @@ func (s *Arena) getNodeOffset(nd *node) uint32 {
if nd == nil {
return 0
}
val := uint32(uintptr(unsafe.Pointer(nd)) - uintptr(unsafe.Pointer(&s.Bytes()[0])))
return val

return uint32(uintptr(unsafe.Pointer(nd)) - uintptr(unsafe.Pointer(&s.buf[0])))
}
122 changes: 88 additions & 34 deletions skl/skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Key differences:
package skl

import (
"bytes"
"math"
"sync/atomic"
"unsafe"
Expand Down Expand Up @@ -73,12 +74,16 @@ type node struct {
tower [maxHeight]uint32
}

type comparatorFunc func([]byte, []byte) int

type Skiplist struct {
height int32 // Current height. 1 <= height <= kMaxHeight. CAS.
head *node
ref int32
arena *Arena
OnClose func()
height int32 // Current height. 1 <= height <= kMaxHeight. CAS.
head *node
ref int32
arena *Arena
hasVersions bool
comparator comparatorFunc
OnClose func()
}

// IncrRef increases the refcount
Expand All @@ -95,7 +100,7 @@ func (s *Skiplist) DecrRef() {
if s.OnClose != nil {
s.OnClose()
}

s.arena.Release()
// Indicate we are closed. Good for testing. Also, lets GC reclaim memory. Race condition
// here would suggest we are accessing skiplist when we are supposed to have no reference!
s.arena = nil
Expand All @@ -104,14 +109,14 @@ func (s *Skiplist) DecrRef() {
s.head = nil
}

func newNode(arena *Arena, key []byte, v y.ValueStruct, height int) *node {
func newNode(arena *Arena, key []byte, u uint64, height int) *node {
// The base level is already allocated in the node struct.
offset := arena.putNode(height)
node := arena.getNode(offset)
node.keyOffset = arena.putKey(key)
node.keySize = uint16(len(key))
node.height = uint16(height)
node.value = encodeValue(arena.putVal(v), v.EncodedSize())
node.value = u
return node
}

Expand All @@ -125,16 +130,32 @@ func decodeValue(value uint64) (valOffset uint32, valSize uint32) {
return
}

// NewSkiplist makes a new empty skiplist, with a given arena size
// NewSkiplist makes a new empty skiplist, with a given arena size.
func NewSkiplist(arenaSize int64) *Skiplist {
arena := newArena(arenaSize)
head := newNode(arena, nil, y.ValueStruct{}, maxHeight)
return &Skiplist{
height: 1,
head: head,
arena: arena,
ref: 1,
buf, err := z.NewBufferWith(int(arenaSize), int(arenaSize), z.UseCalloc)
y.Check(err)
skl := NewSkiplistWithBuffer(buf, true)
return skl
}

// NewSkiplistWithBuffer makes a new skiplist, with a given buffer.
func NewSkiplistWithBuffer(buf *z.Buffer, hasVersions bool) *Skiplist {
arena := new(Arena)
arena.Buffer = buf
offset := arena.allocateValue(y.ValueStruct{})
head := newNode(arena, nil, offset, maxHeight)
sl := &Skiplist{
height: 1,
head: head,
arena: arena,
ref: 1,
hasVersions: hasVersions,
comparator: bytes.Compare,
}
if sl.hasVersions {
sl.comparator = y.CompareKeys
}
return sl
}

func (s *node) getValueOffset() (uint32, uint32) {
Expand All @@ -146,10 +167,8 @@ func (s *node) key(arena *Arena) []byte {
return arena.getKey(s.keyOffset, s.keySize)
}

func (s *node) setValue(arena *Arena, v y.ValueStruct) {
valOffset := arena.putVal(v)
value := encodeValue(valOffset, v.EncodedSize())
atomic.StoreUint64(&s.value, value)
func (s *node) setUint64(u uint64) {
atomic.StoreUint64(&s.value, u)
}

func (s *node) getNextOffset(h int) uint32 {
Expand Down Expand Up @@ -208,9 +227,9 @@ func (s *Skiplist) findNear(key []byte, less bool, allowEqual bool) (*node, bool
}
return x, false
}

nextKey := next.key(s.arena)
cmp := y.CompareKeys(key, nextKey)

cmp := s.comparator(key, nextKey)
if cmp > 0 {
// x.key < next.key < key. We can continue to move right.
x = next
Expand Down Expand Up @@ -265,7 +284,7 @@ func (s *Skiplist) findSpliceForLevel(key []byte, before *node, level int) (*nod
return before, next
}
nextKey := next.key(s.arena)
cmp := y.CompareKeys(key, nextKey)
cmp := s.comparator(key, nextKey)
if cmp == 0 {
// Equality case.
return next, next
Expand All @@ -284,6 +303,12 @@ func (s *Skiplist) getHeight() int32 {

// Put inserts the key-value pair.
func (s *Skiplist) Put(key []byte, v y.ValueStruct) {
val := s.arena.allocateValue(v)
s.PutUint64(key, val)
}

// PutUint64 inserts the key-value pair, with a uint64 value.
func (s *Skiplist) PutUint64(key []byte, u uint64) {
// Since we allow overwrite, we may not need to create a new node. We might not even need to
// increase the height. Let's defer these actions.

Expand All @@ -296,14 +321,14 @@ func (s *Skiplist) Put(key []byte, v y.ValueStruct) {
// Use higher level to speed up for current level.
prev[i], next[i] = s.findSpliceForLevel(key, prev[i+1], i)
if prev[i] == next[i] {
prev[i].setValue(s.arena, v)
prev[i].setUint64(u)
return
}
}

// We do need to create a new node.
height := s.randomHeight()
x := newNode(s.arena, key, v, height)
x := newNode(s.arena, key, u, height)

// Try to increase s.height via CAS.
listHeight = s.getHeight()
Expand Down Expand Up @@ -340,7 +365,7 @@ func (s *Skiplist) Put(key []byte, v y.ValueStruct) {
prev[i], next[i] = s.findSpliceForLevel(key, prev[i], i)
if prev[i] == next[i] {
y.AssertTruef(i == 0, "Equality can happen only on base level: %d", i)
prev[i].setValue(s.arena, v)
prev[i].setUint64(u)
return
}
}
Expand Down Expand Up @@ -376,20 +401,49 @@ func (s *Skiplist) findLast() *node {
// Get gets the value associated with the key. It returns a valid value if it finds equal or earlier
// version of the same key.
func (s *Skiplist) Get(key []byte) y.ValueStruct {
n, _ := s.findNear(key, false, true) // findGreaterOrEqual.
n, version := s.getInternal(key)
if n == nil {
return y.ValueStruct{}
}

nextKey := s.arena.getKey(n.keyOffset, n.keySize)
if !y.SameKey(key, nextKey) {
return y.ValueStruct{}
}

valOffset, valSize := n.getValueOffset()
vs := s.arena.getVal(valOffset, valSize)
vs.Version = y.ParseTs(nextKey)
vs.Version = version
return vs

}

func (s *Skiplist) GetUint64(key []byte) (uint64, bool) {
n, _ := s.getInternal(key)
if n == nil {
return 0, false
}
return n.value, true
}

// getInternal finds the node which is greater than or equal to the given key
// from the skiplist. It returns the fetched node and it's version.
func (s *Skiplist) getInternal(key []byte) (*node, uint64) {
n, _ := s.findNear(key, false, true) // findGreaterOrEqual.
if n == nil {
return nil, 0
}
fetchedKey := s.arena.getKey(n.keyOffset, n.keySize)

// If the node has version, it means it is a badger kv and we should
// compare it's key without the version.
if s.hasVersions {
if !y.SameKey(key, fetchedKey) {
return nil, 0
}
version := y.ParseTs(fetchedKey)
return n, version
}

// This is a key without version.
if !bytes.Equal(key, fetchedKey) {
return nil, 0
}
return n, 0
}

// NewIterator returns a skiplist iterator. You have to Close() the iterator.
Expand Down

0 comments on commit a8d1f08

Please sign in to comment.