From 8d727c94b2e070c64bd89b00bfed79896b580e39 Mon Sep 17 00:00:00 2001 From: Steven Sheehy Date: Mon, 19 Jul 2021 11:46:47 -0500 Subject: [PATCH 1/6] Add a cluster health endpoint to monitor Signed-off-by: Steven Sheehy --- charts/hedera-mirror-monitor/values.yaml | 4 +- docs/configuration.md | 2 +- ...ubscription.java => AbstractScenario.java} | 39 +- .../monitor/ClusterHealthIndicator.java | 75 ++++ .../hedera/mirror/monitor/HederaNetwork.java | 9 +- .../hedera/mirror/monitor/NodeProperties.java | 3 + .../mirror/monitor/ScenarioProperties.java | 62 +++ ...berProtocol.java => ScenarioProtocol.java} | 4 +- ...riptionStatus.java => ScenarioStatus.java} | 4 +- .../monitor/config/MonitorConfiguration.java | 2 +- .../expression/ExpressionConverterImpl.java | 12 +- .../monitor/publish/PublishMetrics.java | 68 ++-- .../monitor/publish/PublishProperties.java | 4 +- .../monitor/publish/PublishRequest.java | 7 +- .../monitor/publish/PublishScenario.java | 46 +++ .../PublishScenarioProperties.java} | 24 +- .../monitor/publish/TransactionPublisher.java | 137 +++---- .../CompositeTransactionGenerator.java | 23 +- .../ConfigurableTransactionGenerator.java | 31 +- .../generator/ScenarioException.java | 10 +- .../generator/TransactionGenerator.java | 10 +- .../AbstractSubscriberProperties.java | 37 +- .../subscribe/CompositeSubscriber.java | 2 +- .../monitor/subscribe/MirrorSubscriber.java | 2 +- .../{Subscription.java => Scenario.java} | 21 +- .../monitor/subscribe/SubscribeMetrics.java | 32 +- .../monitor/subscribe/SubscribeResponse.java | 2 +- .../controller/SubscriberController.java | 16 +- .../monitor/subscribe/grpc/GrpcClientSDK.java | 4 +- .../subscribe/grpc/GrpcSubscription.java | 16 +- .../subscribe/rest/RestSubscriber.java | 7 +- .../subscribe/rest/RestSubscription.java | 16 +- .../src/main/resources/application.yml | 2 + .../monitor/ClusterHealthIndicatorTest.java | 103 +++++ .../ExpressionConverterImplTest.java | 14 +- .../monitor/publish/PublishMetricsTest.java | 20 +- .../publish/PublishPropertiesTest.java | 14 +- .../publish/TransactionPublisherTest.java | 384 ++++++++++++++++++ .../CompositeTransactionGeneratorTest.java | 66 +-- .../ConfigurableTransactionGeneratorTest.java | 15 +- .../subscribe/CompositeSubscriberTest.java | 4 +- .../subscribe/SubscribeMetricsTest.java | 23 +- ...estSubscription.java => TestScenario.java} | 29 +- .../controller/SubscriberControllerTest.java | 38 +- .../subscribe/grpc/GrpcSubscriberTest.java | 16 +- .../subscribe/rest/RestSubscriberTest.java | 87 ++-- 46 files changed, 1121 insertions(+), 425 deletions(-) rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{subscribe/AbstractSubscription.java => AbstractScenario.java} (76%) create mode 100644 hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java create mode 100644 hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioProperties.java rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{subscribe/SubscriberProtocol.java => ScenarioProtocol.java} (89%) rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{subscribe/SubscriptionStatus.java => ScenarioStatus.java} (92%) create mode 100644 hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishScenario.java rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{generator/ScenarioProperties.java => publish/PublishScenarioProperties.java} (82%) rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{ => publish}/generator/CompositeTransactionGenerator.java (85%) rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{ => publish}/generator/ConfigurableTransactionGenerator.java (82%) rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{ => publish}/generator/ScenarioException.java (76%) rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/{ => publish}/generator/TransactionGenerator.java (77%) rename hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/{Subscription.java => Scenario.java} (76%) create mode 100644 hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/ClusterHealthIndicatorTest.java create mode 100644 hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java rename hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/{ => publish}/generator/CompositeTransactionGeneratorTest.java (75%) rename hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/{ => publish}/generator/ConfigurableTransactionGeneratorTest.java (94%) rename hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/{TestSubscription.java => TestScenario.java} (66%) diff --git a/charts/hedera-mirror-monitor/values.yaml b/charts/hedera-mirror-monitor/values.yaml index 794fc9c6e51..da51129c029 100644 --- a/charts/hedera-mirror-monitor/values.yaml +++ b/charts/hedera-mirror-monitor/values.yaml @@ -183,8 +183,8 @@ prometheusRules: description: "Averaging {{ $value | humanizePercentage }} error rate publishing '{{ $labels.scenario }}' scenario from {{ $labels.namespace }}/{{ $labels.pod }}" summary: "Publish error rate exceeds 15%" enabled: true - expr: sum(rate(hedera_mirror_monitor_publish_submit_seconds_sum{application="hedera-mirror-monitor",status!="SUCCESS"}[2m])) by (namespace, pod, scenario) / sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor"}[2m])) by (namespace, pod, scenario) > 0.15 - for: 2m + expr: expr: sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor",status!="SUCCESS"}[2m])) by (namespace, pod, scenario) / sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor"}[2m])) by (namespace, pod, scenario) > 0.33 + for: 3m labels: severity: critical application: hedera-mirror-monitor diff --git a/docs/configuration.md b/docs/configuration.md index 66ce8e81c10..7a6f30f3099 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -249,10 +249,10 @@ Name | Default | D `hedera.mirror.monitor.publish.scenarios..enabled` | true | Whether this publish scenario is enabled `hedera.mirror.monitor.publish.scenarios..limit` | 0 | How many transactions to publish before halting. 0 for unlimited `hedera.mirror.monitor.publish.scenarios..logResponse` | false | Whether to log the response from HAPI -`hedera.mirror.monitor.publish.scenarios..maxAttempts` | 1 | The maximum number of times a scenario transaction will be attempted `hedera.mirror.monitor.publish.scenarios..properties` | {} | Key/value pairs used to configure the [`TransactionSupplier`](/hedera-mirror-datagenerator/src/main/java/com/hedera/datagenerator/sdk/supplier) associated with this scenario type `hedera.mirror.monitor.publish.scenarios..receiptPercent` | 0.0 | The percentage of receipts to retrieve from HAPI. Accepts values between 0-1 `hedera.mirror.monitor.publish.scenarios..recordPercent` | 0.0 | The percentage of records to retrieve from HAPI. Accepts values between 0-1 +`hedera.mirror.monitor.publish.scenarios..retry.maxAttempts` | 1 | The maximum number of times a scenario transaction will be attempted `hedera.mirror.monitor.publish.scenarios..timeout` | 12s | How long to wait for the transaction result `hedera.mirror.monitor.publish.scenarios..tps` | 1.0 | The rate at which transactions will publish `hedera.mirror.monitor.publish.scenarios..type` | | The type of transaction to publish. See the [`TransactionType`](/hedera-mirror-datagenerator/src/main/java/com/hedera/datagenerator/sdk/supplier/TransactionType.java) enum for a list of possible values diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/AbstractSubscription.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/AbstractScenario.java similarity index 76% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/AbstractSubscription.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/AbstractScenario.java index c0f1c960705..fac61fb5b20 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/AbstractSubscription.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/AbstractScenario.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.subscribe; +package com.hedera.mirror.monitor; /*- * ‌ @@ -21,6 +21,7 @@ */ import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; import com.google.common.collect.ConcurrentHashMultiset; import com.google.common.collect.Multiset; import io.micrometer.core.instrument.Clock; @@ -38,11 +39,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.hedera.mirror.monitor.subscribe.Scenario; + @Data @EqualsAndHashCode(onlyExplicitlyIncluded = true) -public abstract class AbstractSubscription

