Skip to content

Commit

Permalink
rowblk: support synthetic prefix in FragmentIter
Browse files Browse the repository at this point in the history
We rely on the blockIter to prepend the prefix for start keys, and the
fragment iterator adjusts the end keys.

We still don't support range keys in external files because of an
outstanding issue which I will start tackling.
  • Loading branch information
RaduBerinde committed Jul 3, 2024
1 parent 4981bd0 commit fad89cf
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 35 deletions.
8 changes: 7 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ func runIngestExternalCmd(
usageErr := func(info interface{}) {
t.Helper()
td.Fatalf(t, "error parsing %q: %v; "+
"usage: obj bounds=(smallest,largest) [size=x] [synthetic-prefix=prefix] [synthetic-suffix=suffix]",
"usage: obj bounds=(smallest,largest) [size=x] [synthetic-prefix=prefix] [synthetic-suffix=suffix] [no-point-keys] [has-range-keys]",
line, info,
)
}
Expand Down Expand Up @@ -1393,6 +1393,12 @@ func runIngestExternalCmd(
nArgs(1)
ef.SyntheticSuffix = []byte(arg.Vals[0])

case "no-point-keys":
ef.HasPointKey = false

case "has-range-keys":
ef.HasRangeKey = true

default:
usageErr(fmt.Sprintf("unknown argument %v", arg.Key))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ func (m *FileMetadata) IterTransforms() sstable.IterTransforms {
func (m *FileMetadata) FragmentIterTransforms() sstable.FragmentIterTransforms {
return sstable.FragmentIterTransforms{
SyntheticSeqNum: m.SyntheticSeqNum(),
// TODO(radu): support these.
// TODO(radu): support this
//SyntheticSuffix: m.SyntheticSuffix,
//SyntheticPrefix: m.SyntheticPrefix,
SyntheticPrefix: m.SyntheticPrefix,
}
}

Expand Down
4 changes: 1 addition & 3 deletions metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,9 +1358,7 @@ func (g *generator) writerIngestExternalFiles() {
}
// Randomly set up synthetic prefix.
var syntheticPrefix sstable.SyntheticPrefix
// We can only use a synthetic prefix if we don't have range dels.
// TODO(radu): we will want to support this at some point.
if !g.keyManager.objKeyMeta(id).hasRangeDels && g.rng.Intn(2) == 0 {
if g.rng.Intn(2) == 0 {
syntheticPrefix = randBytes(g.rng, 1, 5)
start = syntheticPrefix.Apply(start)
end = syntheticPrefix.Apply(end)
Expand Down
1 change: 1 addition & 0 deletions sstable/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type FragmentIterTransforms struct {
// ElideSameSeqNum, if true, returns only the first-occurring (in forward
// order) keyspan.Key for each sequence number.
ElideSameSeqNum bool
SyntheticPrefix SyntheticPrefix
}

// NoFragmentTransforms is the default value for IterTransforms.
Expand Down
83 changes: 60 additions & 23 deletions sstable/rowblk/rowblk_fragment_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package rowblk

import (
"bytes"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -44,7 +45,16 @@ type fragmentIter struct {
// elideSameSeqnum, if true, returns only the first-occurring (in forward
// order) Key for each sequence number.
elideSameSeqnum bool
closeCheck invariants.CloseChecker

syntheticPrefix block.SyntheticPrefix
// startKeyBuf is a buffer that is reused to store the start key of the span
// when a synthetic prefix is used.
startKeyBuf []byte
// endKeyBuf is a buffer that is reused to generate the end key of the span
// when a synthetic prefix is set. It always starts with syntheticPrefix.
endKeyBuf []byte

closeCheck invariants.CloseChecker
}

var _ keyspan.FragmentIterator = (*fragmentIter)(nil)
Expand Down Expand Up @@ -72,10 +82,19 @@ func NewFragmentIter(
// when the spans contain few keys.
i.span.Keys = i.keyBuf[:0]
i.elideSameSeqnum = transforms.ElideSameSeqNum
i.syntheticPrefix = transforms.SyntheticPrefix
if transforms.SyntheticPrefix.IsSet() {
i.endKeyBuf = append(i.endKeyBuf[:0], transforms.SyntheticPrefix...)
}
i.closeCheck = invariants.CloseChecker{}

if err := i.blockIter.InitHandle(cmp, split, blockHandle, block.IterTransforms{
SyntheticSeqNum: transforms.SyntheticSeqNum,
// We let the blockIter prepend the prefix to span start keys; the fragment
// iterator will prepend it for end keys. We could do everything in the
// fragment iterator, but we'd have to duplicate the logic for adjusting the
// seek key for SeekGE/SeekLT.
SyntheticPrefix: transforms.SyntheticPrefix,
// It's okay for HideObsoletePoints to be false here, even for shared
// ingested sstables. This is because rangedels do not apply to points in
// the same sstable at the same sequence number anyway, so exposing obsolete
Expand All @@ -94,13 +113,22 @@ func NewFragmentIter(
// prefix compression (and we don't perform any transforms), so the key/value
// will be pointing directly into the buffer data.
func (i *fragmentIter) initSpan(ik base.InternalKey, internalValue []byte) error {
var err error
if ik.Kind() == base.InternalKeyKindRangeDelete {
i.span = rangedel.Decode(ik, internalValue, i.span.Keys[:0])
} else {
var err error
i.span, err = rangekey.Decode(ik, internalValue, i.span.Keys[:0])
if err != nil {
return err
}
}
return err
// When synthetic prefix is used in the blockIter, the keys cannot be used
// across multiple blockIter operations; we have to make a copy in this case.
if i.syntheticPrefix.IsSet() || invariants.Sometimes(10) {
i.startKeyBuf = append(i.startKeyBuf[:0], i.span.Start...)
i.span.Start = i.startKeyBuf
}
return nil
}

// addToSpan adds a fragment to the existing span. The fragment must be for the
Expand All @@ -117,22 +145,33 @@ func (i *fragmentIter) addToSpan(
return err
}

func (i *fragmentIter) elideKeysOfSameSeqNum() {
if invariants.Enabled {
if !i.elideSameSeqnum || len(i.span.Keys) == 0 {
panic("elideKeysOfSameSeqNum called when it should not be")
// applySpanTransforms applies changes to the span that we decoded, if
// appropriate.
func (i *fragmentIter) applySpanTransforms() {
if i.elideSameSeqnum && len(i.span.Keys) > 0 {
lastSeqNum := i.span.Keys[0].SeqNum()
k := 1
for j := 1; j < len(i.span.Keys); j++ {
if lastSeqNum != i.span.Keys[j].SeqNum() {
lastSeqNum = i.span.Keys[j].SeqNum()
i.span.Keys[k] = i.span.Keys[j]
k++
}
}
i.span.Keys = i.span.Keys[:k]
}
lastSeqNum := i.span.Keys[0].SeqNum()
k := 1
for j := 1; j < len(i.span.Keys); j++ {
if lastSeqNum != i.span.Keys[j].SeqNum() {
lastSeqNum = i.span.Keys[j].SeqNum()
i.span.Keys[k] = i.span.Keys[j]
k++

if i.syntheticPrefix.IsSet() || invariants.Sometimes(10) {
// We have to make a copy of the start key because it will not stay valid
// across multiple blockIter operations.
i.startKeyBuf = append(i.startKeyBuf[:0], i.span.Start...)
i.span.Start = i.startKeyBuf
if invariants.Enabled && !bytes.Equal(i.syntheticPrefix, i.endKeyBuf[:len(i.syntheticPrefix)]) {
panic("pebble: invariant violation: synthetic prefix mismatch")
}
i.endKeyBuf = append(i.endKeyBuf[:len(i.syntheticPrefix)], i.span.End...)
i.span.End = i.endKeyBuf
}
i.span.Keys = i.span.Keys[:k]
}

// gatherForward gathers internal keys with identical bounds. Keys defined over
Expand Down Expand Up @@ -168,9 +207,7 @@ func (i *fragmentIter) gatherForward(kv *base.InternalKV) (*keyspan.Span, error)
return nil, err
}
}
if i.elideSameSeqnum && len(i.span.Keys) > 0 {
i.elideKeysOfSameSeqNum()
}
i.applySpanTransforms()
// i.blockIter is positioned over the first internal key for the next span.
return &i.span, nil
}
Expand Down Expand Up @@ -211,9 +248,7 @@ func (i *fragmentIter) gatherBackward(kv *base.InternalKV) (*keyspan.Span, error
// Backwards iteration encounters internal keys in the wrong order.
keyspan.SortKeysByTrailer(&i.span.Keys)

if i.elideSameSeqnum && len(i.span.Keys) > 0 {
i.elideKeysOfSameSeqNum()
}
i.applySpanTransforms()
return &i.span, nil
}

Expand All @@ -230,8 +265,10 @@ func (i *fragmentIter) Close() {
}

*i = fragmentIter{
blockIter: i.blockIter.ResetForReuse(),
closeCheck: i.closeCheck,
blockIter: i.blockIter.ResetForReuse(),
closeCheck: i.closeCheck,
startKeyBuf: i.startKeyBuf[:0],
endKeyBuf: i.endKeyBuf[:0],
}
fragmentBlockIterPool.Put(i)
}
Expand Down
3 changes: 3 additions & 0 deletions sstable/rowblk/rowblk_fragment_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func TestBlockFragmentIterator(t *testing.T) {
var seqNum uint64
d.MaybeScanArgs(t, "synthetic-seq-num", &seqNum)
transforms.SyntheticSeqNum = block.SyntheticSeqNum(seqNum)
var syntheticPrefix string
d.MaybeScanArgs(t, "synthetic-prefix", &syntheticPrefix)
transforms.SyntheticPrefix = []byte(syntheticPrefix)

blockHandle := block.CacheBufferHandle(c.Get(1, 0, 0))
i, err := NewFragmentIter(comparer.Compare, comparer.Split, blockHandle, transforms)
Expand Down
14 changes: 8 additions & 6 deletions sstable/rowblk/rowblk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,11 @@ func (i *Iter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
}
searchKey := key
if i.transforms.SyntheticPrefix != nil {
// The seek key is before or after the entire block of keys that start with
// SyntheticPrefix. To determine which, we need to compare against a valid
// key in the block. We use firstUserKey which has the synthetic prefix.
if !bytes.HasPrefix(key, i.transforms.SyntheticPrefix) {
// The seek key is before or after the entire block of keys that start
// with SyntheticPrefix. To determine which, we need to compare against a
// valid key in the block. We use firstUserKey which has the synthetic
// prefix.
if i.cmp(i.firstUserKey, key) >= 0 {
return i.First()
}
Expand Down Expand Up @@ -712,10 +713,11 @@ func (i *Iter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
}
searchKey := key
if i.transforms.SyntheticPrefix != nil {
// The seek key is before or after the entire block of keys that start with
// SyntheticPrefix. To determine which, we need to compare against a valid
// key in the block. We use firstUserKey which has the synthetic prefix.
if !bytes.HasPrefix(key, i.transforms.SyntheticPrefix) {
// The seek key is before or after the entire block of keys that start
// with SyntheticPrefix. To determine which, we need to compare against a
// valid key in the block. We use firstUserKey which has the synthetic
// prefix.
if i.cmp(i.firstUserKey, key) < 0 {
return i.Last()
}
Expand Down
34 changes: 34 additions & 0 deletions sstable/rowblk/testdata/rowblk_fragment_iter
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,37 @@ next
first: a-b:{(#10,RANGEDEL)}
next: b-d:{(#10,RANGEDEL)}
next: d-e:{(#10,RANGEDEL)}

# Tests with synthetic prefix.
iter synthetic-prefix=foo_
first
next
prev
seek-ge foo_a
seek-ge foo_b
seek-lt foo_b
seek-lt foo_z
----
first: foo_a-foo_b:{(#11,RANGEDEL)}
next: foo_b-foo_d:{(#12,RANGEDEL) (#11,RANGEDEL) (#11,RANGEDEL)}
prev: foo_a-foo_b:{(#11,RANGEDEL)}
seek-ge: foo_a-foo_b:{(#11,RANGEDEL)}
seek-ge: foo_b-foo_d:{(#12,RANGEDEL) (#11,RANGEDEL) (#11,RANGEDEL)}
seek-lt: foo_a-foo_b:{(#11,RANGEDEL)}
seek-lt: foo_d-foo_e:{(#12,RANGEDEL) (#11,RANGEDEL)}

# Try seeks that don't have the synthetic prefix.
iter synthetic-prefix=foo_
seek-ge fon_a
seek-ge fop_a
prev
seek-lt fon_a
next
seek-lt fop_a
----
seek-ge: foo_a-foo_b:{(#11,RANGEDEL)}
seek-ge: <nil>
prev: foo_d-foo_e:{(#12,RANGEDEL) (#11,RANGEDEL)}
seek-lt: <nil>
next: foo_a-foo_b:{(#11,RANGEDEL)}
seek-lt: foo_d-foo_e:{(#12,RANGEDEL) (#11,RANGEDEL)}
87 changes: 87 additions & 0 deletions testdata/ingest_external
Original file line number Diff line number Diff line change
Expand Up @@ -909,3 +909,90 @@ next
a@10: (a, .)
b@11: (b, .)
.


# Tests with range tombstones.

reset
----

build-remote points.sst
set a@1 va
set b@1 vb
set c@1 vc
set d@1 vc
----

build-remote ranges.sst
del-range b b1
----

ingest-external
points.sst bounds=(a,d)
----

ingest-external
ranges.sst bounds=(a,d)
----

iter
first
next
next
next
----
a@1: (va, .)
c@1: (vc, .)
.
.

ingest-external
points.sst bounds=(p1_a,p1_d) synthetic-prefix=p1_
points.sst bounds=(p2_b,p2_c) synthetic-prefix=p2_
----

ingest-external
ranges.sst bounds=(p1_a,p1_d) synthetic-prefix=p1_
ranges.sst bounds=(p3_a,p3_d) synthetic-prefix=p3_
----

iter
first
next
next
next
next
next
----
a@1: (va, .)
c@1: (vc, .)
p1_a@1: (va, .)
p1_c@1: (vc, .)
p2_b@1: (vb, .)
.

iter
seek-ge p
next
next
next
----
p1_a@1: (va, .)
p1_c@1: (vc, .)
p2_b@1: (vb, .)
.

iter
seek-lt p3
prev
prev
prev
prev
prev
----
p2_b@1: (vb, .)
p1_c@1: (vc, .)
p1_a@1: (va, .)
c@1: (vc, .)
a@1: (va, .)
.

0 comments on commit fad89cf

Please sign in to comment.