Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: forward member promote to leader #31

Merged
merged 1 commit into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 88 additions & 10 deletions etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,56 +16,82 @@ package etcdhttp

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"

"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/api"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/lease/leasehttp"
"go.etcd.io/etcd/pkg/types"

"go.uber.org/zap"
)

const (
peerMembersPrefix = "/members"
peerMembersPath = "/members"
peerMemberPromotePrefix = "/members/promote/"
)

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeer) http.Handler {
return newPeerHandler(lg, s.Cluster(), s.RaftHandler(), s.LeaseHandler())
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler())
}

func newPeerHandler(lg *zap.Logger, cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
lg: lg,
cluster: cluster,
}
func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)

mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPrefix, mh)
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion))
return mux
}

func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
return &peerMembersHandler{
lg: lg,
cluster: cluster,
}
}

type peerMembersHandler struct {
lg *zap.Logger
cluster api.Cluster
}

func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
return &peerMemberPromoteHandler{
lg: lg,
cluster: s.Cluster(),
server: s,
}
}

type peerMemberPromoteHandler struct {
lg *zap.Logger
cluster api.Cluster
server etcdserver.Server
}

func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, "GET") {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())

if r.URL.Path != peerMembersPrefix {
if r.URL.Path != peerMembersPath {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
Expand All @@ -79,3 +105,55 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
}

func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, "POST") {
return
jingyih marked this conversation as resolved.
Show resolved Hide resolved
}
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())

if !strings.HasPrefix(r.URL.Path, peerMemberPromotePrefix) {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
idStr := strings.TrimPrefix(r.URL.Path, peerMemberPromotePrefix)
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
http.Error(w, fmt.Sprintf("member %s not found in cluster", idStr), http.StatusNotFound)
return
}

resp, err := h.server.PromoteMember(r.Context(), id)
if err != nil {
switch err {
case membership.ErrIDNotFound:
http.Error(w, err.Error(), http.StatusNotFound)
case membership.ErrMemberNotLearner:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
case membership.ErrLearnerNotReady:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
default:
WriteError(h.lg, w, r, err)
}
if h.lg != nil {
h.lg.Warn(
"failed to promote a member",
zap.String("member-id", types.ID(id).String()),
zap.Error(err),
)
} else {
plog.Errorf("error promoting member %s (%v)", types.ID(id).String(), err)
}
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(resp); err != nil {
if h.lg != nil {
h.lg.Warn("failed to encode members response", zap.Error(err))
} else {
plog.Warningf("failed to encode members response (%v)", err)
}
}
}
140 changes: 131 additions & 9 deletions etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
package etcdhttp

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"path"
"sort"
"strings"
"testing"

"go.uber.org/zap"

"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/etcdserver/api"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/testutil"
"go.etcd.io/etcd/pkg/types"
)
Expand All @@ -51,13 +56,34 @@ func (c *fakeCluster) Members() []*membership.Member {
func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
func (c *fakeCluster) Version() *semver.Version { return nil }

type fakeServer struct {
cluster api.Cluster
}

func (s *fakeServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
return nil, fmt.Errorf("AddMember not implemented in fakeServer")
}
func (s *fakeServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
return nil, fmt.Errorf("RemoveMember not implemented in fakeServer")
}
func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) {
return nil, fmt.Errorf("UpdateMember not implemented in fakeServer")
}
func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
return nil, fmt.Errorf("PromoteMember not implemented in fakeServer")
}
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) Cluster() api.Cluster { return s.cluster }
func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil }

var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})

// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
// handles raft-prefix requests well.
func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
ph := newPeerHandler(zap.NewExample(), &fakeCluster{}, h, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

Expand All @@ -80,6 +106,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
}
}

// TestServeMembersFails ensures peerMembersHandler only accepts GET request
func TestServeMembersFails(t *testing.T) {
tests := []struct {
method string
Expand All @@ -89,6 +116,10 @@ func TestServeMembersFails(t *testing.T) {
"POST",
http.StatusMethodNotAllowed,
},
{
"PUT",
http.StatusMethodNotAllowed,
},
{
"DELETE",
http.StatusMethodNotAllowed,
Expand All @@ -100,8 +131,12 @@ func TestServeMembersFails(t *testing.T) {
}
for i, tt := range tests {
rw := httptest.NewRecorder()
h := &peerMembersHandler{cluster: nil}
h.ServeHTTP(rw, &http.Request{Method: tt.method})
h := newPeerMembersHandler(nil, &fakeCluster{})
req, err := http.NewRequest(tt.method, "", nil)
if err != nil {
t.Fatalf("#%d: failed to create http request: %v", i, err)
}
h.ServeHTTP(rw, req)
if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
}
Expand All @@ -115,7 +150,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{cluster: cluster}
h := newPeerMembersHandler(nil, cluster)
msb, err := json.Marshal([]membership.Member{memb1, memb2})
if err != nil {
t.Fatal(err)
Expand All @@ -128,8 +163,8 @@ func TestServeMembersGet(t *testing.T) {
wct string
wbody string
}{
{peerMembersPrefix, http.StatusOK, "application/json", wms},
{path.Join(peerMembersPrefix, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"},
{peerMembersPath, http.StatusOK, "application/json", wms},
{path.Join(peerMembersPath, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"},
}

for i, tt := range tests {
Expand All @@ -156,3 +191,90 @@ func TestServeMembersGet(t *testing.T) {
}
}
}

// TestServeMemberPromoteFails ensures peerMemberPromoteHandler only accepts POST request
func TestServeMemberPromoteFails(t *testing.T) {
tests := []struct {
method string
wcode int
}{
{
"GET",
http.StatusMethodNotAllowed,
},
{
"PUT",
http.StatusMethodNotAllowed,
},
{
"DELETE",
http.StatusMethodNotAllowed,
},
{
"BAD",
http.StatusMethodNotAllowed,
},
}
for i, tt := range tests {
rw := httptest.NewRecorder()
h := newPeerMemberPromoteHandler(nil, &fakeServer{cluster: &fakeCluster{}})
req, err := http.NewRequest(tt.method, "", nil)
if err != nil {
t.Fatalf("#%d: failed to create http request: %v", i, err)
}
h.ServeHTTP(rw, req)
if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
}
}
}

// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly
func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

tests := []struct {
path string
wcode int
checkBody bool
wKeyWords string
}{
{
// does not contain member id in path
peerMemberPromotePrefix,
http.StatusNotFound,
false,
"",
},
{
jingyih marked this conversation as resolved.
Show resolved Hide resolved
// try to promote member id = 1
peerMemberPromotePrefix + "1",
http.StatusInternalServerError,
true,
"PromoteMember not implemented in fakeServer",
},
}
for i, tt := range tests {
req, err := http.NewRequest("POST", srv.URL+tt.path, nil)
if err != nil {
t.Fatalf("failed to create request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("failed to get http response: %v", err)
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
t.Fatalf("unexpected ioutil.ReadAll error: %v", err)
}
if resp.StatusCode != tt.wcode {
t.Fatalf("#%d: code = %d, want %d", i, resp.StatusCode, tt.wcode)
}
if tt.checkBody && strings.Contains(string(body), tt.wKeyWords) {
t.Errorf("#%d: body: %s, want body to contain keywords: %s", i, string(body), tt.wKeyWords)
}
}
}
Loading