Skip to content

Commit

Permalink
Merge pull request #83 from xmtp/e2e-convergence
Browse files Browse the repository at this point in the history
e2e: convergence test
  • Loading branch information
snormore authored May 11, 2023
2 parents b13f040 + d0d0b2a commit a4d05e0
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 248 deletions.
3 changes: 2 additions & 1 deletion dev/e2e/run
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ set -eou pipefail

GIT_COMMIT="$(git rev-parse HEAD)"

# shellcheck disable=SC2068
go run \
-ldflags="-X 'main.GitCommit=${GIT_COMMIT}'" \
cmd/xmtpd-e2e/main.go \
"$@"
$@
9 changes: 5 additions & 4 deletions dev/e2e/run-spray-local
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ set -eou pipefail

. dev/net/k8s-env

nodes="$(kubectl -n xmtp-nodes get pods -l "app.kubernetes.io/part-of=xmtp-nodes" -o=json | jq -r '.items[].metadata.labels["app.kubernetes.io/name"]')"
opts=""
nodes="$(dev/terraform/tf output -json | jq -r '.nodes.value[].name')"
opts=()
while read -r node; do
opts="${opts} --api-url=${node}.localhost"
opts+=("--api-url=http://${node}.localhost")
done <<< "$(echo -e "$nodes")"
opts+=("$@")

dev/e2e/run "${opts}" "$@"
dev/e2e/run "${opts[*]}"
1 change: 1 addition & 0 deletions dev/terraform/plans/devnet-local/_variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ variable "node_keys" {
sensitive = true
}
variable "enable_chat_app" { default = true }
variable "enable_e2e" { default = true }
variable "enable_monitoring" { default = true }
variable "e2e_delay" { default = "" }
variable "node_container_cpu_limit" { default = "500m" }
Expand Down
3 changes: 2 additions & 1 deletion dev/terraform/plans/devnet-local/main.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module "cluster" {
source = "git@github.com:xmtp-labs/xmtpd-terraform.git//modules/xmtp-cluster-kind?ref=12d1e46"
source = "git@github.com:xmtp-labs/xmtpd-terraform.git//modules/xmtp-cluster-kind?ref=0d6dce9"

# Uncomment this line and comment out the previous source line to use a
# local instance of xmtpd-modules living in the parent directory of xmtpd.
Expand All @@ -12,6 +12,7 @@ module "cluster" {
e2e_container_image = var.e2e_container_image
e2e_delay = var.e2e_delay
enable_chat_app = var.enable_chat_app
enable_e2e = var.enable_e2e
enable_monitoring = var.enable_monitoring
node_container_cpu_limit = var.node_container_cpu_limit
node_container_memory_limit = var.node_container_memory_limit
Expand Down
23 changes: 12 additions & 11 deletions pkg/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,19 @@ type E2E struct {
}

type Options struct {
APIURLs []string `long:"api-url" env:"XMTP_API_URLS" description:"XMTP node API URLs" default:"http://localhost"`
ClientsPerURL int `long:"clients-per-url" description:"Number of clients for each API URL" default:"1"`
MessagePerClient int `long:"messages-per-client" description:"Number of messages to publish for each client" default:"3"`
Continuous bool `long:"continuous" description:"Run continuously"`
ExitOnError bool `long:"exit-on-error" description:"Exit on error if running continuously"`
RunDelay time.Duration `long:"delay" description:"Delay between runs (in seconds)" default:"5s"`
AdminPort uint `long:"admin-port" description:"Admin HTTP server listen port" default:"7777"`
APIURLs []string `long:"api-url" env:"XMTP_API_URLS" description:"XMTP node API URLs" default:"http://localhost"`
ClientsPerURL int `long:"clients-per-url" description:"Number of clients for each API URL" default:"1"`
MessagePerClient int `long:"messages-per-client" description:"Number of messages to publish for each client" default:"3"`
Continuous bool `long:"continuous" description:"Run continuously"`
ExitOnError bool `long:"exit-on-error" description:"Exit on error if running continuously"`
RunDelay time.Duration `long:"delay" description:"Delay between runs" default:"5s"`
AdminPort uint `long:"admin-port" description:"Admin HTTP server listen port" default:"7777"`
QueryConvergenceDelay time.Duration `long:"query-convergence-delay" description:"Delay between query convergence checks" default:"10ms"`

GitCommit string
}

type testRunFunc func() error
type testRunFunc func(name string) error

type Test struct {
Name string
Expand All @@ -47,7 +48,7 @@ type Test struct {

func (e *E2E) Tests() []*Test {
return []*Test{
e.newTest("messagev1 publish subscribe query", e.testMessageV1PublishSubscribeQuery),
e.newTest("convergence", e.testConvergence),
}
}

Expand All @@ -59,7 +60,7 @@ func New(ctx context.Context, opts *Options) (*E2E, error) {
rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
opts: opts,
}
e.log.Info("running", zap.String("git-commit", opts.GitCommit))
e.log.Info("running", zap.String("git-commit", opts.GitCommit), zap.Strings("nodes", opts.APIURLs))

if e.opts.Continuous {
go func() {
Expand Down Expand Up @@ -102,7 +103,7 @@ func (e *E2E) runTest(test *Test) error {
started := time.Now().UTC()
log := e.log.With(zap.String("test", test.Name))

err := test.Run()
err := test.Run(test.Name)
duration := time.Since(started)
log = log.With(zap.Duration("duration", duration))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestE2E(t *testing.T) {
t.Run(test.Name, func(t *testing.T) {
t.Parallel()

err := test.Run()
err := test.Run(test.Name)
require.NoError(t, err)
})
}
Expand Down
118 changes: 117 additions & 1 deletion pkg/e2e/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import (
)

type Metrics struct {
runDuration *prometheus.HistogramVec
runDuration *prometheus.HistogramVec
subscribeDuration *prometheus.HistogramVec
publishDuration *prometheus.HistogramVec
subscribeConvergenceDuration *prometheus.HistogramVec
queryConvergenceDuration *prometheus.HistogramVec
}

func newMetrics() *Metrics {
Expand All @@ -25,6 +29,46 @@ func newMetrics() *Metrics {
},
[]string{"test", "status"},
),
subscribeDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "xmtpd",
Subsystem: "e2e",
Name: "subscribe_duration_us",
Help: "duration of test case subscribe (microseconds)",
Buckets: prometheus.ExponentialBuckets(10, 10, 10),
},
[]string{"test", "node", "status"},
),
publishDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "xmtpd",
Subsystem: "e2e",
Name: "publish_duration_us",
Help: "duration of test case publish (microseconds)",
Buckets: prometheus.ExponentialBuckets(10, 10, 10),
},
[]string{"test", "node", "status"},
),
subscribeConvergenceDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "xmtpd",
Subsystem: "e2e",
Name: "subscribe_convergence_duration_us",
Help: "duration of test case subscribe convergence (microseconds)",
Buckets: prometheus.ExponentialBuckets(10, 10, 10),
},
[]string{"test", "node", "status"},
),
queryConvergenceDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "xmtpd",
Subsystem: "e2e",
Name: "query_convergence_duration_us",
Help: "duration of test case query convergence (microseconds)",
Buckets: prometheus.ExponentialBuckets(10, 10, 10),
},
[]string{"test", "node", "status"},
),
}
}

