Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Migrate watch test to common framework #14345

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/expect/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ExpectProcess struct {
mu sync.Mutex // protects lines and err
lines []string
count int // increment whenever new line gets added
cur int // current read position
err error

// StopSignal is the signal Stop sends to the process; defaults to SIGTERM.
Expand Down Expand Up @@ -198,3 +199,15 @@ func (ep *ExpectProcess) Lines() []string {
defer ep.mu.Unlock()
return ep.lines
}

// ReadLine returns line by line.
func (ep *ExpectProcess) ReadLine() string {
ep.mu.Lock()
defer ep.mu.Unlock()
if ep.count > ep.cur {
line := ep.lines[ep.cur]
ep.cur++
return line
}
return ""
}
82 changes: 82 additions & 0 deletions tests/common/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package common

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

func TestWatch(t *testing.T) {
testRunner.BeforeTest(t)
watchTimeout := 1 * time.Second
for _, tc := range clusterTestCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, tc.config)

defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
tests := []struct {
puts []testutils.KV
watchKey string
opts config.WatchOptions
wanted []testutils.KV
}{
{ // watch by revision
puts: []testutils.KV{{Key: "bar", Val: "revision_1"}, {Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}},
watchKey: "bar",
opts: config.WatchOptions{Revision: 3},
wanted: []testutils.KV{{Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}},
},
{ // watch 1 key
puts: []testutils.KV{{Key: "sample", Val: "value"}},
watchKey: "sample",
opts: config.WatchOptions{Revision: 1},
wanted: []testutils.KV{{Key: "sample", Val: "value"}},
},
{ // watch 3 keys by prefix
puts: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}},
watchKey: "foo",
opts: config.WatchOptions{Revision: 1, Prefix: true},
wanted: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}},
},
{ // watch 3 keys by range
puts: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key3", Val: "val3"}, {Key: "key2", Val: "val2"}},
watchKey: "key",
opts: config.WatchOptions{Revision: 1, RangeEnd: "key3"},
wanted: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key2", Val: "val2"}},
},
}

