Skip to content

Commit

Permalink
add runnable/callable class in async invoke method (sofastack#1327)
Browse files Browse the repository at this point in the history
Co-authored-by: 致节 <hzj266771@antgroup.com>
  • Loading branch information
HzjNeverStop and 致节 authored May 28, 2024
1 parent 5473c76 commit 88347b5
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -111,19 +112,8 @@ public boolean readinessHealthCheck(Map<String, Health> healthMap) {
if (isParallelCheck()) {
CountDownLatch countDownLatch = new CountDownLatch(readinessHealthCheckers.size());
AtomicBoolean parallelResult = new AtomicBoolean(true);
readinessHealthCheckers.forEach((String key, HealthChecker value) -> healthCheckExecutor.execute(() -> {
try {
if (!doHealthCheck(key, value, false, healthMap, true, false)) {
parallelResult.set(false);
}
} catch (Throwable t) {
parallelResult.set(false);
logger.error(ErrorCode.convert("01-22004"), t);
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN).build());
} finally {
countDownLatch.countDown();
}
}));
readinessHealthCheckers.forEach((String key, HealthChecker value) -> healthCheckExecutor.execute(
new AsyncHealthCheckRunnable(key, value, healthMap, parallelResult, countDownLatch)));
boolean finished = false;
try {
finished = countDownLatch.await(getParallelCheckTimeout(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -160,7 +150,7 @@ public boolean readinessHealthCheck(Map<String, Health> healthMap) {
* @return health check passes or not
*/
private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolean isRetry,
Map<String, Health> healthMap, boolean isReadiness, boolean wait) {
Map<String, Health> healthMap, boolean isReadiness, boolean wait) {
Assert.notNull(healthMap, "HealthMap must not be null");

Health health;
Expand All @@ -180,20 +170,23 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea
do {
try {
if (wait) {
Future<Health> future = healthCheckExecutor.submit(healthChecker::isHealthy);
Future<Health> future = healthCheckExecutor
.submit(new AsyncHealthCheckCallable(healthChecker));
health = future.get(timeout, TimeUnit.MILLISECONDS);
} else {
health = healthChecker.isHealthy();
}
} catch (TimeoutException e) {
logger.error(
} catch (TimeoutException e) {
logger
.error(
"Timeout occurred while doing HealthChecker[{}] {} check, the timeout value is: {}ms.",
beanId, checkType, timeout);
health = new Health.Builder().withException(e).withDetail("timeout", timeout).status(Status.UNKNOWN).build();
health = new Health.Builder().withException(e).withDetail("timeout", timeout)
.status(Status.UNKNOWN).build();
} catch (Throwable e) {
logger.error(String.format(
"Exception occurred while wait the result of HealthChecker[%s] %s check.",
beanId, checkType), e);
"Exception occurred while wait the result of HealthChecker[%s] %s check.",
beanId, checkType), e);
health = new Health.Builder().withException(e).status(Status.DOWN).build();
}
result = health.getStatus().equals(Status.UP);
Expand All @@ -208,9 +201,7 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea
retryCount += 1;
TimeUnit.MILLISECONDS.sleep(healthChecker.getRetryTimeInterval());
} catch (InterruptedException e) {
logger
.error(ErrorCode.convert("01-23002", retryCount, beanId,
checkType), e);
logger.error(ErrorCode.convert("01-23002", retryCount, beanId, checkType), e);
}
}
} while (isRetry && retryCount < healthChecker.getRetryCount());
Expand All @@ -223,12 +214,12 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea
if (!result) {
if (healthChecker.isStrictCheck()) {
logger.error(ErrorCode.convert("01-23001", beanId, checkType, retryCount,
objectMapper.writeValueAsString(health.getDetails()),
healthChecker.isStrictCheck()));
objectMapper.writeValueAsString(health.getDetails()),
healthChecker.isStrictCheck()));
} else {
logger.warn(ErrorCode.convert("01-23001", beanId, checkType, retryCount,
objectMapper.writeValueAsString(health.getDetails()),
healthChecker.isStrictCheck()));
objectMapper.writeValueAsString(health.getDetails()),
healthChecker.isStrictCheck()));
}
}
} catch (JsonProcessingException ex) {
Expand Down Expand Up @@ -364,4 +355,55 @@ public int getTimeout() {
}

}

