diff --git a/.github/workflows/robustness-template.yaml b/.github/workflows/robustness-template.yaml index 78de5b78bbd4..aaf895de83aa 100644 --- a/.github/workflows/robustness-template.yaml +++ b/.github/workflows/robustness-template.yaml @@ -38,6 +38,10 @@ jobs: run: | set -euo pipefail go clean -testcache + + # Build LazyFS + sudo apt-get -y install cmake libfuse3-dev libfuse3-3 fuse3 + make install-lazyfs # Use --failfast to avoid overriding report generated by failed test GO_TEST_FLAGS="-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestRobustness" diff --git a/Makefile b/Makefile index 200a1fbd1486..38a71cad037a 100644 --- a/Makefile +++ b/Makefile @@ -149,6 +149,20 @@ ifeq (, $(shell which yamlfmt)) endif yamlfmt -conf tools/.yamlfmt . +# Tools + +.PHONY: install-lazyfs +install-lazyfs: bin/lazyfs + +bin/lazyfs: + rm /tmp/lazyfs -rf + git clone https://github.com/dsrhaslab/lazyfs /tmp/lazyfs + cd /tmp/lazyfs/; git checkout 94ef5e60117f2a6c6d12b29e09e287c3893150ca + cd /tmp/lazyfs/libs/libpcache; ./build.sh + cd /tmp/lazyfs/lazyfs; ./build.sh + mkdir -p ./bin + cp /tmp/lazyfs/lazyfs/build/lazyfs ./bin/lazyfs + # Cleanup clean: diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index ac8243d9d7e8..17c3c37a0d3e 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -185,6 +185,7 @@ type EtcdProcessClusterConfig struct { CompactHashCheckEnabled bool CompactHashCheckTime time.Duration GoFailEnabled bool + LazyFSEnabled bool CompactionBatchLimit int CompactionSleepInterval time.Duration @@ -344,6 +345,10 @@ func WithGoFailEnabled(enabled bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.GoFailEnabled = enabled } } +func WithLazyFSEnabled(enabled bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.LazyFSEnabled = enabled } +} + func WithWarningUnaryRequestDuration(time time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.WarningUnaryRequestDuration = time } } @@ -407,7 +412,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP // launch etcd processes for i := range etcdCfgs { - proc, err := NewEtcdProcess(etcdCfgs[i]) + proc, err := NewEtcdProcess(t, etcdCfgs[i]) if err != nil { epc.Close() return nil, fmt.Errorf("cannot configure: %v", err) @@ -659,6 +664,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in InitialToken: cfg.InitialToken, GoFailPort: gofailPort, Proxy: proxyCfg, + LazyFSEnabled: cfg.LazyFSEnabled, } } @@ -826,7 +832,7 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces // Then start process tb.Log("start new member") - proc, err := NewEtcdProcess(serverCfg) + proc, err := NewEtcdProcess(tb, serverCfg) if err != nil { epc.Close() return 0, fmt.Errorf("cannot configure: %v", err) @@ -855,7 +861,7 @@ func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...E } epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new") - proc, err := NewEtcdProcess(serverCfg) + proc, err := NewEtcdProcess(tb, serverCfg) if err != nil { return err } diff --git a/tests/framework/e2e/cluster_direct.go b/tests/framework/e2e/cluster_direct.go index ac659bd6bcc4..70c60dbf4c0a 100644 --- a/tests/framework/e2e/cluster_direct.go +++ b/tests/framework/e2e/cluster_direct.go @@ -16,6 +16,8 @@ package e2e -func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) { - return NewEtcdServerProcess(cfg) +import "testing" + +func NewEtcdProcess(t testing.TB, cfg *EtcdServerProcessConfig) (EtcdProcess, error) { + return NewEtcdServerProcess(t, cfg) } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 1d8f941bf84a..1f7722ef09b6 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -56,6 +56,7 @@ type EtcdProcess interface { Config() *EtcdServerProcessConfig PeerProxy() proxy.Server Failpoints() *BinaryFailpoints + //LazyFS() *LazyFS Logs() LogsExpect Kill() error } @@ -70,6 +71,7 @@ type EtcdServerProcess struct { cfg *EtcdServerProcessConfig proc *expect.ExpectProcess proxy proxy.Server + lazyfs *LazyFS failpoints *BinaryFailpoints donec chan struct{} // closed when Interact() terminates } @@ -96,10 +98,11 @@ type EtcdServerProcessConfig struct { InitialCluster string GoFailPort int - Proxy *proxy.ServerConfig + LazyFSEnabled bool + Proxy *proxy.ServerConfig } -func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { +func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { if !fileutil.Exist(cfg.ExecPath) { return nil, fmt.Errorf("could not find etcd binary: %s", cfg.ExecPath) } @@ -112,6 +115,9 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err if cfg.GoFailPort != 0 { ep.failpoints = &BinaryFailpoints{member: ep} } + if cfg.LazyFSEnabled { + ep.lazyfs = newLazyFS(cfg.DataDirPath, t) + } return ep, nil } @@ -146,6 +152,18 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { return err } } + if ep.lazyfs != nil { + ep.cfg.lg.Info("starting lazyfs...", zap.String("name", ep.cfg.Name)) + err := ep.lazyfs.Start() + if err != nil { + return err + } + err = ep.lazyfs.ClearCache() + if err != nil { + return err + } + } + ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name) if err != nil { @@ -205,6 +223,14 @@ func (ep *EtcdServerProcess) Stop() (err error) { return err } } + if ep.lazyfs != nil { + ep.cfg.lg.Info("stopping lazyfs...", zap.String("name", ep.cfg.Name)) + err = ep.lazyfs.Stop() + ep.lazyfs = nil + if err != nil { + return err + } + } return nil } @@ -298,6 +324,10 @@ func (ep *EtcdServerProcess) PeerProxy() proxy.Server { return ep.proxy } +func (ep *EtcdServerProcess) LazyFS() *LazyFS { + return ep.lazyfs +} + func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints { return ep.failpoints } diff --git a/tests/framework/e2e/lazyfs.go b/tests/framework/e2e/lazyfs.go new file mode 100644 index 000000000000..5d3e4222a3d6 --- /dev/null +++ b/tests/framework/e2e/lazyfs.go @@ -0,0 +1,94 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "go.etcd.io/etcd/pkg/v3/expect" + "os" + "path/filepath" +) + +func newLazyFS(dataDir string, tmp TempDirProvider) *LazyFS { + return &LazyFS{ + DataDir: dataDir, + LazyFSDir: tmp.TempDir(), + } +} + +type TempDirProvider interface { + TempDir() string +} + +type LazyFS struct { + DataDir string + LazyFSDir string + + ep *expect.ExpectProcess +} + +func (fs *LazyFS) Start() (err error) { + if fs.ep == nil { + return nil + } + err = os.WriteFile(fs.configPath(), fs.config(), 0666) + if err != nil { + return err + } + dataPath := filepath.Join(fs.LazyFSDir, "data") + err = os.Mkdir(dataPath, 0777) + if err != nil { + return err + } + + fs.ep, err = expect.NewExpect("./bin/lazyfs", fs.DataDir, "--config-path", fs.configPath(), "-o", "allow_other", "-o", "modules=subdir", "-o", "subdir="+dataPath, "-f") + if err != nil { + return err + } + return err +} + +func (fs *LazyFS) configPath() string { + return filepath.Join(fs.LazyFSDir, "config.toml") +} + +func (fs *LazyFS) socketPath() string { + return filepath.Join(fs.LazyFSDir, "sock.fifo") +} + +func (fs *LazyFS) config() []byte { + return []byte(fmt.Sprintf(`[faults] + fifo_path=%q + [cache] + apply_eviction=false + [cache.simple] + custom_size="1gb" + blocks_per_page=1 + [filesystem] + log_all_operations=false`, fs.socketPath())) +} + +func (fs *LazyFS) Stop() error { + if fs.ep == nil { + return nil + } + fmt.Printf("LAZYFS %q", fs.ep.Lines()) + defer func() { fs.ep = nil }() + return fs.ep.Close() +} + +func (fs *LazyFS) ClearCache() error { + return os.WriteFile(fs.socketPath(), []byte("lazyfs::clear-cache\n"), 0666) +} diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index fc568c22859b..c22d3686cf62 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -49,6 +49,7 @@ func TestRobustness(t *testing.T) { e2e.WithClusterSize(1), e2e.WithSnapshotCount(100), e2e.WithGoFailEnabled(true), + e2e.WithLazyFSEnabled(true), e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), ), @@ -58,6 +59,7 @@ func TestRobustness(t *testing.T) { e2e.WithSnapshotCount(100), e2e.WithPeerProxy(true), e2e.WithGoFailEnabled(true), + e2e.WithLazyFSEnabled(true), e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), }