Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

accept machine list to join cluster #12

Merged
merged 2 commits into from
Jul 11, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions client_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
(*w).WriteHeader(http.StatusInternalServerError)
}

dispatch(command, w, req)
dispatch(command, w, req, true)

}

Expand All @@ -77,7 +77,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}

dispatch(command, &w, req)
dispatch(command, &w, req, true)

}

Expand All @@ -90,11 +90,11 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
command := &DeleteCommand{}
command.Key = key

dispatch(command, w, req)
dispatch(command, w, req, true)
}

// Dispatch the command to leader
func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
if raftServer.State() == "leader" {
if body, err := raftServer.Do(c); err != nil {
warn("Commit failed %v", err)
Expand Down Expand Up @@ -132,7 +132,13 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
scheme = "http://"
}

url := scheme + raftTransporter.GetLeaderClientAddress() + path
var url string

if client {
url = scheme + raftTransporter.GetLeaderClientAddress() + path
} else {
url = scheme + raftServer.Leader() + path
}

debug("Redirect to %s", url)

Expand Down
43 changes: 32 additions & 11 deletions etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (

var verbose bool

var cluster string
var machines string
var cluster []string

var address string
var clientPort int
Expand All @@ -51,7 +52,7 @@ var maxSize int
func init() {
flag.BoolVar(&verbose, "v", false, "verbose logging")

flag.StringVar(&cluster, "C", "", "the ip address and port of a existing cluster")
flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in cluster, sepearate by comma")

flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine")
flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
Expand Down Expand Up @@ -135,6 +136,8 @@ var info *Info
func main() {
flag.Parse()

cluster = strings.Split(machines, ",")

// Setup commands.
registerCommands()

Expand Down Expand Up @@ -203,7 +206,7 @@ func startRaft(securityType int) {
if raftServer.IsLogEmpty() {

// start as a leader in a new cluster
if cluster == "" {
if len(cluster) == 0 {
raftServer.StartLeader()

time.Sleep(time.Millisecond * 20)
Expand All @@ -223,9 +226,17 @@ func startRaft(securityType int) {
} else {
raftServer.StartFollower()

err := joinCluster(raftServer, cluster)
for _, machine := range cluster {

err := joinCluster(raftServer, machine)
if err != nil {
debug("cannot join to cluster via machine %s", machine)
} else {
break
}
}
if err != nil {
fatal(fmt.Sprintln(err))
fatal("cannot join to cluster via all given machines!")
}
debug("%s success join to the cluster", raftServer.Name())
}
Expand Down Expand Up @@ -414,6 +425,7 @@ func getInfo(path string) *Info {

// Delete the old configuration if exist
if ignore {

logPath := fmt.Sprintf("%s/log", path)
snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
os.Remove(infoPath)
Expand Down Expand Up @@ -496,11 +508,20 @@ func joinCluster(s *raft.Server, serverName string) error {
json.NewEncoder(&b).Encode(command)

// t must be ok
t, _ := raftServer.Transporter().(transporter)
t, ok := raftServer.Transporter().(transporter)

if !ok {
panic("wrong type")
}

debug("Send Join Request to %s", serverName)

resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)

debug("Finish Join Request to %s", serverName)

for {
fmt.Println(err, resp)
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
Expand All @@ -509,17 +530,17 @@ func joinCluster(s *raft.Server, serverName string) error {
if resp.StatusCode == http.StatusOK {
return nil
}
if resp.StatusCode == http.StatusServiceUnavailable {
address, err := ioutil.ReadAll(resp.Body)
if err != nil {
warn("Cannot Read Leader info: %v", err)
}

if resp.StatusCode == http.StatusTemporaryRedirect {
fmt.Println("redirect")
address = resp.Header.Get("Location")
debug("Leader is %s", address)
debug("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(command)
resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
}
}

}
return fmt.Errorf("Unable to join: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion raft_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {

if err := decodeJsonRequest(req, command); err == nil {
debug("Receive Join Request from %s", command.Name)
dispatch(command, &w, req)
dispatch(command, &w, req, false)
} else {
w.WriteHeader(http.StatusInternalServerError)
return
Expand Down