private class AsyncHealthCheckRunnable implements Runnable {
private final String key;
private final HealthChecker value;
private final Map<String, Health> healthMap;

private final AtomicBoolean parallelResult;

private final CountDownLatch countDownLatch;

public AsyncHealthCheckRunnable(String key, HealthChecker value,
Map<String, Health> healthMap,
AtomicBoolean parallelResult, CountDownLatch countDownLatch) {
this.key = key;
this.value = value;
this.healthMap = healthMap;
this.parallelResult = parallelResult;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
if (!HealthCheckerProcessor.this.doHealthCheck(key, value, false, healthMap, true,
false)) {
parallelResult.set(false);
}
} catch (Throwable t) {
parallelResult.set(false);
logger.error(ErrorCode.convert("01-22004"), t);
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN)
.build());
} finally {
countDownLatch.countDown();
}
}
}

private class AsyncHealthCheckCallable implements Callable<Health> {

private final HealthChecker healthChecker;

public AsyncHealthCheckCallable(HealthChecker healthChecker) {
this.healthChecker = healthChecker;
}

@Override
public Health call() throws Exception {
return healthChecker.isHealthy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -167,19 +168,8 @@ public boolean readinessHealthCheck(Map<String, Health> healthMap) {
if (isParallelCheck()) {
CountDownLatch countDownLatch = new CountDownLatch(healthIndicators.size());
AtomicBoolean parallelResult = new AtomicBoolean(true);
healthIndicators.forEach((key, value) -> healthCheckExecutor.execute(() -> {
try {
if (!doHealthCheck(key, value, healthMap, false)) {
parallelResult.set(false);
}
} catch (Throwable t) {
parallelResult.set(false);
logger.error(ErrorCode.convert("01-21003"), t);
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN).build());
} finally {
countDownLatch.countDown();
}
}));
healthIndicators.forEach((key, value) -> healthCheckExecutor.execute(
new AsyncHealthIndicatorRunnable(key, value, healthMap, parallelResult, countDownLatch)));
boolean finished = false;
try {
finished = countDownLatch.await(getParallelCheckTimeout(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -226,7 +216,7 @@ public boolean doHealthCheck(String beanId, HealthIndicator healthIndicator,
try {
if (wait) {
Future<Health> future = healthCheckExecutor
.submit(healthIndicator::health);
.submit(new AsyncHealthIndicatorCallable(healthIndicator));
health = future.get(timeout, TimeUnit.MILLISECONDS);
} else {
health = healthIndicator.health();
Expand Down Expand Up @@ -315,4 +305,55 @@ public void setHealthIndicatorConfig(Map<String, HealthCheckerConfig> healthIndi
public List<BaseStat> getHealthIndicatorStartupStatList() {
return healthIndicatorStartupStatList;
}

private class AsyncHealthIndicatorRunnable implements Runnable {
private final String key;
private final HealthIndicator value;
private final Map<String, Health> healthMap;

private final AtomicBoolean parallelResult;

private final CountDownLatch countDownLatch;

public AsyncHealthIndicatorRunnable(String key, HealthIndicator value,
Map<String, Health> healthMap,
AtomicBoolean parallelResult,
CountDownLatch countDownLatch) {
this.key = key;
this.value = value;
this.healthMap = healthMap;
this.parallelResult = parallelResult;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
if (!HealthIndicatorProcessor.this.doHealthCheck(key, value, healthMap, false)) {
parallelResult.set(false);
}
} catch (Throwable t) {
parallelResult.set(false);
logger.error(ErrorCode.convert("01-21003"), t);
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN)
.build());
} finally {
countDownLatch.countDown();
}
}
}

private class AsyncHealthIndicatorCallable implements Callable<Health> {

private final HealthIndicator healthIndicator;

public AsyncHealthIndicatorCallable(HealthIndicator healthIndicator) {
this.healthIndicator = healthIndicator;
}

@Override
public Health call() throws Exception {
return healthIndicator.health();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,38 +184,12 @@ private void doRefreshSpringContextParallel() {
/**
* Refresh all {@link ApplicationContext} recursively
*/
private void refreshRecursively(DeploymentDescriptor deployment,
CountDownLatch latch, List<Future<?>> futures) {
private void refreshRecursively(DeploymentDescriptor deployment, CountDownLatch latch,
List<Future<?>> futures) {
// if interrupted, moduleRefreshExecutorService will be null;
if (moduleRefreshExecutorService != null) {
futures.add(moduleRefreshExecutorService.submit(() -> {
String oldName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(
"sofa-module-refresh-" + deployment.getModuleName());
if (deployment.isSpringPowered() && !application.getFailed().contains(deployment)) {
refreshAndCollectCost(deployment);
}
DependencyTree.Entry<String, DeploymentDescriptor> entry = application
.getDeployRegistry().getEntry(deployment.getModuleName());
if (entry != null && entry.getDependsOnMe() != null) {
for (DependencyTree.Entry<String, DeploymentDescriptor> child : entry
.getDependsOnMe()) {
child.getDependencies().remove(entry);
if (child.getDependencies().size() == 0) {
refreshRecursively(child.get(), latch, futures);
}
}
}
} catch (Throwable t) {
LOGGER.error(ErrorCode.convert("01-11002", deployment.getName()), t);
throw new RuntimeException(ErrorCode.convert("01-11002", deployment.getName()),
t);
} finally {
latch.countDown();
Thread.currentThread().setName(oldName);
}
}));
futures.add(moduleRefreshExecutorService.submit(new AsyncSpringContextRunnable(
deployment, latch, futures)));
}
}

Expand Down Expand Up @@ -291,4 +265,46 @@ public String getName() {
public int getOrder() {
return 20000;
}

private class AsyncSpringContextRunnable implements Runnable {
private final DeploymentDescriptor deployment;
private final CountDownLatch latch;
private final List<Future<?>> futures;

public AsyncSpringContextRunnable(DeploymentDescriptor deployment, CountDownLatch latch,
List<Future<?>> futures) {
this.deployment = deployment;
this.latch = latch;
this.futures = futures;
}

@Override
public void run() {
String oldName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("sofa-module-refresh-" + deployment.getModuleName());
if (deployment.isSpringPowered() && !application.getFailed().contains(deployment)) {
SpringContextInstallStage.this.refreshAndCollectCost(deployment);
}
DependencyTree.Entry<String, DeploymentDescriptor> entry = application
.getDeployRegistry().getEntry(deployment.getModuleName());
if (entry != null && entry.getDependsOnMe() != null) {
for (DependencyTree.Entry<String, DeploymentDescriptor> child : entry
.getDependsOnMe()) {
child.getDependencies().remove(entry);
if (child.getDependencies().size() == 0) {
SpringContextInstallStage.this.refreshRecursively(child.get(), latch,
futures);
}
}
}
} catch (Throwable t) {
LOGGER.error(ErrorCode.convert("01-11002", deployment.getName()), t);
throw new RuntimeException(ErrorCode.convert("01-11002", deployment.getName()), t);
} finally {
latch.countDown();
Thread.currentThread().setName(oldName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,15 @@ public Object invoke(final MethodInvocation invocation) throws Throwable {
if (!isAsyncCalled && methodName.equals(asyncMethodName)) {
isAsyncCalled = true;
isAsyncCalling = true;
asyncInitMethodManager.submitTask(() -> {
try {
long startTime = System.currentTimeMillis();
invocation.getMethod().invoke(targetObject, invocation.getArguments());
LOGGER.info("{}({}) {} method execute {}dms.", targetObject
.getClass().getName(), beanName, methodName, (System
.currentTimeMillis() - startTime));
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
asyncMethodFinish();
}
});
asyncInitMethodManager.submitTask(new AsyncBeanInitRunnable(invocation));
return null;
}

if (isAsyncCalling) {
long startTime = System.currentTimeMillis();
initCountDownLatch.await();
LOGGER.info("{}({}) {} method wait {}ms.",
targetObject.getClass().getName(), beanName, methodName,
(System.currentTimeMillis() - startTime));
LOGGER.info("{}({}) {} method wait {}ms.", targetObject.getClass().getName(), beanName,
methodName, (System.currentTimeMillis() - startTime));
}
return invocation.getMethod().invoke(targetObject, invocation.getArguments());
}
Expand All @@ -105,4 +92,28 @@ void asyncMethodFinish() {
this.initCountDownLatch.countDown();
this.isAsyncCalling = false;
}

private class AsyncBeanInitRunnable implements Runnable {

private final MethodInvocation invocation;

public AsyncBeanInitRunnable(MethodInvocation invocation) {
this.invocation = invocation;
}

@Override
public void run() {
try {
long startTime = System.currentTimeMillis();
invocation.getMethod().invoke(targetObject, invocation.getArguments());
LOGGER.info("{}({}) {} method execute {}dms.", targetObject.getClass().getName(),
beanName, invocation.getMethod().getName(),
(System.currentTimeMillis() - startTime));
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
AsyncInitializeBeanMethodInvoker.this.asyncMethodFinish();
}
}
}
}

0 comments on commit 88347b5

Please sign in to comment.