Skip to content

Commit

Permalink
*: add support for socket options
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
  • Loading branch information
hexfusion committed Feb 19, 2021
1 parent 1302e1e commit 49078c6
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 12 deletions.
34 changes: 31 additions & 3 deletions pkg/transport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package transport

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
Expand All @@ -39,18 +40,34 @@ import (

// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
if l, err = newListener(addr, scheme); err != nil {
if l, err = newListener(addr, scheme, nil); err != nil {
return nil, err
}
return wrapTLS(scheme, tlsinfo, l)
}

func newListener(addr string, scheme string) (net.Listener, error) {
// NewListenerWithSocketOpts creates new listener with support for socket options.
func NewListenerWithSocketOpts(addr, scheme string, tlsinfo *TLSInfo, sopts *SocketOpts) (net.Listener, error) {
ln, err := newListener(addr, scheme, sopts)
if err != nil {
return nil, err
}
if tlsinfo != nil {
wrapTLS(scheme, tlsinfo, ln)
}
return ln, nil
}

func newListener(addr string, scheme string, sopts *SocketOpts) (net.Listener, error) {
if scheme == "unix" || scheme == "unixs" {
// unix sockets via unix://laddr
return NewUnixListener(addr)
}
return net.Listen("tcp", addr)
config, err := newListenConfig(sopts)
if err != nil {
return nil, err
}
return config.Listen(context.TODO(), "tcp", addr)
}

func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
Expand All @@ -63,6 +80,17 @@ func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, err
return newTLSListener(l, tlsinfo, checkSAN)
}

func newListenConfig(sopts *SocketOpts) (net.ListenConfig, error) {
lc := net.ListenConfig{}
if sopts != nil {
ctls := getControls(sopts)
if len(ctls) > 0 {
lc.Control = ctls.Control
}
}
return lc, nil
}

type TLSInfo struct {
CertFile string
KeyFile string
Expand Down
70 changes: 70 additions & 0 deletions pkg/transport/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,58 @@ func TestNewListenerTLSInfo(t *testing.T) {
testNewListenerTLSInfoAccept(t, *tlsInfo)
}

func TestNewListenerWithSocketOpts(t *testing.T) {
tlsInfo, del, err := createSelfCert()
if err != nil {
t.Fatalf("unable to create cert: %v", err)
}
defer del()
tests := map[string]struct {
socketOpts *SocketOpts
expectedErr bool
}{
"nil": {
socketOpts: nil,
expectedErr: true,
},
"empty": {
socketOpts: &SocketOpts{},
expectedErr: true,
},
"reuse address": {
socketOpts: &SocketOpts{ReuseAddress: true},
expectedErr: true,
},
"reuse address and reuse port": {
socketOpts: &SocketOpts{ReuseAddress: true, ReusePort: true},
expectedErr: false,
},
"reuse port": {
socketOpts: &SocketOpts{ReusePort: true},
expectedErr: false,
},
}
for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
ln, err := NewListenerWithSocketOpts("127.0.0.1:0", "https", tlsInfo, test.socketOpts)
if err != nil {
t.Fatalf("unexpected NewListenerWithSocketOpts error: %v", err)
}
defer ln.Close()
ln2, err := NewListenerWithSocketOpts(ln.Addr().String(), "https", tlsInfo, test.socketOpts)
if test.expectedErr && err == nil {
t.Fatalf("expected error")
}
if !test.expectedErr && err != nil {
t.Fatalf("unexpected NewListenerWithSocketOpts error: %v", err)
}
if ln2 != nil {
ln2.Close()
}
})
}
}

func testNewListenerTLSInfoAccept(t *testing.T, tlsInfo TLSInfo) {
ln, err := NewListener("127.0.0.1:0", "https", &tlsInfo)
if err != nil {
Expand Down Expand Up @@ -401,3 +453,21 @@ func TestIsClosedConnError(t *testing.T) {
t.Fatalf("expect true, got false (%v)", err)
}
}

func TestSocktOptsEmpty(t *testing.T) {
tests := []struct {
sopts SocketOpts
want bool
}{
{SocketOpts{}, true},
{SocketOpts{ReuseAddress: true, ReusePort: false}, false},
{SocketOpts{ReusePort: true}, false},
}

for i, tt := range tests {
got := tt.sopts.Empty()
if tt.want != got {
t.Errorf("#%d: result of Empty() incorrect: want=%t got=%t", i, tt.want, got)
}
}
}
45 changes: 45 additions & 0 deletions pkg/transport/sockopt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package transport

import (
"syscall"
)

type Controls []func(network, addr string, conn syscall.RawConn) error

func (ctls Controls) Control(network, addr string, conn syscall.RawConn) error {
for _, s := range ctls {
if err := s(network, addr, conn); err != nil {
return err
}
}
return nil
}

type SocketOpts struct {
// ReusePort enables socket option SO_REUSEPORT [1] which allows rebind of
// a port already in use. User should keep in mind that flock can fail
// in which case lock on data file could result in unexpected
// condition. User should take caution to protect against lock race.
// [1] https://man7.org/linux/man-pages/man7/socket.7.html
ReusePort bool
// ReuseAddress enables a socket option SO_REUSEADDR which allows
// binding to an address in `TIME_WAIT` state. Useful to improve MTTR
// in cases where etcd slow to restart due to excessive `TIME_WAIT`.
// [1] https://man7.org/linux/man-pages/man7/socket.7.html
ReuseAddress bool
}

func getControls(sopts *SocketOpts) Controls {
ctls := Controls{}
if sopts.ReuseAddress {
ctls = append(ctls, setReuseAddress)
}
if sopts.ReusePort {
ctls = append(ctls, setReusePort)
}
return ctls
}

func (sopts *SocketOpts) Empty() bool {
return sopts.ReuseAddress == false && sopts.ReusePort == false
}
20 changes: 20 additions & 0 deletions pkg/transport/sockopt_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// +build !windows

package transport

import (
"golang.org/x/sys/unix"
"syscall"
)

func setReusePort(network, address string, conn syscall.RawConn) error {
return conn.Control(func(fd uintptr) {
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
})
}

func setReuseAddress(network, address string, conn syscall.RawConn) error {
return conn.Control(func(fd uintptr) {
syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEADDR, 1)
})
}
18 changes: 18 additions & 0 deletions pkg/transport/sockopt_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// +build windows