for _, tt := range tests {
wCtx, wCancel := context.WithCancel(ctx)
wch := cc.Watch(wCtx, tt.watchKey, tt.opts)
if wch == nil {
t.Fatalf("failed to watch %s", tt.watchKey)
}

for j := range tt.puts {
if err := cc.Put(tt.puts[j].Key, tt.puts[j].Val, config.PutOptions{}); err != nil {
t.Fatalf("can't not put key %q, err: %s", tt.puts[j].Key, err)
}
}

kvs, err := testutils.KeyValuesFromWatchChan(wch, len(tt.wanted), watchTimeout)
if err != nil {
wCancel()
t.Fatalf("failed to get key-values from watch channel %s", err)
}

wCancel()
assert.Equal(t, tt.wanted, kvs)
}
})
})
}
}
22 changes: 0 additions & 22 deletions tests/e2e/ctl_v3_watch_cov_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,43 +52,21 @@ func watchTest(cx ctlCtx) {

wkv []kvExec
}{
{ // watch 1 key
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
args: []string{"--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},

// coverage tests get extra arguments:
// ./bin/etcdctl_test -test.coverprofile=e2e.1525392462795198897.coverprofile -test.outputdir=../..
// do not test watch exec commands

{ // watch 3 keys by prefix
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
args: []string{"key", "--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch 3 keys by prefix, with env
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
envKey: "key",
args: []string{"--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch by revision
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
args: []string{"etcd", "--rev", "2"},
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
},
{ // watch 3 keys by range
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
args: []string{"key", "key3", "--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
{ // watch 3 keys by range, with env
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
envKey: "key",
Expand Down
20 changes: 0 additions & 20 deletions tests/e2e/ctl_v3_watch_no_cov_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func watchTest(cx ctlCtx) {

wkv []kvExec
}{
{ // watch 1 key
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
Expand Down Expand Up @@ -101,27 +96,12 @@ func watchTest(cx ctlCtx) {
args: []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 3 keys by prefix
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
args: []string{"key", "--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch 3 keys by prefix, with env
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
envKey: "key",
args: []string{"--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch by revision
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
args: []string{"etcd", "--rev", "2"},
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
},
{ // watch 3 keys by range
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
args: []string{"key", "key3", "--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
{ // watch 3 keys by range, with env
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
envKey: "key",
Expand Down
6 changes: 6 additions & 0 deletions tests/framework/config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ type LeaseOption struct {
type UserAddOptions struct {
NoPassword bool
}

type WatchOptions struct {
Prefix bool
Revision int64
RangeEnd string
}
57 changes: 56 additions & 1 deletion tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -102,6 +103,7 @@ func (ctl *EtcdctlV3) Get(key string, o config.GetOptions) (*clientv3.GetRespons
if err != nil {
return nil, err
}
defer cmd.Close()
_, err = cmd.Expect("Count")
return &resp, err
}
Expand Down Expand Up @@ -145,6 +147,7 @@ func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptio
if err != nil {
return nil, err
}
defer cmd.Close()
_, err = cmd.Expect("compares:")
if err != nil {
return nil, err
Expand Down Expand Up @@ -336,6 +339,7 @@ func (ctl *EtcdctlV3) Grant(ttl int64) (*clientv3.LeaseGrantResponse, error) {
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseGrantResponse
line, err := cmd.Expect("ID")
if err != nil {
Expand All @@ -355,6 +359,7 @@ func (ctl *EtcdctlV3) TimeToLive(id clientv3.LeaseID, o config.LeaseOption) (*cl
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseTimeToLiveResponse
line, err := cmd.Expect("id")
if err != nil {
Expand Down Expand Up @@ -383,6 +388,7 @@ func (ctl *EtcdctlV3) LeaseList() (*clientv3.LeaseLeasesResponse, error) {
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseLeasesResponse
line, err := cmd.Expect("id")
if err != nil {
Expand All @@ -398,6 +404,7 @@ func (ctl *EtcdctlV3) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKe
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseKeepAliveResponse
line, err := cmd.Expect("ID")
if err != nil {
Expand Down Expand Up @@ -426,6 +433,7 @@ func (ctl *EtcdctlV3) AlarmDisarm(_ *clientv3.AlarmMember) (*clientv3.AlarmRespo
if err != nil {
return nil, err
}
defer ep.Close()
var resp clientv3.AlarmResponse
line, err := ep.Expect("alarm")
if err != nil {
Expand Down Expand Up @@ -454,6 +462,7 @@ func (ctl *EtcdctlV3) UserAdd(name, password string, opts config.UserAddOptions)
if err != nil {
return nil, err
}
defer cmd.Close()

// If no password is provided, and NoPassword isn't set, the CLI will always
// wait for a password, send an enter in this case for an "empty" password.
Expand Down Expand Up @@ -492,7 +501,7 @@ func (ctl *EtcdctlV3) UserChangePass(user, newPass string) error {
if err != nil {
return err
}

defer cmd.Close()
err = cmd.Send(newPass + "\n")
if err != nil {
return err
Expand Down Expand Up @@ -545,9 +554,55 @@ func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error {
if err != nil {
return err
}
defer cmd.Close()
line, err := cmd.Expect("header")
if err != nil {
return err
}
return json.Unmarshal([]byte(line), output)
}

func (ctl *EtcdctlV3) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan {
args := ctl.cmdArgs()
args = append(args, "watch", key)
if opts.RangeEnd != "" {
args = append(args, opts.RangeEnd)
}
args = append(args, "-w", "json")
if opts.Prefix {
args = append(args, "--prefix")
}
if opts.Revision != 0 {
args = append(args, "--rev", fmt.Sprint(opts.Revision))
}
proc, err := SpawnCmd(args, nil)
if err != nil {
return nil
}

ch := make(chan clientv3.WatchResponse)
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered to wait for the goroutine to exit when the test case finish?

Copy link
Contributor Author

@nic-chen nic-chen Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible (although with low possibility) that the test case finish before the goroutine exit, eventually the test might report goroutine leak errors. Our pipeline fails due to this kind of error from time to time. So for safety, we need to make sure the goroutine exits before the the test case returns/exits.

Copy link
Member

@serathius serathius Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a problem. We should be good as long as we propagate signal to goroutine that causes it to exit. It's up to leak detection to wait for this signal to propagate. If you read the etcd test goroutine leak implementation, it gives test up to 5 seconds to cleanup its goroutines and uses runtime.Gosched() to give goroutine time to execute and exit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could merge it first. If there is a real problem, I will fix it. What do you think? @serathius @ahrtr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @ahrtr is worried that the problem will not just imminently show up after merging. Leak goroutine issues like this one are very subtle (for example I cannot reproduce this issue locally at all), so I understand his position.

I still think my argument holds and this should not be a problem, but I would prefer to wait for confirmation from @ahrtr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you read the etcd test goroutine leak implementation, it gives test up to 5 seconds to cleanup its goroutines and uses runtime.Gosched() to give goroutine time to execute and exit.

The goroutine leak checker indeed waits up to 1 second in the afterTest check.

This is just a mitigation solution instead of best practice to me. We should always make sure gracefully shutdown of all threads/goroutines in almost all cases, including production code and test code, unless the cases that we intentionally create.

Previously we also saw a situation that a test case (in test code) or the main goroutine (in production code) already finishes/exits, but a goroutine it creates still prints log, and it isn't allowed by golang. Of course, I do not see such issue in this PR.

I just searched the repo, there are lots of similar cases which create goroutines but do not gracefully shutdown them. It's OK to merge this PR for now. But I'd like to revisit all such cases afterwards in separate PR(s). WDYT? @serathius @spzala

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the is a problem worth looking into.

defer proc.Stop()
for {
select {
case <-ctx.Done():
close(ch)
return
default:
if line := proc.ReadLine(); line != "" {
var resp clientv3.WatchResponse
json.Unmarshal([]byte(line), &resp)
if resp.Canceled {
close(ch)
return
}
if len(resp.Events) > 0 {
ch <- resp
}
}
}
}
}()

return ch
}
1 change: 1 addition & 0 deletions tests/framework/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func SpawnWithExpectLines(args []string, envVars map[string]string, xs ...string
if err != nil {
return nil, err
}
defer proc.Close()
// process until either stdout or stderr contains
// the expected string
var (
Expand Down
15 changes: 15 additions & 0 deletions tests/framework/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,18 @@ func getOps(ss []string) ([]clientv3.Op, error) {
func (c integrationClient) MemberList() (*clientv3.MemberListResponse, error) {
return c.Client.MemberList(c.ctx)
}

func (c integrationClient) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan {
opOpts := []clientv3.OpOption{}
if opts.Prefix {
opOpts = append(opOpts, clientv3.WithPrefix())
}
if opts.Revision != 0 {
opOpts = append(opOpts, clientv3.WithRev(opts.Revision))
}
if opts.RangeEnd != "" {
opOpts = append(opOpts, clientv3.WithRange(opts.RangeEnd))
}

return c.Client.Watch(ctx, key, opOpts...)
}
3 changes: 2 additions & 1 deletion tests/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type Client interface {
LeaseList() (*clientv3.LeaseLeasesResponse, error)
LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error)
LeaseRevoke(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)

UserAdd(name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error)
UserList() (*clientv3.AuthUserListResponse, error)
UserDelete(name string) (*clientv3.AuthUserDeleteResponse, error)
Expand All @@ -73,4 +72,6 @@ type Client interface {
Txn(compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error)

MemberList() (*clientv3.MemberListResponse, error)

Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan
}
Loading