Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
Bump etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Dec 18, 2018
1 parent e564efe commit 7384d66
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 35 deletions.
19 changes: 19 additions & 0 deletions go.etcd.io/etcd/raft/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
approvers:
- heyitsanthony
- philips
- fanminshi
- gyuho
- mitake
- jpbetz
- xiang90
- bdarnell
reviewers:
- heyitsanthony
- philips
- fanminshi
- gyuho
- mitake
- jpbetz
- xiang90
- bdarnell
- tschottdorf
14 changes: 8 additions & 6 deletions go.etcd.io/etcd/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type raftLog struct {

logger Logger

maxMsgSize uint64
// maxNextEntsSize is the maximum number aggregate byte size of the messages
// returned from calls to nextEnts.
maxNextEntsSize uint64
}

// newLog returns log using the given storage and default options. It
Expand All @@ -51,14 +53,14 @@ func newLog(storage Storage, logger Logger) *raftLog {

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
maxMsgSize: maxMsgSize,
storage: storage,
logger: logger,
maxNextEntsSize: maxNextEntsSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
Expand Down Expand Up @@ -149,7 +151,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand Down
14 changes: 12 additions & 2 deletions go.etcd.io/etcd/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ type Node interface {
Tick()
// Campaign causes the Node to transition to candidate state and start campaigning to become leader.
Campaign(ctx context.Context) error
// Propose proposes that data be appended to the log.
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
Expand Down Expand Up @@ -174,7 +175,16 @@ type Node interface {
Status() Status
// ReportUnreachable reports the given node is not reachable for the last send.
ReportUnreachable(id uint64)
// ReportSnapshot reports the status of the sent snapshot.
// ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
// who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
// Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
// snapshot (for e.g., while streaming it from leader to follower), should be reported to the
// leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
// log probes until the follower can apply the snapshot and advance its state. If the follower
// can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any
// updates from the leader. Therefore, it is crucial that the application ensures that any
// failure in snapshot sending is caught and reported back to the leader; so it can resume raft
// log probing in the follower.
ReportSnapshot(id uint64, status SnapshotStatus)
// Stop performs any necessary termination of the Node.
Stop()
Expand Down
58 changes: 48 additions & 10 deletions go.etcd.io/etcd/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type Config struct {
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
// 0 for at most one entry per message.
MaxSizePerMsg uint64
// MaxCommittedSizePerReady limits the size of the committed entries which
// can be applied.
MaxCommittedSizePerReady uint64
// MaxUncommittedEntriesSize limits the aggregate byte size of the
// uncommitted entries that may be appended to a leader's log. Once this
// limit is exceeded, proposals will begin to return ErrProposalDropped
Expand Down Expand Up @@ -224,6 +227,12 @@ func (c *Config) validate() error {
c.MaxUncommittedEntriesSize = noLimit
}

// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
// previously the same parameter.
if c.MaxCommittedSizePerReady == 0 {
c.MaxCommittedSizePerReady = c.MaxSizePerMsg
}

if c.MaxInflightMsgs <= 0 {
return errors.New("max inflight messages must be greater than 0")
}
Expand Down Expand Up @@ -316,7 +325,7 @@ func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
Expand Down Expand Up @@ -635,17 +644,27 @@ func (r *raft) reset(term uint64) {
r.readOnly = newReadOnly(r.readOnly.option)
}

func (r *raft) appendEntry(es ...pb.Entry) {
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
r.logger.Debugf(
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
r.id,
)
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
}

// tickElection is run by followers and candidates after r.electionTimeout.
Expand Down Expand Up @@ -731,6 +750,11 @@ func (r *raft) becomeLeader() {
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs[r.id].becomeReplicate()

// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
Expand All @@ -739,7 +763,16 @@ func (r *raft) becomeLeader() {
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()

r.appendEntry(pb.Entry{Data: nil})
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

Expand Down Expand Up @@ -970,10 +1003,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand All @@ -986,7 +1015,10 @@ func stepLeader(r *raft, m pb.Message) error {
}
}
}
r.appendEntry(m.Entries...)

if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
case pb.MsgReadIndex:
Expand Down Expand Up @@ -1046,7 +1078,13 @@ func stepLeader(r *raft, m pb.Message) error {
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.becomeProbe()
pr.becomeReplicate()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}
Expand Down Expand Up @@ -1490,7 +1528,7 @@ func (r *raft) abortLeaderTransfer() {
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
Expand All @@ -1513,7 +1551,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {

var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}
if s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
Expand Down
33 changes: 33 additions & 0 deletions go.etcd.io/etcd/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,39 @@ func (rn *RawNode) Status() *Status {
return &status
}

// StatusWithoutProgress returns a Status without populating the Progress field
// (and returns the Status as a value to avoid forcing it onto the heap). This
// is more performant if the Progress is not required. See WithProgress for an
// allocation-free way to introspect the Progress.
func (rn *RawNode) StatusWithoutProgress() Status {
return getStatusWithoutProgress(rn.raft)
}

// ProgressType indicates the type of replica a Progress corresponds to.
type ProgressType byte

const (
// ProgressTypePeer accompanies a Progress for a regular peer replica.
ProgressTypePeer ProgressType = iota
// ProgressTypeLearner accompanies a Progress for a learner replica.
ProgressTypeLearner
)

// WithProgress is a helper to introspect the Progress for this node and its
// peers.
func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
for id, pr := range rn.raft.prs {
pr := *pr
pr.ins = nil
visitor(id, ProgressTypePeer, pr)
}
for id, pr := range rn.raft.learnerPrs {
pr := *pr
pr.ins = nil
visitor(id, ProgressTypeLearner, pr)
}
}

// ReportUnreachable reports the given node is not reachable for the last send.
func (rn *RawNode) ReportUnreachable(id uint64) {
_ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id})
Expand Down
32 changes: 19 additions & 13 deletions go.etcd.io/etcd/raft/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,35 @@ type Status struct {
LeadTransferee uint64
}

// getStatus gets a copy of the current raft status.
func getStatus(r *raft) Status {
func getProgressCopy(r *raft) map[uint64]Progress {
prs := make(map[uint64]Progress)
for id, p := range r.prs {
prs[id] = *p
}

for id, p := range r.learnerPrs {
prs[id] = *p
}
return prs
}

func getStatusWithoutProgress(r *raft) Status {
s := Status{
ID: r.id,
LeadTransferee: r.leadTransferee,
}

s.HardState = r.hardState()
s.SoftState = *r.softState()

s.Applied = r.raftLog.applied
return s
}

// getStatus gets a copy of the current raft status.
func getStatus(r *raft) Status {
s := getStatusWithoutProgress(r)
if s.RaftState == StateLeader {
s.Progress = make(map[uint64]Progress)
for id, p := range r.prs {
s.Progress[id] = *p
}

for id, p := range r.learnerPrs {
s.Progress[id] = *p
}
s.Progress = getProgressCopy(r)
}

return s
}

Expand Down
11 changes: 7 additions & 4 deletions go.etcd.io/etcd/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index)
if m.Reject {
fmt.Fprintf(&buf, " Rejected")
if m.RejectHint != 0 {
fmt.Fprintf(&buf, "(Hint:%d)", m.RejectHint)
}
fmt.Fprintf(&buf, " Rejected (Hint: %d)", m.RejectHint)
}
if m.Commit != 0 {
fmt.Fprintf(&buf, " Commit:%d", m.Commit)
Expand All @@ -101,6 +98,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
return buf.String()
}

// PayloadSize is the size of the payload of this Entry. Notably, it does not
// depend on its Index or Term.
func PayloadSize(e pb.Entry) int {
return len(e.Data)
}

// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
Expand Down

0 comments on commit 7384d66

Please sign in to comment.