Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
31556: importccl: re-enable job control tests r=mjibson a=mjibson

I tracked down the problem. It was that after the CANCEL JOB was issued,
sometimes the 2nd stage of the IMPORT (the shuffle) would have started,
and sometimes it wouldn't have. If it did not start then RunJob would
block forever trying to send on the allowResponse chan. Fix this by
making a draining go routine after the first block.

Closes #24623
Closes #24658

Release note: None

31627: storage: remove spurious call to maybeInlineSideloadedRaftCommand r=nvanbenschoten,benesch a=tschottdorf

Entries are "thinned" only when passed to `r.append()` (i.e. written to
disk) and they are always returned "fat" from `Entries()` (i.e. Raft's way
to get entries from disk). Consequently Raft never sees thin entries and
won't ask us to commit them.

Touches #31618.

Release note: None

31695: bitarray: don't allow FromEncodingParts to return invalid bit array r=knz a=benesch

It is invalid for a bit array's lastBitsUsed field to be greater than
64. The FromEncodingParts function, however, would happily construct
an invalid bitarray if given a too-large lastBitsUsed value. Teach the
FromEncodingParts to return an error instead.

This presented as a panic when attempting to pretty-print a key with a
bitarray whose lastBitsUsed encoded value was 65. Such a key can be
created when calling PrefixEnd on a key with a bitarray whose
lastBitsUsed value is 64. By returning an error instead, the
pretty-printing code will try again after calling UndoPrefixEnd and be
able to print the key.

Fix #31115.

Release note: None

31697: partitionccl: deflake TestDropIndexWithZoneConfigCCL r=danhhz,eriktrinh a=benesch

A particularly adversarial goroutine schedule can cause this test to
observe the moment in time where the data is dropped but the zone config
is not. Deflake by retrying the check for the dropped zone config until
it succeeds (or times out).

Fix #31678.

Release note: None

Co-authored-by: Matt Jibson <matt.jibson@gmail.com>
Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
Co-authored-by: Nikhil Benesch <nikhil.benesch@gmail.com>
  • Loading branch information
4 people committed Oct 22, 2018
5 parents 9f994ab + c6095ad + 5500cb5 + eaf5808 + 92ce01b commit b849528
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 42 deletions.
36 changes: 25 additions & 11 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1498,7 +1499,6 @@ func BenchmarkConvertRecord(b *testing.B) {
// work as intended on import jobs.
func TestImportControlJob(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skipf("#24658")

defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
Expand Down Expand Up @@ -1527,15 +1527,33 @@ func TestImportControlJob(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])
sqlDB.Exec(t, `CREATE DATABASE data`)

t.Run("cancel", func(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE cancelimport`)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
makeSrv := func() *httptest.Server {
var once sync.Once
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
<-allowResponse
// The following code correctly handles both the case where, after the
// CANCEL JOB is issued, the second stage of the IMPORT (the shuffle,
// after the sampling) may or may not be started. If it was started, then a
// second GET request is done. The once here will cause that request to not
// block. The draining for loop below will cause jobutils.RunJob's second send
// on allowResponse to succeed (which it does after issuing the CANCEL JOB).
once.Do(func() {
<-allowResponse
go func() {
for range allowResponse {
}
}()
})

_, _ = w.Write([]byte(r.URL.Path[1:]))
}
}))
}

t.Run("cancel", func(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE cancelimport`)

srv := makeSrv()
defer srv.Close()

var urls []string
Expand Down Expand Up @@ -1567,11 +1585,7 @@ func TestImportControlJob(t *testing.T) {

sqlDB.Exec(t, `CREATE DATABASE pauseimport`)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(r.URL.Path[1:]))
}
}))
srv := makeSrv()
defer srv.Close()

count := 100
Expand Down
26 changes: 13 additions & 13 deletions pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package partitionccl

import (
"context"
"fmt"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -124,19 +125,18 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
} else if l := 0; len(kvs) != l {
return errors.Errorf("expected %d key value pairs, but got %d", l, len(kvs))
}
sqlDB.QueryRow(t, "SELECT config FROM system.zones WHERE id = $1", tableDesc.ID).Scan(&buf)
if err := protoutil.Unmarshal(buf, cfg); err != nil {
return err
}
if exists := subzoneExists(cfg, 1, ""); !exists {
return errors.New("zone config for primary index removed after dropping secondary index")
}
for _, target := range subzones[1:] {
if exists := subzoneExists(cfg, target.index, target.partition); exists {
return fmt.Errorf(`zone config for %v still exists`, target)
}
}
return nil
})

