Skip to content

Commit

Permalink
[ISSUE alibaba#2856]Adjust the use of thread pools (config module) (a…
Browse files Browse the repository at this point in the history
…libaba#3206)

* [ISSUE alibaba#2856]Adjust the use of thread pools (config、cmdb module)

* add CmdbExecutor class
  • Loading branch information
wangweizZZ committed Jul 13, 2020
1 parent 3a4e21b commit 891c1f3
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.alibaba.nacos.cmdb.core.SwitchAndOptions;
import com.alibaba.nacos.cmdb.service.CmdbReader;
import com.alibaba.nacos.cmdb.service.CmdbWriter;
import com.alibaba.nacos.cmdb.utils.CmdbExecutor;
import com.alibaba.nacos.cmdb.utils.Loggers;
import com.alibaba.nacos.cmdb.utils.UtilsAndCommons;
import com.alibaba.nacos.common.utils.JacksonUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -117,11 +117,9 @@ public void init() throws NacosException {
initCmdbService();
load();

UtilsAndCommons.GLOBAL_EXECUTOR.schedule(new CmdbDumpTask(), switches.getDumpTaskInterval(), TimeUnit.SECONDS);
UtilsAndCommons.GLOBAL_EXECUTOR
.schedule(new CmdbLabelTask(), switches.getLabelTaskInterval(), TimeUnit.SECONDS);
UtilsAndCommons.GLOBAL_EXECUTOR
.schedule(new CmdbEventTask(), switches.getEventTaskInterval(), TimeUnit.SECONDS);
CmdbExecutor.scheduleCmdbTask(new CmdbDumpTask(), switches.getDumpTaskInterval(), TimeUnit.SECONDS);
CmdbExecutor.scheduleCmdbTask(new CmdbLabelTask(), switches.getLabelTaskInterval(), TimeUnit.SECONDS);
CmdbExecutor.scheduleCmdbTask(new CmdbEventTask(), switches.getEventTaskInterval(), TimeUnit.SECONDS);
}

@Override
Expand Down Expand Up @@ -205,7 +203,7 @@ public void run() {
} catch (Exception e) {
Loggers.MAIN.error("CMDB-LABEL-TASK {}", "dump failed!", e);
} finally {
UtilsAndCommons.GLOBAL_EXECUTOR.schedule(this, switches.getLabelTaskInterval(), TimeUnit.SECONDS);
CmdbExecutor.scheduleCmdbTask(this, switches.getLabelTaskInterval(), TimeUnit.SECONDS);
}
}
}
Expand All @@ -227,7 +225,7 @@ public void run() {
} catch (Exception e) {
Loggers.MAIN.error("DUMP-TASK {}", "dump failed!", e);
} finally {
UtilsAndCommons.GLOBAL_EXECUTOR.schedule(this, switches.getDumpTaskInterval(), TimeUnit.SECONDS);
CmdbExecutor.scheduleCmdbTask(this, switches.getDumpTaskInterval(), TimeUnit.SECONDS);
}
}
}
Expand Down Expand Up @@ -271,7 +269,7 @@ public void run() {
} catch (Exception e) {
Loggers.MAIN.error("CMDB-EVENT {}", "event task failed!", e);
} finally {
UtilsAndCommons.GLOBAL_EXECUTOR.schedule(this, switches.getEventTaskInterval(), TimeUnit.SECONDS);
CmdbExecutor.scheduleCmdbTask(this, switches.getEventTaskInterval(), TimeUnit.SECONDS);
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions cmdb/src/main/java/com/alibaba/nacos/cmdb/utils/CmdbExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.cmdb.utils;

import com.alibaba.nacos.cmdb.CmdbApp;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.core.utils.ClassUtils;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Cmdb executor.
*
* @author wangweizZZ
* @date 2020/7/13 1:54 PM
*/
public class CmdbExecutor {

private static final ScheduledExecutorService GLOBAL_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(CmdbApp.class),
Runtime.getRuntime().availableProcessors(),
new NameThreadFactory("com.alibaba.nacos.cmdb.global.executor"));

public static void scheduleCmdbTask(Runnable runnable, long delay, TimeUnit unit) {
GLOBAL_EXECUTOR.schedule(runnable, delay, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package com.alibaba.nacos.cmdb.utils;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

/**
* Utils and constants.
*
Expand All @@ -32,19 +28,4 @@ public class UtilsAndCommons {

public static final String NACOS_CMDB_CONTEXT = NACOS_SERVER_VERSION + "/cmdb";

public static final ScheduledExecutorService GLOBAL_EXECUTOR;

static {

GLOBAL_EXECUTOR = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("nacos.cmdb.global.executor");
t.setDaemon(true);
return t;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@
"checkstyle:missingjavadocmethod"})
public final class ExecutorFactory {

private static final String DEFAULT_NAMESPACE = "nacos";

private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();

public static ExecutorService newSingleExecutorService() {
return Executors.newFixedThreadPool(1);
}
Expand Down Expand Up @@ -73,61 +69,6 @@ public static ThreadPoolExecutor newCustomerThreadExecutor(final int coreThreads
new LinkedBlockingQueue<Runnable>(), threadFactory);
}

//TODO remove Deprecated function after replace all module
@Deprecated
public static ExecutorService newSingleExecutorService(final String group) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ExecutorService newSingleExecutorService(final String group, final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ExecutorService newFixedExecutorService(final String group, final int nThreads) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ExecutorService newFixedExecutorService(final String group, final int nThreads,
final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ScheduledExecutorService newSingleScheduledExecutorService(final String group,
final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ScheduledExecutorService newScheduledExecutorService(final String group, final int nThreads,
final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ThreadPoolExecutor newCustomerThreadExecutor(final String group, final int coreThreads,
final int maxThreads, final long keepAliveTimeMs, final ThreadFactory threadFactory) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeMs,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executor);
return executor;
}

public static final class Managed {

private static final String DEFAULT_NAMESPACE = "nacos";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class MemoryMonitor {
@Autowired
public MemoryMonitor(AsyncNotifyService notifySingleService) {

ConfigExecutor.scheduleWithFixedDelay(new PrintMemoryTask(), DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);
ConfigExecutor.scheduleConfigTask(new PrintMemoryTask(), DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);

ConfigExecutor.scheduleWithFixedDelay(new PrintGetConfigResponeTask(), DELAY_SECONDS, DELAY_SECONDS,
TimeUnit.SECONDS);
ConfigExecutor
.scheduleConfigTask(new PrintGetConfigResponeTask(), DELAY_SECONDS, DELAY_SECONDS, TimeUnit.SECONDS);

ConfigExecutor.scheduleWithFixedDelay(new NotifyTaskQueueMonitorTask(notifySingleService), DELAY_SECONDS,
DELAY_SECONDS, TimeUnit.SECONDS);
ConfigExecutor
.scheduleConfigTask(new NotifyTaskQueueMonitorTask(notifySingleService), DELAY_SECONDS, DELAY_SECONDS,
TimeUnit.SECONDS);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package com.alibaba.nacos.config.server.monitor;

import com.alibaba.nacos.config.server.service.notify.AsyncNotifyService;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;

import static com.alibaba.nacos.config.server.utils.LogUtil.MEMORY_LOG;

Expand All @@ -37,7 +36,7 @@ public class NotifyTaskQueueMonitorTask implements Runnable {

@Override
public void run() {
int size = ((ScheduledThreadPoolExecutor) notifySingleService.getExecutor()).getQueue().size();
int size = ConfigExecutor.asyncNotifyQueueSize();
MEMORY_LOG.info("toNotifyTaskSize = {}", size);
MetricsMonitor.getNotifyTaskMonitor().set(size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package com.alibaba.nacos.config.server.service;

import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.SampleResult;
import com.alibaba.nacos.config.server.service.notify.NotifyService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.JSONUtils;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import org.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -37,18 +37,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Config sub service.
Expand All @@ -58,24 +55,12 @@
@Service
public class ConfigSubService {

private ScheduledExecutorService scheduler;

private ServerMemberManager memberManager;

@Autowired
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ConfigSubService(ServerMemberManager memberManager) {
this.memberManager = memberManager;

scheduler = Executors.newScheduledThreadPool(ThreadUtils.getSuitableThreadCount(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.ConfigSubService");
return t;
}
});
}

protected ConfigSubService() {
Expand Down Expand Up @@ -140,7 +125,7 @@ private List<SampleResult> runCollectionJob(String url, Map<String, String> para
* Merge SampleResult.
*
* @param sampleCollectResult sampleCollectResult.
* @param sampleResults sampleResults.
* @param sampleResults sampleResults.
* @return SampleResult.
*/
public SampleResult mergeSampleResult(SampleResult sampleCollectResult, List<SampleResult> sampleResults) {
Expand Down Expand Up @@ -195,7 +180,7 @@ public SampleResult call() throws Exception {
String urlAll = getUrl(ip, url) + "?" + paramUrl;
com.alibaba.nacos.config.server.service.notify.NotifyService.HttpResult result = NotifyService
.invokeURL(urlAll, null, Constants.ENCODE);

// Http code 200
if (result.code == HttpURLConnection.HTTP_OK) {
String json = result.content;
Expand Down Expand Up @@ -227,8 +212,8 @@ public SampleResult getCollectSampleResult(String dataId, String group, String t
}
BlockingQueue<Future<SampleResult>> queue = new LinkedBlockingDeque<Future<SampleResult>>(
memberManager.getServerList().size());
CompletionService<SampleResult> completionService = new ExecutorCompletionService<SampleResult>(scheduler,
queue);
CompletionService<SampleResult> completionService = new ExecutorCompletionService<SampleResult>(
ConfigExecutor.getConfigSubServiceExecutor(), queue);

SampleResult sampleCollectResult = new SampleResult();
for (int i = 0; i < sampleTime; i++) {
Expand All @@ -247,8 +232,8 @@ public SampleResult getCollectSampleResultByIp(String ip, int sampleTime) throws
params.put("ip", ip);
BlockingQueue<Future<SampleResult>> queue = new LinkedBlockingDeque<Future<SampleResult>>(
memberManager.getServerList().size());
CompletionService<SampleResult> completionService = new ExecutorCompletionService<SampleResult>(scheduler,
queue);
CompletionService<SampleResult> completionService = new ExecutorCompletionService<SampleResult>(
ConfigExecutor.getConfigSubServiceExecutor(), queue);

SampleResult sampleCollectResult = new SampleResult();
for (int i = 0; i < sampleTime; i++) {
Expand Down
Loading

0 comments on commit 891c1f3

Please sign in to comment.