Expand All @@ -44,3 +88,75 @@ func (m *Metrics) recordRun(ctx context.Context, test, status string, duration t
}
met.Observe(float64(duration.Microseconds()))
}

func (m *Metrics) recordSubscribe(ctx context.Context, test, node, status string, duration time.Duration) {
if m == nil || m.subscribeDuration == nil {
return
}
met, err := m.subscribeDuration.GetMetricWithLabelValues(test, node, status)
if err != nil {
ctx.Logger().Warn("error observing metric",
zap.Error(err),
zap.String("metric", "subscribe_duration_us"),
zap.String("test", test),
zap.String("node", node),
zap.String("status", status),
)
return
}
met.Observe(float64(duration.Microseconds()))
}

func (m *Metrics) recordPublish(ctx context.Context, test, node, status string, duration time.Duration) {
if m == nil || m.publishDuration == nil {
return
}
met, err := m.publishDuration.GetMetricWithLabelValues(test, node, status)
if err != nil {
ctx.Logger().Warn("error observing metric",
zap.Error(err),
zap.String("metric", "publish_duration_us"),
zap.String("test", test),
zap.String("node", node),
zap.String("status", status),
)
return
}
met.Observe(float64(duration.Microseconds()))
}

func (m *Metrics) recordSubscribeConvergence(ctx context.Context, test, node, status string, duration time.Duration) {
if m == nil || m.subscribeConvergenceDuration == nil {
return
}
met, err := m.subscribeConvergenceDuration.GetMetricWithLabelValues(test, node, status)
if err != nil {
ctx.Logger().Warn("error observing metric",
zap.Error(err),
zap.String("metric", "subscribe_convergence_duration_us"),
zap.String("test", test),
zap.String("node", node),
zap.String("status", status),
)
return
}
met.Observe(float64(duration.Microseconds()))
}

func (m *Metrics) recordQueryConvergence(ctx context.Context, test, node, status string, duration time.Duration) {
if m == nil || m.queryConvergenceDuration == nil {
return
}
met, err := m.queryConvergenceDuration.GetMetricWithLabelValues(test, node, status)
if err != nil {
ctx.Logger().Warn("error observing metric",
zap.Error(err),
zap.String("metric", "query_convergence_duration_us"),
zap.String("test", test),
zap.String("node", node),
zap.String("status", status),
)
return
}
met.Observe(float64(duration.Microseconds()))
}
Loading

0 comments on commit a4d05e0

Please sign in to comment.