diff --git a/Gopkg.lock b/Gopkg.lock index e407137dc32e..987878d02244 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1489,14 +1489,17 @@ [[projects]] branch = "master" - digest = "1:71e03c7f9fbead2dd42b87d696a37282dd47ebfb6cdcd7d70ecb8e7bc4f61434" + digest = "1:51f3f946cfff94e96c8d0a8705a2bcd88d17a70408676ff740acd5255f40a537" name = "go.etcd.io/etcd" packages = [ "raft", + "raft/confchange", + "raft/quorum", "raft/raftpb", + "raft/tracker", ] pruneopts = "UT" - revision = "1eee465a43720d713bb69f7b7f5e120135fdb1ac" + revision = "62f4fb3c5eaed2a7614258083b4504f18bf44269" [[projects]] digest = "1:3b5a3bc35810830ded5e26ef9516e933083a2380d8e57371fdfde3c70d7c6952" @@ -2000,6 +2003,7 @@ "github.com/wadey/gocovmerge", "go.etcd.io/etcd/raft", "go.etcd.io/etcd/raft/raftpb", + "go.etcd.io/etcd/raft/tracker", "golang.org/x/crypto/bcrypt", "golang.org/x/crypto/ssh", "golang.org/x/crypto/ssh/agent", diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index c69211f57493..3efba1b1acae 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1694,7 +1694,7 @@ partition_by ::= | 'PARTITION' 'BY' 'NOTHING' create_as_table_defs ::= - ( column_name create_as_col_qual_list ) ( ( ',' create_as_constraint_def | ',' column_name create_as_col_qual_list ) )* + ( column_name create_as_col_qual_list ) ( ( ',' column_name create_as_col_qual_list | ',' family_def | ',' create_as_constraint_def ) )* common_table_expr ::= table_alias_name opt_column_list 'AS' '(' preparable_stmt ')' @@ -2136,6 +2136,7 @@ range_partition ::= create_as_col_qualification ::= create_as_col_qualification_elem + | 'FAMILY' family_name create_as_constraint_elem ::= 'PRIMARY' 'KEY' '(' create_as_params ')' diff --git a/pkg/roachpb/data_test.go b/pkg/roachpb/data_test.go index c1eacce1657a..1ca9df452bb2 100644 --- a/pkg/roachpb/data_test.go +++ b/pkg/roachpb/data_test.go @@ -28,10 +28,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/gogo/protobuf/proto" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) func makeTS(walltime int64, logical int32) hlc.Timestamp { @@ -1559,3 +1562,31 @@ func TestUpdateObservedTimestamps(t *testing.T) { }) } } + +func TestChangeReplicasTrigger_String(t *testing.T) { + defer leaktest.AfterTest(t)() + + l := ReplicaType_LEARNER + v := ReplicaType_VOTER + repl := ReplicaDescriptor{NodeID: 1, StoreID: 2, ReplicaID: 3, Type: &l} + crt := ChangeReplicasTrigger{ + ChangeType: ADD_REPLICA, + Replica: repl, + Desc: &RangeDescriptor{ + RangeID: 1, + StartKey: RKey("a"), + EndKey: RKey("b"), + InternalReplicas: []ReplicaDescriptor{ + repl, + {NodeID: 4, StoreID: 5, ReplicaID: 6, Type: &v}, + {NodeID: 7, StoreID: 8, ReplicaID: 9, Type: &l}, + }, + NextReplicaID: 10, + Generation: proto.Int64(5), + GenerationComparable: proto.Bool(true), + }, + } + act := crt.String() + exp := `ADD_REPLICA((n1,s2):3LEARNER): updated=(n4,s5):6,(n1,s2):3LEARNER,(n7,s8):9LEARNER next=10` + require.Equal(t, exp, act) +} diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 1430e12fc8b1..c8d2a6e26e6d 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -10,7 +10,11 @@ package roachpb -import "sort" +import ( + "fmt" + "sort" + "strings" +) // ReplicaDescriptors is a set of replicas, usually the nodes/stores on which // replicas of a range are stored. @@ -29,6 +33,17 @@ func MakeReplicaDescriptors(replicas []ReplicaDescriptor) ReplicaDescriptors { return ReplicaDescriptors{wrapped: replicas} } +func (d ReplicaDescriptors) String() string { + var buf strings.Builder + for i, desc := range d.wrapped { + if i > 0 { + buf.WriteByte(',') + } + fmt.Fprint(&buf, desc) + } + return buf.String() +} + // Unwrap returns every replica in the set. It is a placeholder for code that // used to work on a slice of replicas until learner replicas are added. At that // point, all uses of Unwrap will be migrated to All/Voters/Learners. diff --git a/pkg/server/status.go b/pkg/server/status.go index 87ee58abe7cb..cb13ad75085f 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1205,7 +1205,7 @@ func (s *statusServer) Ranges( state.Progress[id] = serverpb.RaftState_Progress{ Match: progress.Match, Next: progress.Next, - Paused: progress.Paused, + Paused: progress.IsPaused(), PendingSnapshot: progress.PendingSnapshot, State: progress.State.String(), } diff --git a/pkg/sql/execpb/stats.go b/pkg/sql/execpb/stats.go index 7e4d87ecf781..7a6707970d3a 100644 --- a/pkg/sql/execpb/stats.go +++ b/pkg/sql/execpb/stats.go @@ -38,10 +38,14 @@ func (vs *VectorizedStats) Stats() map[string]string { } else { timeSuffix = executionTimeTagSuffix } + selectivity := float64(0) + if vs.NumBatches > 0 { + selectivity = float64(vs.NumTuples) / float64(coldata.BatchSize*vs.NumBatches) + } return map[string]string{ batchesOutputTagSuffix: fmt.Sprintf("%d", vs.NumBatches), tuplesOutputTagSuffix: fmt.Sprintf("%d", vs.NumTuples), - selectivityTagSuffix: fmt.Sprintf("%.2f", float64(vs.NumTuples)/float64(coldata.BatchSize*vs.NumBatches)), + selectivityTagSuffix: fmt.Sprintf("%.2f", selectivity), timeSuffix: fmt.Sprintf("%v", vs.Time.Round(time.Microsecond)), } } @@ -62,10 +66,14 @@ func (vs *VectorizedStats) StatsForQueryPlan() []string { } else { timeSuffix = executionTimeQueryPlanSuffix } + selectivity := float64(0) + if vs.NumBatches > 0 { + selectivity = float64(vs.NumTuples) / float64(coldata.BatchSize*vs.NumBatches) + } return []string{ fmt.Sprintf("%s: %d", batchesOutputQueryPlanSuffix, vs.NumBatches), fmt.Sprintf("%s: %d", tuplesOutputQueryPlanSuffix, vs.NumTuples), - fmt.Sprintf("%s: %.2f", selectivityQueryPlanSuffix, float64(vs.NumTuples)/float64(coldata.BatchSize*vs.NumBatches)), + fmt.Sprintf("%s: %.2f", selectivityQueryPlanSuffix, selectivity), fmt.Sprintf("%s: %v", timeSuffix, vs.Time.Round(time.Microsecond)), } } diff --git a/pkg/sql/logictest/testdata/logic_test/create_as b/pkg/sql/logictest/testdata/logic_test/create_as index d43a40765d37..23c228aaff8d 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_as +++ b/pkg/sql/logictest/testdata/logic_test/create_as @@ -236,3 +236,65 @@ foo11 CREATE TABLE foo11 ( statement error pq: multiple primary keys for table "foo12" are not allowed CREATE TABLE foo12 (x PRIMARY KEY, y, PRIMARY KEY(y)) AS VALUES (1, 2), (3, 4); +# Check that CREATE TABLE AS allows users to specify column families. +statement ok +CREATE TABLE abcd( + a INT PRIMARY KEY, + b INT, + c INT, + d INT +); + +# Test column qualifiers to define column families. +statement ok +CREATE TABLE foo12 (a PRIMARY KEY FAMILY f1, b, c FAMILY fam_1_c, d) AS SELECT * FROM abcd; + +query TT +SHOW CREATE TABLE foo12 +---- +foo12 CREATE TABLE foo12 ( + a INT8 NOT NULL, + b INT8 NULL, + c INT8 NULL, + d INT8 NULL, + CONSTRAINT "primary" PRIMARY KEY (a ASC), + FAMILY f1 (a, b, d), + FAMILY fam_1_c (c) +) + +# Test constraint style definition of column families. +statement ok +CREATE TABLE foo13 (a, b, c, d, PRIMARY KEY(a, b), FAMILY pk (a, b), FAMILY (c, d)) AS SELECT * FROM abcd; + +query TT +SHOW CREATE TABLE foo13 +---- +foo13 CREATE TABLE foo13 ( + a INT8 NOT NULL, + b INT8 NOT NULL, + c INT8 NULL, + d INT8 NULL, + CONSTRAINT "primary" PRIMARY KEY (a ASC, b ASC), + FAMILY pk (a, b), + FAMILY fam_1_c_d (c, d) +) + +# Test renaming columns still preserves the column families. +statement ok +ALTER TABLE foo13 RENAME d TO z + +statement ok +ALTER TABLE foo13 RENAME c TO e + +query TT +SHOW CREATE TABLE foo13 +---- +foo13 CREATE TABLE foo13 ( + a INT8 NOT NULL, + b INT8 NOT NULL, + e INT8 NULL, + z INT8 NULL, + CONSTRAINT "primary" PRIMARY KEY (a ASC, b ASC), + FAMILY pk (a, b), + FAMILY fam_1_c_d (e, z) +) diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 372af167edaa..560f25f52167 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -257,6 +257,11 @@ func TestParse(t *testing.T) { {`CREATE TABLE IF NOT EXISTS a (z PRIMARY KEY) AS SELECT * FROM b`}, {`CREATE TABLE a (x, y, z, PRIMARY KEY (x, y, z)) AS SELECT * FROM b`}, {`CREATE TABLE IF NOT EXISTS a (x, y, z, PRIMARY KEY (x, y, z)) AS SELECT * FROM b`}, + {`CREATE TABLE a (x, FAMILY (x)) AS SELECT * FROM b`}, + {`CREATE TABLE IF NOT EXISTS a (x, FAMILY (x)) AS SELECT * FROM b`}, + {`CREATE TABLE a (x, y FAMILY f1) AS SELECT * FROM b`}, + {`CREATE TABLE IF NOT EXISTS a (x, y FAMILY f1) AS SELECT * FROM b`}, + {`CREATE TABLE a (b STRING COLLATE de)`}, {`CREATE TABLE a (b STRING(3) COLLATE de)`}, {`CREATE TABLE a (b STRING[] COLLATE de)`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index fea8b3edceb1..0ede2b201474 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -4443,11 +4443,6 @@ create_as_table_defs: var colToTableDef tree.TableDef = tableDef $$.val = tree.TableDefs{colToTableDef} } -| create_as_table_defs ',' create_as_constraint_def - { - var constraintToTableDef tree.TableDef = $3.constraintDef() - $$.val = append($1.tblDefs(), constraintToTableDef) - } | create_as_table_defs ',' column_name create_as_col_qual_list { tableDef, err := tree.NewColumnTableDef(tree.Name($3), nil, false, $4.colQuals()) @@ -4459,6 +4454,15 @@ create_as_table_defs: $$.val = append($1.tblDefs(), colToTableDef) } +| create_as_table_defs ',' family_def + { + $$.val = append($1.tblDefs(), $3.tblDef()) + } +| create_as_table_defs ',' create_as_constraint_def +{ + var constraintToTableDef tree.TableDef = $3.constraintDef() + $$.val = append($1.tblDefs(), constraintToTableDef) +} create_as_constraint_def: create_as_constraint_elem @@ -4508,6 +4512,10 @@ create_as_col_qualification: { $$.val = tree.NamedColumnQualification{Qualification: $1.colQualElem()} } +| FAMILY family_name + { + $$.val = tree.NamedColumnQualification{Qualification: &tree.ColumnFamilyConstraint{Family: tree.Name($2)}} + } create_as_col_qualification_elem: PRIMARY KEY diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 39fe69142a18..e900955c5811 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/pkg/errors" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const ( @@ -1196,7 +1197,7 @@ func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool // behind the actual commit index of the range. if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok { if uint64(replicaID) == raftStatus.Lead || - (progress.State == raft.ProgressStateReplicate && + (progress.State == tracker.StateReplicate && progress.Match >= raftStatus.Commit) { return false } @@ -1214,8 +1215,8 @@ func simulateFilterUnremovableReplicas( brandNewReplicaID roachpb.ReplicaID, ) []roachpb.ReplicaDescriptor { status := *raftStatus - status.Progress[uint64(brandNewReplicaID)] = raft.Progress{ - State: raft.ProgressStateReplicate, + status.Progress[uint64(brandNewReplicaID)] = tracker.Progress{ + State: tracker.StateReplicate, Match: status.Commit, } return filterUnremovableReplicas(&status, replicas, brandNewReplicaID) diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index ccdc1089dac6..c75440510402 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -44,6 +44,7 @@ import ( "github.com/olekukonko/tablewriter" "github.com/pkg/errors" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const firstRange = roachpb.RangeID(1) @@ -825,10 +826,10 @@ func TestAllocatorRebalanceTarget(t *testing.T) { rangeInfo := rangeInfoForRepl(repl, desc) status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for _, replica := range replicas { - status.Progress[uint64(replica.NodeID)] = raft.Progress{ + status.Progress[uint64(replica.NodeID)] = tracker.Progress{ Match: 10, } } @@ -5154,18 +5155,18 @@ func TestFilterBehindReplicas(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = c.leader status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { - p := raft.Progress{ + p := tracker.Progress{ Match: v, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } if v == 0 { - p.State = raft.ProgressStateProbe + p.State = tracker.StateProbe } replicaID := uint64(j + 1) status.Progress[replicaID] = p @@ -5222,7 +5223,7 @@ func TestFilterUnremovableReplicas(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } // Use an invalid replica ID for the leader. TestFilterBehindReplicas covers // valid replica IDs. @@ -5230,12 +5231,12 @@ func TestFilterUnremovableReplicas(t *testing.T) { status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { - p := raft.Progress{ + p := tracker.Progress{ Match: v, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } if v == 0 { - p.State = raft.ProgressStateProbe + p.State = tracker.StateProbe } replicaID := uint64(j + 1) status.Progress[replicaID] = p @@ -5277,7 +5278,7 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } // Use an invalid replica ID for the leader. TestFilterBehindReplicas covers // valid replica IDs. @@ -5285,12 +5286,12 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) { status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { - p := raft.Progress{ + p := tracker.Progress{ Match: v, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } if v == 0 { - p.State = raft.ProgressStateProbe + p.State = tracker.StateProbe } replicaID := uint64(j + 1) status.Progress[replicaID] = p diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 3632d399698e..1519fb1fccc7 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const ( @@ -209,7 +210,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err if pr, ok := raftStatus.Progress[raftStatus.Lead]; ok { // TODO(tschottdorf): remove this line once we have picked up // https://github.com/etcd-io/etcd/pull/10279 - pr.State = raft.ProgressStateReplicate + pr.State = tracker.StateReplicate raftStatus.Progress[raftStatus.Lead] = pr } @@ -229,7 +230,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err func updateRaftProgressFromActivity( ctx context.Context, - prs map[uint64]raft.Progress, + prs map[uint64]tracker.Progress, replicas []roachpb.ReplicaDescriptor, lastUpdate lastUpdateTimesMap, now time.Time, @@ -285,7 +286,7 @@ type truncateDecision struct { func (td *truncateDecision) raftSnapshotsForIndex(index uint64) int { var n int for _, p := range td.Input.RaftStatus.Progress { - if p.State != raft.ProgressStateReplicate { + if p.State != tracker.StateReplicate { // If the follower isn't replicating, we can't trust its Match in // the first place. But note that this shouldn't matter in practice // as we already take care to not cut off these followers when @@ -427,7 +428,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { // overlapping bounds that put significant stress on the Raft snapshot // queue. if progress.RecentActive { - if progress.State == raft.ProgressStateProbe { + if progress.State == tracker.StateProbe { decision.ProtectIndex(decision.Input.FirstIndex, truncatableIndexChosenViaProbingFollower) } else { decision.ProtectIndex(progress.Match, truncatableIndexChosenViaFollowers) @@ -475,7 +476,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { func getQuorumIndex(raftStatus *raft.Status) uint64 { match := make([]uint64, 0, len(raftStatus.Progress)) for _, progress := range raftStatus.Progress { - if progress.State == raft.ProgressStateReplicate { + if progress.State == tracker.StateReplicate { match = append(match, progress.Match) } else { match = append(match, 0) diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 0b188b1152bf..56edc99b2a62 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -32,6 +32,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) func TestShouldTruncate(t *testing.T) { @@ -85,10 +86,13 @@ func TestGetQuorumIndex(t *testing.T) { } for i, c := range testCases { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for j, v := range c.progress { - status.Progress[uint64(j)] = raft.Progress{State: raft.ProgressStateReplicate, Match: v} + status.Progress[uint64(j)] = tracker.Progress{ + State: tracker.StateReplicate, + Match: v, + } } quorumMatchedIndex := getQuorumIndex(status) if c.expected != quorumMatchedIndex { @@ -99,10 +103,10 @@ func TestGetQuorumIndex(t *testing.T) { // Verify that only replicating followers are taken into account (i.e. others // are treated as Match == 0). status := &raft.Status{ - Progress: map[uint64]raft.Progress{ - 1: {State: raft.ProgressStateReplicate, Match: 100}, - 2: {State: raft.ProgressStateSnapshot, Match: 100}, - 3: {State: raft.ProgressStateReplicate, Match: 90}, + Progress: map[uint64]tracker.Progress{ + 1: {State: tracker.StateReplicate, Match: 100}, + 2: {State: tracker.StateSnapshot, Match: 100}, + 3: {State: tracker.StateReplicate, Match: 90}, }, } assert.Equal(t, uint64(90), getQuorumIndex(status)) @@ -211,10 +215,15 @@ func TestComputeTruncateDecision(t *testing.T) { for i, c := range testCases { t.Run("", func(t *testing.T) { status := raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for j, v := range c.progress { - status.Progress[uint64(j)] = raft.Progress{RecentActive: true, State: raft.ProgressStateReplicate, Match: v, Next: v + 1} + status.Progress[uint64(j)] = tracker.Progress{ + RecentActive: true, + State: tracker.StateReplicate, + Match: v, + Next: v + 1, + } } input := truncateDecisionInput{ RaftStatus: status, @@ -277,26 +286,26 @@ func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) { testutils.RunTrueAndFalse(t, "tooLarge", func(t *testing.T, tooLarge bool) { testutils.RunTrueAndFalse(t, "active", func(t *testing.T, active bool) { status := raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for j, v := range []uint64{100, 200, 300, 400, 500} { - var pr raft.Progress + var pr tracker.Progress if v == 100 { // A probing follower is probed with some index (Next) but // it has a zero Match (i.e. no idea how much of its log // agrees with ours). - pr = raft.Progress{ + pr = tracker.Progress{ RecentActive: active, - State: raft.ProgressStateProbe, + State: tracker.StateProbe, Match: 0, Next: v, } } else { // everyone else - pr = raft.Progress{ + pr = tracker.Progress{ Match: v, Next: v + 1, RecentActive: true, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } status.Progress[uint64(j)] = pr @@ -335,15 +344,15 @@ func TestTruncateDecisionNumSnapshots(t *testing.T) { defer leaktest.AfterTest(t)() status := raft.Status{ - Progress: map[uint64]raft.Progress{ + Progress: map[uint64]tracker.Progress{ // Fully caught up. - 5: {State: raft.ProgressStateReplicate, Match: 11, Next: 12}, + 5: {State: tracker.StateReplicate, Match: 11, Next: 12}, // Behind. - 6: {State: raft.ProgressStateReplicate, Match: 10, Next: 11}, + 6: {State: tracker.StateReplicate, Match: 10, Next: 11}, // Last MsgApp in flight, so basically caught up. - 7: {State: raft.ProgressStateReplicate, Match: 10, Next: 12}, - 8: {State: raft.ProgressStateProbe}, // irrelevant - 9: {State: raft.ProgressStateSnapshot}, // irrelevant + 7: {State: tracker.StateReplicate, Match: 10, Next: 12}, + 8: {State: tracker.StateProbe}, // irrelevant + 9: {State: tracker.StateSnapshot}, // irrelevant }, } @@ -374,12 +383,12 @@ func TestUpdateRaftStatusActivity(t *testing.T) { defer leaktest.AfterTest(t)() type testCase struct { - prs []raft.Progress + prs []tracker.Progress replicas []roachpb.ReplicaDescriptor lastUpdate lastUpdateTimesMap now time.Time - exp []raft.Progress + exp []tracker.Progress } now := timeutil.Now() @@ -388,20 +397,20 @@ func TestUpdateRaftStatusActivity(t *testing.T) { // No data, no crash. {}, // No knowledge = no update. - {prs: []raft.Progress{{RecentActive: true}}, exp: []raft.Progress{{RecentActive: true}}}, - {prs: []raft.Progress{{RecentActive: false}}, exp: []raft.Progress{{RecentActive: false}}}, + {prs: []tracker.Progress{{RecentActive: true}}, exp: []tracker.Progress{{RecentActive: true}}}, + {prs: []tracker.Progress{{RecentActive: false}}, exp: []tracker.Progress{{RecentActive: false}}}, // See replica in descriptor but then don't find it in the map. Assumes the follower is not // active. { replicas: []roachpb.ReplicaDescriptor{{ReplicaID: 1}}, - prs: []raft.Progress{{RecentActive: true}}, - exp: []raft.Progress{{RecentActive: false}}, + prs: []tracker.Progress{{RecentActive: true}}, + exp: []tracker.Progress{{RecentActive: false}}, }, // Three replicas in descriptor. The first one responded recently, the second didn't, // the third did but it doesn't have a Progress. { replicas: []roachpb.ReplicaDescriptor{{ReplicaID: 1}, {ReplicaID: 2}, {ReplicaID: 3}}, - prs: []raft.Progress{{RecentActive: false}, {RecentActive: true}}, + prs: []tracker.Progress{{RecentActive: false}, {RecentActive: true}}, lastUpdate: map[roachpb.ReplicaID]time.Time{ 1: now.Add(-1 * MaxQuotaReplicaLivenessDuration / 2), 2: now.Add(-1 - MaxQuotaReplicaLivenessDuration), @@ -409,7 +418,7 @@ func TestUpdateRaftStatusActivity(t *testing.T) { }, now: now, - exp: []raft.Progress{{RecentActive: true}, {RecentActive: false}}, + exp: []tracker.Progress{{RecentActive: true}, {RecentActive: false}}, }, } @@ -417,11 +426,11 @@ func TestUpdateRaftStatusActivity(t *testing.T) { for _, tc := range tcs { t.Run("", func(t *testing.T) { - prs := make(map[uint64]raft.Progress) + prs := make(map[uint64]tracker.Progress) for i, pr := range tc.prs { prs[uint64(i+1)] = pr } - expPRs := make(map[uint64]raft.Progress) + expPRs := make(map[uint64]tracker.Progress) for i, pr := range tc.exp { expPRs[uint64(i+1)] = pr } diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go index 22b6db067f1c..32b6d2aa9328 100644 --- a/pkg/storage/raft_snapshot_queue.go +++ b/pkg/storage/raft_snapshot_queue.go @@ -20,7 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" - "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const ( @@ -66,7 +66,7 @@ func (rq *raftSnapshotQueue) shouldQueue( if status := repl.RaftStatus(); status != nil { // raft.Status.Progress is only populated on the Raft group leader. for _, p := range status.Progress { - if p.State == raft.ProgressStateSnapshot { + if p.State == tracker.StateSnapshot { if log.V(2) { log.Infof(ctx, "raft snapshot needed, enqueuing") } @@ -84,7 +84,7 @@ func (rq *raftSnapshotQueue) process( if status := repl.RaftStatus(); status != nil { // raft.Status.Progress is only populated on the Raft group leader. for id, p := range status.Progress { - if p.State == raft.ProgressStateSnapshot { + if p.State == tracker.StateSnapshot { if log.V(1) { log.Infof(ctx, "sending raft snapshot") } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 0b95f8fe99ee..834e1860bf67 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -870,7 +870,8 @@ func (r *Replica) RaftStatus() *raft.Status { func (r *Replica) raftStatusRLocked() *raft.Status { if rg := r.mu.internalRaftGroup; rg != nil { - return rg.Status() + s := rg.Status() + return &s } return nil } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 96bc69780982..5e045f2ee110 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -38,6 +38,7 @@ import ( "github.com/pkg/errors" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // AdminSplit divides the range into into two ranges using args.SplitKey. @@ -80,17 +81,17 @@ func splitSnapshotWarningStr(rangeID roachpb.RangeID, status *raft.Status) strin // https://github.com/etcd-io/etcd/pull/10279 continue } - if pr.State == raft.ProgressStateReplicate { + if pr.State == tracker.StateReplicate { // This follower is in good working order. continue } s += fmt.Sprintf("; r%d/%d is ", rangeID, replicaID) switch pr.State { - case raft.ProgressStateSnapshot: + case tracker.StateSnapshot: // If the Raft snapshot queue is backed up, replicas can spend // minutes or worse until they are caught up. s += "waiting for a Raft snapshot" - case raft.ProgressStateProbe: + case tracker.StateProbe: // Assuming the split has already been delayed for a little bit, // seeing a follower that is probing hints at some problem with // Raft or Raft message delivery. (Of course it's possible that diff --git a/pkg/storage/replica_proposal_quota.go b/pkg/storage/replica_proposal_quota.go index 4272bdbea534..0f06b01226fd 100644 --- a/pkg/storage/replica_proposal_quota.go +++ b/pkg/storage/replica_proposal_quota.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) // MaxQuotaReplicaLivenessDuration is the maximum duration that a replica @@ -144,9 +145,9 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // Find the minimum index that active followers have acknowledged. now := timeutil.Now() - status := r.mu.internalRaftGroup.StatusWithoutProgress() + status := r.mu.internalRaftGroup.BasicStatus() commitIndex, minIndex := status.Commit, status.Commit - r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress raft.Progress) { + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) { rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) if !ok { return diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index c3bc51c7c424..9bf63cfc0dd8 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -37,6 +37,7 @@ import ( "github.com/pkg/errors" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) func makeIDKey() storagebase.CmdIDKey { @@ -1059,8 +1060,8 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { // below for more context: _ = maybeDropMsgApp // NB: this code is allocation free. - r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr raft.Progress) { - if id == msg.To && pr.State == raft.ProgressStateProbe { + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr tracker.Progress) { + if id == msg.To && pr.State == tracker.StateProbe { // It is moderately expensive to attach a full key to the message, but note that // a probing follower will only be appended to once per heartbeat interval (i.e. // on the order of seconds). See: @@ -1338,7 +1339,7 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) { leaseStatus := r.leaseStatus(*r.mu.state.Lease, r.store.Clock().Now(), r.mu.minLeaseProposedTS) raftStatus := r.mu.internalRaftGroup.Status() - if shouldCampaignOnWake(leaseStatus, *r.mu.state.Lease, r.store.StoreID(), *raftStatus) { + if shouldCampaignOnWake(leaseStatus, *r.mu.state.Lease, r.store.StoreID(), raftStatus) { log.VEventf(ctx, 3, "campaigning") if err := r.mu.internalRaftGroup.Campaign(); err != nil { log.VEventf(ctx, 1, "failed to campaign: %s", err) @@ -1367,10 +1368,10 @@ func (m lastUpdateTimesMap) update(replicaID roachpb.ReplicaID, now time.Time) { // a suitable pattern of quiesce and unquiesce operations (and this in turn // can interfere with Raft log truncations). func (m lastUpdateTimesMap) updateOnUnquiesce( - descs []roachpb.ReplicaDescriptor, prs map[uint64]raft.Progress, now time.Time, + descs []roachpb.ReplicaDescriptor, prs map[uint64]tracker.Progress, now time.Time, ) { for _, desc := range descs { - if prs[uint64(desc.ReplicaID)].State == raft.ProgressStateReplicate { + if prs[uint64(desc.ReplicaID)].State == tracker.StateReplicate { m.update(desc.ReplicaID, now) } } diff --git a/pkg/storage/replica_raft_test.go b/pkg/storage/replica_raft_test.go index 6206a9f94503..d1604d1fd545 100644 --- a/pkg/storage/replica_raft_test.go +++ b/pkg/storage/replica_raft_test.go @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) func TestLastUpdateTimesMap(t *testing.T) { @@ -37,14 +37,14 @@ func TestLastUpdateTimesMap(t *testing.T) { t4 := t3.Add(time.Second) descs = append(descs, []roachpb.ReplicaDescriptor{{ReplicaID: 5}, {ReplicaID: 6}}...) - prs := map[uint64]raft.Progress{ - 1: {State: raft.ProgressStateReplicate}, // should be updated + prs := map[uint64]tracker.Progress{ + 1: {State: tracker.StateReplicate}, // should be updated // 2 is missing because why not - 3: {State: raft.ProgressStateProbe}, // should be ignored - 4: {State: raft.ProgressStateSnapshot}, // should be ignored - 5: {State: raft.ProgressStateProbe}, // should be ignored - 6: {State: raft.ProgressStateReplicate}, // should be added - 7: {State: raft.ProgressStateReplicate}, // ignored, not in descs + 3: {State: tracker.StateProbe}, // should be ignored + 4: {State: tracker.StateSnapshot}, // should be ignored + 5: {State: tracker.StateProbe}, // should be ignored + 6: {State: tracker.StateReplicate}, // should be added + 7: {State: tracker.StateReplicate}, // ignored, not in descs } m.updateOnUnquiesce(descs, prs, t4) assert.EqualValues(t, map[roachpb.ReplicaID]time.Time{ diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 020ac12f56a9..4e2d28c0e225 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -63,6 +63,7 @@ import ( "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // allSpans is a SpanSet that covers *everything* for use in tests that don't @@ -133,17 +134,19 @@ func leaseExpiry(repl *Replica) int64 { // Create a Raft status that shows everyone fully up to date. func upToDateRaftStatus(repls []roachpb.ReplicaDescriptor) *raft.Status { - prs := make(map[uint64]raft.Progress) + prs := make(map[uint64]tracker.Progress) for _, repl := range repls { - prs[uint64(repl.ReplicaID)] = raft.Progress{ - State: raft.ProgressStateReplicate, + prs[uint64(repl.ReplicaID)] = tracker.Progress{ + State: tracker.StateReplicate, Match: 100, } } return &raft.Status{ - HardState: raftpb.HardState{Commit: 100}, - SoftState: raft.SoftState{Lead: 1, RaftState: raft.StateLeader}, - Progress: prs, + BasicStatus: raft.BasicStatus{ + HardState: raftpb.HardState{Commit: 100}, + SoftState: raft.SoftState{Lead: 1, RaftState: raft.StateLeader}, + }, + Progress: prs, } } @@ -8143,14 +8146,14 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { func TestReplicaMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - progress := func(vals ...uint64) map[uint64]raft.Progress { - m := make(map[uint64]raft.Progress) + progress := func(vals ...uint64) map[uint64]tracker.Progress { + m := make(map[uint64]tracker.Progress) for i, v := range vals { - m[uint64(i+1)] = raft.Progress{Match: v} + m[uint64(i+1)] = tracker.Progress{Match: v} } return m } - status := func(lead uint64, progress map[uint64]raft.Progress) *raft.Status { + status := func(lead uint64, progress map[uint64]tracker.Progress) *raft.Status { status := &raft.Status{ Progress: progress, } @@ -9076,20 +9079,22 @@ func TestShouldReplicaQuiesce(t *testing.T) { }, }, status: &raft.Status{ - ID: 1, - HardState: raftpb.HardState{ - Commit: logIndex, - }, - SoftState: raft.SoftState{ - RaftState: raft.StateLeader, + BasicStatus: raft.BasicStatus{ + ID: 1, + HardState: raftpb.HardState{ + Commit: logIndex, + }, + SoftState: raft.SoftState{ + RaftState: raft.StateLeader, + }, + Applied: logIndex, + LeadTransferee: 0, }, - Applied: logIndex, - Progress: map[uint64]raft.Progress{ + Progress: map[uint64]tracker.Progress{ 1: {Match: logIndex}, 2: {Match: logIndex}, 3: {Match: logIndex}, }, - LeadTransferee: 0, }, lastIndex: logIndex, raftReady: false, @@ -9153,7 +9158,7 @@ func TestShouldReplicaQuiesce(t *testing.T) { }) for _, i := range []uint64{1, 2, 3} { test(false, func(q *testQuiescer) *testQuiescer { - q.status.Progress[i] = raft.Progress{Match: invalidIndex} + q.status.Progress[i] = tracker.Progress{Match: invalidIndex} return q }) } @@ -9187,7 +9192,7 @@ func TestShouldReplicaQuiesce(t *testing.T) { for _, i := range []uint64{1, 2, 3} { test(true, func(q *testQuiescer) *testQuiescer { q.livenessMap[roachpb.NodeID(i)] = IsLiveMapEntry{IsLive: false} - q.status.Progress[i] = raft.Progress{Match: invalidIndex} + q.status.Progress[i] = tracker.Progress{Match: invalidIndex} return q }) } @@ -9715,30 +9720,30 @@ func TestReplicaShouldCampaignOnWake(t *testing.T) { }, } - followerWithoutLeader := raft.Status{ + followerWithoutLeader := raft.Status{BasicStatus: raft.BasicStatus{ SoftState: raft.SoftState{ RaftState: raft.StateFollower, Lead: 0, }, - } - followerWithLeader := raft.Status{ + }} + followerWithLeader := raft.Status{BasicStatus: raft.BasicStatus{ SoftState: raft.SoftState{ RaftState: raft.StateFollower, Lead: 1, }, - } - candidate := raft.Status{ + }} + candidate := raft.Status{BasicStatus: raft.BasicStatus{ SoftState: raft.SoftState{ RaftState: raft.StateCandidate, Lead: 0, }, - } - leader := raft.Status{ + }} + leader := raft.Status{BasicStatus: raft.BasicStatus{ SoftState: raft.SoftState{ RaftState: raft.StateLeader, Lead: 1, }, - } + }} tests := []struct { leaseStatus storagepb.LeaseStatus @@ -11455,7 +11460,7 @@ func TestSplitSnapshotWarningStr(t *testing.T) { assert.Equal(t, "", splitSnapshotWarningStr(12, status)) pr := status.Progress[2] - pr.State = raft.ProgressStateProbe + pr.State = tracker.StateProbe status.Progress[2] = pr assert.Equal( @@ -11464,7 +11469,7 @@ func TestSplitSnapshotWarningStr(t *testing.T) { splitSnapshotWarningStr(12, status), ) - pr.State = raft.ProgressStateSnapshot + pr.State = tracker.StateSnapshot assert.Equal( t, diff --git a/pkg/storage/split_delay_helper.go b/pkg/storage/split_delay_helper.go index 2a451d232f62..de180f82cce2 100644 --- a/pkg/storage/split_delay_helper.go +++ b/pkg/storage/split_delay_helper.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) type splitDelayHelperI interface { @@ -106,13 +107,7 @@ func maybeDelaySplitToAvoidSnapshot(ctx context.Context, sdh splitDelayHelperI) done := true for replicaID, pr := range raftStatus.Progress { - if replicaID == raftStatus.Lead { - // TODO(tschottdorf): remove this once we have picked up - // https://github.com/etcd-io/etcd/pull/10279 - continue - } - - if pr.State != raft.ProgressStateReplicate { + if pr.State != tracker.StateReplicate { if !pr.RecentActive { if ticks == 0 { // Having set done = false, we make sure we're not exiting early. diff --git a/pkg/storage/split_delay_helper_test.go b/pkg/storage/split_delay_helper_test.go index e628af04ccab..8e13bd061dd3 100644 --- a/pkg/storage/split_delay_helper_test.go +++ b/pkg/storage/split_delay_helper_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) type testSplitDelayHelper struct { @@ -84,8 +85,8 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: raft.ProgressStateProbe}, + Progress: map[uint64]tracker.Progress{ + 2: {State: tracker.StateProbe}, }, }, } @@ -96,23 +97,27 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { assert.Equal(t, 1, h.emptyProposed) }) - for _, state := range []raft.ProgressStateType{raft.ProgressStateProbe, raft.ProgressStateSnapshot} { + for _, state := range []tracker.StateType{tracker.StateProbe, tracker.StateSnapshot} { t.Run(state.String(), func(t *testing.T) { h := &testSplitDelayHelper{ numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: state, RecentActive: true, Paused: true /* unifies string output below */}, + Progress: map[uint64]tracker.Progress{ + 2: { + State: state, + RecentActive: true, + ProbeSent: true, // Unifies string output below. + Inflights: &tracker.Inflights{}, + }, // Healthy follower just for kicks. - 3: {State: raft.ProgressStateReplicate}, + 3: {State: tracker.StateReplicate}, }, }, } s := maybeDelaySplitToAvoidSnapshot(ctx, h) - assert.Equal(t, "; replica r1/2 not caught up: next = 0, match = 0, state = "+ - state.String()+ - ", waiting = true, pendingSnapshot = 0; delayed split for 5.0s to avoid Raft snapshot (without success)", s) + assert.Equal(t, "; replica r1/2 not caught up: "+state.String()+ + " match=0 next=0 paused; delayed split for 5.0s to avoid Raft snapshot (without success)", s) assert.Equal(t, 5, h.slept) assert.Equal(t, 5, h.emptyProposed) }) @@ -123,8 +128,8 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: raft.ProgressStateReplicate}, // intentionally not recently active + Progress: map[uint64]tracker.Progress{ + 2: {State: tracker.StateReplicate}, // intentionally not recently active }, }, } @@ -139,8 +144,8 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: raft.ProgressStateProbe, RecentActive: true}, + Progress: map[uint64]tracker.Progress{ + 2: {State: tracker.StateProbe, RecentActive: true, Inflights: &tracker.Inflights{}}, }, }, } @@ -148,7 +153,7 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { h.sleep = func() { if h.slept == 2 { pr := h.raftStatus.Progress[2] - pr.State = raft.ProgressStateReplicate + pr.State = tracker.StateReplicate h.raftStatus.Progress[2] = pr } } diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go index a4b9eeef543a..affacafc26e6 100644 --- a/pkg/storage/store_rebalancer_test.go +++ b/pkg/storage/store_rebalancer_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/gogo/protobuf/proto" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) var ( @@ -135,14 +136,14 @@ func TestChooseLeaseToTransfer(t *testing.T) { // raft status with one that always returns all replicas as up to date. sr.getRaftStatusFn = func(r *Replica) *raft.Status { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { - status.Progress[uint64(replica.ReplicaID)] = raft.Progress{ + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ Match: 1, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } return status @@ -218,14 +219,14 @@ func TestChooseReplicaToRebalance(t *testing.T) { // raft status with one that always returns all replicas as up to date. sr.getRaftStatusFn = func(r *Replica) *raft.Status { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { - status.Progress[uint64(replica.ReplicaID)] = raft.Progress{ + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ Match: 1, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } return status @@ -331,7 +332,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // are caught up). We thus shouldn't transfer a lease to s5. sr.getRaftStatusFn = func(r *Replica) *raft.Status { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) status.Commit = 1 @@ -340,9 +341,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { if replica.StoreID == roachpb.StoreID(5) { match = 0 } - status.Progress[uint64(replica.ReplicaID)] = raft.Progress{ + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ Match: match, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } return status diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 20ff98fb09d3..c91e20102b61 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "io" - "math" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -35,10 +34,6 @@ import ( ) const ( - // preemptiveSnapshotRaftGroupID is a bogus ID for which a Raft group is - // temporarily created during the application of a preemptive snapshot. - preemptiveSnapshotRaftGroupID = math.MaxUint64 - // Messages that provide detail about why a preemptive snapshot was rejected. snapshotStoreTooFullMsg = "store almost out of disk space" snapshotApplySemBusyMsg = "store busy applying snapshots" diff --git a/pkg/storage/store_snapshot_preemptive.go b/pkg/storage/store_snapshot_preemptive.go index 93304c9107c8..207438203eb7 100644 --- a/pkg/storage/store_snapshot_preemptive.go +++ b/pkg/storage/store_snapshot_preemptive.go @@ -282,6 +282,16 @@ func (s *Store) processPreemptiveSnapshotRequest( if err != nil { return roachpb.NewError(err) } + // We need to create a temporary RawNode to process the snapshot. Raft + // internally runs safety checks on the snapshot, among them one that + // verifies that the peer is actually part of the configuration encoded + // in the snapshot. Awkwardly, it isn't actually a peer (preemptive + // snapshot...). To get around this, pretend the RawNode has the ID of a + // peer we know exists, namely the one that sent us the snap. This won't + // be persisted anywhere, and since we're only using the RawNode for + // this one snapshot, everything is ok. However, we'll make sure that + // no messages are sent in the resulting Ready. + preemptiveSnapshotRaftGroupID := uint64(snapHeader.RaftMessageRequest.FromReplica.ReplicaID) raftGroup, err := raft.NewRawNode( newRaftConfig( raft.Storage((*replicaRaftStorage)(r)), @@ -308,6 +318,9 @@ func (s *Store) processPreemptiveSnapshotRequest( var ready raft.Ready if raftGroup.HasReady() { ready = raftGroup.Ready() + // See the comment above - we don't want this temporary Raft group + // to contact the outside world. Apply the snapshot and that's it. + ready.Messages = nil } if needTombstone { diff --git a/vendor b/vendor index 40ed5ce4c6da..ab736d516a48 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 40ed5ce4c6daba74cb03d977f3bde0f6e60aa3d4 +Subproject commit ab736d516a4867f3179c0d703cdca9c29a9ec4b3