From 81d8e8f7113a64ee8f46b9f0f6506f91e77d9316 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 10 Mar 2023 13:35:18 +0100 Subject: [PATCH 1/7] refactor: Use proper variable names for urls Signed-off-by: Marek Siarkowicz --- server/embed/config.go | 134 +++++++++--------- server/embed/config_test.go | 32 ++--- server/embed/etcd.go | 36 ++--- server/embed/serve_test.go | 4 +- server/etcdmain/config.go | 10 +- server/etcdmain/config_test.go | 50 +++---- server/etcdmain/etcd.go | 8 +- .../clientv3/snapshot/v3_snapshot_test.go | 6 +- tests/integration/embed/embed_test.go | 10 +- tests/integration/snapshot/member_test.go | 4 +- .../integration/snapshot/v3_snapshot_test.go | 16 +-- tools/etcd-dump-metrics/etcd.go | 4 +- 12 files changed, 157 insertions(+), 157 deletions(-) diff --git a/server/embed/config.go b/server/embed/config.go index aacd1ed9897..6c76d56e4be 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -206,12 +206,12 @@ type Config struct { // streams that each client can open at a time. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` - LPUrls, LCUrls []url.URL - APUrls, ACUrls []url.URL - ClientTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerTLSInfo transport.TLSInfo - PeerAutoTLS bool + ListenPeerUrls, ListenClientUrls []url.URL + AdvertisePeerUrls, AdvertiseClientUrls []url.URL + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool // SelfSignedCertValidity specifies the validity period of the client and peer certificates // that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS, // the unit is year, and the default is 1 @@ -426,10 +426,10 @@ type configYAML struct { // configJSON has file options that are translated into Config options type configJSON struct { - LPUrlsJSON string `json:"listen-peer-urls"` - LCUrlsJSON string `json:"listen-client-urls"` - APUrlsJSON string `json:"initial-advertise-peer-urls"` - ACUrlsJSON string `json:"advertise-client-urls"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` CORSJSON string `json:"cors"` HostWhitelistJSON string `json:"host-whitelist"` @@ -478,10 +478,10 @@ func NewConfig() *Config { ElectionMs: 1000, InitialElectionTickAdvance: true, - LPUrls: []url.URL{*lpurl}, - LCUrls: []url.URL{*lcurl}, - APUrls: []url.URL{*apurl}, - ACUrls: []url.URL{*acurl}, + ListenPeerUrls: []url.URL{*lpurl}, + ListenClientUrls: []url.URL{*lcurl}, + AdvertisePeerUrls: []url.URL{*apurl}, + AdvertiseClientUrls: []url.URL{*acurl}, ClusterState: ClusterStateFlagNew, InitialClusterToken: "etcd-cluster", @@ -543,40 +543,40 @@ func (cfg *configYAML) configFromFile(path string) error { return err } - if cfg.LPUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ",")) + if cfg.configJSON.ListenPeerUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenPeerUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up listen-peer-urls: %v\n", err) os.Exit(1) } - cfg.LPUrls = []url.URL(u) + cfg.Config.ListenPeerUrls = u } - if cfg.LCUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ",")) + if cfg.configJSON.ListenClientUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-urls: %v\n", err) os.Exit(1) } - cfg.LCUrls = []url.URL(u) + cfg.Config.ListenClientUrls = u } - if cfg.APUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ",")) + if cfg.configJSON.AdvertisePeerUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up initial-advertise-peer-urls: %v\n", err) os.Exit(1) } - cfg.APUrls = []url.URL(u) + cfg.Config.AdvertisePeerUrls = u } - if cfg.ACUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ",")) + if cfg.configJSON.AdvertiseClientUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertiseClientUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up advertise-peer-urls: %v\n", err) os.Exit(1) } - cfg.ACUrls = []url.URL(u) + cfg.Config.AdvertiseClientUrls = u } if cfg.ListenMetricsUrlsJSON != "" { @@ -654,21 +654,21 @@ func (cfg *Config) Validate() error { if err := cfg.setupLogging(); err != nil { return err } - if err := checkBindURLs(cfg.LPUrls); err != nil { + if err := checkBindURLs(cfg.ListenPeerUrls); err != nil { return err } - if err := checkBindURLs(cfg.LCUrls); err != nil { + if err := checkBindURLs(cfg.ListenClientUrls); err != nil { return err } if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil { return err } - if err := checkHostURLs(cfg.APUrls); err != nil { - addrs := cfg.getAPURLs() + if err := checkHostURLs(cfg.AdvertisePeerUrls); err != nil { + addrs := cfg.getAdvertisePeerUrls() return fmt.Errorf(`--initial-advertise-peer-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err) } - if err := checkHostURLs(cfg.ACUrls); err != nil { - addrs := cfg.getACURLs() + if err := checkHostURLs(cfg.AdvertiseClientUrls); err != nil { + addrs := cfg.getAdvertiseClientUrls() return fmt.Errorf(`--advertise-client-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err) } // Check if conflicting flags are passed. @@ -701,7 +701,7 @@ func (cfg *Config) Validate() error { } // check this last since proxying in etcdmain may make this OK - if cfg.LCUrls != nil && cfg.ACUrls == nil { + if cfg.ListenClientUrls != nil && cfg.AdvertiseClientUrls == nil { return ErrUnsetAdvertiseClientURLsFlag } @@ -754,7 +754,7 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok urlsmap = types.URLsMap{} // If using discovery, generate a temporary cluster based on // self's advertised peer URLs - urlsmap[cfg.Name] = cfg.APUrls + urlsmap[cfg.Name] = cfg.AdvertisePeerUrls token = cfg.Durl case cfg.DNSCluster != "": @@ -808,7 +808,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { // Use both etcd-server-ssl and etcd-server for discovery. // Combine the results if both are available. - clusterStrs, cerr = getCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) + clusterStrs, cerr = getCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls) if cerr != nil { clusterStrs = make([]string, 0) } @@ -818,12 +818,12 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { zap.String("service-name", "etcd-server-ssl"+serviceNameSuffix), zap.String("server-name", cfg.Name), zap.String("discovery-srv", cfg.DNSCluster), - zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()), zap.Strings("found-cluster", clusterStrs), zap.Error(cerr), ) - defaultHTTPClusterStrs, httpCerr := getCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) + defaultHTTPClusterStrs, httpCerr := getCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls) if httpCerr == nil { clusterStrs = append(clusterStrs, defaultHTTPClusterStrs...) } @@ -833,7 +833,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { zap.String("service-name", "etcd-server"+serviceNameSuffix), zap.String("server-name", cfg.Name), zap.String("discovery-srv", cfg.DNSCluster), - zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()), zap.Strings("found-cluster", clusterStrs), zap.Error(httpCerr), ) @@ -842,15 +842,15 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { } func (cfg Config) InitialClusterFromName(name string) (ret string) { - if len(cfg.APUrls) == 0 { + if len(cfg.AdvertisePeerUrls) == 0 { return "" } n := name if name == "" { n = DefaultName } - for i := range cfg.APUrls { - ret = ret + "," + n + "=" + cfg.APUrls[i].String() + for i := range cfg.AdvertisePeerUrls { + ret = ret + "," + n + "=" + cfg.AdvertisePeerUrls[i].String() } return ret[1:] } @@ -866,11 +866,11 @@ func (cfg Config) V2DeprecationEffective() config.V2DeprecationEnum { } func (cfg Config) defaultPeerHost() bool { - return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs + return len(cfg.AdvertisePeerUrls) == 1 && cfg.AdvertisePeerUrls[0].String() == DefaultInitialAdvertisePeerURLs } func (cfg Config) defaultClientHost() bool { - return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs + return len(cfg.AdvertiseClientUrls) == 1 && cfg.AdvertiseClientUrls[0].String() == DefaultAdvertiseClientURLs } func (cfg *Config) ClientSelfCert() (err error) { @@ -881,8 +881,8 @@ func (cfg *Config) ClientSelfCert() (err error) { cfg.logger.Warn("ignoring client auto TLS since certs given") return nil } - chosts := make([]string, len(cfg.LCUrls)) - for i, u := range cfg.LCUrls { + chosts := make([]string, len(cfg.ListenClientUrls)) + for i, u := range cfg.ListenClientUrls { chosts[i] = u.Host } cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts, cfg.SelfSignedCertValidity) @@ -900,8 +900,8 @@ func (cfg *Config) PeerSelfCert() (err error) { cfg.logger.Warn("ignoring peer auto TLS since certs given") return nil } - phosts := make([]string, len(cfg.LPUrls)) - for i, u := range cfg.LPUrls { + phosts := make([]string, len(cfg.ListenPeerUrls)) + for i, u := range cfg.ListenPeerUrls { phosts[i] = u.Host } cfg.PeerTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts, cfg.SelfSignedCertValidity) @@ -929,9 +929,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s } used := false - pip, pport := cfg.LPUrls[0].Hostname(), cfg.LPUrls[0].Port() + pip, pport := cfg.ListenPeerUrls[0].Hostname(), cfg.ListenPeerUrls[0].Port() if cfg.defaultPeerHost() && pip == "0.0.0.0" { - cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)} + cfg.AdvertisePeerUrls[0] = url.URL{Scheme: cfg.AdvertisePeerUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)} used = true } // update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc') @@ -939,9 +939,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) } - cip, cport := cfg.LCUrls[0].Hostname(), cfg.LCUrls[0].Port() + cip, cport := cfg.ListenClientUrls[0].Hostname(), cfg.ListenClientUrls[0].Port() if cfg.defaultClientHost() && cip == "0.0.0.0" { - cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)} + cfg.AdvertiseClientUrls[0] = url.URL{Scheme: cfg.AdvertiseClientUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)} used = true } dhost := defaultHostname @@ -986,34 +986,34 @@ func checkHostURLs(urls []url.URL) error { return nil } -func (cfg *Config) getAPURLs() (ss []string) { - ss = make([]string, len(cfg.APUrls)) - for i := range cfg.APUrls { - ss[i] = cfg.APUrls[i].String() +func (cfg *Config) getAdvertisePeerUrls() (ss []string) { + ss = make([]string, len(cfg.AdvertisePeerUrls)) + for i := range cfg.AdvertisePeerUrls { + ss[i] = cfg.AdvertisePeerUrls[i].String() } return ss } -func (cfg *Config) getLPURLs() (ss []string) { - ss = make([]string, len(cfg.LPUrls)) - for i := range cfg.LPUrls { - ss[i] = cfg.LPUrls[i].String() +func (cfg *Config) getListenPeerUrls() (ss []string) { + ss = make([]string, len(cfg.ListenPeerUrls)) + for i := range cfg.ListenPeerUrls { + ss[i] = cfg.ListenPeerUrls[i].String() } return ss } -func (cfg *Config) getACURLs() (ss []string) { - ss = make([]string, len(cfg.ACUrls)) - for i := range cfg.ACUrls { - ss[i] = cfg.ACUrls[i].String() +func (cfg *Config) getAdvertiseClientUrls() (ss []string) { + ss = make([]string, len(cfg.AdvertiseClientUrls)) + for i := range cfg.AdvertiseClientUrls { + ss[i] = cfg.AdvertiseClientUrls[i].String() } return ss } -func (cfg *Config) getLCURLs() (ss []string) { - ss = make([]string, len(cfg.LCUrls)) - for i := range cfg.LCUrls { - ss[i] = cfg.LCUrls[i].String() +func (cfg *Config) getListenClientUrls() (ss []string) { + ss = make([]string, len(cfg.ListenClientUrls)) + for i := range cfg.ListenClientUrls { + ss[i] = cfg.ListenClientUrls[i].String() } return ss } diff --git a/server/embed/config_test.go b/server/embed/config_test.go index 584ef3553f2..a0550a6a288 100644 --- a/server/embed/config_test.go +++ b/server/embed/config_test.go @@ -86,12 +86,12 @@ func TestConfigFileOtherFields(t *testing.T) { func TestUpdateDefaultClusterFromName(t *testing.T) { cfg := NewConfig() defaultInitialCluster := cfg.InitialCluster - oldscheme := cfg.APUrls[0].Scheme - origpeer := cfg.APUrls[0].String() - origadvc := cfg.ACUrls[0].String() + oldscheme := cfg.AdvertisePeerUrls[0].Scheme + origpeer := cfg.AdvertisePeerUrls[0].String() + origadvc := cfg.AdvertiseClientUrls[0].String() cfg.Name = "abc" - lpport := cfg.LPUrls[0].Port() + lpport := cfg.ListenPeerUrls[0].Port() // in case of 'etcd --name=abc' exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport) @@ -100,12 +100,12 @@ func TestUpdateDefaultClusterFromName(t *testing.T) { t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster) } // advertise peer URL should not be affected - if origpeer != cfg.APUrls[0].String() { - t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String()) + if origpeer != cfg.AdvertisePeerUrls[0].String() { + t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.AdvertisePeerUrls[0].String()) } // advertise client URL should not be affected - if origadvc != cfg.ACUrls[0].String() { - t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String()) + if origadvc != cfg.AdvertiseClientUrls[0].String() { + t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String()) } } @@ -118,17 +118,17 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) { cfg := NewConfig() defaultInitialCluster := cfg.InitialCluster - oldscheme := cfg.APUrls[0].Scheme - origadvc := cfg.ACUrls[0].String() + oldscheme := cfg.AdvertisePeerUrls[0].Scheme + origadvc := cfg.AdvertiseClientUrls[0].String() cfg.Name = "abc" - lpport := cfg.LPUrls[0].Port() - cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)} + lpport := cfg.ListenPeerUrls[0].Port() + cfg.ListenPeerUrls[0] = url.URL{Scheme: cfg.ListenPeerUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)} dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster) if dhost != defaultHostname { t.Fatalf("expected default host %q, got %q", defaultHostname, dhost) } - aphost, apport := cfg.APUrls[0].Hostname(), cfg.APUrls[0].Port() + aphost, apport := cfg.AdvertisePeerUrls[0].Hostname(), cfg.AdvertisePeerUrls[0].Port() if apport != lpport { t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport) } @@ -141,8 +141,8 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) { } // advertise client URL should not be affected - if origadvc != cfg.ACUrls[0].String() { - t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String()) + if origadvc != cfg.AdvertiseClientUrls[0].String() { + t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String()) } } @@ -276,7 +276,7 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) { cfg.InitialCluster = "" cfg.InitialClusterToken = "" cfg.DNSCluster = "example.com" - cfg.APUrls = types.MustNewURLs(tt.apurls) + cfg.AdvertisePeerUrls = types.MustNewURLs(tt.apurls) if err := cfg.Validate(); err != nil { t.Errorf("#%d: failed to validate test Config: %v", i, err) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 7af8d0471a9..51bd60d4492 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -123,7 +123,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { } e.cfg.logger.Info( "configuring peer listeners", - zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), + zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()), ) if e.Peers, err = configurePeerListeners(cfg); err != nil { return e, err @@ -131,7 +131,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.cfg.logger.Info( "configuring client listeners", - zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()), ) if e.sctxs, err = configureClientListeners(cfg); err != nil { return e, err @@ -167,8 +167,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { srvcfg := config.ServerConfig{ Name: cfg.Name, - ClientURLs: cfg.ACUrls, - PeerURLs: cfg.APUrls, + ClientURLs: cfg.AdvertiseClientUrls, + PeerURLs: cfg.AdvertisePeerUrls, DataDir: cfg.Dir, DedicatedWALDir: cfg.WalDir, SnapshotCount: cfg.SnapshotCount, @@ -276,10 +276,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.cfg.logger.Info( "now serving peer/client/metrics", zap.String("local-member-id", e.Server.ID().String()), - zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()), - zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), - zap.Strings("advertise-client-urls", e.cfg.getACURLs()), - zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()), + zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()), + zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()), + zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()), zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()), ) serving = true @@ -327,10 +327,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Uint("max-wals", sc.MaxWALFiles), zap.Uint("max-snapshots", sc.MaxSnapFiles), zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries), - zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()), - zap.Strings("listen-peer-urls", ec.getLPURLs()), - zap.Strings("advertise-client-urls", ec.getACURLs()), - zap.Strings("listen-client-urls", ec.getLCURLs()), + zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerUrls()), + zap.Strings("listen-peer-urls", ec.getListenPeerUrls()), + zap.Strings("advertise-client-urls", ec.getAdvertiseClientUrls()), + zap.Strings("listen-client-urls", ec.getListenClientUrls()), zap.Strings("listen-metrics-urls", ec.getMetricsURLs()), zap.Strings("cors", cors), zap.Strings("host-whitelist", hss), @@ -368,8 +368,8 @@ func (e *Etcd) Close() { fields := []zap.Field{ zap.String("name", e.cfg.Name), zap.String("data-dir", e.cfg.Dir), - zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()), - zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()), + zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()), } lg := e.GetLogger() lg.Info("closing etcd server", fields...) @@ -493,7 +493,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { ) } - peers = make([]*peerListener, len(cfg.LPUrls)) + peers = make([]*peerListener, len(cfg.ListenPeerUrls)) defer func() { if err == nil { return @@ -502,7 +502,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { if peers[i] != nil && peers[i].close != nil { cfg.logger.Warn( "closing peer listener", - zap.String("address", cfg.LPUrls[i].String()), + zap.String("address", cfg.ListenPeerUrls[i].String()), zap.Error(err), ) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -512,7 +512,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { } }() - for i, u := range cfg.LPUrls { + for i, u := range cfg.ListenPeerUrls { if u.Scheme == "http" { if !cfg.PeerTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) @@ -613,7 +613,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } sctxs = make(map[string]*serveCtx) - for _, u := range cfg.LCUrls { + for _, u := range cfg.ListenClientUrls { sctx := newServeCtx(cfg.logger) if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { diff --git a/server/embed/serve_test.go b/server/embed/serve_test.go index aada585f07e..6a1991119ee 100644 --- a/server/embed/serve_test.go +++ b/server/embed/serve_test.go @@ -38,8 +38,8 @@ func TestStartEtcdWrongToken(t *testing.T) { urls := newEmbedURLs(2) curls := []url.URL{urls[0]} purls := []url.URL{urls[1]} - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range purls { cfg.InitialCluster += ",default=" + purls[i].String() diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index f5ef586b802..0390f8dd774 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -394,10 +394,10 @@ func (cfg *config) configFromCmdLine() error { lg.Info(fmt.Sprintf("raft-write-timeout increased to minimum value: %v", rafthttp.DefaultConnWriteTimeout)) } - cfg.ec.LPUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") - cfg.ec.APUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") - cfg.ec.LCUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") - cfg.ec.ACUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") + cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") + cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") + cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") + cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls") cfg.ec.CORS = flags.UniqueURLsMapFromFlag(cfg.cf.flagSet, "cors") @@ -418,7 +418,7 @@ func (cfg *config) configFromCmdLine() error { // disable default advertise-client-urls if lcurls is set missingAC := flags.IsSet(cfg.cf.flagSet, "listen-client-urls") && !flags.IsSet(cfg.cf.flagSet, "advertise-client-urls") if !cfg.mayBeProxy() && missingAC { - cfg.ec.ACUrls = nil + cfg.ec.AdvertiseClientUrls = nil } // disable default initial-cluster if discovery is set diff --git a/server/etcdmain/config_test.go b/server/etcdmain/config_test.go index 0dd4db97ec2..df3da61c583 100644 --- a/server/etcdmain/config_test.go +++ b/server/etcdmain/config_test.go @@ -51,14 +51,14 @@ func TestConfigParsingMemberFlags(t *testing.T) { func TestConfigFileMemberFields(t *testing.T) { yc := struct { - Dir string `json:"data-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapshotCount uint64 `json:"snapshot-count"` - LPUrls string `json:"listen-peer-urls"` - LCUrls string `json:"listen-client-urls"` - AcurlsCfgFile string `json:"advertise-client-urls"` + Dir string `json:"data-dir"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + SnapshotCount uint64 `json:"snapshot-count"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` }{ "testdir", 10, @@ -513,13 +513,13 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { func validateMemberFlags(t *testing.T, cfg *config) { wcfg := &embed.Config{ - Dir: "testdir", - LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, - MaxSnapFiles: 10, - MaxWalFiles: 10, - Name: "testname", - SnapshotCount: 10, + Dir: "testdir", + ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + MaxSnapFiles: 10, + MaxWalFiles: 10, + Name: "testname", + SnapshotCount: 10, } if cfg.ec.Dir != wcfg.Dir { @@ -537,18 +537,18 @@ func validateMemberFlags(t *testing.T, cfg *config) { if cfg.ec.SnapshotCount != wcfg.SnapshotCount { t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount) } - if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) { - t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls) + if !reflect.DeepEqual(cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls) { + t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls) } - if !reflect.DeepEqual(cfg.ec.LCUrls, wcfg.LCUrls) { - t.Errorf("listen-client-urls = %v, want %v", cfg.ec.LCUrls, wcfg.LCUrls) + if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) { + t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) } } func validateClusteringFlags(t *testing.T, cfg *config) { wcfg := newConfig() - wcfg.ec.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} - wcfg.ec.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} + wcfg.ec.AdvertisePeerUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} + wcfg.ec.AdvertiseClientUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} wcfg.ec.ClusterState = embed.ClusterStateFlagExisting wcfg.cf.fallback.Set(fallbackFlagExit) wcfg.ec.InitialCluster = "0=http://localhost:8000" @@ -566,11 +566,11 @@ func validateClusteringFlags(t *testing.T, cfg *config) { if cfg.ec.InitialClusterToken != wcfg.ec.InitialClusterToken { t.Errorf("initialClusterToken = %v, want %v", cfg.ec.InitialClusterToken, wcfg.ec.InitialClusterToken) } - if !reflect.DeepEqual(cfg.ec.APUrls, wcfg.ec.APUrls) { - t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.APUrls, wcfg.ec.APUrls) + if !reflect.DeepEqual(cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls) { + t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls) } - if !reflect.DeepEqual(cfg.ec.ACUrls, wcfg.ec.ACUrls) { - t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.ACUrls, wcfg.ec.ACUrls) + if !reflect.DeepEqual(cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls) { + t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls) } } diff --git a/server/etcdmain/etcd.go b/server/etcdmain/etcd.go index 2ff8d9173a5..6a79f0a787c 100644 --- a/server/etcdmain/etcd.go +++ b/server/etcdmain/etcd.go @@ -193,7 +193,7 @@ func startEtcdOrProxyV2(args []string) { if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) { lg.Warn("forgot to set --initial-cluster?") } - if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs { + if types.URLs(cfg.ec.AdvertisePeerUrls).String() == embed.DefaultInitialAdvertisePeerURLs { lg.Warn("forgot to set --initial-advertise-peer-urls?") } if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 { @@ -389,11 +389,11 @@ func startProxy(cfg *config) error { // setup self signed certs when serving https cHosts, cTLS := []string{}, false - for _, u := range cfg.ec.LCUrls { + for _, u := range cfg.ec.ListenClientUrls { cHosts = append(cHosts, u.Host) cTLS = cTLS || u.Scheme == "https" } - for _, u := range cfg.ec.ACUrls { + for _, u := range cfg.ec.AdvertiseClientUrls { cHosts = append(cHosts, u.Host) cTLS = cTLS || u.Scheme == "https" } @@ -406,7 +406,7 @@ func startProxy(cfg *config) error { } // Start a proxy server goroutine for each listen address - for _, u := range cfg.ec.LCUrls { + for _, u := range cfg.ec.ListenClientUrls { l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS) if err != nil { return err diff --git a/tests/integration/clientv3/snapshot/v3_snapshot_test.go b/tests/integration/clientv3/snapshot/v3_snapshot_test.go index 82b03214f5a..770a73430aa 100644 --- a/tests/integration/clientv3/snapshot/v3_snapshot_test.go +++ b/tests/integration/clientv3/snapshot/v3_snapshot_test.go @@ -66,8 +66,8 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { cfg := integration.NewEmbedConfig(t, "default") cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) srv, err := embed.StartEtcd(cfg) if err != nil { @@ -82,7 +82,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { t.Fatalf("failed to start embed.Etcd for creating snapshots") } - ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + ccfg := clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}} cli, err := integration.NewClient(t, ccfg) if err != nil { t.Fatal(err) diff --git a/tests/integration/embed/embed_test.go b/tests/integration/embed/embed_test.go index c04bf97c961..27da5bf473b 100644 --- a/tests/integration/embed/embed_test.go +++ b/tests/integration/embed/embed_test.go @@ -78,7 +78,7 @@ func TestEmbedEtcd(t *testing.T) { tests[0].cfg.Durl = "abc" setupEmbedCfg(&tests[1].cfg, []url.URL{urls[0]}, []url.URL{urls[1]}) - tests[1].cfg.ACUrls = nil + tests[1].cfg.AdvertiseClientUrls = nil tests[2].cfg.TickMs = tests[2].cfg.ElectionMs - 1 tests[3].cfg.ElectionMs = 999999 setupEmbedCfg(&tests[4].cfg, []url.URL{urls[2]}, []url.URL{urls[3]}) @@ -86,8 +86,8 @@ func TestEmbedEtcd(t *testing.T) { setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]}) dnsURL, _ := url.Parse("http://whatever.test:12345") - tests[7].cfg.LCUrls = []url.URL{*dnsURL} - tests[8].cfg.LPUrls = []url.URL{*dnsURL} + tests[7].cfg.ListenClientUrls = []url.URL{*dnsURL} + tests[8].cfg.ListenPeerUrls = []url.URL{*dnsURL} dir := filepath.Join(t.TempDir(), fmt.Sprintf("embed-etcd")) @@ -202,8 +202,8 @@ func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) { cfg.LogOutputs = []string{"/dev/null"} cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range purls { cfg.InitialCluster += ",default=" + purls[i].String() diff --git a/tests/integration/snapshot/member_test.go b/tests/integration/snapshot/member_test.go index 076d928bbc3..61ce235f5c5 100644 --- a/tests/integration/snapshot/member_test.go +++ b/tests/integration/snapshot/member_test.go @@ -66,8 +66,8 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) { cfg := integration.NewEmbedConfig(t, "3") cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = newCURLs, newCURLs - cfg.LPUrls, cfg.APUrls = newPURLs, newPURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = newCURLs, newCURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = newPURLs, newPURLs cfg.InitialCluster = "" for i := 0; i < clusterN; i++ { cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index 36886c40bf9..4de0f251d6b 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -48,8 +48,8 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { cfg := integration.NewEmbedConfig(t, "s1") cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) sp := snapshot.NewV3(zaptest.NewLogger(t)) @@ -82,7 +82,7 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { } var cli *clientv3.Client - cli, err = integration.NewClient(t, clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) + cli, err = integration.NewClient(t, clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}}) if err != nil { t.Fatal(err) } @@ -177,8 +177,8 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { cfg := integration.NewEmbedConfig(t, "default") cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) srv, err := embed.StartEtcd(cfg) if err != nil { @@ -193,7 +193,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { t.Fatalf("failed to start embed.Etcd for creating snapshots") } - ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + ccfg := clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}} cli, err := integration.NewClient(t, ccfg) if err != nil { t.Fatal(err) @@ -237,8 +237,8 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) ( cfg := integration.NewEmbedConfig(t, fmt.Sprintf("m%d", i)) cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} - cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} cfg.InitialCluster = ics sp := snapshot.NewV3( diff --git a/tools/etcd-dump-metrics/etcd.go b/tools/etcd-dump-metrics/etcd.go index 7997e283237..5b4db2c25ab 100644 --- a/tools/etcd-dump-metrics/etcd.go +++ b/tools/etcd-dump-metrics/etcd.go @@ -51,8 +51,8 @@ func setupEmbedCfg(cfg *embed.Config, curls, purls, ics []url.URL) { os.RemoveAll(cfg.Dir) cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range ics { From 5cb32feeccc84709668f41444a24a413d4cfcdd0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 10 Mar 2023 14:06:44 +0100 Subject: [PATCH 2/7] server: Separate client listener grouping from serving Signed-off-by: Marek Siarkowicz --- server/embed/etcd.go | 38 +++++++++++++++++++++----------------- server/embed/serve.go | 6 ++++-- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 51bd60d4492..02341c60477 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -614,7 +614,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctxs = make(map[string]*serveCtx) for _, u := range cfg.ListenClientUrls { - sctx := newServeCtx(cfg.logger) if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String())) @@ -626,24 +625,31 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() { return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String()) } + } - network := "tcp" + for _, u := range cfg.ListenClientUrls { addr := u.Host + network := "tcp" if u.Scheme == "unix" || u.Scheme == "unixs" { - network = "unix" addr = u.Host + u.Path + network = "unix" } - sctx.network = network + secure := u.Scheme == "https" || u.Scheme == "unixs" + insecure := !secure - sctx.secure = u.Scheme == "https" || u.Scheme == "unixs" - sctx.insecure = !sctx.secure - if oldctx := sctxs[addr]; oldctx != nil { - oldctx.secure = oldctx.secure || sctx.secure - oldctx.insecure = oldctx.insecure || sctx.insecure - continue + sctx := sctxs[addr] + if sctx == nil { + sctx = newServeCtx(cfg.logger) + sctxs[addr] = sctx } - - if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme, + sctx.secure = sctx.secure || secure + sctx.insecure = sctx.insecure || insecure + sctx.scheme = u.Scheme + sctx.addr = addr + sctx.network = network + } + for _, sctx := range sctxs { + if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme, transport.WithSocketOpts(&cfg.SocketOpts), transport.WithSkipTLSInfoCheck(true), ); err != nil { @@ -651,7 +657,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking // hosts that disable ipv6. So, use the address given by the user. - sctx.addr = addr if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { if fdLimit <= reservedInternalFDNum { @@ -664,17 +669,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) } - defer func(u url.URL) { + defer func(addr string) { if err == nil { return } sctx.l.Close() cfg.logger.Warn( "closing peer listener", - zap.String("address", u.Host), + zap.String("address", addr), zap.Error(err), ) - }(u) + }(sctx.addr) for k := range cfg.UserHandlers { sctx.userHandlers[k] = cfg.UserHandlers[k] } @@ -685,7 +690,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if cfg.LogLevel == "debug" { sctx.registerTrace() } - sctxs[addr] = sctx } return sctxs, nil } diff --git a/server/embed/serve.go b/server/embed/serve.go index 4989e1ca521..6cfaeb01e79 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -50,8 +50,10 @@ import ( ) type serveCtx struct { - lg *zap.Logger - l net.Listener + lg *zap.Logger + l net.Listener + + scheme string addr string network string secure bool From f7e460d1b0bac4f8c937ea3bed845e1c3b3b0357 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 28 Mar 2023 11:47:57 +0200 Subject: [PATCH 3/7] server: Extract resolveUrl helper function Signed-off-by: Marek Siarkowicz --- server/embed/etcd.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 02341c60477..c0f5abe649f 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -628,22 +628,14 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } for _, u := range cfg.ListenClientUrls { - addr := u.Host - network := "tcp" - if u.Scheme == "unix" || u.Scheme == "unixs" { - addr = u.Host + u.Path - network = "unix" - } - secure := u.Scheme == "https" || u.Scheme == "unixs" - insecure := !secure - + addr, secure, network := resolveUrl(u) sctx := sctxs[addr] if sctx == nil { sctx = newServeCtx(cfg.logger) sctxs[addr] = sctx } sctx.secure = sctx.secure || secure - sctx.insecure = sctx.insecure || insecure + sctx.insecure = sctx.insecure || !secure sctx.scheme = u.Scheme sctx.addr = addr sctx.network = network @@ -694,6 +686,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro return sctxs, nil } +func resolveUrl(u url.URL) (addr string, secure bool, network string) { + addr = u.Host + network = "tcp" + if u.Scheme == "unix" || u.Scheme == "unixs" { + addr = u.Host + u.Path + network = "unix" + } + secure = u.Scheme == "https" || u.Scheme == "unixs" + return addr, secure, network +} + func (e *Etcd) serveClients() (err error) { if !e.cfg.ClientTLSInfo.Empty() { e.cfg.logger.Info( From a4d61fa69c34781ce8e71c87e465ba42b1681e38 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 13 Mar 2023 15:46:42 +0100 Subject: [PATCH 4/7] server: Pick one address that all grpc gateways connect to Signed-off-by: Marek Siarkowicz --- server/embed/etcd.go | 50 +++++++++++++++++++++++++++++++++++++++++- server/embed/serve.go | 51 +++++++++++++------------------------------ 2 files changed, 64 insertions(+), 37 deletions(-) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index c0f5abe649f..be15802e9d7 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" defaultLog "log" + "math" "net" "net/http" "net/url" @@ -32,6 +33,7 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" runtimeutil "go.etcd.io/etcd/pkg/v3/runtime" "go.etcd.io/etcd/server/v3/config" @@ -48,6 +50,7 @@ import ( "github.com/soheilhy/cmux" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" ) @@ -745,12 +748,57 @@ func (e *Etcd) serveClients() (err error) { // start client servers in each goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(), gopts...)) }(sctx) } return nil } +func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { + if !e.cfg.EnableGRPCGateway { + return nil + } + sctx := e.pickGrpcGatewayServeContext() + addr := sctx.addr + if network := sctx.network; network == "unix" { + // explicitly define unix network for gRPC socket support + addr = fmt.Sprintf("%s://%s", network, addr) + } + + opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))} + if sctx.secure { + tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig() + if tlsErr != nil { + return func(ctx context.Context) (*grpc.ClientConn, error) { + return nil, tlsErr + } + } + dtls := tlscfg.Clone() + // trust local server + dtls.InsecureSkipVerify = true + bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls}) + opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials())) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + return func(ctx context.Context) (*grpc.ClientConn, error) { + conn, err := grpc.DialContext(ctx, addr, opts...) + if err != nil { + sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err)) + return nil, err + } + return conn, err + } +} + +func (e *Etcd) pickGrpcGatewayServeContext() *serveCtx { + for _, sctx := range e.sctxs { + return sctx + } + panic("Expect at least one context able to serve grpc") +} + func (e *Etcd) serveMetrics() (err error) { if e.cfg.Metrics == "extensive" { grpc_prometheus.EnableHandlingTimeHistogram() diff --git a/server/embed/serve.go b/server/embed/serve.go index 6cfaeb01e79..a53cb38e9a1 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -19,14 +19,12 @@ import ( "fmt" "io/ioutil" defaultLog "log" - "math" "net" "net/http" "strings" etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw" "go.etcd.io/etcd/client/pkg/v3/transport" - "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" "go.etcd.io/etcd/server/v3/config" @@ -95,6 +93,7 @@ func (sctx *serveCtx) serve( tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error), + grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error), gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() @@ -106,6 +105,18 @@ func (sctx *serveCtx) serve( servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) + // Make sure serversC is closed even if we prematurely exit the function. + defer close(sctx.serversC) + var gwmux *gw.ServeMux + if s.Cfg.EnableGRPCGateway { + // GRPC gateway connects to grpc server via connection provided by grpc dial. + gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends) + if err != nil { + sctx.lg.Error("registerGateway failed", zap.Error(err)) + return err + } + } + if sctx.insecure { gs := v3rpc.Server(s, nil, nil, gopts...) v3electionpb.RegisterElectionServer(gs, servElection) @@ -127,14 +138,6 @@ func (sctx *serveCtx) serve( errHandler(gs.Serve(grpcLis)) }(gs, grpcl) - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()}) - if err != nil { - return err - } - } - httpmux := sctx.createMux(gwmux, handler) srvhttp := &http.Server{ @@ -180,20 +183,6 @@ func (sctx *serveCtx) serve( }(gs) handler = grpcHandlerFunc(gs, handler) - - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - dtls := tlscfg.Clone() - // trust local server - dtls.InsecureSkipVerify = true - bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls}) - opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())} - gwmux, err = sctx.registerGateway(opts) - if err != nil { - return err - } - } - var tlsl net.Listener tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) if err != nil { @@ -255,20 +244,10 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error -func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) { +func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.ClientConn, error)) (*gw.ServeMux, error) { ctx := sctx.ctx - addr := sctx.addr - if network := sctx.network; network == "unix" { - // explicitly define unix network for gRPC socket support - addr = fmt.Sprintf("%s://%s", network, addr) - } - - opts = append(opts, grpc.WithDefaultCallOptions([]grpc.CallOption{ - grpc.MaxCallRecvMsgSize(math.MaxInt32), - }...)) - - conn, err := grpc.DialContext(ctx, addr, opts...) + conn, err := dial(ctx) if err != nil { return nil, err } From 792f700774069370a18da924cd0a3399c69d97b0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 10 Mar 2023 17:33:46 +0100 Subject: [PATCH 5/7] server: Add --listen-client-http-urls flag to allow running grpc server separate from http server Difference in load configuration for watch delay tests show how huge the impact is. Even with random write scheduler grpc under http server can only handle 500 KB with 2 seconds delay. On the other hand, separate grpc server easily hits 10, 100 or even 1000 MB within 100 miliseconds. Priority write scheduler that was used in most previous releases is far worse than random one. Tests configured to only 5 MB to avoid flakes and taking too long to fill etcd. Signed-off-by: Marek Siarkowicz --- server/embed/config.go | 53 +++++++--- server/embed/etcd.go | 52 ++++++++-- server/embed/serve.go | 171 ++++++++++++++++++++------------- server/etcdmain/config.go | 7 +- server/etcdmain/config_test.go | 37 ++++--- server/etcdmain/help.go | 4 +- tests/e2e/cluster_test.go | 6 ++ tests/e2e/etcdctl.go | 1 + tests/e2e/utils.go | 6 +- tests/e2e/watch_delay_test.go | 69 ++++++++----- 10 files changed, 276 insertions(+), 130 deletions(-) diff --git a/server/embed/config.go b/server/embed/config.go index 6c76d56e4be..ac61846ca80 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -206,12 +206,12 @@ type Config struct { // streams that each client can open at a time. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` - ListenPeerUrls, ListenClientUrls []url.URL - AdvertisePeerUrls, AdvertiseClientUrls []url.URL - ClientTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerTLSInfo transport.TLSInfo - PeerAutoTLS bool + ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL + AdvertisePeerUrls, AdvertiseClientUrls []url.URL + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool // SelfSignedCertValidity specifies the validity period of the client and peer certificates // that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS, // the unit is year, and the default is 1 @@ -426,10 +426,11 @@ type configYAML struct { // configJSON has file options that are translated into Config options type configJSON struct { - ListenPeerUrls string `json:"listen-peer-urls"` - ListenClientUrls string `json:"listen-client-urls"` - AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` - AdvertiseClientUrls string `json:"advertise-client-urls"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` + AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` CORSJSON string `json:"cors"` HostWhitelistJSON string `json:"host-whitelist"` @@ -561,6 +562,15 @@ func (cfg *configYAML) configFromFile(path string) error { cfg.Config.ListenClientUrls = u } + if cfg.configJSON.ListenClientHttpUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ",")) + if err != nil { + fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err) + os.Exit(1) + } + cfg.Config.ListenClientHttpUrls = u + } + if cfg.configJSON.AdvertisePeerUrls != "" { u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ",")) if err != nil { @@ -660,6 +670,12 @@ func (cfg *Config) Validate() error { if err := checkBindURLs(cfg.ListenClientUrls); err != nil { return err } + if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil { + return err + } + if len(cfg.ListenClientHttpUrls) == 0 { + cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.") + } if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil { return err } @@ -881,9 +897,12 @@ func (cfg *Config) ClientSelfCert() (err error) { cfg.logger.Warn("ignoring client auto TLS since certs given") return nil } - chosts := make([]string, len(cfg.ListenClientUrls)) - for i, u := range cfg.ListenClientUrls { - chosts[i] = u.Host + chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls)) + for _, u := range cfg.ListenClientUrls { + chosts = append(chosts, u.Host) + } + for _, u := range cfg.ListenClientHttpUrls { + chosts = append(chosts, u.Host) } cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts, cfg.SelfSignedCertValidity) if err != nil { @@ -1018,6 +1037,14 @@ func (cfg *Config) getListenClientUrls() (ss []string) { return ss } +func (cfg *Config) getListenClientHttpUrls() (ss []string) { + ss = make([]string, len(cfg.ListenClientHttpUrls)) + for i := range cfg.ListenClientHttpUrls { + ss[i] = cfg.ListenClientHttpUrls[i].String() + } + return ss +} + func (cfg *Config) getMetricsURLs() (ss []string) { ss = make([]string, len(cfg.ListenMetricsUrls)) for i := range cfg.ListenMetricsUrls { diff --git a/server/embed/etcd.go b/server/embed/etcd.go index be15802e9d7..089b332c379 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -442,11 +442,16 @@ func (e *Etcd) Close() { func stopServers(ctx context.Context, ss *servers) { // first, close the http.Server - ss.http.Shutdown(ctx) - // do not grpc.Server.GracefulStop with TLS enabled etcd server + if ss.http != nil { + ss.http.Shutdown(ctx) + } + if ss.grpc == nil { + return + } + // do not grpc.Server.GracefulStop when grpc runs under http server // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 // and https://github.com/etcd-io/etcd/issues/8916 - if ss.secure { + if ss.secure && ss.http != nil { ss.grpc.Stop() return } @@ -616,7 +621,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } sctxs = make(map[string]*serveCtx) - for _, u := range cfg.ListenClientUrls { + for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) { if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String())) @@ -643,6 +648,24 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.addr = addr sctx.network = network } + for _, u := range cfg.ListenClientHttpUrls { + addr, secure, network := resolveUrl(u) + + sctx := sctxs[addr] + if sctx == nil { + sctx = newServeCtx(cfg.logger) + sctxs[addr] = sctx + } else if !sctx.httpOnly { + return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String()) + } + sctx.secure = sctx.secure || secure + sctx.insecure = sctx.insecure || !secure + sctx.scheme = u.Scheme + sctx.addr = addr + sctx.network = network + sctx.httpOnly = true + } + for _, sctx := range sctxs { if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme, transport.WithSocketOpts(&cfg.SocketOpts), @@ -665,7 +688,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } defer func(addr string) { - if err == nil { + if err == nil || sctx.l == nil { return } sctx.l.Close() @@ -745,20 +768,27 @@ func (e *Etcd) serveClients() (err error) { })) } + splitHttp := false + for _, sctx := range e.sctxs { + if sctx.httpOnly { + splitHttp = true + } + } + // start client servers in each goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(), gopts...)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...)) }(sctx) } return nil } -func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { +func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { if !e.cfg.EnableGRPCGateway { return nil } - sctx := e.pickGrpcGatewayServeContext() + sctx := e.pickGrpcGatewayServeContext(splitHttp) addr := sctx.addr if network := sctx.network; network == "unix" { // explicitly define unix network for gRPC socket support @@ -792,9 +822,11 @@ func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.Clie } } -func (e *Etcd) pickGrpcGatewayServeContext() *serveCtx { +func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx { for _, sctx := range e.sctxs { - return sctx + if !splitHttp || !sctx.httpOnly { + return sctx + } } panic("Expect at least one context able to serve grpc") } diff --git a/server/embed/serve.go b/server/embed/serve.go index a53cb38e9a1..30c975c3768 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -56,6 +56,7 @@ type serveCtx struct { network string secure bool insecure bool + httpOnly bool ctx context.Context cancel context.CancelFunc @@ -94,6 +95,7 @@ func (sctx *serveCtx) serve( handler http.Handler, errHandler func(error), grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error), + splitHttp bool, gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() @@ -101,6 +103,12 @@ func (sctx *serveCtx) serve( sctx.lg.Info("ready to serve client requests") m := cmux.New(sctx.l) + var server func() error + onlyGRPC := splitHttp && !sctx.httpOnly + onlyHttp := splitHttp && sctx.httpOnly + grpcEnabled := !onlyHttp + httpEnabled := !onlyGRPC + v3c := v3client.New(s) servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) @@ -116,104 +124,137 @@ func (sctx *serveCtx) serve( return err } } + var traffic string + switch { + case onlyGRPC: + traffic = "grpc" + case onlyHttp: + traffic = "http" + default: + traffic = "grpc+http" + } if sctx.insecure { - gs := v3rpc.Server(s, nil, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) + var gs *grpc.Server + var srv *http.Server + if httpEnabled { + httpmux := sctx.createMux(gwmux, handler) + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } } - - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) + if grpcEnabled { + gs = v3rpc.Server(s, nil, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) } - }(gs) - - grpcl := m.Match(cmux.HTTP2()) - go func(gs *grpc.Server, grpcLis net.Listener) { - errHandler(gs.Serve(grpcLis)) - }(gs, grpcl) - - httpmux := sctx.createMux(gwmux, handler) - - srvhttp := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - ErrorLog: logger, // do not log user error + defer func(gs *grpc.Server) { + if err != nil { + sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) + } + }(gs) } - if err := configureHttpServer(srvhttp, s.Cfg); err != nil { - sctx.lg.Error("Configure http server failed", zap.Error(err)) - return err + if onlyGRPC { + server = func() error { + return gs.Serve(sctx.l) + } + } else { + server = m.Serve + + httpl := m.Match(cmux.HTTP1()) + go func(srvhttp *http.Server, tlsLis net.Listener) { + errHandler(srvhttp.Serve(tlsLis)) + }(srv, httpl) + + if grpcEnabled { + grpcl := m.Match(cmux.HTTP2()) + go func(gs *grpc.Server, l net.Listener) { + errHandler(gs.Serve(l)) + }(gs, grpcl) + } } - httpl := m.Match(cmux.HTTP1()) - - go func(srvhttp *http.Server, httpLis net.Listener) { - errHandler(srvhttp.Serve(httpLis)) - }(srvhttp, httpl) - sctx.serversC <- &servers{grpc: gs, http: srvhttp} + sctx.serversC <- &servers{grpc: gs, http: srv} sctx.lg.Info( "serving client traffic insecurely; this is strongly discouraged!", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } if sctx.secure { + var gs *grpc.Server + var srv *http.Server + tlscfg, tlsErr := tlsinfo.ServerConfig() if tlsErr != nil { return tlsErr } - gs := v3rpc.Server(s, tlscfg, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) + if grpcEnabled { + gs = v3rpc.Server(s, tlscfg, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) + } + defer func(gs *grpc.Server) { + if err != nil { + sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) + } + }(gs) } - - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) + if httpEnabled { + if grpcEnabled { + handler = grpcHandlerFunc(gs, handler) } - }(gs) + httpmux := sctx.createMux(gwmux, handler) - handler = grpcHandlerFunc(gs, handler) - var tlsl net.Listener - tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) - if err != nil { - return err + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + TLSConfig: tlscfg, + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) + return err + } } - // TODO: add debug flag; enable logging when debug flag is set - httpmux := sctx.createMux(gwmux, handler) - srv := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - TLSConfig: tlscfg, - ErrorLog: logger, // do not log user error - } - if err := configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure https server failed", zap.Error(err)) - return err - } + if onlyGRPC { + server = func() error { return gs.Serve(sctx.l) } + } else { + server = m.Serve - go func(srvhttp *http.Server, tlsLis net.Listener) { - errHandler(srvhttp.Serve(tlsLis)) - }(srv, tlsl) + tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) + if err != nil { + return err + } + go func(srvhttp *http.Server, tlsl net.Listener) { + errHandler(srvhttp.Serve(tlsl)) + }(srv, tlsl) + } sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} sctx.lg.Info( "serving client traffic securely", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } - close(sctx.serversC) - return m.Serve() + return server() } func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 0390f8dd774..d8460af88b2 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -147,7 +147,11 @@ func newConfig() *config { ) fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultListenClientURLs, ""), "listen-client-urls", - "List of URLs to listen on for client traffic.", + "List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.", + ) + fs.Var( + flags.NewUniqueURLsWithExceptions("", ""), "listen-client-http-urls", + "List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.", ) fs.Var( flags.NewUniqueURLsWithExceptions("", ""), @@ -397,6 +401,7 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") + cfg.ec.ListenClientHttpUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-http-urls") cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls") diff --git a/server/etcdmain/config_test.go b/server/etcdmain/config_test.go index df3da61c583..faaf250a48f 100644 --- a/server/etcdmain/config_test.go +++ b/server/etcdmain/config_test.go @@ -36,6 +36,7 @@ func TestConfigParsingMemberFlags(t *testing.T) { "-snapshot-count=10", "-listen-peer-urls=http://localhost:8000,https://localhost:8001", "-listen-client-urls=http://localhost:7000,https://localhost:7001", + "-listen-client-http-urls=http://localhost:7002,https://localhost:7003", // it should be set if -listen-client-urls is set "-advertise-client-urls=http://localhost:7000,https://localhost:7001", } @@ -51,14 +52,15 @@ func TestConfigParsingMemberFlags(t *testing.T) { func TestConfigFileMemberFields(t *testing.T) { yc := struct { - Dir string `json:"data-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapshotCount uint64 `json:"snapshot-count"` - ListenPeerUrls string `json:"listen-peer-urls"` - ListenClientUrls string `json:"listen-client-urls"` - AdvertiseClientUrls string `json:"advertise-client-urls"` + Dir string `json:"data-dir"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + SnapshotCount uint64 `json:"snapshot-count"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` }{ "testdir", 10, @@ -67,6 +69,7 @@ func TestConfigFileMemberFields(t *testing.T) { 10, "http://localhost:8000,https://localhost:8001", "http://localhost:7000,https://localhost:7001", + "http://localhost:7002,https://localhost:7003", "http://localhost:7000,https://localhost:7001", } @@ -513,13 +516,14 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { func validateMemberFlags(t *testing.T, cfg *config) { wcfg := &embed.Config{ - Dir: "testdir", - ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, - MaxSnapFiles: 10, - MaxWalFiles: 10, - Name: "testname", - SnapshotCount: 10, + Dir: "testdir", + ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + ListenClientHttpUrls: []url.URL{{Scheme: "http", Host: "localhost:7002"}, {Scheme: "https", Host: "localhost:7003"}}, + MaxSnapFiles: 10, + MaxWalFiles: 10, + Name: "testname", + SnapshotCount: 10, } if cfg.ec.Dir != wcfg.Dir { @@ -543,6 +547,9 @@ func validateMemberFlags(t *testing.T, cfg *config) { if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) { t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) } + if !reflect.DeepEqual(cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) { + t.Errorf("listen-client-http-urls = %v, want %v", cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) + } } func validateClusteringFlags(t *testing.T, cfg *config) { diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 0e5345a1cf2..fdb55d887b0 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -63,7 +63,9 @@ Member: --listen-peer-urls 'http://localhost:2380' List of URLs to listen on for peer traffic. --listen-client-urls 'http://localhost:2379' - List of URLs to listen on for client traffic. + List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified. + --listen-client-http-urls '' + List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls. --max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `' Maximum number of snapshot files to retain (0 is unlimited). --max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `' diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 6b293673f72..c5d752e7026 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -149,6 +149,7 @@ type etcdProcessClusterConfig struct { clientTLS clientConnType clientCertAuthEnabled bool + clientHttpSeparate bool isPeerTLS bool isPeerAutoTLS bool isClientAutoTLS bool @@ -247,6 +248,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* var curls []string var curl, curltls string port := cfg.basePort + 5*i + clientHttpPort := port + 4 curlHost := fmt.Sprintf("localhost:%d", port) switch cfg.clientTLS { @@ -277,6 +279,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), } + if cfg.clientHttpSeparate { + clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)} + args = append(args, "--listen-client-http-urls", clientHttpUrl.String()) + } args = addV2Args(args) if cfg.forceNewCluster { args = append(args, "--force-new-cluster") diff --git a/tests/e2e/etcdctl.go b/tests/e2e/etcdctl.go index 05e8c4f4a4d..854e61234c8 100644 --- a/tests/e2e/etcdctl.go +++ b/tests/e2e/etcdctl.go @@ -121,6 +121,7 @@ func (ctl *Etcdctl) cmdArgs(args ...string) []string { func (ctl *Etcdctl) flags() map[string]string { fmap := make(map[string]string) if ctl.v2 { + fmap["no-sync"] = "true" if ctl.connType == clientTLS { fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile fmap["cert-file"] = integration.TestTLSInfo.CertFile diff --git a/tests/e2e/utils.go b/tests/e2e/utils.go index 4023fc63e9a..d05b3ad4641 100644 --- a/tests/e2e/utils.go +++ b/tests/e2e/utils.go @@ -101,15 +101,17 @@ func tlsInfo(t testing.TB, connType clientConnType, isAutoTLS bool) (*transport. } } -func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error { +func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error { g := errgroup.Group{} concurrency := 10 + keyCount := 100 keysPerRoutine := keyCount / concurrency + valueSize := dbSize / keyCount for i := 0; i < concurrency; i++ { i := i g.Go(func() error { for j := 0; j < keysPerRoutine; j++ { - _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize)) + _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize))) if err != nil { return err } diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_delay_test.go index c5bf0375ec1..b1ee9c8a0be 100644 --- a/tests/e2e/watch_delay_test.go +++ b/tests/e2e/watch_delay_test.go @@ -33,29 +33,48 @@ import ( const ( watchResponsePeriod = 100 * time.Millisecond watchTestDuration = 5 * time.Second - // TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed. - maxWatchDelay = 2 * time.Second - // Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402. - // Tweaked to pass on GitHub runner. For local runs please increase parameters. - // TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed. - numberOfPreexistingKeys = 100 - sizeOfPreexistingValues = 5000 - readLoadConcurrency = 10 + readLoadConcurrency = 10 ) type testCase struct { - name string - config etcdProcessClusterConfig + name string + config etcdProcessClusterConfig + maxWatchDelay time.Duration + dbSizeBytes int } +const ( + Kilo = 1000 + Mega = 1000 * Kilo +) + +// 10 MB is not a bottleneck of grpc server, but filling up etcd with data. +// Keeping it lower so tests don't take too long. +// If we implement reuse of db we could increase the dbSize. var tcs = []testCase{ { - name: "NoTLS", - config: etcdProcessClusterConfig{clusterSize: 1}, + name: "NoTLS", + config: etcdProcessClusterConfig{clusterSize: 1}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, + }, + { + name: "TLS", + config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS}, + maxWatchDelay: 2 * time.Second, + dbSizeBytes: 500 * Kilo, + }, + { + name: "SeparateHttpNoTLS", + config: etcdProcessClusterConfig{clusterSize: 1, clientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, { - name: "ClientTLS", - config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS}, + name: "SeparateHttpTLS", + config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS, clientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, } @@ -69,13 +88,13 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() g := errgroup.Group{} continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify())) + validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -89,7 +108,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -107,7 +126,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { time.Sleep(watchResponsePeriod) } }) - validateWatchDelay(t, c.Watch(ctx, "fake-key")) + validateWatchDelay(t, c.Watch(ctx, "fake-key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -121,7 +140,7 @@ func TestWatchDelayForEvent(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -140,13 +159,13 @@ func TestWatchDelayForEvent(t *testing.T) { } }) continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "key")) + validateWatchDelay(t, c.Watch(ctx, "key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } } -func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) { +func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) { start := time.Now() var maxDelay time.Duration for range watch { @@ -177,15 +196,19 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr for i := 0; i < readLoadConcurrency; i++ { g.Go(func() error { for { - _, err := c.Get(ctx, "", clientv3.WithPrefix()) + resp, err := c.Get(ctx, "", clientv3.WithPrefix()) if err != nil { if strings.Contains(err.Error(), "context deadline exceeded") { return nil } return err } + respSize := 0 + for _, kv := range resp.Kvs { + respSize += kv.Size() + } mux.Lock() - size += numberOfPreexistingKeys * sizeOfPreexistingValues + size += respSize mux.Unlock() } }) From 0ae068461396748642d3c4e2bc15668909e6fc08 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 27 Mar 2023 13:35:29 +0200 Subject: [PATCH 6/7] tests: Test separate http port connection multiplexing Signed-off-by: Marek Siarkowicz --- tests/e2e/cluster_proxy_test.go | 6 ++-- tests/e2e/cluster_test.go | 58 ++++++++++++++++++++------------- tests/e2e/cmux_test.go | 45 ++++++++++++++++--------- tests/e2e/etcd_process.go | 18 +++++++--- 4 files changed, 82 insertions(+), 45 deletions(-) diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index fd7924835b3..beca84cfdea 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -55,8 +55,10 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } -func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() } -func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() } +func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() } +func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() } +func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() } +func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() } func (p *proxyEtcdProcess) EndpointsMetrics() []string { panic("not implemented; proxy doesn't provide health information") } diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index c5d752e7026..e63e9a5d88d 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -246,19 +246,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* initialCluster := make([]string, cfg.clusterSize) for i := 0; i < cfg.clusterSize; i++ { var curls []string - var curl, curltls string + var curl string port := cfg.basePort + 5*i + clientPort := port clientHttpPort := port + 4 - curlHost := fmt.Sprintf("localhost:%d", port) - switch cfg.clientTLS { - case clientNonTLS, clientTLS: - curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() + if cfg.clientTLS == clientTLSAndNonTLS { + curl = clientURL(clientPort, clientNonTLS) + curls = []string{curl, clientURL(clientPort, clientTLS)} + } else { + curl = clientURL(clientPort, cfg.clientTLS) curls = []string{curl} - case clientTLSAndNonTLS: - curl = (&url.URL{Scheme: "http", Host: curlHost}).String() - curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() - curls = []string{curl, curltls} } purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} @@ -279,9 +277,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), } + var clientHttpUrl string if cfg.clientHttpSeparate { - clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)} - args = append(args, "--listen-client-http-urls", clientHttpUrl.String()) + clientHttpUrl = clientURL(clientHttpPort, cfg.clientTLS) + args = append(args, "--listen-client-http-urls", clientHttpUrl) } args = addV2Args(args) if cfg.forceNewCluster { @@ -342,18 +341,19 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* } etcdCfgs[i] = &etcdServerProcessConfig{ - lg: lg, - execPath: cfg.execPath, - args: args, - envVars: cfg.envVars, - tlsArgs: cfg.tlsArgs(), - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - purl: purl, - acurl: curl, - murl: murl, - initialToken: cfg.initialToken, + lg: lg, + execPath: cfg.execPath, + args: args, + envVars: cfg.envVars, + tlsArgs: cfg.tlsArgs(), + dataDirPath: dataDirPath, + keepDataDir: cfg.keepDataDir, + name: name, + purl: purl, + acurl: curl, + murl: murl, + initialToken: cfg.initialToken, + clientHttpUrl: clientHttpUrl, } } @@ -366,6 +366,18 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* return etcdCfgs } +func clientURL(port int, connType clientConnType) string { + curlHost := fmt.Sprintf("localhost:%d", port) + switch connType { + case clientNonTLS: + return (&url.URL{Scheme: "http", Host: curlHost}).String() + case clientTLS: + return (&url.URL{Scheme: "https", Host: curlHost}).String() + default: + panic(fmt.Sprintf("Unsupported connection type %v", connType)) + } +} + func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { if cfg.clientTLS != clientNonTLS { if cfg.isClientAutoTLS { diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index bf3f5ac43e9..22f95297d2a 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -37,8 +37,9 @@ import ( func TestConnectionMultiplexing(t *testing.T) { BeforeTest(t) for _, tc := range []struct { - name string - serverTLS clientConnType + name string + serverTLS clientConnType + separateHttpPort bool }{ { name: "ServerTLS", @@ -52,10 +53,20 @@ func TestConnectionMultiplexing(t *testing.T) { name: "ServerTLSAndNonTLS", serverTLS: clientTLSAndNonTLS, }, + { + name: "SeparateHTTP/ServerTLS", + serverTLS: clientTLS, + separateHttpPort: true, + }, + { + name: "SeparateHTTP/ServerNonTLS", + serverTLS: clientNonTLS, + separateHttpPort: true, + }, } { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true} + cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort} clus, err := newEtcdProcessCluster(t, &cfg) require.NoError(t, err) defer clus.Close() @@ -76,43 +87,45 @@ func TestConnectionMultiplexing(t *testing.T) { name = "ClientTLS" } t.Run(name, func(t *testing.T) { - testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType) + testConnectionMultiplexing(ctx, t, clus.procs[0], connType) }) } }) } - } -func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) { +func testConnectionMultiplexing(ctx context.Context, t *testing.T, member etcdProcess, connType clientConnType) { + httpEndpoint := member.EndpointsHTTP()[0] + grpcEndpoint := member.EndpointsGRPC()[0] switch connType { case clientTLS: - endpoint = toTLS(endpoint) + httpEndpoint = toTLS(httpEndpoint) + grpcEndpoint = toTLS(grpcEndpoint) case clientNonTLS: default: panic(fmt.Sprintf("Unsupported conn type %v", connType)) } t.Run("etcdctl", func(t *testing.T) { t.Run("v2", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true) + etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true) err := etcdctl.Set("a", "1") assert.NoError(t, err) }) t.Run("v3", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false) + etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false) err := etcdctl.Put("a", "1") assert.NoError(t, err) }) }) t.Run("clientv2", func(t *testing.T) { - c, err := newClientV2(t, []string{endpoint}, connType, false) + c, err := newClientV2(t, []string{httpEndpoint}, connType, false) require.NoError(t, err) kv := clientv2.NewKeysAPI(c) _, err = kv.Set(ctx, "a", "1", nil) assert.NoError(t, err) }) t.Run("clientv3", func(t *testing.T) { - c := newClient(t, []string{endpoint}, connType, false) + c := newClient(t, []string{grpcEndpoint}, connType, false) _, err := c.Get(ctx, "a") assert.NoError(t, err) }) @@ -123,11 +136,11 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint stri tname = "default" } t.Run(tname, func(t *testing.T) { - assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType)) - assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType)) - assert.NoError(t, fetchVersion(endpoint, httpVersion, connType)) - assert.NoError(t, fetchHealth(endpoint, httpVersion, connType)) - assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType)) + assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType)) }) } }) diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index 027b7d6aa14..2c5a408e1af 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -35,6 +35,8 @@ var ( type etcdProcess interface { EndpointsV2() []string EndpointsV3() []string + EndpointsGRPC() []string + EndpointsHTTP() []string EndpointsMetrics() []string Start() error @@ -72,8 +74,9 @@ type etcdServerProcessConfig struct { purl url.URL - acurl string - murl string + acurl string + murl string + clientHttpUrl string initialToken string initialCluster string @@ -91,8 +94,15 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil } -func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} } -func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } +func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() } +func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() } +func (ep *etcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.acurl} } +func (ep *etcdServerProcess) EndpointsHTTP() []string { + if ep.cfg.clientHttpUrl == "" { + return []string{ep.cfg.acurl} + } + return []string{ep.cfg.clientHttpUrl} +} func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} } func (ep *etcdServerProcess) Start() error { From 3023b096934a86b4390fb8dce9d0de384eeca662 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 30 Mar 2023 13:37:19 +0200 Subject: [PATCH 7/7] server: Fix defer function closure escape Signed-off-by: Marek Siarkowicz --- server/embed/etcd.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 089b332c379..c15670c7004 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -687,17 +687,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) } - defer func(addr string) { + defer func(sctx *serveCtx) { if err == nil || sctx.l == nil { return } sctx.l.Close() cfg.logger.Warn( "closing peer listener", - zap.String("address", addr), + zap.String("address", sctx.addr), zap.Error(err), ) - }(sctx.addr) + }(sctx) for k := range cfg.UserHandlers { sctx.userHandlers[k] = cfg.UserHandlers[k] }