From 6025355ce0a4e77602593ffd25daec098f6d91e3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 2 Feb 2023 11:37:54 +0100 Subject: [PATCH 1/3] tests: Allow configuring progress notify interval in e2e tests Signed-off-by: Marek Siarkowicz --- tests/e2e/cluster_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 87bc376f16a..48536292ed2 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -20,6 +20,7 @@ import ( "net/url" "os" "strings" + "time" "go.etcd.io/etcd/etcdserver" ) @@ -135,7 +136,8 @@ type etcdProcessClusterConfig struct { initialCorruptCheck bool authTokenOpts string - MaxConcurrentStreams uint32 // default is math.MaxUint32 + MaxConcurrentStreams uint32 // default is math.MaxUint32 + WatchProcessNotifyInterval time.Duration } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -269,6 +271,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) } + if cfg.WatchProcessNotifyInterval != 0 { + args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) + } + etcdCfgs[i] = &etcdServerProcessConfig{ execPath: cfg.execPath, args: args, From e818b5fac88e6893a5adf4f01e0c62b3a564b862 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 2 Mar 2023 12:31:13 +0100 Subject: [PATCH 2/3] test: Test etcd watch stream starvation under high read response load when sharing the same connection Signed-off-by: Marek Siarkowicz --- go.mod | 1 + go.sum | 2 + pkg/stringutil/rand.go | 6 +- tests/e2e/watch_delay_test.go | 277 ++++++++++++++++++++++++++++++++++ 4 files changed, 283 insertions(+), 3 deletions(-) create mode 100644 tests/e2e/watch_delay_test.go diff --git a/go.mod b/go.mod index e9437244dd1..34c2198a39e 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/sirupsen/logrus v1.6.0 // indirect go.uber.org/atomic v1.3.2 // indirect go.uber.org/multierr v1.1.0 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/text v0.7.0 // indirect google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 // indirect google.golang.org/protobuf v1.26.0-rc.1 // indirect diff --git a/go.sum b/go.sum index de185824e7c..53f138ad4f3 100644 --- a/go.sum +++ b/go.sum @@ -233,6 +233,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/stringutil/rand.go b/pkg/stringutil/rand.go index a15b0de0c08..96d9df311cf 100644 --- a/pkg/stringutil/rand.go +++ b/pkg/stringutil/rand.go @@ -24,7 +24,7 @@ func UniqueStrings(slen uint, n int) (ss []string) { exist := make(map[string]struct{}) ss = make([]string, 0, n) for len(ss) < n { - s := randString(slen) + s := RandString(slen) if _, ok := exist[s]; !ok { ss = append(ss, s) exist[s] = struct{}{} @@ -37,14 +37,14 @@ func UniqueStrings(slen uint, n int) (ss []string) { func RandomStrings(slen uint, n int) (ss []string) { ss = make([]string, 0, n) for i := 0; i < n; i++ { - ss = append(ss, randString(slen)) + ss = append(ss, RandString(slen)) } return ss } const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" -func randString(l uint) string { +func RandString(l uint) string { rand.Seed(time.Now().UnixNano()) s := make([]byte, l) for i := 0; i < int(l); i++ { diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_delay_test.go new file mode 100644 index 00000000000..c9ccc002bb3 --- /dev/null +++ b/tests/e2e/watch_delay_test.go @@ -0,0 +1,277 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// These tests are performance sensitive, addition of cluster proxy makes them unstable. +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/stringutil" + "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/pkg/transport" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +const ( + watchResponsePeriod = 100 * time.Millisecond + watchTestDuration = 5 * time.Second + // TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed. + maxWatchDelay = 2 * time.Second + // Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402. + // Tweaked to pass on GitHub runner. For local runs please increase parameters. + // TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed. + numberOfPreexistingKeys = 100 + sizeOfPreexistingValues = 5000 + readLoadConcurrency = 10 +) + +type testCase struct { + name string + config etcdProcessClusterConfig +} + +var tcs = []testCase{ + { + name: "NoTLS", + config: etcdProcessClusterConfig{clusterSize: 1}, + }, + { + name: "ClientTLS", + config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS}, + }, +} + +func TestWatchDelayForPeriodicProgressNotification(t *testing.T) { + defer testutil.AfterTest(t) + for _, tc := range tcs { + tc := tc + tc.config.WatchProcessNotifyInterval = watchResponsePeriod + t.Run(tc.name, func(t *testing.T) { + clus, err := newEtcdProcessCluster(&tc.config) + require.NoError(t, err) + defer clus.Close() + c := newClient(t, clus, tc.config) + require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + + ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) + defer cancel() + g := errgroup.Group{} + continuouslyExecuteGetAll(ctx, t, &g, c) + validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify())) + require.NoError(t, g.Wait()) + }) + } +} + +func TestWatchDelayForManualProgressNotification(t *testing.T) { + defer testutil.AfterTest(t) + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + clus, err := newEtcdProcessCluster(&tc.config) + require.NoError(t, err) + defer clus.Close() + c := newClient(t, clus, tc.config) + require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + + ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) + defer cancel() + g := errgroup.Group{} + continuouslyExecuteGetAll(ctx, t, &g, c) + g.Go(func() error { + for { + err := c.RequestProgress(ctx) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil + } + return err + } + time.Sleep(watchResponsePeriod) + } + }) + validateWatchDelay(t, c.Watch(ctx, "fake-key")) + require.NoError(t, g.Wait()) + }) + } +} + +func TestWatchDelayForEvent(t *testing.T) { + defer testutil.AfterTest(t) + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + clus, err := newEtcdProcessCluster(&tc.config) + require.NoError(t, err) + defer clus.Close() + c := newClient(t, clus, tc.config) + require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + + ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) + defer cancel() + g := errgroup.Group{} + g.Go(func() error { + i := 0 + for { + _, err := c.Put(ctx, "key", fmt.Sprintf("%d", i)) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil + } + return err + } + time.Sleep(watchResponsePeriod) + } + }) + continuouslyExecuteGetAll(ctx, t, &g, c) + validateWatchDelay(t, c.Watch(ctx, "key")) + require.NoError(t, g.Wait()) + }) + } +} + +func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) { + start := time.Now() + var maxDelay time.Duration + for range watch { + sinceLast := time.Since(start) + if sinceLast > watchResponsePeriod+maxWatchDelay { + t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: %s", maxWatchDelay, sinceLast-watchResponsePeriod) + } else { + t.Logf("Got watch response, since last: %s", sinceLast) + } + if sinceLast > maxDelay { + maxDelay = sinceLast + } + start = time.Now() + } + sinceLast := time.Since(start) + if sinceLast > maxDelay && sinceLast > watchResponsePeriod+maxWatchDelay { + t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: unknown", maxWatchDelay) + t.Errorf("Test finished while in middle of delayed response, measured delay: %s", sinceLast-watchResponsePeriod) + t.Logf("Please increase the test duration to measure delay") + } else { + t.Logf("Max delay: %s", maxDelay-watchResponsePeriod) + } +} + +func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error { + g := errgroup.Group{} + concurrency := 10 + keysPerRoutine := keyCount / concurrency + for i := 0; i < concurrency; i++ { + i := i + g.Go(func() error { + for j := 0; j < keysPerRoutine; j++ { + _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize)) + if err != nil { + return err + } + } + return nil + }) + } + return g.Wait() +} + +func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Group, c *clientv3.Client) { + mux := sync.RWMutex{} + size := 0 + for i := 0; i < readLoadConcurrency; i++ { + g.Go(func() error { + for { + _, err := c.Get(ctx, "", clientv3.WithPrefix()) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil + } + return err + } + mux.Lock() + size += numberOfPreexistingKeys * sizeOfPreexistingValues + mux.Unlock() + } + }) + } + g.Go(func() error { + lastSize := size + for range time.Tick(time.Second) { + select { + case <-ctx.Done(): + return nil + default: + } + mux.RLock() + t.Logf("Generating read load around %.1f MB/s", float64(size-lastSize)/1000/1000) + lastSize = size + mux.RUnlock() + } + return nil + }) +} + +func newClient(t *testing.T, clus *etcdProcessCluster, cfg etcdProcessClusterConfig) *clientv3.Client { + tlscfg, err := tlsInfo(t, cfg) + if err != nil { + t.Fatal(err) + } + ccfg := clientv3.Config{ + Endpoints: clus.EndpointsV3(), + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + if tlscfg != nil { + tls, err := tlscfg.ClientConfig() + if err != nil { + t.Fatal(err) + } + ccfg.TLS = tls + } + c, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + c.Close() + }) + return c +} + +func tlsInfo(t testing.TB, cfg etcdProcessClusterConfig) (*transport.TLSInfo, error) { + switch cfg.clientTLS { + case clientNonTLS, clientTLSAndNonTLS: + return nil, nil + case clientTLS: + if cfg.isClientAutoTLS { + tls, err := transport.SelfCert(zap.NewNop(), t.TempDir(), []string{"localhost"}, 1) + if err != nil { + return nil, fmt.Errorf("failed to generate cert: %s", err) + } + return &tls, nil + } + panic("Unsupported non-auto tls") + default: + return nil, fmt.Errorf("config %v not supported", cfg) + } +} From 60e381aaa92f4dc23ddf04ffb7c6032b11f9b046 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 8 Mar 2023 15:07:25 +0100 Subject: [PATCH 3/3] server: Switch back to random scheduler to improve resilience to watch starvation Signed-off-by: Marek Siarkowicz --- embed/serve.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/embed/serve.go b/embed/serve.go index 2dfe567c0e8..7e5d774728e 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -215,6 +215,8 @@ func configureHttpServer(srv *http.Server, cfg etcdserver.ServerConfig) error { // todo (ahrtr): should we support configuring other parameters in the future as well? return http2.ConfigureServer(srv, &http2.Server{ MaxConcurrentStreams: cfg.MaxConcurrentStreams, + // Override to avoid using priority scheduler which is affected by https://github.com/golang/go/issues/58804. + NewWriteScheduler: http2.NewRandomWriteScheduler, }) }