diff --git a/raft/raft.go b/raft/raft.go index 1a599557c0d5..c585328b381a 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -160,6 +160,16 @@ type Config struct { // overflowing that sending buffer. TODO (xiangli): feedback to application to // limit the proposal rate? MaxInflightMsgs int + // MaxInflightBytes limits the number of in-flight bytes in append messages. + // Complements MaxInflightMsgs. Ignored if zero. + // + // This effectively bounds the bandwidth-delay product. Note that especially + // in high-latency deployments setting this too low can lead to a dramatic + // reduction in throughput. For example, with a peer that has a round-trip + // latency of 100ms to the leader and this setting is set to 1 MB, there is a + // throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops + // to 2.5 MB/s. + MaxInflightBytes uint64 // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. @@ -332,7 +342,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -484,8 +494,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - // TODO(pavelkalinnikov): set bytes to sum(Entries[].Size()) - if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ diff --git a/raft/raft_test.go b/raft/raft_test.go index 2acafc4bf6f0..3599dfa7f9f8 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -130,6 +130,7 @@ func TestProgressFlowControl(t *testing.T) { cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) cfg.MaxInflightMsgs = 3 cfg.MaxSizePerMsg = 2048 + cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg. r := newRaft(cfg) r.becomeCandidate() r.becomeLeader() @@ -140,7 +141,12 @@ func TestProgressFlowControl(t *testing.T) { // While node 2 is in probe state, propose a bunch of entries. r.prs.Progress[2].BecomeProbe() blob := []byte(strings.Repeat("a", 1000)) - for i := 0; i < 10; i++ { + large := []byte(strings.Repeat("b", 5000)) + for i := 0; i < 22; i++ { + blob := blob + if i >= 10 && i < 16 { // Temporarily send large messages. + blob = large + } r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) } @@ -158,40 +164,40 @@ func TestProgressFlowControl(t *testing.T) { t.Fatalf("unexpected entry sizes: %v", ms[0].Entries) } - // When this append is acked, we change to replicate state and can - // send multiple messages at once. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 3 { - t.Fatalf("expected 3 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + ackAndVerify := func(index uint64, expEntries ...int) uint64 { + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index}) + ms := r.readMessages() + if got, want := len(ms), len(expEntries); got != want { + t.Fatalf("expected %d messages, got %d", want, got) } - if len(m.Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries)) + for i, m := range ms { + if got, want := m.Type, pb.MsgApp; got != want { + t.Errorf("%d: expected MsgApp, got %s", i, got) + } + if got, want := len(m.Entries), expEntries[i]; got != want { + t.Errorf("%d: expected %d entries, got %d", i, want, got) + } } - } - - // Ack all three of those messages together and get the last two - // messages (containing three entries). - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 2 { - t.Fatalf("expected 2 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + last := ms[len(ms)-1].Entries + if len(last) == 0 { + return index } + return last[len(last)-1].Index } - if len(ms[0].Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries)) - } - if len(ms[1].Entries) != 1 { - t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries)) - } + + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2) + // Ack all three of those messages together and get another 3 messages. The + // third message contains a single large entry, in contrast to 2 before. + index = ackAndVerify(index, 2, 1, 1) + // All subsequent messages contain one large entry, and we cap at 2 messages + // because it overflows MaxInflightBytes. + index = ackAndVerify(index, 1, 1) + index = ackAndVerify(index, 1, 1) + // Start getting small messages again. + index = ackAndVerify(index, 1, 2, 2) + ackAndVerify(index, 2) } func TestUncommittedEntryLimit(t *testing.T) {