From 392c702e9e8872b52e275ff852aa224c5776b864 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 17 Aug 2016 22:08:40 -0700 Subject: [PATCH] etcdserver: initial read index implementation --- etcdserver/raft.go | 10 +++++ etcdserver/server.go | 29 ++++++++++---- etcdserver/v3_server.go | 84 +++++++++++++++++++++++++++++++++-------- 3 files changed, 101 insertions(+), 22 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 95e95fd22a4..7b10a0c97b0 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -103,6 +103,9 @@ type raftNode struct { // a chan to send out apply applyc chan apply + // a chan to send out readState + readStateC chan raft.ReadState + // TODO: remove the etcdserver related logic from raftNode // TODO: add a state machine interface to apply the commit entries // and do snapshot/recover @@ -196,6 +199,13 @@ func (r *raftNode) start(s *EtcdServer) { } } + if rd.ReadState.Index != 0 { + select { + case r.readStateC <- rd.ReadState: + default: + } + } + raftDone := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, diff --git a/etcdserver/server.go b/etcdserver/server.go index 3466355f027..35ae1fa8fdd 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -173,7 +173,9 @@ type EtcdServer struct { snapCount uint64 - w wait.Wait + w wait.Wait + readwaitc chan chan struct{} + stop chan struct{} done chan struct{} errorc chan error @@ -190,12 +192,15 @@ type EtcdServer struct { applyV3 applierV3 // applyV3Base is the core applier without auth or quotas applyV3Base applierV3 - kv mvcc.ConsistentWatchableKV - lessor lease.Lessor - bemu sync.Mutex - be backend.Backend - authStore auth.AuthStore - alarmStore *alarm.AlarmStore + + applyNotify chan struct{} + + kv mvcc.ConsistentWatchableKV + lessor lease.Lessor + bemu sync.Mutex + be backend.Backend + authStore auth.AuthStore + alarmStore *alarm.AlarmStore stats *stats.ServerStats lstats *stats.LeaderStats @@ -382,6 +387,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), raftStorage: s, storage: NewStorage(w, ss), + readStateC: make(chan raft.ReadState, 1), }, id: id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, @@ -393,6 +399,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), forceVersionC: make(chan struct{}), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + applyNotify: make(chan struct{}, 0), } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} @@ -464,6 +471,7 @@ func (s *EtcdServer) Start() { go s.purgeFile() go monitorFileDescriptor(s.done) go s.monitorVersions() + go s.linearizableReadLoop() } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -475,6 +483,7 @@ func (s *EtcdServer) start() { s.snapCount = DefaultSnapCount } s.w = wait.New() + s.readwaitc = make(chan chan struct{}, 1024) s.done = make(chan struct{}) s.stop = make(chan struct{}) if s.ClusterVersion() != nil { @@ -633,6 +642,12 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. <-apply.raftDone + + select { + case s.applyNotify <- struct{}{}: + default: + } + s.triggerSnapshot(ep) select { // snapshot requested via send() diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index e044dbe9dda..6404f6ba31e 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -15,6 +15,8 @@ package etcdserver import ( + "bytes" + "encoding/binary" "strconv" "strings" "time" @@ -24,6 +26,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/raft" "golang.org/x/net/context" "google.golang.org/grpc/metadata" ) @@ -84,26 +87,31 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { - if r.Serializable { - var resp *pb.RangeResponse - var err error - chk := func(ai *auth.AuthInfo) error { - return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) + if !r.Serializable { + ok := make(chan struct{}) + + select { + case s.readwaitc <- ok: + case <-ctx.Done(): + return nil, ctx.Err() } - get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } - if serr := s.doSerialize(ctx, chk, get); serr != nil { - return nil, serr + + select { + case <-ok: + case <-ctx.Done(): + return nil, ctx.Err() } - return resp, err } - result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r}) - if err != nil { - return nil, err + var resp *pb.RangeResponse + var err error + chk := func(ai *auth.AuthInfo) error { + return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - if result.err != nil { - return nil, result.err + get := func() { resp, err = s.applyV3Base.Range(noTxn, r) } + if serr := s.doSerialize(ctx, chk, get); serr != nil { + return nil, serr } - return result.resp.(*pb.RangeResponse), nil + return resp, err } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { @@ -604,3 +612,49 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern // Watchable returns a watchable interface attached to the etcdserver. func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } + +func (s *EtcdServer) linearizableReadLoop() { + var rs raft.ReadState + oks := make([]chan struct{}, 0, 256) + ctx := make([]byte, 8) + + for lc := uint64(0); ; lc++ { + binary.BigEndian.PutUint64(ctx, lc) + + for (len(s.readwaitc) > 0 && len(oks) < 256) || (len(s.readwaitc) == 0 && len(oks) == 0) { + select { + case ok := <-s.readwaitc: + oks = append(oks, ok) + } + } + + if err := s.r.ReadIndex(context.Background(), ctx); err != nil { + continue + } + select { + case rs = <-s.r.readStateC: + if !bytes.Equal(rs.RequestCtx, ctx) { + continue + } + case <-time.After(time.Second): + continue + } + + for { + ai := s.getAppliedIndex() + if ai >= rs.Index { + // unblock all the l-read that happened before we requested the + // read index + for _, ok := range oks { + close(ok) + } + oks = make([]chan struct{}, 0, 256) + break + } + select { + case <-s.applyNotify: + case <-time.After(10 * time.Millisecond): + } + } + } +}