implements Subscription { +public abstract class AbstractScenario

implements Scenario { - private static final long UPDATE_INTERVAL = 10_000L; // 10s + private static final long UPDATE_INTERVAL = 20_000L; // 20s measured in milliseconds @EqualsAndHashCode.Include protected final int id; @@ -85,27 +88,36 @@ public double getRate() { } @Override - public SubscriptionStatus getStatus() { - if (!stopwatch.isRunning()) { - return SubscriptionStatus.COMPLETED; + public ScenarioStatus getStatus() { + if (!isRunning()) { + return ScenarioStatus.COMPLETED; } else if (getRate() <= 0.0) { - return SubscriptionStatus.IDLE; + return ScenarioStatus.IDLE; } else { - return SubscriptionStatus.RUNNING; + return ScenarioStatus.RUNNING; } } + @Override + public boolean isRunning() { + return stopwatch.isRunning(); + } + + @Override public void onComplete() { - if (stopwatch.isRunning()) { + if (isRunning()) { stopwatch.stop(); - log.info("Stopping '{}' subscription", this); + log.info("Stopping '{}' scenario", this); } } - public void onError(Throwable t) { - errors.add(t.getClass().getSimpleName()); + @Override + public void onError(Throwable throwable) { + Throwable rootCause = Throwables.getRootCause(throwable); + errors.add(rootCause.getClass().getSimpleName()); } + @Override public void onNext(T response) { counter.incrementAndGet(); intervalCounter.getCurrent().increment(); @@ -115,7 +127,6 @@ public void onNext(T response) { @Override public String toString() { - String name = getName(); - return getProperties().getSubscribers() <= 1 ? name : name + " #" + getId(); + return getName(); } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java new file mode 100644 index 00000000000..cd3aef31501 --- /dev/null +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java @@ -0,0 +1,75 @@ +package com.hedera.mirror.monitor; + +/*- + * ‌ + * Hedera Mirror Node + * ​ + * Copyright (C) 2019 - 2021 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import javax.inject.Named; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.ReactiveHealthIndicator; +import org.springframework.boot.actuate.health.Status; +import reactor.core.publisher.Mono; + +import com.hedera.mirror.monitor.publish.generator.TransactionGenerator; +import com.hedera.mirror.monitor.subscribe.MirrorSubscriber; +import com.hedera.mirror.monitor.subscribe.Scenario; + +@Named +@RequiredArgsConstructor +public class ClusterHealthIndicator implements ReactiveHealthIndicator { + + private static final Mono DOWN = health(Status.DOWN, "Subscribing is inactive"); + private static final Mono UNKNOWN = health(Status.UNKNOWN, "Publishing is inactive"); + private static final Mono UP = health(Status.UP, ""); + + private final MirrorSubscriber mirrorSubscriber; + private final TransactionGenerator transactionGenerator; + + private static Mono health(Status status, String reason) { + Health.Builder health = Health.status(status); + if (StringUtils.isNotBlank(reason)) { + health.withDetail("reason", reason); + } + return Mono.just(health.build()); + } + + @Override + public Mono health() { + return publishing().switchIfEmpty(subscribing()); + } + + private Mono publishing() { + return transactionGenerator.scenarios() + .map(Scenario::getRate) + .reduce(0.0, (c, n) -> c + n) + .filter(sum -> sum <= 0) + .flatMap(n -> UNKNOWN); + } + + private Mono subscribing() { + return mirrorSubscriber.getSubscriptions() + .map(Scenario::getRate) + .reduce(0.0, (cur, next) -> cur + next) + .filter(sum -> sum > 0) + .flatMap(n -> UP) + .switchIfEmpty(DOWN); + } +} diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/HederaNetwork.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/HederaNetwork.java index 69c6a918f8b..5783ea3bc0a 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/HederaNetwork.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/HederaNetwork.java @@ -29,7 +29,7 @@ @RequiredArgsConstructor public enum HederaNetwork { - MAINNET(mainnet(), mirrorNode("mainnet")), + MAINNET(mainnet(), mirrorNode("mainnet-public")), PREVIEWNET(previewnet(), mirrorNode("previewnet")), TESTNET(testnet(), mirrorNode("testnet")), OTHER(Collections.emptySet(), null); @@ -42,6 +42,10 @@ private static MirrorNodeProperties mirrorNode(String environment) { MirrorNodeProperties mirrorNodeProperties = new MirrorNodeProperties(); mirrorNodeProperties.getGrpc().setHost("hcs." + host); mirrorNodeProperties.getRest().setHost(host); + if ("mainnet-public".equals(environment)) { + mirrorNodeProperties.getGrpc().setHost(host); + mirrorNodeProperties.getGrpc().setPort(443); + } return mirrorNodeProperties; } @@ -76,7 +80,8 @@ private static Set previewnet() { new NodeProperties("0.0.3", "0.previewnet.hedera.com"), new NodeProperties("0.0.4", "1.previewnet.hedera.com"), new NodeProperties("0.0.5", "2.previewnet.hedera.com"), - new NodeProperties("0.0.6", "3.previewnet.hedera.com") + new NodeProperties("0.0.6", "3.previewnet.hedera.com"), + new NodeProperties("0.0.7", "4.previewnet.hedera.com") ); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java index 5fc8511276d..55bab34e744 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java @@ -46,6 +46,9 @@ public NodeProperties(String accountId, String host) { private int port = 50211; public String getEndpoint() { + if (host.startsWith("in-process:")) { + return host; + } return host + ":" + port; } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioProperties.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioProperties.java new file mode 100644 index 00000000000..00aec2d60af --- /dev/null +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioProperties.java @@ -0,0 +1,62 @@ +package com.hedera.mirror.monitor; + +/*- + * ‌ + * Hedera Mirror Node + * ​ + * Copyright (C) 2019 - 2021 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import java.time.Duration; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import lombok.Data; +import org.hibernate.validator.constraints.time.DurationMin; +import org.springframework.validation.annotation.Validated; + +@Data +public abstract class ScenarioProperties { + + @NotNull + @DurationMin(seconds = 30) + protected Duration duration = Duration.ofNanos(Long.MAX_VALUE); + + protected boolean enabled = true; + + @Min(0) + protected long limit = 0; // 0 for unlimited + + protected String name; + + @NotNull + protected RetryProperties retry = new RetryProperties(); + + @Data + @Validated + public static class RetryProperties { + + @Min(0) + private long maxAttempts = 16L; + + @NotNull + @DurationMin(millis = 500L) + private Duration maxBackoff = Duration.ofSeconds(8L); + + @NotNull + @DurationMin(millis = 100L) + private Duration minBackoff = Duration.ofMillis(250L); + } +} diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscriberProtocol.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioProtocol.java similarity index 89% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscriberProtocol.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioProtocol.java index 49b03f990f4..6b8072ed6ad 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscriberProtocol.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioProtocol.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.subscribe; +package com.hedera.mirror.monitor; /*- * ‌ @@ -20,7 +20,7 @@ * ‍ */ -public enum SubscriberProtocol { +public enum ScenarioProtocol { GRPC, REST } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscriptionStatus.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioStatus.java similarity index 92% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscriptionStatus.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioStatus.java index 1c0a9bba53b..d3e8926c5c8 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscriptionStatus.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ScenarioStatus.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.subscribe; +package com.hedera.mirror.monitor; /*- * ‌ @@ -20,7 +20,7 @@ * ‍ */ -public enum SubscriptionStatus { +public enum ScenarioStatus { COMPLETED, // The scenario has completed normally due to reaching the configured duration or limit IDLE, // The scenario has not completed but is not currently receiving any responses RUNNING, // The scenario is still actively receiving responses diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/config/MonitorConfiguration.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/config/MonitorConfiguration.java index 7ae39f49e26..20b108c4b90 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/config/MonitorConfiguration.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/config/MonitorConfiguration.java @@ -31,12 +31,12 @@ import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; -import com.hedera.mirror.monitor.generator.TransactionGenerator; import com.hedera.mirror.monitor.publish.PublishException; import com.hedera.mirror.monitor.publish.PublishMetrics; import com.hedera.mirror.monitor.publish.PublishProperties; import com.hedera.mirror.monitor.publish.PublishRequest; import com.hedera.mirror.monitor.publish.TransactionPublisher; +import com.hedera.mirror.monitor.publish.generator.TransactionGenerator; import com.hedera.mirror.monitor.subscribe.MirrorSubscriber; import com.hedera.mirror.monitor.subscribe.SubscribeMetrics; diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java index 178abfab78c..738f9386317 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java @@ -49,6 +49,8 @@ import com.hedera.mirror.monitor.NodeProperties; import com.hedera.mirror.monitor.publish.PublishRequest; import com.hedera.mirror.monitor.publish.PublishResponse; +import com.hedera.mirror.monitor.publish.PublishScenario; +import com.hedera.mirror.monitor.publish.PublishScenarioProperties; import com.hedera.mirror.monitor.publish.TransactionPublisher; @Log4j2 @@ -112,14 +114,16 @@ private synchronized String doConvert(Expression expression) { .setOperatorAccountId(monitorProperties.getOperator().getAccountId()); } + PublishScenarioProperties publishScenarioProperties = new PublishScenarioProperties(); + publishScenarioProperties.setName(expression.toString()); + publishScenarioProperties.setTimeout(Duration.ofSeconds(30L)); + publishScenarioProperties.setType(type.getTransactionType()); + PublishScenario scenario = new PublishScenario(publishScenarioProperties); PublishRequest request = PublishRequest.builder() - .logResponse(true) .receipt(true) - .scenarioName(expression.toString()) - .timeout(Duration.ofSeconds(30L)) + .scenario(scenario) .timestamp(Instant.now()) .transaction(transactionSupplier.get()) - .type(type.getTransactionType()) .build(); PublishResponse publishResponse = Mono.defer(() -> transactionPublisher.publish(request)) diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java index 14a943a61a1..50d0b42d1ef 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java @@ -20,27 +20,19 @@ * ‍ */ -import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; -import com.google.common.collect.ConcurrentHashMultiset; -import com.google.common.collect.Multiset; import io.grpc.StatusRuntimeException; -import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.TimeGauge; import io.micrometer.core.instrument.Timer; -import io.micrometer.core.instrument.step.StepLong; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import javax.inject.Named; import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.extern.log4j.Log4j2; -import org.apache.commons.math3.util.Precision; import org.springframework.scheduling.annotation.Scheduled; import com.hedera.datagenerator.sdk.supplier.TransactionType; @@ -59,28 +51,21 @@ public class PublishMetrics { static final String METRIC_SUBMIT = "hedera.mirror.monitor.publish.submit"; static final String SUCCESS = "SUCCESS"; static final String UNKNOWN = "unknown"; - private static final long UPDATE_INTERVAL = 10_000L; // 10s - private final PublishProperties publishProperties; - private final AtomicLong counter = new AtomicLong(0L); - private final Multiset errors = ConcurrentHashMultiset.create(); - private final StepLong intervalCounter = new StepLong(Clock.SYSTEM, UPDATE_INTERVAL); - private final Stopwatch stopwatch = Stopwatch.createStarted(); + private final Map durationGauges = new ConcurrentHashMap<>(); private final Map handleTimers = new ConcurrentHashMap<>(); private final Map submitTimers = new ConcurrentHashMap<>(); - private final Map durationGauges = new ConcurrentHashMap<>(); private final MeterRegistry meterRegistry; + private final PublishProperties publishProperties; public void onSuccess(PublishResponse response) { - counter.incrementAndGet(); - intervalCounter.getCurrent().increment(); recordMetric(response.getRequest(), response, SUCCESS); } public void onError(PublishException publishException) { PublishRequest request = publishException.getPublishRequest(); String status; - TransactionType type = request.getType(); + TransactionType type = request.getScenario().getProperties().getType(); Throwable throwable = Throwables.getRootCause(publishException); if (throwable instanceof PrecheckStatusException) { @@ -101,7 +86,6 @@ public void onError(PublishException publishException) { log.debug("{} submitting {} transaction: {}", status, type, throwable.getMessage()); } - errors.add(status); recordMetric(request, null, status); } @@ -112,13 +96,12 @@ private void recordMetric(PublishRequest request, PublishResponse response, Stri .map(AccountId::toString) .orElse(UNKNOWN); long startTime = request.getTimestamp().toEpochMilli(); - String scenarioName = request.getScenarioName(); - TransactionType type = request.getType(); - long endTime = response != null ? response.getTimestamp().toEpochMilli() : System.currentTimeMillis(); - Tags tags = new Tags(node, scenarioName, status, type); + Tags tags = new Tags(node, request.getScenario(), status); + Timer submitTimer = submitTimers.computeIfAbsent(tags, this::newSubmitMetric); submitTimer.record(endTime - startTime, TimeUnit.MILLISECONDS); + durationGauges.computeIfAbsent(tags, this::newDurationMetric); if (response != null && response.getReceipt() != null) { @@ -130,11 +113,11 @@ private void recordMetric(PublishRequest request, PublishResponse response, Stri private TimeGauge newDurationMetric(Tags tags) { TimeUnit unit = TimeUnit.NANOSECONDS; - return TimeGauge.builder(METRIC_DURATION, stopwatch, unit, s -> s.elapsed(unit)) + return TimeGauge.builder(METRIC_DURATION, tags.getScenario(), unit, s -> s.getElapsed().toNanos()) .description("The amount of time this scenario has been publishing transactions") .tag(Tags.TAG_NODE, tags.getNode()) - .tag(Tags.TAG_SCENARIO, tags.getScenarioName()) - .tag(Tags.TAG_TYPE, tags.getType().toString()) + .tag(Tags.TAG_SCENARIO, tags.getScenario().getName()) + .tag(Tags.TAG_TYPE, tags.getType()) .register(meterRegistry); } @@ -142,9 +125,9 @@ private Timer newHandleMetric(Tags tags) { return Timer.builder(METRIC_HANDLE) .description("The time it takes from submit to being handled by the main nodes") .tag(Tags.TAG_NODE, tags.getNode()) - .tag(Tags.TAG_SCENARIO, tags.getScenarioName()) + .tag(Tags.TAG_SCENARIO, tags.getScenario().getName()) .tag(Tags.TAG_STATUS, tags.getStatus()) - .tag(Tags.TAG_TYPE, tags.getType().toString()) + .tag(Tags.TAG_TYPE, tags.getType()) .register(meterRegistry); } @@ -152,25 +135,25 @@ private Timer newSubmitMetric(Tags tags) { return Timer.builder(METRIC_SUBMIT) .description("The time it takes to submit a transaction") .tag(Tags.TAG_NODE, tags.getNode()) - .tag(Tags.TAG_SCENARIO, tags.getScenarioName()) + .tag(Tags.TAG_SCENARIO, tags.getScenario().getName()) .tag(Tags.TAG_STATUS, tags.getStatus()) - .tag(Tags.TAG_TYPE, tags.getType().toString()) + .tag(Tags.TAG_TYPE, tags.getType()) .register(meterRegistry); } @Scheduled(fixedDelayString = "${hedera.mirror.monitor.publish.statusFrequency:10000}") public void status() { - if (!publishProperties.isEnabled()) { - return; + if (publishProperties.isEnabled()) { + durationGauges.keySet().stream().map(Tags::getScenario).distinct().forEach(this::status); } + } - long count = counter.get(); - long intervalCount = intervalCounter.poll(); - double rate = Precision.round((intervalCount * 1000.0) / UPDATE_INTERVAL, 1); - Map errorCounts = new HashMap<>(); - errors.forEachEntry((k, v) -> errorCounts.put(k, v)); - String elapsedStr = DurationToStringSerializer.convert(stopwatch.elapsed()); - log.info("Published {} transactions in {} at {}/s. Errors: {}", count, elapsedStr, rate, errorCounts); + private void status(PublishScenario scenario) { + if (scenario.isRunning()) { + String elapsed = DurationToStringSerializer.convert(scenario.getElapsed()); + log.info("{}: {} transactions in {} at {}/s. Errors: {}", + scenario, scenario.getCount(), elapsed, scenario.getRate(), scenario.getErrors()); + } } @Value @@ -181,8 +164,11 @@ class Tags { static final String TAG_TYPE = "type"; private final String node; - private final String scenarioName; + private final PublishScenario scenario; private final String status; - private final TransactionType type; + + private String getType() { + return scenario.getProperties().getType().toString(); + } } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishProperties.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishProperties.java index b61f86025f5..f6bde1e4dfd 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishProperties.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishProperties.java @@ -32,8 +32,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; -import com.hedera.mirror.monitor.generator.ScenarioProperties; - @Data @Validated @ConfigurationProperties("hedera.mirror.monitor.publish") @@ -48,7 +46,7 @@ public class PublishProperties { private boolean enabled = true; @NotNull - private Map scenarios = new LinkedHashMap<>(); + private Map scenarios = new LinkedHashMap<>(); @DurationMin(seconds = 1L) @NotNull diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishRequest.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishRequest.java index 6eada72a56d..c43c3b17efc 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishRequest.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishRequest.java @@ -20,23 +20,18 @@ * ‍ */ -import java.time.Duration; import java.time.Instant; import lombok.Builder; import lombok.Value; -import com.hedera.datagenerator.sdk.supplier.TransactionType; import com.hedera.hashgraph.sdk.Transaction; @Builder @Value public class PublishRequest { - private final boolean logResponse; - private final String scenarioName; private final boolean receipt; private final boolean record; - private final Duration timeout; + private final PublishScenario scenario; private final Instant timestamp; private final Transaction transaction; - private final TransactionType type; } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishScenario.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishScenario.java new file mode 100644 index 00000000000..db7db5761d3 --- /dev/null +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishScenario.java @@ -0,0 +1,46 @@ +package com.hedera.mirror.monitor.publish; + +/*- + * ‌ + * Hedera Mirror Node + * ​ + * Copyright (C) 2019 - 2021 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import java.util.Objects; + +import com.hedera.mirror.monitor.AbstractScenario; +import com.hedera.mirror.monitor.ScenarioProtocol; + +public class PublishScenario extends AbstractScenario { + + private final String memo; + + public PublishScenario(PublishScenarioProperties properties) { + super(1, properties); + String hostname = Objects.requireNonNullElse(System.getenv("HOSTNAME"), "unknown"); + this.memo = String.format("Monitor %s on %s", properties.getName(), hostname); + } + + public String getMemo() { + return memo; + } + + @Override + public ScenarioProtocol getProtocol() { + return ScenarioProtocol.GRPC; + } +} diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ScenarioProperties.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishScenarioProperties.java similarity index 82% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ScenarioProperties.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishScenarioProperties.java index 5d174825cf3..8c794a48142 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ScenarioProperties.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishScenarioProperties.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.generator; +package com.hedera.mirror.monitor.publish; /*- * ‌ @@ -31,27 +31,14 @@ import org.springframework.validation.annotation.Validated; import com.hedera.datagenerator.sdk.supplier.TransactionType; +import com.hedera.mirror.monitor.ScenarioProperties; @Data @Validated -public class ScenarioProperties { - - @NotNull - @DurationMin(seconds = 30) - private Duration duration = Duration.ofNanos(Long.MAX_VALUE); - - private boolean enabled = true; - - @Min(0) - private long limit = 0; +public class PublishScenarioProperties extends ScenarioProperties { private boolean logResponse = false; - @Min(1) - private int maxAttempts = 1; - - private String name; - @NotNull private Map properties = new LinkedHashMap<>(); @@ -73,6 +60,11 @@ public class ScenarioProperties { @NotNull private TransactionType type; + public PublishScenarioProperties() { + getRetry().setMaxAttempts(1L); + } + + @Override public long getLimit() { return limit > 0 ? limit : Long.MAX_VALUE; } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java index b6e1257085a..12db23f6b25 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java @@ -20,32 +20,29 @@ * ‍ */ -import com.google.common.base.Suppliers; import java.security.SecureRandom; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.inject.Named; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import com.hedera.hashgraph.sdk.AccountBalanceQuery; import com.hedera.hashgraph.sdk.AccountId; import com.hedera.hashgraph.sdk.Client; +import com.hedera.hashgraph.sdk.Hbar; +import com.hedera.hashgraph.sdk.HbarUnit; import com.hedera.hashgraph.sdk.PrivateKey; -import com.hedera.hashgraph.sdk.Transaction; import com.hedera.hashgraph.sdk.TransactionId; +import com.hedera.hashgraph.sdk.TransactionRecordQuery; import com.hedera.hashgraph.sdk.TransactionResponse; +import com.hedera.hashgraph.sdk.TransferTransaction; import com.hedera.mirror.monitor.MonitorProperties; import com.hedera.mirror.monitor.NodeProperties; @@ -56,55 +53,44 @@ public class TransactionPublisher implements AutoCloseable { private final MonitorProperties monitorProperties; private final PublishProperties publishProperties; - private final Supplier> clients = Suppliers.memoize(this::getClients); - private final Supplier> nodeAccountIds = Suppliers.memoize(this::getNodeAccountIds); - private final AtomicInteger counter = new AtomicInteger(0); + private final Flux clients = Flux.defer(this::getClients).cache(); private final SecureRandom secureRandom = new SecureRandom(); - private final AtomicBoolean initialized = new AtomicBoolean(false); @Override public void close() { - if (initialized.get()) { - log.info("Closing {} clients", clients.get().size()); - - for (Client client : clients.get()) { + if (publishProperties.isEnabled()) { + log.warn("Closing {} clients", publishProperties.getClients()); + clients.subscribe(client -> { try { client.close(); } catch (Exception e) { // Ignore } - } + }); } } public Mono publish(PublishRequest request) { log.trace("Publishing: {}", request); - int clientIndex = counter.getAndUpdate(n -> (n + 1 < clients.get().size()) ? n + 1 : 0); - Client client = clients.get().get(clientIndex); - - return getTransactionResponse(request, client) - .flatMap(transactionResponse -> processTransactionResponse(client, request, transactionResponse)) - .map(PublishResponse.PublishResponseBuilder::build) - .doOnNext(response -> { - if (log.isTraceEnabled() || request.isLogResponse()) { - log.info("Received response : {}", response); - } - }) - .timeout(request.getTimeout()) - .onErrorMap(t -> new PublishException(request, t)); - } - - private Mono getTransactionResponse(PublishRequest request, Client client) { - Transaction transaction = request.getTransaction(); - - // set transaction node where applicable - if (transaction.getNodeAccountIds() == null) { - int nodeIndex = secureRandom.nextInt(nodeAccountIds.get().size()); - List nodeAccountId = List.of(nodeAccountIds.get().get(nodeIndex)); - transaction.setNodeAccountIds(nodeAccountId); - } - - return Mono.fromFuture(transaction.executeAsync(client)); + int clientIndex = secureRandom.nextInt(publishProperties.getClients()); + PublishScenario scenario = request.getScenario(); + PublishScenarioProperties properties = scenario.getProperties(); + + return clients.elementAt(clientIndex) + .flatMap(client -> Mono.fromFuture(request.getTransaction() + .setTransactionMemo(scenario.getMemo()) + .executeAsync(client)) + .flatMap(r -> processTransactionResponse(client, request, r)) + .map(PublishResponse.PublishResponseBuilder::build) + .doOnNext(response -> { + if (log.isTraceEnabled() || properties.isLogResponse()) { + log.info("Received response : {}", response); + } + }) + .timeout(properties.getTimeout()) + .doOnNext(scenario::onNext) + .doOnError(scenario::onError) + .onErrorMap(t -> new PublishException(request, t))); } private Mono processTransactionResponse(Client client, @@ -117,55 +103,48 @@ private Mono processTransactionResponse( .transactionId(transactionId); if (request.isRecord()) { - return Mono.fromFuture(transactionId.getRecordAsync(client)) + // TransactionId.getRecordAsync() is inefficient doing a get receipt, a cost query, then the get record + TransactionRecordQuery transactionRecordQuery = new TransactionRecordQuery() + .setQueryPayment(Hbar.from(1, HbarUnit.HBAR)) + .setTransactionId(transactionId); + return Mono.fromFuture(transactionRecordQuery.executeAsync(client)) .map(r -> builder.record(r).receipt(r.receipt)); } else if (request.isReceipt()) { - // TODO: Implement a faster retry for get receipt for more accurate metrics - return Mono.fromFuture(transactionId.getReceiptAsync(client)) - .map(builder::receipt); + return Mono.fromFuture(transactionId.getReceiptAsync(client)).map(builder::receipt); } return Mono.just(builder); } - private List getClients() { - Collection validNodes = validateNodes(); + private Flux getClients() { + List validNodes = validateNodes(); if (validNodes.isEmpty()) { throw new IllegalArgumentException("No valid nodes found"); } - List validatedClients = new ArrayList<>(); - Map network = toNetwork(validNodes); - for (int i = 0; i < publishProperties.getClients(); ++i) { - Client client = toClient(network); - validatedClients.add(client); - } - - initialized.set(true); - return validatedClients; + return Flux.range(0, publishProperties.getClients()) + .map(i -> i % validNodes.size()) + .flatMap(i -> Flux.defer(() -> Mono.just(toClient(validNodes.get(i))))); } - private List getNodeAccountIds() { - return new ArrayList<>(clients.get().get(0).getNetwork().values()); - } - - private Collection validateNodes() { + private List validateNodes() { Set nodes = monitorProperties.getNodes(); if (!monitorProperties.isValidateNodes()) { - return nodes; + return new ArrayList<>(nodes); } List validNodes = new ArrayList<>(); - try (Client client = toClient(toNetwork(nodes))) { - for (NodeProperties node : nodes) { + + for (NodeProperties node : nodes) { + try (Client client = toClient(node)) { if (validateNode(client, node)) { validNodes.add(node); } + } catch (Exception e) { + log.warn("Error validating nodes: {}", e.getMessage()); } - } catch (Exception e) { - log.warn("Error validating nodes: {}", e.getMessage()); } log.info("{} of {} nodes are functional", validNodes.size(), nodes.size()); @@ -174,12 +153,15 @@ private Collection validateNodes() { private boolean validateNode(Client client, NodeProperties node) { boolean valid = false; + try { AccountId nodeAccountId = AccountId.fromString(node.getAccountId()); - new AccountBalanceQuery() - .setAccountId(nodeAccountId) - .setNodeAccountIds(List.of(nodeAccountId)) - .execute(client, Duration.ofSeconds(10L)); + Hbar hbar = Hbar.fromTinybars(1L); + new TransferTransaction() + .addHbarTransfer(nodeAccountId, hbar) + .addHbarTransfer(client.getOperatorAccountId(), hbar.negated()) + .execute(client, Duration.ofSeconds(10L)) + .getReceipt(client, Duration.ofSeconds(10L)); log.info("Validated node: {}", node); valid = true; } catch (TimeoutException e) { @@ -191,16 +173,13 @@ private boolean validateNode(Client client, NodeProperties node) { return valid; } - private Client toClient(Map network) { + private Client toClient(NodeProperties nodeProperties) { + AccountId nodeAccount = AccountId.fromString(nodeProperties.getAccountId()); AccountId operatorId = AccountId.fromString(monitorProperties.getOperator().getAccountId()); PrivateKey operatorPrivateKey = PrivateKey.fromString(monitorProperties.getOperator().getPrivateKey()); - Client client = Client.forNetwork(network); + + Client client = Client.forNetwork(Map.of(nodeProperties.getEndpoint(), nodeAccount)); client.setOperator(operatorId, operatorPrivateKey); return client; } - - private Map toNetwork(Collection nodes) { - return nodes.stream() - .collect(Collectors.toMap(NodeProperties::getEndpoint, p -> AccountId.fromString(p.getAccountId()))); - } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/CompositeTransactionGenerator.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/CompositeTransactionGenerator.java similarity index 85% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/CompositeTransactionGenerator.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/CompositeTransactionGenerator.java index 96d25a37190..171e8ae5ac7 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/CompositeTransactionGenerator.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/CompositeTransactionGenerator.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.generator; +package com.hedera.mirror.monitor.publish.generator; /*- * ‌ @@ -32,11 +32,14 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.math3.distribution.EnumeratedDistribution; import org.apache.commons.math3.util.Pair; +import reactor.core.publisher.Flux; import com.hedera.mirror.monitor.expression.ExpressionConverter; import com.hedera.mirror.monitor.properties.ScenarioPropertiesAggregator; import com.hedera.mirror.monitor.publish.PublishProperties; import com.hedera.mirror.monitor.publish.PublishRequest; +import com.hedera.mirror.monitor.publish.PublishScenario; +import com.hedera.mirror.monitor.publish.PublishScenarioProperties; @Log4j2 @Named @@ -63,7 +66,7 @@ public CompositeTransactionGenerator(ExpressionConverter expressionConverter, this.transactionGenerators = properties.getScenarios() .values() .stream() - .filter(ScenarioProperties::isEnabled) + .filter(PublishScenarioProperties::isEnabled) .map(scenarioProperties -> new ConfigurableTransactionGenerator(expressionConverter, scenarioPropertiesAggregator, scenarioProperties)) .collect(Collectors.toList()); @@ -84,7 +87,8 @@ public List next(int count) { i++; } catch (ScenarioException e) { log.warn(e.getMessage()); - e.getProperties().setEnabled(false); + e.getScenario().getProperties().setEnabled(false); + e.getScenario().onComplete(); rebuild(); if (rateLimiter.get().equals(INACTIVE_RATE_LIMITER)) { break; @@ -98,15 +102,20 @@ public List next(int count) { return publishRequests; } + @Override + public Flux scenarios() { + return Flux.fromIterable(transactionGenerators).flatMap(TransactionGenerator::scenarios); + } + private synchronized void rebuild() { double total = 0.0; List> pairs = new ArrayList<>(); for (Iterator iter = transactionGenerators.iterator(); iter.hasNext(); ) { ConfigurableTransactionGenerator transactionGenerator = iter.next(); - ScenarioProperties scenarioProperties = transactionGenerator.getProperties(); - if (scenarioProperties.isEnabled()) { - total += scenarioProperties.getTps(); - pairs.add(Pair.create(transactionGenerator, scenarioProperties.getTps())); + PublishScenarioProperties publishScenarioProperties = transactionGenerator.getProperties(); + if (publishScenarioProperties.isEnabled()) { + total += publishScenarioProperties.getTps(); + pairs.add(Pair.create(transactionGenerator, publishScenarioProperties.getTps())); } else { iter.remove(); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ConfigurableTransactionGenerator.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGenerator.java similarity index 82% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ConfigurableTransactionGenerator.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGenerator.java index 5eaf8bd63c9..a1b732b6210 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ConfigurableTransactionGenerator.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGenerator.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.generator; +package com.hedera.mirror.monitor.publish.generator; /*- * ‌ @@ -20,6 +20,7 @@ * ‍ */ +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import java.security.SecureRandom; @@ -37,11 +38,14 @@ import lombok.Getter; import lombok.extern.log4j.Log4j2; import org.hibernate.validator.messageinterpolation.ParameterMessageInterpolator; +import reactor.core.publisher.Flux; import com.hedera.datagenerator.sdk.supplier.TransactionSupplier; import com.hedera.mirror.monitor.expression.ExpressionConverter; import com.hedera.mirror.monitor.properties.ScenarioPropertiesAggregator; import com.hedera.mirror.monitor.publish.PublishRequest; +import com.hedera.mirror.monitor.publish.PublishScenario; +import com.hedera.mirror.monitor.publish.PublishScenarioProperties; @Log4j2 public class ConfigurableTransactionGenerator implements TransactionGenerator { @@ -51,26 +55,24 @@ public class ConfigurableTransactionGenerator implements TransactionGenerator { private final ExpressionConverter expressionConverter; private final ScenarioPropertiesAggregator scenarioPropertiesAggregator; @Getter - private final ScenarioProperties properties; + private final PublishScenarioProperties properties; private final Supplier> transactionSupplier; private final AtomicLong remaining; private final long stopTime; private final PublishRequest.PublishRequestBuilder builder; + private final PublishScenario scenario; public ConfigurableTransactionGenerator(ExpressionConverter expressionConverter, ScenarioPropertiesAggregator scenarioPropertiesAggregator, - ScenarioProperties properties) { + PublishScenarioProperties properties) { this.expressionConverter = expressionConverter; this.scenarioPropertiesAggregator = scenarioPropertiesAggregator; this.properties = properties; transactionSupplier = Suppliers.memoize(this::convert); remaining = new AtomicLong(properties.getLimit()); stopTime = System.nanoTime() + properties.getDuration().toNanos(); - builder = PublishRequest.builder() - .logResponse(properties.isLogResponse()) - .scenarioName(properties.getName()) - .timeout(properties.getTimeout()) - .type(properties.getType()); + scenario = new PublishScenario(properties); + builder = PublishRequest.builder().scenario(scenario); } @Override @@ -82,11 +84,11 @@ public List next(int count) { long left = remaining.getAndAdd(-count); long actual = Math.min(left, count); if (actual <= 0) { - throw new ScenarioException(properties, "Reached publish limit of " + properties.getLimit()); + throw new ScenarioException(scenario, "Reached publish limit"); } if (stopTime - System.nanoTime() <= 0) { - throw new ScenarioException(properties, "Reached publish duration of " + properties.getDuration()); + throw new ScenarioException(scenario, "Reached publish duration"); } List publishRequests = new ArrayList<>(); @@ -94,7 +96,8 @@ public List next(int count) { PublishRequest publishRequest = builder.receipt(shouldGenerate(properties.getReceiptPercent())) .record(shouldGenerate(properties.getRecordPercent())) .timestamp(Instant.now()) - .transaction(transactionSupplier.get().get().setMaxAttempts(properties.getMaxAttempts())) + .transaction(transactionSupplier.get().get() + .setMaxAttempts((int) properties.getRetry().getMaxAttempts())) .build(); publishRequests.add(publishRequest); } @@ -102,10 +105,16 @@ public List next(int count) { return publishRequests; } + @Override + public Flux scenarios() { + return Flux.just(scenario); + } + private TransactionSupplier convert() { Map convertedProperties = expressionConverter.convert(properties.getProperties()); Map correctedProperties = scenarioPropertiesAggregator.aggregateProperties(convertedProperties); TransactionSupplier supplier = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .convertValue(correctedProperties, properties.getType().getSupplier()); validateSupplier(supplier); diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ScenarioException.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ScenarioException.java similarity index 76% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ScenarioException.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ScenarioException.java index a9775dfea2b..35089bc5ce9 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/ScenarioException.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ScenarioException.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.generator; +package com.hedera.mirror.monitor.publish.generator; /*- * ‌ @@ -22,15 +22,17 @@ import lombok.Getter; +import com.hedera.mirror.monitor.subscribe.Scenario; + public class ScenarioException extends RuntimeException { private static final long serialVersionUID = 1690349494197296387L; @Getter - private final transient ScenarioProperties properties; + private final transient Scenario scenario; - public ScenarioException(ScenarioProperties properties, String message) { + public ScenarioException(Scenario scenario, String message) { super(message); - this.properties = properties; + this.scenario = scenario; } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/TransactionGenerator.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/TransactionGenerator.java similarity index 77% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/TransactionGenerator.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/TransactionGenerator.java index 79c6b854202..9b810f637dd 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/generator/TransactionGenerator.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/TransactionGenerator.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.generator; +package com.hedera.mirror.monitor.publish.generator; /*- * ‌ @@ -21,14 +21,16 @@ */ import java.util.List; +import reactor.core.publisher.Flux; import com.hedera.mirror.monitor.publish.PublishRequest; +import com.hedera.mirror.monitor.publish.PublishScenario; public interface TransactionGenerator { /** - * Gets the next count publish requests. If count > 0, up to count publish requests will be generated; - * if count <= 0, the generator will determine the actual count. + * Gets the next count publish requests. If count > 0, up to count publish requests will be generated; if count <= + * 0, the generator will determine the actual count. * * @param count * @return @@ -38,4 +40,6 @@ public interface TransactionGenerator { default List next() { return next(1); } + + Flux scenarios(); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/AbstractSubscriberProperties.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/AbstractSubscriberProperties.java index d0e35676a48..29d1cad310d 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/AbstractSubscriberProperties.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/AbstractSubscriberProperties.java @@ -20,51 +20,20 @@ * ‍ */ -import java.time.Duration; import javax.validation.constraints.Max; import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; import lombok.Data; -import org.hibernate.validator.constraints.time.DurationMin; import org.springframework.validation.annotation.Validated; +import com.hedera.mirror.monitor.ScenarioProperties; + @Data @Validated -public abstract class AbstractSubscriberProperties { - - @NotNull - @DurationMin(seconds = 30) - protected Duration duration = Duration.ofNanos(Long.MAX_VALUE); - - protected boolean enabled = true; - - @Min(0) - protected long limit = 0; // 0 for unlimited - - protected String name; - - @NotNull - protected RetryProperties retry = new RetryProperties(); +public abstract class AbstractSubscriberProperties extends ScenarioProperties { @Min(1) @Max(1024) protected int subscribers = 1; - - @Data - @Validated - public static class RetryProperties { - - @Min(0) - private long maxAttempts = 16L; - - @NotNull - @DurationMin(millis = 500L) - private Duration maxBackoff = Duration.ofSeconds(8L); - - @NotNull - @DurationMin(millis = 100L) - private Duration minBackoff = Duration.ofMillis(250L); - } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriber.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriber.java index 4795b3d5f27..4ebba4327c7 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriber.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriber.java @@ -46,7 +46,7 @@ public Flux subscribe() { } @Override - public Flux getSubscriptions() { + public Flux getSubscriptions() { return Flux.fromIterable(subscribers).flatMap(MirrorSubscriber::getSubscriptions); } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java index f28a6354886..7b935af1926 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java @@ -31,5 +31,5 @@ default void onPublish(PublishResponse response) { Flux subscribe(); - Flux getSubscriptions(); + Flux getSubscriptions(); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/Subscription.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/Scenario.java similarity index 76% rename from hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/Subscription.java rename to hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/Scenario.java index 20469840256..867870959ba 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/Subscription.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/Scenario.java @@ -26,11 +26,14 @@ import java.time.Duration; import java.util.Map; +import com.hedera.mirror.monitor.ScenarioProperties; +import com.hedera.mirror.monitor.ScenarioProtocol; +import com.hedera.mirror.monitor.ScenarioStatus; import com.hedera.mirror.monitor.converter.DurationToStringSerializer; import com.hedera.mirror.monitor.converter.StringToDurationDeserializer; -@JsonSerialize(as = Subscription.class) -public interface Subscription { +@JsonSerialize(as = Scenario.class) +public interface Scenario

{ long getCount(); @@ -47,11 +50,19 @@ default String getName() { } @JsonIgnore - T getProperties(); + P getProperties(); - SubscriberProtocol getProtocol(); + ScenarioProtocol getProtocol(); double getRate(); - SubscriptionStatus getStatus(); + ScenarioStatus getStatus(); + + boolean isRunning(); + + void onComplete(); + + void onError(Throwable t); + + void onNext(T response); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java index bd09d035e5d..c2edb129ece 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java @@ -46,38 +46,38 @@ public class SubscribeMetrics { static final String TAG_SCENARIO = "scenario"; static final String TAG_SUBSCRIBER = "subscriber"; - private final Map durationMetrics = new ConcurrentHashMap<>(); - private final Map latencyMetrics = new ConcurrentHashMap<>(); + private final Map durationMetrics = new ConcurrentHashMap<>(); + private final Map latencyMetrics = new ConcurrentHashMap<>(); private final MeterRegistry meterRegistry; private final SubscribeProperties subscribeProperties; public void onNext(SubscribeResponse response) { log.trace("Response: {}", response); - Subscription subscription = response.getSubscription(); + Scenario scenario = response.getScenario(); Instant publishedTimestamp = response.getPublishedTimestamp(); - durationMetrics.computeIfAbsent(subscription, this::newDurationGauge); + durationMetrics.computeIfAbsent(scenario, this::newDurationGauge); if (publishedTimestamp != null) { Duration latency = Duration.between(publishedTimestamp, response.getReceivedTimestamp()); - latencyMetrics.computeIfAbsent(subscription, this::newLatencyTimer).record(latency); + latencyMetrics.computeIfAbsent(scenario, this::newLatencyTimer).record(latency); } } - private TimeGauge newDurationGauge(Subscription subscription) { - return TimeGauge.builder(METRIC_DURATION, subscription, TimeUnit.NANOSECONDS, s -> s.getElapsed().toNanos()) + private TimeGauge newDurationGauge(Scenario scenario) { + return TimeGauge.builder(METRIC_DURATION, scenario, TimeUnit.NANOSECONDS, s -> s.getElapsed().toNanos()) .description("How long the subscriber has been running") - .tag(TAG_PROTOCOL, subscription.getProtocol().toString()) - .tag(TAG_SCENARIO, subscription.getName()) - .tag(TAG_SUBSCRIBER, String.valueOf(subscription.getId())) + .tag(TAG_PROTOCOL, scenario.getProtocol().toString()) + .tag(TAG_SCENARIO, scenario.getName()) + .tag(TAG_SUBSCRIBER, String.valueOf(scenario.getId())) .register(meterRegistry); } - private final Timer newLatencyTimer(Subscription subscription) { + private final Timer newLatencyTimer(Scenario scenario) { return Timer.builder(METRIC_E2E) .description("The end to end transaction latency starting from publish and ending at receive") - .tag(TAG_PROTOCOL, subscription.getProtocol().toString()) - .tag(TAG_SCENARIO, subscription.getName()) - .tag(TAG_SUBSCRIBER, String.valueOf(subscription.getId())) + .tag(TAG_PROTOCOL, scenario.getProtocol().toString()) + .tag(TAG_SCENARIO, scenario.getName()) + .tag(TAG_SUBSCRIBER, String.valueOf(scenario.getId())) .register(meterRegistry); } @@ -88,8 +88,8 @@ public void status() { } } - private void status(Subscription s) { - if (s.getStatus() == SubscriptionStatus.RUNNING) { + private void status(Scenario s) { + if (s.isRunning()) { String elapsed = DurationToStringSerializer.convert(s.getElapsed()); log.info("{} {}: {} transactions in {} at {}/s. Errors: {}", s.getProtocol(), s, s.getCount(), elapsed, s.getRate(), s.getErrors()); diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java index 08d0058046d..31957240f74 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java @@ -30,5 +30,5 @@ public class SubscribeResponse { private final Instant consensusTimestamp; private final Instant publishedTimestamp; private final Instant receivedTimestamp; - private final Subscription subscription; + private final Scenario scenario; } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java index c5f740f3413..ae0a72a3798 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java @@ -36,10 +36,10 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import com.hedera.mirror.monitor.ScenarioProtocol; +import com.hedera.mirror.monitor.ScenarioStatus; import com.hedera.mirror.monitor.subscribe.MirrorSubscriber; -import com.hedera.mirror.monitor.subscribe.SubscriberProtocol; -import com.hedera.mirror.monitor.subscribe.Subscription; -import com.hedera.mirror.monitor.subscribe.SubscriptionStatus; +import com.hedera.mirror.monitor.subscribe.Scenario; @Log4j2 @RequestMapping("/api/v1/subscriber") @@ -50,8 +50,8 @@ class SubscriberController { private final MirrorSubscriber mirrorSubscriber; @GetMapping - public Flux subscriptions(@RequestParam Optional protocol, - @RequestParam Optional> status) { + public Flux subscriptions(@RequestParam Optional protocol, + @RequestParam Optional> status) { return mirrorSubscriber.getSubscriptions() .filter(s -> !protocol.isPresent() || protocol.get() == s.getProtocol()) .filter(s -> !status.isPresent() || status.get().contains(s.getStatus())) @@ -59,15 +59,15 @@ public Flux subscriptions(@RequestParam Optional subscriptions(@PathVariable String name, - @RequestParam Optional> status) { + public Flux subscriptions(@PathVariable String name, + @RequestParam Optional> status) { return subscriptions(Optional.empty(), status) .filter(subscription -> subscription.getName().equals(name)) .switchIfEmpty(Mono.error(new NoSuchElementException())); } @GetMapping("/{name}/{id}") - public Mono subscription(@PathVariable String name, @PathVariable int id) { + public Mono subscription(@PathVariable String name, @PathVariable int id) { return subscriptions(name, Optional.empty()) .filter(s -> s.getId() == id) .last(); diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcClientSDK.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcClientSDK.java index ac6eed05e38..318dfcf522c 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcClientSDK.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcClientSDK.java @@ -64,7 +64,7 @@ class GrpcClientSDK implements GrpcClient { @Override public Flux subscribe(GrpcSubscription subscription) { int clientIndex = secureRandom.nextInt(subscribeProperties.getClients()); - log.info("Starting '{}' subscription to client {}", subscription, clientIndex); + log.info("Starting '{}' scenario to client {}", subscription, clientIndex); return clients.elementAt(clientIndex) .flatMapMany(client -> subscribeToClient(client, subscription)); } @@ -100,7 +100,7 @@ private SubscribeResponse toResponse(GrpcSubscription subscription, TopicMessage .consensusTimestamp(topicMessage.consensusTimestamp) .publishedTimestamp(publishedTimestamp) .receivedTimestamp(receivedTimestamp) - .subscription(subscription) + .scenario(subscription) .build(); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscription.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscription.java index 19a4f1d7a2c..7ef473a260a 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscription.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscription.java @@ -27,18 +27,18 @@ import com.hedera.hashgraph.sdk.TopicId; import com.hedera.hashgraph.sdk.TopicMessage; import com.hedera.hashgraph.sdk.TopicMessageQuery; -import com.hedera.mirror.monitor.subscribe.AbstractSubscription; -import com.hedera.mirror.monitor.subscribe.SubscriberProtocol; +import com.hedera.mirror.monitor.AbstractScenario; +import com.hedera.mirror.monitor.ScenarioProtocol; -class GrpcSubscription extends AbstractSubscription { +class GrpcSubscription extends AbstractScenario { GrpcSubscription(int id, GrpcSubscriberProperties properties) { super(id, properties); } @Override - public SubscriberProtocol getProtocol() { - return SubscriberProtocol.GRPC; + public ScenarioProtocol getProtocol() { + return ScenarioProtocol.GRPC; } TopicMessageQuery getTopicMessageQuery() { @@ -78,4 +78,10 @@ public void onError(Throwable t) { } errors.add(statusCode.name()); } + + @Override + public String toString() { + String name = getName(); + return getProperties().getSubscribers() <= 1 ? name : name + " #" + getId(); + } } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriber.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriber.java index 99072f572a3..4028b911ec0 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriber.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriber.java @@ -43,7 +43,6 @@ import com.hedera.mirror.monitor.subscribe.MirrorSubscriber; import com.hedera.mirror.monitor.subscribe.SubscribeProperties; import com.hedera.mirror.monitor.subscribe.SubscribeResponse; -import com.hedera.mirror.monitor.subscribe.SubscriptionStatus; import com.hedera.mirror.monitor.subscribe.rest.response.MirrorTransaction; @Log4j2 @@ -75,14 +74,14 @@ public void onPublish(PublishResponse response) { } private boolean shouldSample(RestSubscription subscription, PublishResponse response) { - if (subscription.getStatus() == SubscriptionStatus.COMPLETED) { + if (!subscription.isRunning()) { return false; } RestSubscriberProperties properties = subscription.getProperties(); Set publishers = properties.getPublishers(); - if (!publishers.isEmpty() && !publishers.contains(response.getRequest().getScenarioName())) { + if (!publishers.isEmpty() && !publishers.contains(response.getRequest().getScenario().getName())) { return false; } @@ -151,7 +150,7 @@ private SubscribeResponse toResponse(RestSubscription subscription, PublishRespo .consensusTimestamp(transaction.getConsensusTimestamp()) .publishedTimestamp(publishResponse.getRequest().getTimestamp()) .receivedTimestamp(receivedTimestamp) - .subscription(subscription) + .scenario(subscription) .build(); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscription.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscription.java index 524ff7bd62f..649c2b5b23b 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscription.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscription.java @@ -25,13 +25,13 @@ import reactor.core.Exceptions; import reactor.core.publisher.Sinks; +import com.hedera.mirror.monitor.AbstractScenario; +import com.hedera.mirror.monitor.ScenarioProtocol; import com.hedera.mirror.monitor.publish.PublishResponse; -import com.hedera.mirror.monitor.subscribe.AbstractSubscription; -import com.hedera.mirror.monitor.subscribe.SubscriberProtocol; import com.hedera.mirror.monitor.subscribe.rest.response.MirrorTransaction; @Getter -class RestSubscription extends AbstractSubscription { +class RestSubscription extends AbstractScenario { private final Sinks.Many sink; @@ -41,8 +41,8 @@ class RestSubscription extends AbstractSubscription subscribeScenario(double rate) { + TestScenario testScenario = new TestScenario(); + testScenario.setRate(rate); + return testScenario; + } + + @Getter + private class TestPublishScenario extends PublishScenario { + + private final double rate; + + private TestPublishScenario(double rate) { + super(new PublishScenarioProperties()); + this.rate = rate; + } + } +} diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/expression/ExpressionConverterImplTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/expression/ExpressionConverterImplTest.java index 20566b1ca54..0b32f93857f 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/expression/ExpressionConverterImplTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/expression/ExpressionConverterImplTest.java @@ -130,7 +130,7 @@ void account() throws InvalidProtocolBufferException { assertThat(expressionConverter.convert("${account.foo}")).isEqualTo("0.0.100"); verify(transactionPublisher).publish(request.capture()); - assertThat(request.getValue().getType()).isEqualTo(type); + assertThat(request.getValue().getScenario().getProperties().getType()).isEqualTo(type); } @Test @@ -140,7 +140,7 @@ void token() throws InvalidProtocolBufferException { assertThat(expressionConverter.convert("${token.foo}")).isEqualTo("0.0.101"); verify(transactionPublisher).publish(request.capture()); - assertThat(request.getValue().getType()).isEqualTo(type); + assertThat(request.getValue().getScenario().getProperties().getType()).isEqualTo(type); } @Test @@ -150,7 +150,7 @@ void nft() throws InvalidProtocolBufferException { assertThat(expressionConverter.convert("${nft.foo}")).isEqualTo("0.0.101"); verify(transactionPublisher).publish(request.capture()); - assertThat(request.getValue().getType()).isEqualTo(type); + assertThat(request.getValue().getScenario().getProperties().getType()).isEqualTo(type); } @Test @@ -160,7 +160,7 @@ void topic() throws InvalidProtocolBufferException { assertThat(expressionConverter.convert("${topic.foo}")).isEqualTo("0.0.100"); verify(transactionPublisher).publish(request.capture()); - assertThat(request.getValue().getType()).isEqualTo(type); + assertThat(request.getValue().getScenario().getProperties().getType()).isEqualTo(type); } @Test @@ -171,7 +171,7 @@ void schedule() throws InvalidProtocolBufferException { assertThat(expressionConverter.convert("${schedule.foo}")).isEqualTo("0.0.100"); verify(transactionPublisher).publish(request.capture()); - assertThat(request.getValue().getType()).isEqualTo(type); + assertThat(request.getValue().getScenario().getProperties().getType()).isEqualTo(type); } @Test @@ -184,7 +184,7 @@ void cached() throws InvalidProtocolBufferException { assertThat(expressionConverter.convert("${topic.foo}")).isEqualTo("0.0.100"); verify(transactionPublisher).publish(request.capture()); - assertThat(request.getValue().getType()).isEqualTo(type); + assertThat(request.getValue().getScenario().getProperties().getType()).isEqualTo(type); } @Test @@ -199,7 +199,7 @@ void map() throws InvalidProtocolBufferException { .containsEntry("topicId", "0.0.101"); verify(transactionPublisher).publish(request.capture()); - assertThat(request.getValue().getType()).isEqualTo(type); + assertThat(request.getValue().getScenario().getProperties().getType()).isEqualTo(type); } private Mono response(TransactionType type, long id) throws InvalidProtocolBufferException { diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishMetricsTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishMetricsTest.java index 555db59713c..fe37166cff1 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishMetricsTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishMetricsTest.java @@ -54,6 +54,7 @@ class PublishMetricsTest { private MeterRegistry meterRegistry; private PublishMetrics publishMetrics; private PublishProperties publishProperties; + private PublishScenario publishScenario; private StringWriter logOutput; private WriterAppender writerAppender; @@ -61,7 +62,12 @@ class PublishMetricsTest { void setup() { meterRegistry = new SimpleMeterRegistry(); publishProperties = new PublishProperties(); - publishMetrics = new PublishMetrics(publishProperties, meterRegistry); + publishMetrics = new PublishMetrics(meterRegistry, publishProperties); + + PublishScenarioProperties publishScenarioProperties = new PublishScenarioProperties(); + publishScenarioProperties.setName(SCENARIO_NAME); + publishScenarioProperties.setType(TransactionType.CONSENSUS_SUBMIT_MESSAGE); + publishScenario = new PublishScenario(publishScenarioProperties); logOutput = new StringWriter(); writerAppender = WriterAppender.newBuilder() @@ -133,23 +139,26 @@ void onError(Throwable throwable, String status) { @Test void statusError() { PublishException publishException = new PublishException(request(), new TimeoutException()); + publishScenario.onError(publishException); publishMetrics.onError(publishException); publishMetrics.status(); assertThat(logOutput) .asString() .hasLineCount(1) - .contains("Published 0 transactions in") + .contains(SCENARIO_NAME + ": 0 transactions in") .contains("Errors: {TimeoutException=1}"); } @Test void statusSuccess() throws Exception { - publishMetrics.onSuccess(response()); + PublishResponse response = response(); + publishScenario.onNext(response); + publishMetrics.onSuccess(response); publishMetrics.status(); assertThat(logOutput) .asString() .hasLineCount(1) - .contains("Published 1 transactions in") + .contains(SCENARIO_NAME + ": 1 transactions in") .contains("Errors: {}"); } @@ -176,10 +185,9 @@ private ObjectAssert assertMetric(Iterable meters) { private PublishRequest request() { List nodeAccountIds = List.of(AccountId.fromString(NODE_ACCOUNT_ID)); return PublishRequest.builder() - .scenarioName(SCENARIO_NAME) + .scenario(publishScenario) .timestamp(Instant.now().minusSeconds(5L)) .transaction(new TopicMessageSubmitTransaction().setNodeAccountIds(nodeAccountIds)) - .type(TransactionType.CONSENSUS_SUBMIT_MESSAGE) .build(); } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishPropertiesTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishPropertiesTest.java index 32878a9cca4..65802824cc7 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishPropertiesTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/PublishPropertiesTest.java @@ -28,36 +28,34 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import com.hedera.mirror.monitor.generator.ScenarioProperties; - class PublishPropertiesTest { - private ScenarioProperties scenarioProperties; + private PublishScenarioProperties publishScenarioProperties; private PublishProperties publishProperties; @BeforeEach void setup() { - scenarioProperties = new ScenarioProperties(); + publishScenarioProperties = new PublishScenarioProperties(); publishProperties = new PublishProperties(); - publishProperties.getScenarios().put("test1", scenarioProperties); + publishProperties.getScenarios().put("test1", publishScenarioProperties); } @Test void validate() { publishProperties.validate(); - assertThat(scenarioProperties.getName()).isEqualTo("test1"); + assertThat(publishScenarioProperties.getName()).isEqualTo("test1"); } @ParameterizedTest @ValueSource(strings = {"", " "}) void emptyName(String name) { - publishProperties.getScenarios().put(name, scenarioProperties); + publishProperties.getScenarios().put(name, publishScenarioProperties); assertThrows(IllegalArgumentException.class, publishProperties::validate); } @Test void nullName() { - publishProperties.getScenarios().put(null, scenarioProperties); + publishProperties.getScenarios().put(null, publishScenarioProperties); assertThrows(IllegalArgumentException.class, publishProperties::validate); } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java new file mode 100644 index 00000000000..9d1368074af --- /dev/null +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java @@ -0,0 +1,384 @@ +package com.hedera.mirror.monitor.publish; + +/*- + * ‌ + * Hedera Mirror Node + * ​ + * Copyright (C) 2019 - 2021 Hedera Hashgraph, LLC + * ​ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ‍ + */ + +import static com.hedera.hashgraph.sdk.proto.ResponseCodeEnum.OK; +import static com.hedera.hashgraph.sdk.proto.ResponseCodeEnum.SUCCESS; +import static org.assertj.core.api.Assertions.assertThat; + +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import lombok.Data; +import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import com.hedera.datagenerator.sdk.supplier.TransactionType; +import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.TransferTransaction; +import com.hedera.hashgraph.sdk.proto.CryptoServiceGrpc; +import com.hedera.hashgraph.sdk.proto.Query; +import com.hedera.hashgraph.sdk.proto.Response; +import com.hedera.hashgraph.sdk.proto.ResponseCodeEnum; +import com.hedera.hashgraph.sdk.proto.ResponseHeader; +import com.hedera.hashgraph.sdk.proto.Transaction; +import com.hedera.hashgraph.sdk.proto.TransactionGetReceiptResponse; +import com.hedera.hashgraph.sdk.proto.TransactionGetRecordResponse; +import com.hedera.hashgraph.sdk.proto.TransactionReceipt; +import com.hedera.hashgraph.sdk.proto.TransactionRecord; +import com.hedera.hashgraph.sdk.proto.TransactionResponse; +import com.hedera.mirror.monitor.MonitorProperties; +import com.hedera.mirror.monitor.NodeProperties; +import com.hedera.mirror.monitor.OperatorProperties; + +@Log4j2 +class TransactionPublisherTest { + + private CryptoServiceStub cryptoServiceStub; + private MonitorProperties monitorProperties; + private PublishProperties publishProperties; + private PublishScenarioProperties publishScenarioProperties; + private Server server; + private TransactionPublisher transactionPublisher; + + @BeforeEach + void setup() throws IOException { + OperatorProperties operatorProperties = new OperatorProperties(); + operatorProperties.setAccountId("0.0.100"); + operatorProperties.setPrivateKey(PrivateKey.generate().toString()); + publishScenarioProperties = new PublishScenarioProperties(); + publishScenarioProperties.setName("test"); + publishScenarioProperties.setType(TransactionType.CRYPTO_TRANSFER); + monitorProperties = new MonitorProperties(); + monitorProperties.setNodes(Set.of(new NodeProperties("0.0.3", "in-process:test"))); + monitorProperties.setOperator(operatorProperties); + publishProperties = new PublishProperties(); + transactionPublisher = new TransactionPublisher(monitorProperties, publishProperties); + + cryptoServiceStub = new CryptoServiceStub(); + server = InProcessServerBuilder.forName("test") + .addService(cryptoServiceStub) + .directExecutor() + .build() + .start(); + } + + @AfterEach + void teardown() throws InterruptedException { + cryptoServiceStub.verify(); + transactionPublisher.close(); + if (server != null) { + server.shutdown(); + server.awaitTermination(); + } + } + + @Test + @Timeout(3) + void publish() { + PublishRequest request = request().build(); + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), Mono.just(response(OK))); + + transactionPublisher.publish(request) + .as(StepVerifier::create) + .expectNextMatches(r -> { + assertThat(r) + .isNotNull() + .returns(request, PublishResponse::getRequest) + .returns(request.getTransaction().getTransactionId(), PublishResponse::getTransactionId) + .extracting(PublishResponse::getTimestamp) + .isNotNull(); + return true; + }) + .expectComplete() + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void publishWithLogResponse() { + publishScenarioProperties.setLogResponse(true); + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), Mono.just(response(OK))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectNextCount(1L) + .expectComplete() + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void publishWithReceipt() { + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS)), Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), Mono.just(response(OK))); + + transactionPublisher.publish(request().receipt(true).build()) + .as(StepVerifier::create) + .expectNextMatches(r -> { + assertThat(r).extracting(PublishResponse::getReceipt).isNotNull(); + assertThat(r).extracting(PublishResponse::getRecord).isNull(); + return true; + }) + .expectComplete() + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void publishWithRecord() { + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS)), Mono.just(record(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), Mono.just(response(OK))); + + transactionPublisher.publish(request().record(true).build()) + .as(StepVerifier::create) + .expectNextMatches(r -> { + assertThat(r).extracting(PublishResponse::getReceipt).isNotNull(); + assertThat(r).extracting(PublishResponse::getRecord).isNotNull(); + return true; + }) + .expectComplete() + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void publishPreCheckError() { + ResponseCodeEnum errorResponseCode = ResponseCodeEnum.INSUFFICIENT_ACCOUNT_BALANCE; + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), Mono.just(response(errorResponseCode))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectErrorSatisfies(t -> assertThat(t) + .isInstanceOf(PublishException.class) + .hasMessageContaining(errorResponseCode.toString())) + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void publishRetrySuccessful() { + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), + Mono.just(response(ResponseCodeEnum.PLATFORM_TRANSACTION_NOT_CREATED)), + Mono.just(response(ResponseCodeEnum.OK))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectNextCount(1L) + .expectComplete() + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void publishRetryError() { + ResponseCodeEnum errorResponseCode = ResponseCodeEnum.PLATFORM_TRANSACTION_NOT_CREATED; + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), + Mono.just(response(errorResponseCode)), + Mono.just(response(errorResponseCode))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectErrorSatisfies(t -> assertThat(t) + .isInstanceOf(PublishException.class) + .hasMessageContaining("Failed to get gRPC response within maximum retry count") + .getRootCause() + .hasMessageContaining(errorResponseCode.toString())) + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void publishTimeout() { + publishScenarioProperties.setTimeout(Duration.ofMillis(100L)); + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), + Mono.delay(Duration.ofMillis(500L)).thenReturn(response(OK))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectErrorSatisfies(t -> assertThat(t) + .isInstanceOf(PublishException.class) + .hasMessageContaining("Did not observe any item or terminal signal within 100ms") + .hasCauseInstanceOf(TimeoutException.class)) + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void noValidNodes() { + cryptoServiceStub.addTransactions(Mono.error(Status.INTERNAL.asRuntimeException())); + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectError(IllegalArgumentException.class) + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void skipNodeValidation() { + monitorProperties.setValidateNodes(false); + cryptoServiceStub.addTransactions(Mono.just(response(OK))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectNextCount(1L) + .expectComplete() + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void nodeValidationFailsReceipt() { + cryptoServiceStub.addQueries(Mono.just(receipt(ResponseCodeEnum.ACCOUNT_DELETED))); + cryptoServiceStub.addTransactions(Mono.just(response(OK))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectError(IllegalArgumentException.class) + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void someValidNodes() { + monitorProperties.setNodes(Set.of(new NodeProperties("0.0.3", "in-process:test"), + new NodeProperties("0.0.4", "invalid:1"))); // Illegal DNS to avoid SDK retry + cryptoServiceStub.addQueries(Mono.just(receipt(SUCCESS))); + cryptoServiceStub.addTransactions(Mono.just(response(OK)), Mono.just(response(OK))); + + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectNextCount(1L) + .expectComplete() + .verify(Duration.ofSeconds(1L)); + } + + @Test + @Timeout(3) + void closeWhenDisabled() { + publishProperties.setEnabled(false); + transactionPublisher.close(); + } + + private PublishRequest.PublishRequestBuilder request() { + return PublishRequest.builder() + .scenario(new PublishScenario(publishScenarioProperties)) + .timestamp(Instant.now()) + .transaction(new TransferTransaction().setMaxAttempts(2)); + } + + private TransactionResponse response(ResponseCodeEnum responseCode) { + return TransactionResponse.newBuilder() + .setNodeTransactionPrecheckCode(responseCode) + .build(); + } + + private Response receipt(ResponseCodeEnum responseCode) { + ResponseHeader responseHeader = ResponseHeader.newBuilder() + .setNodeTransactionPrecheckCode(OK) + .build(); + return Response.newBuilder() + .setTransactionGetReceipt(TransactionGetReceiptResponse.newBuilder() + .setHeader(responseHeader) + .setReceipt(TransactionReceipt.newBuilder().setStatus(responseCode).build()) + .build()) + .build(); + } + + private Response record(ResponseCodeEnum responseCode) { + ResponseHeader.Builder responseHeader = ResponseHeader.newBuilder() + .setNodeTransactionPrecheckCode(OK); + TransactionReceipt.Builder transactionReceipt = TransactionReceipt.newBuilder().setStatus(responseCode); + return Response.newBuilder() + .setTransactionGetRecord(TransactionGetRecordResponse.newBuilder() + .setHeader(responseHeader) + .setTransactionRecord(TransactionRecord.newBuilder().setReceipt(transactionReceipt))) + .build(); + } + + @Data + private class CryptoServiceStub extends CryptoServiceGrpc.CryptoServiceImplBase { + + private Queue> transactions = new LinkedList<>(); + private Queue> queries = new LinkedList<>(); + + void addQueries(Mono... query) { + queries.addAll(Arrays.asList(query)); + } + + void addTransactions(Mono... transaction) { + transactions.addAll(Arrays.asList(transaction)); + } + + @Override + public void cryptoTransfer(Transaction request, StreamObserver responseObserver) { + log.debug("cryptoTransfer: {}", request); + send(responseObserver, transactions.poll()); + } + + @Override + public void getTransactionReceipts(Query request, StreamObserver responseObserver) { + log.debug("getTransactionReceipts: {}", request); + send(responseObserver, queries.poll()); + } + + @Override + public void getTxRecordByTxID(Query request, StreamObserver responseObserver) { + log.debug("getTxRecordByTxID: {}", request); + send(responseObserver, queries.poll()); + } + + private void send(StreamObserver responseObserver, Mono response) { + assertThat(response).isNotNull(); + response.delayElement(Duration.ofMillis(100L)) + .doOnError(responseObserver::onError) + .doOnNext(responseObserver::onNext) + .doOnNext(t -> log.trace("Next: {}", t)) + .doOnSuccess(r -> responseObserver.onCompleted()) + .subscribe(); + } + + void verify() { + assertThat(queries).isEmpty(); + assertThat(transactions).isEmpty(); + } + } +} diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/generator/CompositeTransactionGeneratorTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/CompositeTransactionGeneratorTest.java similarity index 75% rename from hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/generator/CompositeTransactionGeneratorTest.java rename to hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/CompositeTransactionGeneratorTest.java index 2f80794fe08..73b9d439b48 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/generator/CompositeTransactionGeneratorTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/CompositeTransactionGeneratorTest.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.generator; +package com.hedera.mirror.monitor.publish.generator; /*- * ‌ @@ -42,35 +42,38 @@ import org.junit.jupiter.params.provider.ValueSource; import com.hedera.datagenerator.sdk.supplier.TransactionType; +import com.hedera.mirror.monitor.ScenarioStatus; import com.hedera.mirror.monitor.publish.PublishProperties; import com.hedera.mirror.monitor.publish.PublishRequest; +import com.hedera.mirror.monitor.publish.PublishScenario; +import com.hedera.mirror.monitor.publish.PublishScenarioProperties; class CompositeTransactionGeneratorTest { - private ScenarioProperties scenarioProperties1; - private ScenarioProperties scenarioProperties2; + private PublishScenarioProperties publishScenarioProperties1; + private PublishScenarioProperties publishScenarioProperties2; private PublishProperties properties; private Supplier supplier; private double totalTps; @BeforeEach void init() { - scenarioProperties1 = new ScenarioProperties(); - scenarioProperties1.setName("test1"); - scenarioProperties1.setProperties(Map.of("topicId", "0.0.1000")); - scenarioProperties1.setTps(750); - scenarioProperties1.setType(TransactionType.CONSENSUS_SUBMIT_MESSAGE); - totalTps = scenarioProperties1.getTps(); - - scenarioProperties2 = new ScenarioProperties(); - scenarioProperties2.setName("test2"); - scenarioProperties2.setTps(250); - scenarioProperties2.setType(TransactionType.ACCOUNT_CREATE); - totalTps += scenarioProperties2.getTps(); + publishScenarioProperties1 = new PublishScenarioProperties(); + publishScenarioProperties1.setName("test1"); + publishScenarioProperties1.setProperties(Map.of("topicId", "0.0.1000")); + publishScenarioProperties1.setTps(750); + publishScenarioProperties1.setType(TransactionType.CONSENSUS_SUBMIT_MESSAGE); + totalTps = publishScenarioProperties1.getTps(); + + publishScenarioProperties2 = new PublishScenarioProperties(); + publishScenarioProperties2.setName("test2"); + publishScenarioProperties2.setTps(250); + publishScenarioProperties2.setType(TransactionType.ACCOUNT_CREATE); + totalTps += publishScenarioProperties2.getTps(); properties = new PublishProperties(); - properties.getScenarios().put(scenarioProperties1.getName(), scenarioProperties1); - properties.getScenarios().put(scenarioProperties2.getName(), scenarioProperties2); + properties.getScenarios().put(publishScenarioProperties1.getName(), publishScenarioProperties1); + properties.getScenarios().put(publishScenarioProperties2.getName(), publishScenarioProperties2); supplier = Suppliers.memoize(() -> new CompositeTransactionGenerator(p -> p, p -> p.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), properties)); @@ -117,21 +120,21 @@ void distribution() { double seconds = 5; for (int i = 0; i < totalTps * seconds; ) { List requests = generator.next(); - requests.stream().map(PublishRequest::getType).forEach(types::add); + requests.stream().map(r -> r.getScenario().getProperties().getType()).forEach(types::add); i += requests.size(); } - for (ScenarioProperties scenarioProperties : properties.getScenarios().values()) { - assertThat(types.count(scenarioProperties.getType())) + for (PublishScenarioProperties publishScenarioProperties : properties.getScenarios().values()) { + assertThat(types.count(publishScenarioProperties.getType())) .isNotNegative() .isNotZero() - .isCloseTo((int) (scenarioProperties.getTps() * seconds), withinPercentage(10)); + .isCloseTo((int) (publishScenarioProperties.getTps() * seconds), withinPercentage(10)); } } @Test void disabledScenario() { - scenarioProperties1.setEnabled(false); + publishScenarioProperties1.setEnabled(false); CompositeTransactionGenerator generator = supplier.get(); assertThat(generator.distribution.get().getPmf()) .hasSize(1) @@ -167,15 +170,18 @@ void publishDisabled() { @Test void scenariosComplete() { - properties.getScenarios().remove(scenarioProperties2.getName()); - scenarioProperties1.setLimit(1L); + properties.getScenarios().remove(publishScenarioProperties2.getName()); + publishScenarioProperties1.setLimit(1L); CompositeTransactionGenerator generator = supplier.get(); + List scenarios = generator.scenarios().collectList().block(); assertThat(generator.next()).hasSize(1); assertThat(generator.next()).isEmpty(); assertInactive(); assertThat(properties.getScenarios().values()) - .extracting(ScenarioProperties::isEnabled) + .extracting(PublishScenarioProperties::isEnabled) .containsExactly(false); + assertThat(scenarios).extracting(PublishScenario::getStatus).containsOnly(ScenarioStatus.COMPLETED); + assertThat(generator.scenarios().count().block()).isZero(); } @Test @@ -211,8 +217,8 @@ void warmupBatchRequest() { @Test @Timeout(10) void scenariosDurationAfterFirstFinish() { - scenarioProperties1.setDuration(Duration.ofSeconds(3)); - scenarioProperties2.setDuration(Duration.ofSeconds(5)); + publishScenarioProperties1.setDuration(Duration.ofSeconds(3)); + publishScenarioProperties2.setDuration(Duration.ofSeconds(5)); properties.setWarmupPeriod(Duration.ZERO); CompositeTransactionGenerator generator = supplier.get(); @@ -224,14 +230,14 @@ void scenariosDurationAfterFirstFinish() { assertThat(generator.transactionGenerators) .hasSize(1) .first() - .returns(scenarioProperties2, from(ConfigurableTransactionGenerator::getProperties)); + .returns(publishScenarioProperties2, from(ConfigurableTransactionGenerator::getProperties)); } @Test @Timeout(10) void scenariosDurationAfterBothFinish() { - scenarioProperties1.setDuration(Duration.ofSeconds(3)); - scenarioProperties2.setDuration(Duration.ofSeconds(5)); + publishScenarioProperties1.setDuration(Duration.ofSeconds(3)); + publishScenarioProperties2.setDuration(Duration.ofSeconds(5)); properties.setWarmupPeriod(Duration.ZERO); CompositeTransactionGenerator generator = supplier.get(); diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/generator/ConfigurableTransactionGeneratorTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java similarity index 94% rename from hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/generator/ConfigurableTransactionGeneratorTest.java rename to hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java index 84a3188340e..0c7f2bbe9a4 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/generator/ConfigurableTransactionGeneratorTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java @@ -1,4 +1,4 @@ -package com.hedera.mirror.monitor.generator; +package com.hedera.mirror.monitor.publish.generator; /*- * ‌ @@ -42,18 +42,19 @@ import com.hedera.datagenerator.sdk.supplier.TransactionType; import com.hedera.hashgraph.sdk.TopicId; import com.hedera.mirror.monitor.publish.PublishRequest; +import com.hedera.mirror.monitor.publish.PublishScenarioProperties; class ConfigurableTransactionGeneratorTest { private static final int SAMPLE_SIZE = 10_000; private static final String TOPIC_ID = "0.0.1000"; - private ScenarioProperties properties; + private PublishScenarioProperties properties; private Supplier generator; @BeforeEach void init() { - properties = new ScenarioProperties(); + properties = new PublishScenarioProperties(); properties.setReceiptPercent(1); properties.setRecordPercent(1); properties.setName("test"); @@ -90,12 +91,6 @@ void nextCountMoreThanLimit() { .hasMessageContaining("Reached publish limit"); } - @Test - void logResponse() { - properties.setLogResponse(true); - assertRequests(generator.get().next()); - } - @Test void unknownField() { properties.setProperties(Map.of("foo", "bar", "topicId", TOPIC_ID)); @@ -202,10 +197,8 @@ private void assertRequests(List publishRequests, int size) { assertThat(publishRequests).hasSize(size).allSatisfy(publishRequest -> assertThat(publishRequest) .isNotNull() .hasNoNullFieldsOrProperties() - .hasFieldOrPropertyWithValue("logResponse", properties.isLogResponse()) .hasFieldOrPropertyWithValue("receipt", true) .hasFieldOrPropertyWithValue("record", true) - .hasFieldOrPropertyWithValue("type", properties.getType()) .hasFieldOrPropertyWithValue("transaction.topicId", TopicId.fromString(TOPIC_ID)) ); } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriberTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriberTest.java index 379ff8bd9c6..1e3b96b6fdc 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriberTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/CompositeSubscriberTest.java @@ -73,8 +73,8 @@ void subscribe() { @Test void subscriptions() { - TestSubscription subscription1 = new TestSubscription(); - TestSubscription subscription2 = new TestSubscription(); + TestScenario subscription1 = new TestScenario(); + TestScenario subscription2 = new TestScenario(); when(mirrorSubscriber1.getSubscriptions()).thenReturn(Flux.just(subscription1)); when(mirrorSubscriber2.getSubscriptions()).thenReturn(Flux.just(subscription2)); diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/SubscribeMetricsTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/SubscribeMetricsTest.java index fef6fbf34f6..159c3992536 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/SubscribeMetricsTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/SubscribeMetricsTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.hedera.mirror.monitor.ScenarioStatus; import com.hedera.mirror.monitor.subscribe.grpc.GrpcSubscriberProperties; class SubscribeMetricsTest { @@ -77,7 +78,7 @@ void after() { @Test void recordDuration() { - TestSubscription subscription = new TestSubscription(); + TestScenario subscription = new TestScenario(); SubscribeResponse response1 = response(subscription); SubscribeResponse response2 = response(subscription); subscribeMetrics.onNext(response1); @@ -94,7 +95,7 @@ void recordDuration() { @Test void recordE2E() { - TestSubscription subscription = new TestSubscription(); + TestScenario subscription = new TestScenario(); SubscribeResponse response1 = response(subscription); subscribeMetrics.onNext(response1); @@ -125,8 +126,8 @@ void recordE2E() { @Test void status() { - TestSubscription testSubscription1 = new TestSubscription(); - TestSubscription testSubscription2 = new TestSubscription(); + TestScenario testSubscription1 = new TestScenario(); + TestScenario testSubscription2 = new TestScenario(); testSubscription2.setName("Test2"); subscribeMetrics.onNext(response(testSubscription1)); @@ -143,7 +144,7 @@ void status() { void statusDisabled() { subscribeProperties.setEnabled(false); - subscribeMetrics.onNext(response(new TestSubscription())); + subscribeMetrics.onNext(response(new TestScenario())); subscribeMetrics.status(); assertThat(logOutput).asString().isEmpty(); @@ -151,8 +152,8 @@ void statusDisabled() { @Test void statusNotRunning() { - TestSubscription testSubscription = new TestSubscription(); - testSubscription.setStatus(SubscriptionStatus.COMPLETED); + TestScenario testSubscription = new TestScenario(); + testSubscription.setStatus(ScenarioStatus.COMPLETED); subscribeMetrics.onNext(response(testSubscription)); subscribeMetrics.status(); @@ -160,13 +161,13 @@ void statusNotRunning() { assertThat(logOutput).asString().isEmpty(); } - private SubscribeResponse response(Subscription subscription) { + private SubscribeResponse response(Scenario scenario) { Instant timestamp = Instant.now().minusSeconds(5L); return SubscribeResponse.builder() .publishedTimestamp(timestamp) - .consensusTimestamp(timestamp.plusSeconds(1L * subscription.getCount())) - .receivedTimestamp(timestamp.plusSeconds(2L * subscription.getCount())) - .subscription(subscription) + .consensusTimestamp(timestamp.plusSeconds(1L * scenario.getCount())) + .receivedTimestamp(timestamp.plusSeconds(2L * scenario.getCount())) + .scenario(scenario) .build(); } } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/TestSubscription.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/TestScenario.java similarity index 66% rename from hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/TestSubscription.java rename to hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/TestScenario.java index 7dfdfe7438f..0f520d27764 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/TestSubscription.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/TestScenario.java @@ -25,8 +25,11 @@ import java.util.Map; import lombok.Data; +import com.hedera.mirror.monitor.ScenarioProtocol; +import com.hedera.mirror.monitor.ScenarioStatus; + @Data -public class TestSubscription implements Subscription { +public class TestScenario implements Scenario { private long count = 1; private Duration elapsed = Duration.ofSeconds(1L); @@ -34,12 +37,32 @@ public class TestSubscription implements Subscription { private int id = 1; private String name = "Test"; private AbstractSubscriberProperties properties; - private SubscriberProtocol protocol = SubscriberProtocol.GRPC; + private ScenarioProtocol protocol = ScenarioProtocol.GRPC; private double rate = 1.0; - private SubscriptionStatus status = SubscriptionStatus.RUNNING; + private ScenarioStatus status = ScenarioStatus.RUNNING; + + @Override + public boolean isRunning() { + return status == ScenarioStatus.RUNNING; + } @Override public String toString() { return name; } + + @Override + public void onComplete() { + // Ignore + } + + @Override + public void onError(Throwable t) { + // Ignore + } + + @Override + public void onNext(Object response) { + // Ignore + } } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberControllerTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberControllerTest.java index 52593d1cbe7..0abc9170b10 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberControllerTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberControllerTest.java @@ -32,11 +32,11 @@ import org.springframework.test.web.reactive.server.WebTestClient; import reactor.core.publisher.Flux; +import com.hedera.mirror.monitor.ScenarioProtocol; +import com.hedera.mirror.monitor.ScenarioStatus; import com.hedera.mirror.monitor.config.LoggingFilter; import com.hedera.mirror.monitor.subscribe.MirrorSubscriber; -import com.hedera.mirror.monitor.subscribe.SubscriberProtocol; -import com.hedera.mirror.monitor.subscribe.SubscriptionStatus; -import com.hedera.mirror.monitor.subscribe.TestSubscription; +import com.hedera.mirror.monitor.subscribe.TestScenario; @Log4j2 @ExtendWith(MockitoExtension.class) @@ -47,22 +47,22 @@ class SubscriberControllerTest { private WebTestClient webTestClient; - private TestSubscription subscription1; - private TestSubscription subscription2; + private TestScenario subscription1; + private TestScenario subscription2; @BeforeEach void setup() { - subscription1 = new TestSubscription(); + subscription1 = new TestScenario(); subscription1.setName("grpc1"); subscription1.setId(1); - subscription1.setProtocol(SubscriberProtocol.GRPC); - subscription1.setStatus(SubscriptionStatus.COMPLETED); + subscription1.setProtocol(ScenarioProtocol.GRPC); + subscription1.setStatus(ScenarioStatus.COMPLETED); - subscription2 = new TestSubscription(); + subscription2 = new TestScenario(); subscription2.setName("rest1"); subscription2.setId(1); - subscription2.setProtocol(SubscriberProtocol.REST); - subscription2.setStatus(SubscriptionStatus.RUNNING); + subscription2.setProtocol(ScenarioProtocol.REST); + subscription2.setStatus(ScenarioStatus.RUNNING); SubscriberController subscriberController = new SubscriberController(mirrorSubscriber); webTestClient = WebTestClient.bindToController(subscriberController).webFilter(new LoggingFilter()).build(); @@ -76,7 +76,7 @@ void subscriptions() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription1, subscription2)); } @@ -87,7 +87,7 @@ void subscriptionsWithProtocol() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription2)); } @@ -98,7 +98,7 @@ void subscriptionsWithStatus() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription1)); } @@ -109,7 +109,7 @@ void subscriptionsWithEmptyStatus() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription1, subscription2)); } @@ -120,7 +120,7 @@ void subscriptionsWithStatuses() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription1, subscription2)); } @@ -143,7 +143,7 @@ void subscriptionsByName() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription1, subscription2)); } @@ -156,7 +156,7 @@ void subscriptionsByNameWithStatus() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription1)); } @@ -178,7 +178,7 @@ void subscription() { .exchange() .expectStatus() .is2xxSuccessful() - .expectBodyList(TestSubscription.class) + .expectBodyList(TestScenario.class) .isEqualTo(Arrays.asList(subscription2)); } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscriberTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscriberTest.java index 5a22bdba7a3..7cb3df99b58 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscriberTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/grpc/GrpcSubscriberTest.java @@ -37,12 +37,12 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import com.hedera.mirror.monitor.ScenarioProtocol; import com.hedera.mirror.monitor.expression.ExpressionConverter; import com.hedera.mirror.monitor.publish.PublishResponse; +import com.hedera.mirror.monitor.subscribe.Scenario; import com.hedera.mirror.monitor.subscribe.SubscribeProperties; import com.hedera.mirror.monitor.subscribe.SubscribeResponse; -import com.hedera.mirror.monitor.subscribe.SubscriberProtocol; -import com.hedera.mirror.monitor.subscribe.Subscription; @ExtendWith(MockitoExtension.class) class GrpcSubscriberTest { @@ -80,7 +80,7 @@ void subscribe() { .verify(Duration.ofMillis(500L)); assertThat(grpcSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(SubscriberProtocol.GRPC, Subscription::getProtocol); + .returns(ScenarioProtocol.GRPC, Scenario::getProtocol); } @Test @@ -95,9 +95,9 @@ void multipleSubscribers() { .hasSize(2) .doesNotHaveDuplicates() .allSatisfy(s -> assertThat(s).isNotNull() - .returns(grpcSubscriberProperties, Subscription::getProperties) - .returns(SubscriberProtocol.GRPC, Subscription::getProtocol)) - .extracting(Subscription::getId) + .returns(grpcSubscriberProperties, Scenario::getProperties) + .returns(ScenarioProtocol.GRPC, Scenario::getProtocol)) + .extracting(Scenario::getId) .containsExactly(1, 2); } @@ -117,8 +117,8 @@ void multipleScenarios() { .hasSize(2) .doesNotHaveDuplicates() .allSatisfy(s -> assertThat(s).isNotNull() - .returns(SubscriberProtocol.GRPC, Subscription::getProtocol)) - .extracting(Subscription::getName) + .returns(ScenarioProtocol.GRPC, Scenario::getProtocol)) + .extracting(Scenario::getName) .doesNotHaveDuplicates(); } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriberTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriberTest.java index 7e38595652f..2d5a323c650 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriberTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/subscribe/rest/RestSubscriberTest.java @@ -60,13 +60,15 @@ import com.hedera.hashgraph.sdk.TransactionId; import com.hedera.mirror.monitor.MirrorNodeProperties; import com.hedera.mirror.monitor.MonitorProperties; +import com.hedera.mirror.monitor.ScenarioProtocol; +import com.hedera.mirror.monitor.ScenarioStatus; import com.hedera.mirror.monitor.publish.PublishRequest; import com.hedera.mirror.monitor.publish.PublishResponse; +import com.hedera.mirror.monitor.publish.PublishScenario; +import com.hedera.mirror.monitor.publish.PublishScenarioProperties; +import com.hedera.mirror.monitor.subscribe.Scenario; import com.hedera.mirror.monitor.subscribe.SubscribeProperties; import com.hedera.mirror.monitor.subscribe.SubscribeResponse; -import com.hedera.mirror.monitor.subscribe.SubscriberProtocol; -import com.hedera.mirror.monitor.subscribe.Subscription; -import com.hedera.mirror.monitor.subscribe.SubscriptionStatus; import com.hedera.mirror.monitor.subscribe.rest.response.MirrorTransaction; @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -84,6 +86,7 @@ class RestSubscriberTest { @Mock private ExchangeFunction exchangeFunction; + private PublishScenario publishScenario; private SubscribeProperties subscribeProperties; private RestSubscriberProperties restSubscriberProperties; private RestSubscriber restSubscriber; @@ -101,6 +104,11 @@ void setup() { restSubscriberProperties.getRetry().setMinBackoff(Duration.ofNanos(1L)); restSubscriberProperties.getRetry().setMaxBackoff(Duration.ofNanos(2L)); + PublishScenarioProperties publishScenarioProperties = new PublishScenarioProperties(); + publishScenarioProperties.setName(SCENARIO); + publishScenarioProperties.setType(TransactionType.CONSENSUS_SUBMIT_MESSAGE); + publishScenario = new PublishScenario(publishScenarioProperties); + subscribeProperties = new SubscribeProperties(); subscribeProperties.getRest().put(restSubscriberProperties.getName(), restSubscriberProperties); @@ -177,14 +185,14 @@ void subscribe() { RestSubscription subscription = restSubscriber.getSubscriptions().blockFirst(); assertThat(subscription) .isNotNull() - .returns(2L, Subscription::getCount) - .returns(Map.of(), Subscription::getErrors) - .returns(SubscriberProtocol.REST, Subscription::getProtocol); + .returns(2L, Scenario::getCount) + .returns(Map.of(), Scenario::getErrors) + .returns(ScenarioProtocol.REST, Scenario::getProtocol); assertThat(responses).hasSize(2).allSatisfy(s -> { assertThat(s.getConsensusTimestamp()).isNotNull(); assertThat(s.getPublishedTimestamp()).isNotNull(); assertThat(s.getReceivedTimestamp()).isNotNull(); - assertThat(s.getSubscription()).isNotNull().isEqualTo(subscription); + assertThat(s.getScenario()).isNotNull().isEqualTo(subscription); }); } @@ -205,11 +213,11 @@ void multipleSubscribers() { assertThat(restSubscriber.getSubscriptions().collectList().block()) .hasSize(2) .allSatisfy(s -> assertThat(s).isNotNull() - .returns(2L, Subscription::getCount) - .returns(Map.of(), Subscription::getErrors) - .returns(restSubscriberProperties, Subscription::getProperties) - .returns(SubscriberProtocol.REST, Subscription::getProtocol)) - .extracting(Subscription::getId) + .returns(2L, Scenario::getCount) + .returns(Map.of(), Scenario::getErrors) + .returns(restSubscriberProperties, Scenario::getProperties) + .returns(ScenarioProtocol.REST, Scenario::getProtocol)) + .extracting(Scenario::getId) .containsExactly(1, 2); } @@ -233,9 +241,9 @@ void multipleScenarios() { .hasSize(2) .doesNotHaveDuplicates() .allSatisfy(s -> assertThat(s).isNotNull() - .returns(2L, Subscription::getCount) - .returns(Map.of(), Subscription::getErrors)) - .extracting(Subscription::getName) + .returns(2L, Scenario::getCount) + .returns(Map.of(), Scenario::getErrors)) + .extracting(Scenario::getName) .doesNotHaveDuplicates(); } @@ -278,10 +286,10 @@ void subscribeEmpty() { verifyNoInteractions(exchangeFunction); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(0L, Subscription::getCount) - .returns(Map.of(), Subscription::getErrors) - .returns(SubscriberProtocol.REST, Subscription::getProtocol) - .returns(SubscriptionStatus.IDLE, Subscription::getStatus); + .returns(0L, Scenario::getCount) + .returns(Map.of(), Scenario::getErrors) + .returns(ScenarioProtocol.REST, Scenario::getProtocol) + .returns(ScenarioStatus.IDLE, Scenario::getStatus); } @Test @@ -303,10 +311,10 @@ void duration() { verify(exchangeFunction, times(2)).exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(1L, Subscription::getCount) - .returns(Map.of(), Subscription::getErrors) - .returns(SubscriberProtocol.REST, Subscription::getProtocol) - .returns(SubscriptionStatus.COMPLETED, Subscription::getStatus); + .returns(1L, Scenario::getCount) + .returns(Map.of(), Scenario::getErrors) + .returns(ScenarioProtocol.REST, Scenario::getProtocol) + .returns(ScenarioStatus.COMPLETED, Scenario::getStatus); } @Test @@ -325,8 +333,8 @@ void limitReached() { verify(exchangeFunction, times(3)).exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(3L, Subscription::getCount) - .returns(SubscriptionStatus.COMPLETED, Subscription::getStatus); + .returns(3L, Scenario::getCount) + .returns(ScenarioStatus.COMPLETED, Scenario::getStatus); } @Test @@ -346,7 +354,7 @@ void limitExceeded() { verify(exchangeFunction, times(3)).exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(3L, Subscription::getCount); + .returns(3L, Scenario::getCount); } @Test @@ -365,8 +373,8 @@ void nonRetryableError() { verify(exchangeFunction).exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(0L, Subscription::getCount) - .returns(Map.of("500", 1), Subscription::getErrors); + .returns(0L, Scenario::getCount) + .returns(Map.of("500", 1), Scenario::getErrors); } @Test @@ -386,8 +394,8 @@ void recovers() { verify(exchangeFunction, times(2)).exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(1L, Subscription::getCount) - .returns(Map.of(), Subscription::getErrors); + .returns(1L, Scenario::getCount) + .returns(Map.of(), Scenario::getErrors); } @Test @@ -407,8 +415,8 @@ void neverRecovers() { verify(exchangeFunction, times(3)).exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(0L, Subscription::getCount) - .returns(Map.of("404", 1), Subscription::getErrors); + .returns(0L, Scenario::getCount) + .returns(Map.of("404", 1), Scenario::getErrors); } @Test @@ -430,8 +438,8 @@ void samplePercent0() { verifyNoInteractions(exchangeFunction); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(0L, Subscription::getCount) - .returns(Map.of(), Subscription::getErrors); + .returns(0L, Scenario::getCount) + .returns(Map.of(), Scenario::getErrors); } @Test @@ -460,8 +468,8 @@ void samplePercent75() { verify(exchangeFunction, atMost(max)).exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(Map.of(), Subscription::getErrors) - .extracting(Subscription::getCount) + .returns(Map.of(), Scenario::getErrors) + .extracting(Scenario::getCount) .isNotNull() .matches(count -> count >= min && count <= max); } @@ -490,17 +498,16 @@ void samplePercent100() { .exchange(Mockito.isA(ClientRequest.class)); assertThat(restSubscriber.getSubscriptions().blockFirst()) .isNotNull() - .returns(Map.of(), Subscription::getErrors) - .extracting(Subscription::getCount) + .returns(Map.of(), Scenario::getErrors) + .extracting(Scenario::getCount) .isEqualTo(restSubscriberProperties.getLimit()); } private PublishResponse publishResponse() { return PublishResponse.builder() .request(PublishRequest.builder() - .scenarioName(SCENARIO) + .scenario(publishScenario) .timestamp(Instant.now()) - .type(TransactionType.CONSENSUS_SUBMIT_MESSAGE) .build()) .timestamp(Instant.now()) .transactionId(TransactionId.withValidStart(AccountId.fromString("0.0.1000"), Instant.ofEpochSecond(1))) From bb3f7400f536dfa9528bde5dbd9e53af3265ee41 Mon Sep 17 00:00:00 2001 From: Steven Sheehy Date: Mon, 2 Aug 2021 13:28:27 -0500 Subject: [PATCH 2/6] Fix tests Signed-off-by: Steven Sheehy --- charts/hedera-mirror-monitor/values.yaml | 2 +- .../generator/ConfigurableTransactionGeneratorTest.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/charts/hedera-mirror-monitor/values.yaml b/charts/hedera-mirror-monitor/values.yaml index da51129c029..f31665e07db 100644 --- a/charts/hedera-mirror-monitor/values.yaml +++ b/charts/hedera-mirror-monitor/values.yaml @@ -183,7 +183,7 @@ prometheusRules: description: "Averaging {{ $value | humanizePercentage }} error rate publishing '{{ $labels.scenario }}' scenario from {{ $labels.namespace }}/{{ $labels.pod }}" summary: "Publish error rate exceeds 15%" enabled: true - expr: expr: sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor",status!="SUCCESS"}[2m])) by (namespace, pod, scenario) / sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor"}[2m])) by (namespace, pod, scenario) > 0.33 + expr: sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor",status!="SUCCESS"}[2m])) by (namespace, pod, scenario) / sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor"}[2m])) by (namespace, pod, scenario) > 0.33 for: 3m labels: severity: critical diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java index 0c7f2bbe9a4..4abcd0270f8 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java @@ -94,9 +94,8 @@ void nextCountMoreThanLimit() { @Test void unknownField() { properties.setProperties(Map.of("foo", "bar", "topicId", TOPIC_ID)); - assertThatThrownBy(() -> generator.get().next()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Unrecognized field"); + List publishRequests = generator.get().next(); + assertThat(publishRequests).hasSize(1); // No error } @Test From d90d8ed745f5856ecfc117f99164e09f2d5b2d1a Mon Sep 17 00:00:00 2001 From: Steven Sheehy Date: Mon, 2 Aug 2021 13:56:49 -0500 Subject: [PATCH 3/6] Fix code smells Signed-off-by: Steven Sheehy --- .../monitor/publish/TransactionPublisher.java | 4 ++++ .../monitor/subscribe/MirrorSubscriber.java | 2 +- .../monitor/subscribe/SubscribeMetrics.java | 12 ++++++------ .../monitor/subscribe/SubscribeResponse.java | 2 +- .../controller/SubscriberController.java | 10 +++++----- .../monitor/publish/TransactionPublisherTest.java | 5 +++++ .../ConfigurableTransactionGeneratorTest.java | 15 ++++++++------- 7 files changed, 30 insertions(+), 20 deletions(-) diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java index 12db23f6b25..fe4454d475a 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java @@ -71,6 +71,10 @@ public void close() { } public Mono publish(PublishRequest request) { + if (!publishProperties.isEnabled()) { + return Mono.empty(); + } + log.trace("Publishing: {}", request); int clientIndex = secureRandom.nextInt(publishProperties.getClients()); PublishScenario scenario = request.getScenario(); diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java index 7b935af1926..1ec7f3a2cc2 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/MirrorSubscriber.java @@ -31,5 +31,5 @@ default void onPublish(PublishResponse response) { Flux subscribe(); - Flux getSubscriptions(); + > Flux getSubscriptions(); } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java index c2edb129ece..0a962f0aa03 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java @@ -46,14 +46,14 @@ public class SubscribeMetrics { static final String TAG_SCENARIO = "scenario"; static final String TAG_SUBSCRIBER = "subscriber"; - private final Map durationMetrics = new ConcurrentHashMap<>(); - private final Map latencyMetrics = new ConcurrentHashMap<>(); + private final Map, TimeGauge> durationMetrics = new ConcurrentHashMap<>(); + private final Map, Timer> latencyMetrics = new ConcurrentHashMap<>(); private final MeterRegistry meterRegistry; private final SubscribeProperties subscribeProperties; public void onNext(SubscribeResponse response) { log.trace("Response: {}", response); - Scenario scenario = response.getScenario(); + Scenario scenario = response.getScenario(); Instant publishedTimestamp = response.getPublishedTimestamp(); durationMetrics.computeIfAbsent(scenario, this::newDurationGauge); @@ -63,7 +63,7 @@ public void onNext(SubscribeResponse response) { } } - private TimeGauge newDurationGauge(Scenario scenario) { + private TimeGauge newDurationGauge(Scenario scenario) { return TimeGauge.builder(METRIC_DURATION, scenario, TimeUnit.NANOSECONDS, s -> s.getElapsed().toNanos()) .description("How long the subscriber has been running") .tag(TAG_PROTOCOL, scenario.getProtocol().toString()) @@ -72,7 +72,7 @@ private TimeGauge newDurationGauge(Scenario scenario) { .register(meterRegistry); } - private final Timer newLatencyTimer(Scenario scenario) { + private final Timer newLatencyTimer(Scenario scenario) { return Timer.builder(METRIC_E2E) .description("The end to end transaction latency starting from publish and ending at receive") .tag(TAG_PROTOCOL, scenario.getProtocol().toString()) @@ -88,7 +88,7 @@ public void status() { } } - private void status(Scenario s) { + private void status(Scenario s) { if (s.isRunning()) { String elapsed = DurationToStringSerializer.convert(s.getElapsed()); log.info("{} {}: {} transactions in {} at {}/s. Errors: {}", diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java index 31957240f74..53c15e3b064 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeResponse.java @@ -30,5 +30,5 @@ public class SubscribeResponse { private final Instant consensusTimestamp; private final Instant publishedTimestamp; private final Instant receivedTimestamp; - private final Scenario scenario; + private final Scenario scenario; } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java index ae0a72a3798..49492b67f0e 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/controller/SubscriberController.java @@ -50,8 +50,8 @@ class SubscriberController { private final MirrorSubscriber mirrorSubscriber; @GetMapping - public Flux subscriptions(@RequestParam Optional protocol, - @RequestParam Optional> status) { + public Flux> subscriptions(@RequestParam Optional protocol, + @RequestParam Optional> status) { return mirrorSubscriber.getSubscriptions() .filter(s -> !protocol.isPresent() || protocol.get() == s.getProtocol()) .filter(s -> !status.isPresent() || status.get().contains(s.getStatus())) @@ -59,15 +59,15 @@ public Flux subscriptions(@RequestParam Optional pro } @GetMapping("/{name}") - public Flux subscriptions(@PathVariable String name, - @RequestParam Optional> status) { + public Flux> subscriptions(@PathVariable String name, + @RequestParam Optional> status) { return subscriptions(Optional.empty(), status) .filter(subscription -> subscription.getName().equals(name)) .switchIfEmpty(Mono.error(new NoSuchElementException())); } @GetMapping("/{name}/{id}") - public Mono subscription(@PathVariable String name, @PathVariable int id) { + public Mono> subscription(@PathVariable String name, @PathVariable int id) { return subscriptions(name, Optional.empty()) .filter(s -> s.getId() == id) .last(); diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java index 9d1368074af..d7881753b29 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/TransactionPublisherTest.java @@ -296,6 +296,11 @@ void someValidNodes() { void closeWhenDisabled() { publishProperties.setEnabled(false); transactionPublisher.close(); + transactionPublisher.publish(request().build()) + .as(StepVerifier::create) + .expectNextCount(0L) + .expectComplete() + .verify(Duration.ofSeconds(1L)); } private PublishRequest.PublishRequestBuilder request() { diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java index 4abcd0270f8..4268e13b296 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.validation.ConstraintViolationException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -62,8 +61,7 @@ void init() { properties.setTps(100_000); properties.setType(TransactionType.CONSENSUS_SUBMIT_MESSAGE); generator = Suppliers.memoize(() -> new ConfigurableTransactionGenerator(p -> p, - p -> p.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), - properties)); + p -> Collections.unmodifiableMap(p), properties)); } @Test @@ -85,8 +83,9 @@ void nextTwo() { @Test void nextCountMoreThanLimit() { properties.setLimit(4); + ConfigurableTransactionGenerator transactionGenerator = generator.get(); assertRequests(generator.get().next(5), 4); - assertThatThrownBy(() -> generator.get().next()) + assertThatThrownBy(transactionGenerator::next) .isInstanceOf(ScenarioException.class) .hasMessageContaining("Reached publish limit"); } @@ -103,7 +102,7 @@ void reachedLimit() { properties.setLimit(1); TransactionGenerator transactionGenerator = generator.get(); assertRequests(transactionGenerator.next()); - assertThatThrownBy(() -> transactionGenerator.next()) + assertThatThrownBy(transactionGenerator::next) .isInstanceOf(ScenarioException.class) .hasMessageContaining("Reached publish limit"); } @@ -111,7 +110,8 @@ void reachedLimit() { @Test void reachedDuration() { properties.setDuration(Duration.ofSeconds(-5L)); - assertThatThrownBy(() -> generator.get().next()) + ConfigurableTransactionGenerator transactionGenerator = generator.get(); + assertThatThrownBy(transactionGenerator::next) .isInstanceOf(ScenarioException.class) .hasMessageContaining("Reached publish duration"); } @@ -189,7 +189,8 @@ void recordPercent() { @Test void missingRequiredField() { properties.setProperties(Collections.emptyMap()); - assertThatThrownBy(() -> generator.get().next()).isInstanceOf(ConstraintViolationException.class); + ConfigurableTransactionGenerator transactionGenerator = generator.get(); + assertThatThrownBy(transactionGenerator::next).isInstanceOf(ConstraintViolationException.class); } private void assertRequests(List publishRequests, int size) { From 1c185cfd07b39d929ead638002d850227c37627e Mon Sep 17 00:00:00 2001 From: Steven Sheehy Date: Tue, 3 Aug 2021 09:27:59 -0500 Subject: [PATCH 4/6] Improve performance Signed-off-by: Steven Sheehy --- .../expression/ExpressionConverterImpl.java | 17 +++++--- .../monitor/publish/TransactionPublisher.java | 39 ++++++++++++++----- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java index 738f9386317..c83a4e4e4a2 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java @@ -35,6 +35,7 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import reactor.core.publisher.Mono; +import reactor.util.retry.RetryBackoffSpec; import reactor.util.retry.RetrySpec; import com.hedera.datagenerator.sdk.supplier.AdminKeyable; @@ -126,11 +127,17 @@ private synchronized String doConvert(Expression expression) { .transaction(transactionSupplier.get()) .build(); - PublishResponse publishResponse = Mono.defer(() -> transactionPublisher.publish(request)) - .retryWhen(RetrySpec.backoff(Long.MAX_VALUE, Duration.ofMillis(500L)) - .maxBackoff(Duration.ofSeconds(8L))) - .toFuture().join(); - String createdId = type.getIdExtractor().apply(publishResponse.getReceipt()); + RetryBackoffSpec retrySpec = RetrySpec.backoff(Long.MAX_VALUE, Duration.ofMillis(500L)) + .maxBackoff(Duration.ofSeconds(8L)) + .doBeforeRetry(r -> log.warn("Retry attempt #{} after failure: {}", + r.totalRetries() + 1, r.failure().getMessage())); + + String createdId = Mono.defer(() -> transactionPublisher.publish(request)) + .retryWhen(retrySpec) + .map(PublishResponse::getReceipt) + .map(type.getIdExtractor()::apply) + .toFuture() + .join(); log.info("Created {} entity {}", type, createdId); return createdId; } catch (Exception e) { diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java index fe4454d475a..b831bc6f03f 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/TransactionPublisher.java @@ -27,7 +27,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import javax.inject.Named; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -39,6 +41,7 @@ import com.hedera.hashgraph.sdk.Hbar; import com.hedera.hashgraph.sdk.HbarUnit; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.Transaction; import com.hedera.hashgraph.sdk.TransactionId; import com.hedera.hashgraph.sdk.TransactionRecordQuery; import com.hedera.hashgraph.sdk.TransactionResponse; @@ -54,6 +57,7 @@ public class TransactionPublisher implements AutoCloseable { private final MonitorProperties monitorProperties; private final PublishProperties publishProperties; private final Flux clients = Flux.defer(this::getClients).cache(); + private final List nodeAccountIds = new CopyOnWriteArrayList<>(); private final SecureRandom secureRandom = new SecureRandom(); @Override @@ -81,9 +85,7 @@ public Mono publish(PublishRequest request) { PublishScenarioProperties properties = scenario.getProperties(); return clients.elementAt(clientIndex) - .flatMap(client -> Mono.fromFuture(request.getTransaction() - .setTransactionMemo(scenario.getMemo()) - .executeAsync(client)) + .flatMap(client -> getTransactionResponse(request, client) .flatMap(r -> processTransactionResponse(client, request, r)) .map(PublishResponse.PublishResponseBuilder::build) .doOnNext(response -> { @@ -97,6 +99,20 @@ public Mono publish(PublishRequest request) { .onErrorMap(t -> new PublishException(request, t))); } + private Mono getTransactionResponse(PublishRequest request, Client client) { + Transaction transaction = request.getTransaction(); + transaction.setTransactionMemo(request.getScenario().getMemo()); + + // set transaction node where applicable + if (transaction.getNodeAccountIds() == null) { + int nodeIndex = secureRandom.nextInt(nodeAccountIds.size()); + List nodeAccountId = List.of(nodeAccountIds.get(nodeIndex)); + transaction.setNodeAccountIds(nodeAccountId); + } + + return Mono.fromFuture(transaction.executeAsync(client)); + } + private Mono processTransactionResponse(Client client, PublishRequest request, TransactionResponse transactionResponse) { @@ -127,9 +143,13 @@ private Flux getClients() { throw new IllegalArgumentException("No valid nodes found"); } + validNodes.forEach(n -> nodeAccountIds.add(AccountId.fromString(n.getAccountId()))); + Map nodeMap = validNodes.stream() + .collect(Collectors.toMap(NodeProperties::getEndpoint, p -> AccountId.fromString(p.getAccountId()))); + log.info("Creating {} connections to {} nodes", publishProperties.getClients(), validNodes.size()); + return Flux.range(0, publishProperties.getClients()) - .map(i -> i % validNodes.size()) - .flatMap(i -> Flux.defer(() -> Mono.just(toClient(validNodes.get(i))))); + .flatMap(i -> Flux.defer(() -> Mono.just(toClient(nodeMap)))); } private List validateNodes() { @@ -142,7 +162,9 @@ private List validateNodes() { List validNodes = new ArrayList<>(); for (NodeProperties node : nodes) { - try (Client client = toClient(node)) { + AccountId nodeAccountId = AccountId.fromString(node.getAccountId()); + + try (Client client = toClient(Map.of(node.getEndpoint(), nodeAccountId))) { if (validateNode(client, node)) { validNodes.add(node); } @@ -177,12 +199,11 @@ private boolean validateNode(Client client, NodeProperties node) { return valid; } - private Client toClient(NodeProperties nodeProperties) { - AccountId nodeAccount = AccountId.fromString(nodeProperties.getAccountId()); + private Client toClient(Map nodes) { AccountId operatorId = AccountId.fromString(monitorProperties.getOperator().getAccountId()); PrivateKey operatorPrivateKey = PrivateKey.fromString(monitorProperties.getOperator().getPrivateKey()); - Client client = Client.forNetwork(Map.of(nodeProperties.getEndpoint(), nodeAccount)); + Client client = Client.forNetwork(nodes); client.setOperator(operatorId, operatorPrivateKey); return client; } From 55aa5b6ba0a1549236e2d04ee6c3078af3398d09 Mon Sep 17 00:00:00 2001 From: Steven Sheehy Date: Tue, 3 Aug 2021 09:46:29 -0500 Subject: [PATCH 5/6] Address review comments Signed-off-by: Steven Sheehy --- charts/hedera-mirror-monitor/values.yaml | 4 ++-- .../mirror/monitor/ClusterHealthIndicator.java | 2 ++ .../com/hedera/mirror/monitor/NodeProperties.java | 1 + .../mirror/monitor/publish/PublishMetrics.java | 15 +++++++++------ .../ConfigurableTransactionGenerator.java | 2 ++ .../monitor/subscribe/SubscribeMetrics.java | 13 +++++++------ .../ConfigurableTransactionGeneratorTest.java | 8 ++++++++ 7 files changed, 31 insertions(+), 14 deletions(-) diff --git a/charts/hedera-mirror-monitor/values.yaml b/charts/hedera-mirror-monitor/values.yaml index f31665e07db..a59ba7ef0aa 100644 --- a/charts/hedera-mirror-monitor/values.yaml +++ b/charts/hedera-mirror-monitor/values.yaml @@ -181,7 +181,7 @@ prometheusRules: MonitorPublishErrors: annotations: description: "Averaging {{ $value | humanizePercentage }} error rate publishing '{{ $labels.scenario }}' scenario from {{ $labels.namespace }}/{{ $labels.pod }}" - summary: "Publish error rate exceeds 15%" + summary: "Publish error rate exceeds 33%" enabled: true expr: sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor",status!="SUCCESS"}[2m])) by (namespace, pod, scenario) / sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor"}[2m])) by (namespace, pod, scenario) > 0.33 for: 3m @@ -207,7 +207,7 @@ prometheusRules: description: "Averaging {{ $value | humanizePercentage }} PLATFORM_NOT_ACTIVE or UNAVAILABLE errors while attempting to publish in {{ $labels.namespace }}" summary: "Platform is not active" enabled: true - expr: sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor",status=~"(PLATFORM_NOT_ACTIVE|UNAVAILABLE)"}[2m])) by (namespace) / sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor"}[2m])) by (namespace) > 0 + expr: sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor",status=~"(PLATFORM_NOT_ACTIVE|UNAVAILABLE)"}[2m])) by (namespace) / sum(rate(hedera_mirror_monitor_publish_submit_seconds_count{application="hedera-mirror-monitor"}[2m])) by (namespace) > 0.33 for: 1m labels: application: hedera-mirror-monitor diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java index cd3aef31501..9931acc9a98 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/ClusterHealthIndicator.java @@ -56,6 +56,7 @@ public Mono health() { return publishing().switchIfEmpty(subscribing()); } + // Returns unknown if all publish scenarios aggregated rate has dropped to zero, otherwise returns an empty flux private Mono publishing() { return transactionGenerator.scenarios() .map(Scenario::getRate) @@ -64,6 +65,7 @@ private Mono publishing() { .flatMap(n -> UNKNOWN); } + // Returns up if any subscription is running and its rate is above zero, otherwise returns down private Mono subscribing() { return mirrorSubscriber.getSubscriptions() .map(Scenario::getRate) diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java index 55bab34e744..d3a17921b86 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/NodeProperties.java @@ -46,6 +46,7 @@ public NodeProperties(String accountId, String host) { private int port = 50211; public String getEndpoint() { + // Allow for in-process testing of gRPC stubs if (host.startsWith("in-process:")) { return host; } diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java index 50d0b42d1ef..738fc12bdb1 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java @@ -144,16 +144,19 @@ private Timer newSubmitMetric(Tags tags) { @Scheduled(fixedDelayString = "${hedera.mirror.monitor.publish.statusFrequency:10000}") public void status() { if (publishProperties.isEnabled()) { - durationGauges.keySet().stream().map(Tags::getScenario).distinct().forEach(this::status); + durationGauges.keySet() + .stream() + .map(Tags::getScenario) + .distinct() + .filter(PublishScenario::isRunning) + .forEach(this::status); } } private void status(PublishScenario scenario) { - if (scenario.isRunning()) { - String elapsed = DurationToStringSerializer.convert(scenario.getElapsed()); - log.info("{}: {} transactions in {} at {}/s. Errors: {}", - scenario, scenario.getCount(), elapsed, scenario.getRate(), scenario.getErrors()); - } + String elapsed = DurationToStringSerializer.convert(scenario.getElapsed()); + log.info("{}: {} transactions in {} at {}/s. Errors: {}", + scenario, scenario.getCount(), elapsed, scenario.getRate(), scenario.getErrors()); } @Value diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGenerator.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGenerator.java index a1b732b6210..f8b77af8c40 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGenerator.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGenerator.java @@ -38,6 +38,7 @@ import lombok.Getter; import lombok.extern.log4j.Log4j2; import org.hibernate.validator.messageinterpolation.ParameterMessageInterpolator; +import org.springframework.util.Assert; import reactor.core.publisher.Flux; import com.hedera.datagenerator.sdk.supplier.TransactionSupplier; @@ -73,6 +74,7 @@ public ConfigurableTransactionGenerator(ExpressionConverter expressionConverter, stopTime = System.nanoTime() + properties.getDuration().toNanos(); scenario = new PublishScenario(properties); builder = PublishRequest.builder().scenario(scenario); + Assert.state(properties.getRetry().getMaxAttempts() > 0, "maxAttempts must be positive"); } @Override diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java index 0a962f0aa03..a7496529b74 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/subscribe/SubscribeMetrics.java @@ -84,15 +84,16 @@ private final Timer newLatencyTimer(Scenario scenario) { @Scheduled(fixedDelayString = "${hedera.mirror.monitor.subscribe.statusFrequency:10000}") public void status() { if (subscribeProperties.isEnabled()) { - durationMetrics.keySet().forEach(this::status); + durationMetrics.keySet() + .stream() + .filter(Scenario::isRunning) + .forEach(this::status); } } private void status(Scenario s) { - if (s.isRunning()) { - String elapsed = DurationToStringSerializer.convert(s.getElapsed()); - log.info("{} {}: {} transactions in {} at {}/s. Errors: {}", - s.getProtocol(), s, s.getCount(), elapsed, s.getRate(), s.getErrors()); - } + String elapsed = DurationToStringSerializer.convert(s.getElapsed()); + log.info("{} {}: {} transactions in {} at {}/s. Errors: {}", + s.getProtocol(), s, s.getCount(), elapsed, s.getRate(), s.getErrors()); } } diff --git a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java index 4268e13b296..6def9db6ebe 100644 --- a/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java +++ b/hedera-mirror-monitor/src/test/java/com/hedera/mirror/monitor/publish/generator/ConfigurableTransactionGeneratorTest.java @@ -90,6 +90,14 @@ void nextCountMoreThanLimit() { .hasMessageContaining("Reached publish limit"); } + @Test + void invalidMaxAttempts() { + properties.getRetry().setMaxAttempts(0L); + assertThatThrownBy(generator::get) + .isInstanceOf(IllegalStateException.class) + .hasMessage("maxAttempts must be positive"); + } + @Test void unknownField() { properties.setProperties(Map.of("foo", "bar", "topicId", TOPIC_ID)); From 69e22d064bbed4a2ac9f64609a7babb1a8e53d6a Mon Sep 17 00:00:00 2001 From: Steven Sheehy Date: Tue, 3 Aug 2021 12:49:48 -0500 Subject: [PATCH 6/6] Fix one code smell Signed-off-by: Steven Sheehy --- .../mirror/monitor/expression/ExpressionConverterImpl.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java index c83a4e4e4a2..450c357fe73 100644 --- a/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java +++ b/hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/expression/ExpressionConverterImpl.java @@ -35,8 +35,7 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import reactor.core.publisher.Mono; -import reactor.util.retry.RetryBackoffSpec; -import reactor.util.retry.RetrySpec; +import reactor.util.retry.Retry; import com.hedera.datagenerator.sdk.supplier.AdminKeyable; import com.hedera.datagenerator.sdk.supplier.TransactionSupplier; @@ -127,7 +126,7 @@ private synchronized String doConvert(Expression expression) { .transaction(transactionSupplier.get()) .build(); - RetryBackoffSpec retrySpec = RetrySpec.backoff(Long.MAX_VALUE, Duration.ofMillis(500L)) + Retry retrySpec = Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(500L)) .maxBackoff(Duration.ofSeconds(8L)) .doBeforeRetry(r -> log.warn("Retry attempt #{} after failure: {}", r.totalRetries() + 1, r.failure().getMessage()));