Skip to content

Commit

Permalink
raft: add MaxInflightBytes to Config
Browse files Browse the repository at this point in the history
This commit introduces the max inflight bytes setting at the Config level, and
tests that raft flow control honours it.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
  • Loading branch information
pav-kv committed Nov 8, 2022
1 parent b93cb7d commit 30c9fdb
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 36 deletions.
15 changes: 12 additions & 3 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ type Config struct {
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
MaxInflightMsgs int
// MaxInflightBytes limits the number of in-flight bytes in append messages.
// Complements MaxInflightMsgs. Ignored if zero.
//
// This effectively bounds the bandwidth-delay product. Note that especially
// in high-latency deployments setting this too low can lead to a dramatic
// reduction in throughput. For example, with a peer that has a round-trip
// latency of 100ms to the leader and this setting is set to 1 MB, there is a
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s.
MaxInflightBytes uint64

// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
Expand Down Expand Up @@ -332,7 +342,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand Down Expand Up @@ -484,8 +494,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {

// Send the actual MsgApp otherwise, and update the progress accordingly.
next := pr.Next // save Next for later, as the progress update can change it
// TODO(pavelkalinnikov): set bytes to sum(Entries[].Size())
if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil {
if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
}
r.send(pb.Message{
Expand Down
68 changes: 37 additions & 31 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func TestProgressFlowControl(t *testing.T) {
cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
cfg.MaxInflightMsgs = 3
cfg.MaxSizePerMsg = 2048
cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg.
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()
Expand All @@ -140,7 +141,12 @@ func TestProgressFlowControl(t *testing.T) {
// While node 2 is in probe state, propose a bunch of entries.
r.prs.Progress[2].BecomeProbe()
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 10; i++ {
large := []byte(strings.Repeat("b", 5000))
for i := 0; i < 22; i++ {
blob := blob
if i >= 10 && i < 16 { // Temporarily send large messages.
blob = large
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
}

Expand All @@ -158,40 +164,40 @@ func TestProgressFlowControl(t *testing.T) {
t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
}

// When this append is acked, we change to replicate state and can
// send multiple messages at once.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 3 {
t.Fatalf("expected 3 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
ackAndVerify := func(index uint64, expEntries ...int) uint64 {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index})
ms := r.readMessages()
if got, want := len(ms), len(expEntries); got != want {
t.Fatalf("expected %d messages, got %d", want, got)
}
if len(m.Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
for i, m := range ms {
if got, want := m.Type, pb.MsgApp; got != want {
t.Errorf("%d: expected MsgApp, got %s", i, got)
}
if got, want := len(m.Entries), expEntries[i]; got != want {
t.Errorf("%d: expected %d entries, got %d", i, want, got)
}
}
}

// Ack all three of those messages together and get the last two
// messages (containing three entries).
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 2 {
t.Fatalf("expected 2 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
last := ms[len(ms)-1].Entries
if len(last) == 0 {
return index
}
return last[len(last)-1].Index
}
if len(ms[0].Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
}
if len(ms[1].Entries) != 1 {
t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
}

// When this append is acked, we change to replicate state and can
// send multiple messages at once.
index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2)
// Ack all three of those messages together and get another 3 messages. The
// third message contains a single large entry, in contrast to 2 before.
index = ackAndVerify(index, 2, 1, 1)
// All subsequent messages contain one large entry, and we cap at 2 messages
// because it overflows MaxInflightBytes.
index = ackAndVerify(index, 1, 1)
index = ackAndVerify(index, 1, 1)
// Start getting small messages again.
index = ackAndVerify(index, 1, 2, 2)
ackAndVerify(index, 2)
}

func TestUncommittedEntryLimit(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions raft/tracker/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) {
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256, 0),
Inflights: NewInflights(256, 0),
}
if g := p.IsPaused(); g != tt.w {
t.Errorf("#%d: paused= %t, want %t", i, g, tt.w)
Expand Down

0 comments on commit 30c9fdb

Please sign in to comment.