-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Again inspired heavily by Distributed Services by Go, this adds service discovery with Serf/SWIM with an agent which is repsonsible for configuring the server, service discovery, Raft etc. Server now takes a store interface, allowing us to swap in a DistributedStore instead of a Store. cmux is used to multiplex Raft and gRPC on the same port.
- Loading branch information
Showing
9 changed files
with
364 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
package agent | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"io" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
"github.com/antw/violin/internal/server" | ||
"github.com/hashicorp/raft" | ||
"github.com/soheilhy/cmux" | ||
"google.golang.org/grpc" | ||
|
||
"github.com/antw/violin/internal/discovery" | ||
"github.com/antw/violin/internal/storage" | ||
) | ||
|
||
type Agent struct { | ||
Config | ||
|
||
mux cmux.CMux | ||
store *storage.DistributedStore | ||
server *grpc.Server | ||
membership *discovery.Membership | ||
|
||
shutdown bool | ||
shutdowns chan struct{} | ||
shutdownLock sync.Mutex | ||
} | ||
|
||
type Config struct { | ||
DataDir string | ||
BindAddr string | ||
RPCPort int | ||
NodeName string | ||
StartJoinAddrs []string | ||
Bootstrap bool | ||
} | ||
|
||
func (c Config) RPCAddr() (string, error) { | ||
host, _, err := net.SplitHostPort(c.BindAddr) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
return fmt.Sprintf("%s:%d", host, c.RPCPort), nil | ||
} | ||
|
||
func New(config Config) (*Agent, error) { | ||
a := &Agent{ | ||
Config: config, | ||
shutdowns: make(chan struct{}), | ||
} | ||
|
||
setup := []func() error{ | ||
a.setupMux, | ||
a.setupStore, | ||
a.setupServer, | ||
a.setupMembership, | ||
} | ||
for _, fn := range setup { | ||
if err := fn(); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
go func() { | ||
_ = a.serve() | ||
}() | ||
|
||
return a, nil | ||
} | ||
|
||
// setupStore configures cmux to allow multiplexing Raft and the gRPC server on the same port. | ||
func (a *Agent) setupMux() error { | ||
rpcAddr := fmt.Sprintf(":%d", a.Config.RPCPort) | ||
|
||
listener, err := net.Listen("tcp", rpcAddr) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
a.mux = cmux.New(listener) | ||
return nil | ||
} | ||
|
||
// setupStore configures the distributed store. | ||
func (a *Agent) setupStore() error { | ||
raftListener := a.mux.Match(func(reader io.Reader) bool { | ||
b := make([]byte, 1) | ||
if _, err := reader.Read(b); err != nil { | ||
return false | ||
} | ||
return bytes.Compare(b, []byte{storage.RaftRPC}) == 0 | ||
}) | ||
|
||
storeConfig := storage.Config{} | ||
storeConfig.Raft.StreamLayer = storage.NewStreamLayer(raftListener) | ||
storeConfig.Raft.LocalID = raft.ServerID(a.Config.NodeName) | ||
storeConfig.Raft.Bootstrap = a.Config.Bootstrap | ||
|
||
var err error | ||
|
||
a.store, err = storage.NewDistributedStore(a.Config.DataDir, storeConfig) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if a.Config.Bootstrap { | ||
err = a.store.WaitForLeader(3 * time.Second) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// setupServer configures the server and starts a listener. | ||
func (a *Agent) setupServer() error { | ||
a.server = server.New(a.store) | ||
|
||
grpcListener := a.mux.Match(cmux.Any()) | ||
|
||
go func() { | ||
if err := a.server.Serve(grpcListener); err != nil { | ||
_ = a.Shutdown() | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
func (a *Agent) setupMembership() error { | ||
rpcAddr, err := a.RPCAddr() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
a.membership, err = discovery.New(a.store, discovery.Config{ | ||
NodeName: a.Config.NodeName, | ||
BindAddr: a.Config.BindAddr, | ||
Tags: map[string]string{ | ||
"rpc_addr": rpcAddr, | ||
}, | ||
StartJoinAddrs: a.Config.StartJoinAddrs, | ||
}) | ||
|
||
return err | ||
} | ||
|
||
func (a *Agent) Shutdown() error { | ||
a.shutdownLock.Lock() | ||
defer a.shutdownLock.Unlock() | ||
|
||
if a.shutdown { | ||
return nil | ||
} | ||
|
||
a.shutdown = true | ||
close(a.shutdowns) | ||
|
||
if err := a.membership.Leave(); err != nil { | ||
return err | ||
} | ||
|
||
a.server.GracefulStop() | ||
|
||
return nil | ||
} | ||
|
||
// serve runs the server | ||
func (a *Agent) serve() error { | ||
if err := a.mux.Serve(); err != nil { | ||
_ = a.Shutdown() | ||
return err | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package agent | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"github.com/phayes/freeport" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc" | ||
|
||
"github.com/antw/violin/api" | ||
) | ||
|
||
func TestAgent(t *testing.T) { | ||
var agents []*Agent | ||
for i := 0; i < 3; i++ { | ||
ports, err := freeport.GetFreePorts(2) | ||
require.NoError(t, err) | ||
|
||
bindAddr := fmt.Sprintf("%s:%d", "127.0.0.1", ports[0]) | ||
|
||
dataDir, err := ioutil.TempDir("", "agent-test") | ||
require.NoError(t, err) | ||
|
||
var startJoinAddrs []string | ||
if i > 0 { | ||
startJoinAddrs = append(startJoinAddrs, agents[0].Config.BindAddr) | ||
} | ||
|
||
fmt.Printf("!!! Server %d addr %s\n", i, bindAddr) | ||
|
||
agent, err := New(Config{ | ||
NodeName: fmt.Sprintf("%d", i), | ||
Bootstrap: i == 0, | ||
StartJoinAddrs: startJoinAddrs, | ||
BindAddr: bindAddr, | ||
RPCPort: ports[1], | ||
DataDir: dataDir, | ||
}) | ||
require.NoError(t, err) | ||
|
||
agents = append(agents, agent) | ||
} | ||
|
||
defer func() { | ||
for _, agent := range agents { | ||
err := agent.Shutdown() | ||
require.NoError(t, err) | ||
require.NoError(t, os.RemoveAll(agent.Config.DataDir)) | ||
} | ||
}() | ||
|
||
time.Sleep(3 * time.Second) | ||
|
||
leaderClient := client(t, agents[0]) | ||
_, err := leaderClient.Set( | ||
context.Background(), | ||
&api.SetRequest{Register: &api.KV{Key: "foo", Value: []byte("bar")}}, | ||
) | ||
require.NoError(t, err) | ||
|
||
consumeResponse, err := leaderClient.Get( | ||
context.Background(), | ||
&api.GetRequest{Key: "foo"}, | ||
) | ||
require.NoError(t, err) | ||
require.Equal(t, "bar", string(consumeResponse.GetRegister().GetValue())) | ||
|
||
// wait until replication has finished | ||
time.Sleep(3 * time.Second) | ||
|
||
followerClient := client(t, agents[1]) | ||
consumeResponse, err = followerClient.Get( | ||
context.Background(), | ||
&api.GetRequest{Key: "foo"}, | ||
) | ||
require.NoError(t, err) | ||
require.Equal(t, "bar", string(consumeResponse.GetRegister().GetValue())) | ||
|
||
// Check accessing a register which does not exist. | ||
consumeResponse, err = leaderClient.Get( | ||
context.Background(), | ||
&api.GetRequest{Key: "baz"}, | ||
) | ||
require.Nil(t, consumeResponse) | ||
require.Error(t, err) | ||
|
||
// TODO: Check the gRPC error code? | ||
} | ||
|
||
// client creates a client for interacting with a server. | ||
func client(t *testing.T, agent *Agent) api.RegisterClient { | ||
rpcAddr, err := agent.Config.RPCAddr() | ||
require.NoError(t, err) | ||
|
||
conn, err := grpc.Dial(rpcAddr, grpc.WithInsecure()) | ||
require.NoError(t, err) | ||
|
||
return api.NewRegisterClient(conn) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.