package transport

import (
"fmt"
"syscall"
)

func setReusePort(network, address string, c syscall.RawConn) error {
return fmt.Errorf("port reuse is not supported on Windows")
}

// Windows supports SO_REUSEADDR, but it may cause undefined behavior, as
// there is no protection against port hijacking.
func setReuseAddress(network, addr string, conn syscall.RawConn) error {
return fmt.Errorf("address reuse is not supported on Windows")
}
25 changes: 19 additions & 6 deletions pkg/transport/timeout_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,32 @@ import (
// If read/write on the accepted connection blocks longer than its time limit,
// it will return timeout error.
func NewTimeoutListener(addr string, scheme string, tlsinfo *TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (net.Listener, error) {
ln, err := newListener(addr, scheme)
ln, err := newListener(addr, scheme, nil)
if err != nil {
return nil, err
}
ln = &rwTimeoutListener{
return newTimeoutListener(ln, scheme, rdtimeoutd, wtimeoutd, tlsinfo)
}

// NewTimeoutListerWithSocketOpts returns a listener that listens on the given address.
// If read/write on the accepted connection blocks longer than its time limit,
// it will return timeout error. Socket options can be passed and will be applied to the
// ListenConfig.
func NewTimeoutListerWithSocketOpts(addr string, scheme string, tlsinfo *TLSInfo, rdtimeoutd, wtimeoutd time.Duration, sopts *SocketOpts) (net.Listener, error) {
ln, err := newListener(addr, scheme, sopts)
if err != nil {
return nil, err
}
return newTimeoutListener(ln, scheme, rdtimeoutd, wtimeoutd, tlsinfo)
}

func newTimeoutListener(ln net.Listener, scheme string, rdtimeoutd, wtimeoutd time.Duration, tlsinfo *TLSInfo) (net.Listener, error) {
timeoutListener := &rwTimeoutListener{
Listener: ln,
rdtimeoutd: rdtimeoutd,
wtimeoutd: wtimeoutd,
}
if ln, err = wrapTLS(scheme, tlsinfo, ln); err != nil {
return nil, err
}
return ln, nil
return wrapTLS(scheme, tlsinfo, timeoutListener)
}

type rwTimeoutListener struct {
Expand Down
5 changes: 5 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ type Config struct {
// before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts

// PreVote is true to enable Raft Pre-Vote.
// If enabled, Raft runs an additional election phase
// to check whether it would get enough votes to win
Expand Down Expand Up @@ -398,6 +401,8 @@ func NewConfig() *Config {
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout,

SocketOpts: transport.SocketOpts{},

TickMs: 100,
ElectionMs: 1000,
InitialElectionTickAdvance: true,
Expand Down
14 changes: 11 additions & 3 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
e = nil
}()

if !cfg.SocketOpts.Empty() {
cfg.logger.Info(
"configuring socket options",
zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
)
}
e.cfg.logger.Info(
"configuring peer listeners",
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
Expand Down Expand Up @@ -181,6 +188,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
Expand Down Expand Up @@ -458,7 +466,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
}
}
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
peers[i].Listener, err = rafthttp.NewListenerWithSocketOpts(u, &cfg.PeerTLSInfo, &cfg.SocketOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -565,7 +573,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
continue
}

if sctx.l, err = net.Listen(network, addr); err != nil {
if sctx.l, err = transport.NewListenerWithSocketOpts(addr, u.Scheme, nil, &cfg.SocketOpts); err != nil {
return nil, err
}
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
Expand Down Expand Up @@ -678,7 +686,7 @@ func (e *Etcd) serveMetrics() (err error) {
if murl.Scheme == "http" {
tlsInfo = nil
}
ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo)
ml, err := transport.NewListenerWithSocketOpts(murl.Host, murl.Scheme, tlsInfo, &e.cfg.SocketOpts)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.ec.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
fs.DurationVar(&cfg.ec.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.ec.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
fs.DurationVar(&cfg.ec.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.ec.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.")
fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.")

// clustering
fs.Var(
Expand Down
4 changes: 4 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ Member:
Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
--grpc-keepalive-timeout '20s'
Additional duration of wait before closing a non-responsive connection (0 to disable).
--socket-reuse-port 'false'
Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.
--socket-reuse-address 'false'
Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in TIME_WAIT state.
Clustering:
--initial-advertise-peer-urls 'http://localhost:2380'
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/api/rafthttp/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
return transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
}

func NewListenerWithSocketOpts(u url.URL, tlsinfo *transport.TLSInfo, sopts *transport.SocketOpts) (net.Listener, error) {
return transport.NewTimeoutListerWithSocketOpts(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout, sopts)
}

// NewRoundTripper returns a roundTripper used to send requests
// to rafthttp listener of remote peers.
func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
Expand Down
3 changes: 3 additions & 0 deletions server/etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ type ServerConfig struct {
// PreVote is true to enable Raft Pre-Vote.
PreVote bool

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts

// Logger logs server-side operations.
// If not nil, it disables "capnslog" and uses the given logger.
Logger *zap.Logger
Expand Down

0 comments on commit 49078c6

Please sign in to comment.