Skip to content

Commit

Permalink
etcdserver: initial read index implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Aug 18, 2016
1 parent 14f6dd4 commit 392c702
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 22 deletions.
10 changes: 10 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type raftNode struct {
// a chan to send out apply
applyc chan apply

// a chan to send out readState
readStateC chan raft.ReadState

// TODO: remove the etcdserver related logic from raftNode
// TODO: add a state machine interface to apply the commit entries
// and do snapshot/recover
Expand Down Expand Up @@ -196,6 +199,13 @@ func (r *raftNode) start(s *EtcdServer) {
}
}

if rd.ReadState.Index != 0 {
select {
case r.readStateC <- rd.ReadState:
default:
}
}

raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
Expand Down
29 changes: 22 additions & 7 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ type EtcdServer struct {

snapCount uint64

w wait.Wait
w wait.Wait
readwaitc chan chan struct{}

stop chan struct{}
done chan struct{}
errorc chan error
Expand All @@ -190,12 +192,15 @@ type EtcdServer struct {
applyV3 applierV3
// applyV3Base is the core applier without auth or quotas
applyV3Base applierV3
kv mvcc.ConsistentWatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
authStore auth.AuthStore
alarmStore *alarm.AlarmStore

applyNotify chan struct{}

kv mvcc.ConsistentWatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
authStore auth.AuthStore
alarmStore *alarm.AlarmStore

stats *stats.ServerStats
lstats *stats.LeaderStats
Expand Down Expand Up @@ -382,6 +387,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
readStateC: make(chan raft.ReadState, 1),
},
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Expand All @@ -393,6 +399,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyNotify: make(chan struct{}, 0),
}

srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
Expand Down Expand Up @@ -464,6 +471,7 @@ func (s *EtcdServer) Start() {
go s.purgeFile()
go monitorFileDescriptor(s.done)
go s.monitorVersions()
go s.linearizableReadLoop()
}

// start prepares and starts server in a new goroutine. It is no longer safe to
Expand All @@ -475,6 +483,7 @@ func (s *EtcdServer) start() {
s.snapCount = DefaultSnapCount
}
s.w = wait.New()
s.readwaitc = make(chan chan struct{}, 1024)
s.done = make(chan struct{})
s.stop = make(chan struct{})
if s.ClusterVersion() != nil {
Expand Down Expand Up @@ -633,6 +642,12 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
<-apply.raftDone

select {
case s.applyNotify <- struct{}{}:
default:
}

s.triggerSnapshot(ep)
select {
// snapshot requested via send()
Expand Down
84 changes: 69 additions & 15 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package etcdserver

import (
"bytes"
"encoding/binary"
"strconv"
"strings"
"time"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -84,26 +87,31 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if r.Serializable {
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
if !r.Serializable {
ok := make(chan struct{})

select {
case s.readwaitc <- ok:
case <-ctx.Done():
return nil, ctx.Err()
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr

select {
case <-ok:
case <-ctx.Done():
return nil, ctx.Err()
}
return resp, err
}
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
if err != nil {
return nil, err
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
if result.err != nil {
return nil, result.err
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return result.resp.(*pb.RangeResponse), nil
return resp, err
}

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
Expand Down Expand Up @@ -604,3 +612,49 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern

// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
oks := make([]chan struct{}, 0, 256)
ctx := make([]byte, 8)

for lc := uint64(0); ; lc++ {
binary.BigEndian.PutUint64(ctx, lc)

for (len(s.readwaitc) > 0 && len(oks) < 256) || (len(s.readwaitc) == 0 && len(oks) == 0) {
select {
case ok := <-s.readwaitc:
oks = append(oks, ok)
}
}

if err := s.r.ReadIndex(context.Background(), ctx); err != nil {
continue
}
select {
case rs = <-s.r.readStateC:
if !bytes.Equal(rs.RequestCtx, ctx) {
continue
}
case <-time.After(time.Second):
continue
}

for {
ai := s.getAppliedIndex()
if ai >= rs.Index {
// unblock all the l-read that happened before we requested the
// read index
for _, ok := range oks {
close(ok)
}
oks = make([]chan struct{}, 0, 256)
break
}
select {
case <-s.applyNotify:
case <-time.After(10 * time.Millisecond):
}
}
}
}

0 comments on commit 392c702

Please sign in to comment.