sqlDB.QueryRow(t, "SELECT config FROM system.zones WHERE id = $1", tableDesc.ID).Scan(&buf)
if err := protoutil.Unmarshal(buf, cfg); err != nil {
t.Fatal(err)
}
if exists := subzoneExists(cfg, 1, ""); !exists {
t.Fatal("zone config for primary index removed after dropping secondary index")
}
for _, target := range subzones[1:] {
if exists := subzoneExists(cfg, target.index, target.partition); exists {
t.Fatalf(`zone config for %v still exists`, target)
}
}
}
5 changes: 5 additions & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func TestPrettyPrint(t *testing.T) {
{makeKey(MakeTablePrefix(42),
roachpb.RKey(encoding.EncodeBitArrayDescending(nil, bitArray))),
"/Table/42/B00111010"},
// Regression test for #31115.
{roachpb.Key(makeKey(MakeTablePrefix(42),
roachpb.RKey(encoding.EncodeBitArrayAscending(nil, bitarray.MakeZeroBitArray(64))),
)).PrefixEnd(),
"/Table/42/B0000000000000000000000000000000000000000000000000000000000000000/PrefixEnd"},
{makeKey(MakeTablePrefix(42),
roachpb.RKey(durationAsc)),
"/Table/42/1mon1d1s"},
Expand Down
13 changes: 3 additions & 10 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4323,16 +4323,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
for _, e := range rd.CommittedEntries {
switch e.Type {
case raftpb.EntryNormal:
// Committed entries come straight from the Raft log. Consequently,
// sideloaded SSTables are not usually inlined.
if newEnt, err := maybeInlineSideloadedRaftCommand(
ctx, r.RangeID, e, r.raftMu.sideloaded, r.store.raftEntryCache,
); err != nil {
const expl = "maybeInlineSideloadedRaftCommand"
return stats, expl, errors.Wrap(err, expl)
} else if newEnt != nil {
e = *newEnt
}
// NB: Committed entries are handed to us by Raft. Raft does not
// know about sideloading. Consequently the entries here are all
// already inlined.

var commandID storagebase.CmdIDKey
var command storagepb.RaftCommand
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, e
readonly := r.store.Engine().NewReadOnly()
defer readonly.Close()
ctx := r.AnnotateCtx(context.TODO())
if r.raftMu.sideloaded == nil {
return nil, errors.New("sideloaded storage is uninitialized")
}
return entries(ctx, r.mu.stateLoader, readonly, r.RangeID, r.store.raftEntryCache,
r.raftMu.sideloaded, lo, hi, maxBytes)
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/util/bitarray/bitarray.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (d BitArray) Clone() BitArray {
// MakeZeroBitArray creates a bit array with the specified bit size.
func MakeZeroBitArray(bitLen uint) BitArray {
a, b := EncodingPartsForBitLen(bitLen)
return FromEncodingParts(a, b)
return mustFromEncodingParts(a, b)
}

// ToWidth resizes the bit array to the specified size.
Expand All @@ -128,7 +128,7 @@ func (d BitArray) ToWidth(desiredLen uint) BitArray {
words, lastBitsUsed := EncodingPartsForBitLen(desiredLen)
copy(words, d.words[:len(words)])
words[len(words)-1] &= (^word(0) << (numBitsPerWord - lastBitsUsed))
return FromEncodingParts(words, lastBitsUsed)
return mustFromEncodingParts(words, lastBitsUsed)
}

// New length is larger.
Expand All @@ -140,7 +140,7 @@ func (d BitArray) ToWidth(desiredLen uint) BitArray {
words = make([]word, numWords)
copy(words, d.words)
}
return FromEncodingParts(words, lastBitsUsed)
return mustFromEncodingParts(words, lastBitsUsed)
}

// Sizeof returns the size in bytes of the bit array and its components.
Expand Down Expand Up @@ -346,7 +346,7 @@ func Parse(s string) (res BitArray, err error) {
words[wordIdx] = curWord
}

return FromEncodingParts(words, lastBitsUsed), nil
return FromEncodingParts(words, lastBitsUsed)
}

// Concat concatenates two bit arrays.
Expand Down Expand Up @@ -481,11 +481,24 @@ func (d BitArray) EncodingParts() ([]uint64, uint64) {
}

// FromEncodingParts creates a bit array from the encoding parts.
func FromEncodingParts(words []uint64, lastBitsUsed uint64) BitArray {
func FromEncodingParts(words []uint64, lastBitsUsed uint64) (BitArray, error) {
if lastBitsUsed > numBitsPerWord {
return BitArray{}, fmt.Errorf("FromEncodingParts: lastBitsUsed must not exceed %d, got %d",
numBitsPerWord, lastBitsUsed)
}
return BitArray{
words: words,
lastBitsUsed: uint8(lastBitsUsed),
}, nil
}

// mustFromEncodingParts is like FromEncodingParts but errors cause a panic.
func mustFromEncodingParts(words []uint64, lastBitsUsed uint64) BitArray {
ba, err := FromEncodingParts(words, lastBitsUsed)
if err != nil {
panic(err)
}
return ba
}

// Rand generates a random bit array of the specified length.
Expand Down
27 changes: 27 additions & 0 deletions pkg/util/bitarray/bitarray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ func TestParseFormat(t *testing.T) {
}
}

func TestFromEncodingParts(t *testing.T) {
testData := []struct {
words []uint64
lastBitsUsed uint64
ba BitArray
err string
}{
{nil, 0, BitArray{words: nil, lastBitsUsed: 0}, ""},
{[]uint64{0}, 0, BitArray{words: []word{0}, lastBitsUsed: 0}, ""},
{[]uint64{42}, 3, BitArray{words: []word{42}, lastBitsUsed: 3}, ""},
{[]uint64{42}, 65, BitArray{}, "FromEncodingParts: lastBitsUsed must not exceed 64, got 65"},
}

for _, test := range testData {
t.Run(fmt.Sprintf("{%v,%d}", test.words, test.lastBitsUsed), func(t *testing.T) {
ba, err := FromEncodingParts(test.words, test.lastBitsUsed)
if test.err != "" && (err == nil || test.err != err.Error()) {
t.Errorf("expected %q error, but got: %+v", test.err, err)
} else if test.err == "" && err != nil {
t.Errorf("unexpected error: %s", err)
} else if !reflect.DeepEqual(ba, test.ba) {
t.Errorf("expected %+v, got %+v", test.ba, ba)
}
})
}
}

func TestToWidth(t *testing.T) {
testData := []struct {
str string
Expand Down
15 changes: 12 additions & 3 deletions pkg/util/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,11 @@ func DecodeBitArrayAscending(b []byte) ([]byte, bitarray.BitArray, error) {
}
b = b[1:]
b, lastVal, err := DecodeUvarintAscending(b)
return b, bitarray.FromEncodingParts(words, lastVal), err
if err != nil {
return b, bitarray.BitArray{}, err
}
ba, err := bitarray.FromEncodingParts(words, lastVal)
return b, ba, err
}

var errBitArrayTerminatorMissing = errors.New("cannot find bit array data terminator")
Expand Down Expand Up @@ -1165,7 +1169,11 @@ func DecodeBitArrayDescending(b []byte) ([]byte, bitarray.BitArray, error) {
}
b = b[1:]
b, lastVal, err := DecodeUvarintDescending(b)
return b, bitarray.FromEncodingParts(words, lastVal), err
if err != nil {
return b, bitarray.BitArray{}, err
}
ba, err := bitarray.FromEncodingParts(words, lastVal)
return b, ba, err
}

// Type represents the type of a value encoded by
Expand Down Expand Up @@ -2121,7 +2129,8 @@ func DecodeUntaggedBitArrayValue(b []byte) (remaining []byte, d bitarray.BitArra
}
words[i] = val
}
return b, bitarray.FromEncodingParts(words, lastBitsUsed), nil
ba, err := bitarray.FromEncodingParts(words, lastBitsUsed)
return b, ba, err
}

const uuidValueEncodedLength = 16
Expand Down

0 comments on commit b849528

Please sign in to comment.