Skip to content

Commit

Permalink
[ISSUE alibaba#2856]Adjust the use of thread pools (naming module) (a…
Browse files Browse the repository at this point in the history
…libaba#3136)

Change-Id: I626179bd9ee8e852d9d51787950ad80744ec71cd
  • Loading branch information
wangweizZZ committed Jun 28, 2020
1 parent b1f0d63 commit 3f145e6
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 368 deletions.
183 changes: 88 additions & 95 deletions istio/src/main/java/com/alibaba/nacos/istio/mcp/NacosMcpService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.istio.mcp;

package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.istio.misc.IstioConfig;
import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.model.Port;
import com.alibaba.nacos.istio.model.mcp.*;
import com.alibaba.nacos.istio.model.mcp.Metadata;
import com.alibaba.nacos.istio.model.mcp.RequestResources;
import com.alibaba.nacos.istio.model.mcp.Resource;
import com.alibaba.nacos.istio.model.mcp.ResourceSourceGrpc;
import com.alibaba.nacos.istio.model.mcp.Resources;
import com.alibaba.nacos.istio.model.naming.ServiceEntry;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
Expand All @@ -39,213 +43,202 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* nacos mcp service.
*
* @author nkorange
* @since 1.1.4
*/
@org.springframework.stereotype.Service
public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {

private AtomicInteger connectIdGenerator = new AtomicInteger(0);

private Map<Integer, StreamObserver<Resources>> connnections = new ConcurrentHashMap<>(16);

private Map<String, Resource> resourceMap = new ConcurrentHashMap<>(16);

private Map<String, String> checksumMap = new ConcurrentHashMap<>(16);

private static final String SERVICE_NAME_SPLITTER = "nacos";

private static final String MESSAGE_TYPE_URL = "type.googleapis.com/istio.networking.v1alpha3.ServiceEntry";

private static final long MCP_PUSH_PERIOD_MILLISECONDS = 10000L;

@Autowired
private ServiceManager serviceManager;

@Autowired
private IstioConfig istioConfig;


/**
* start mcpPushTask{@link McpPushTask}.
*/
@PostConstruct
public void init() {
if (!istioConfig.isMcpServerEnabled()) {
return;
}
GlobalExecutor.schedule(new McpPushTask(), MCP_PUSH_PERIOD_MILLISECONDS * 2, MCP_PUSH_PERIOD_MILLISECONDS);
GlobalExecutor
.scheduleMcpPushTask(new McpPushTask(), MCP_PUSH_PERIOD_MILLISECONDS * 2, MCP_PUSH_PERIOD_MILLISECONDS);
}

private class McpPushTask implements Runnable {

@Override
public void run() {

boolean changed = false;

// Query all services to see if any of them have changes:
Set<String> namespaces = serviceManager.getAllNamespaces();

for (String namespace : namespaces) {

Map<String, Service> services = serviceManager.getServiceMap(namespace);

if (services.isEmpty()) {
continue;
}

for (Service service : services.values()) {

String convertedName = convertName(service);

// Service not changed:
if (checksumMap.containsKey(convertedName) && checksumMap.get(convertedName).equals(service.getChecksum())) {
if (checksumMap.containsKey(convertedName) && checksumMap.get(convertedName)
.equals(service.getChecksum())) {
continue;
}

if (service.allIPs().isEmpty()) {
resourceMap.remove(convertedName);
continue;
}

// Update the resource:
changed = true;
resourceMap.put(convertedName, convertService(service));
checksumMap.put(convertedName, service.getChecksum());
}
}

if (!changed) {
// If no service changed, just return:
return;
}

Resources resources = Resources.newBuilder()
.addAllResources(resourceMap.values())
.setCollection(CollectionTypes.SERVICE_ENTRY)
.setNonce(String.valueOf(System.currentTimeMillis()))
.build();


Resources resources = Resources.newBuilder().addAllResources(resourceMap.values())
.setCollection(CollectionTypes.SERVICE_ENTRY).setNonce(String.valueOf(System.currentTimeMillis()))
.build();

if (connnections.isEmpty()) {
return;
}

Loggers.MAIN.info("MCP push, resource count is: {}", resourceMap.size());

if (Loggers.MAIN.isDebugEnabled()) {
Loggers.MAIN.debug("MCP push, sending resources: {}", resources);
}

for (StreamObserver<Resources> observer : connnections.values()) {
observer.onNext(resources);
}
}
}

private String convertName(Service service) {

String serviceName = NamingUtils.getServiceName(service.getName()) + ".sn";

if (!Constants.DEFAULT_GROUP.equals(NamingUtils.getGroupName(service.getName()))) {
serviceName = serviceName + NamingUtils.getGroupName(service.getName()) + ".gn";
}

if (!Constants.DEFAULT_NAMESPACE_ID.equals(service.getNamespaceId())) {
serviceName = serviceName + service.getNamespaceId() + ".ns";
}
return serviceName;
}

private Resource convertService(Service service) {

String serviceName = convertName(service);

ServiceEntry.Builder serviceEntryBuilder = ServiceEntry.newBuilder()
.setResolution(ServiceEntry.Resolution.STATIC)
.setLocation(ServiceEntry.Location.MESH_INTERNAL)
.addHosts(serviceName + "." + SERVICE_NAME_SPLITTER)
.addPorts(Port.newBuilder().setNumber(8848).setName("http").setProtocol("HTTP").build());
.setResolution(ServiceEntry.Resolution.STATIC).setLocation(ServiceEntry.Location.MESH_INTERNAL)
.addHosts(serviceName + "." + SERVICE_NAME_SPLITTER)
.addPorts(Port.newBuilder().setNumber(8848).setName("http").setProtocol("HTTP").build());

for (Instance instance : service.allIPs()) {

if (!instance.isHealthy() || !instance.isEnabled()) {
continue;
}

ServiceEntry.Endpoint endpoint =
ServiceEntry.Endpoint.newBuilder()
.setAddress(instance.getIp())
.setWeight((int) instance.getWeight())
.putAllLabels(instance.getMetadata())
.putPorts("http", instance.getPort())
.build();


ServiceEntry.Endpoint endpoint = ServiceEntry.Endpoint.newBuilder().setAddress(instance.getIp())
.setWeight((int) instance.getWeight()).putAllLabels(instance.getMetadata())
.putPorts("http", instance.getPort()).build();

serviceEntryBuilder.addEndpoints(endpoint);
}

ServiceEntry serviceEntry = serviceEntryBuilder.build();

Any any = Any.newBuilder()
.setValue(serviceEntry.toByteString())
.setTypeUrl(MESSAGE_TYPE_URL)
.build();

Metadata metadata = Metadata.newBuilder()
.setName(SERVICE_NAME_SPLITTER + "/" + serviceName)
.putAllAnnotations(service.getMetadata())
.putAnnotations("virtual", "1")
.build();

Resource resource = Resource.newBuilder()
.setBody(any)
.setMetadata(metadata)
.build();


Any any = Any.newBuilder().setValue(serviceEntry.toByteString()).setTypeUrl(MESSAGE_TYPE_URL).build();

Metadata metadata = Metadata.newBuilder().setName(SERVICE_NAME_SPLITTER + "/" + serviceName)
.putAllAnnotations(service.getMetadata()).putAnnotations("virtual", "1").build();

Resource resource = Resource.newBuilder().setBody(any).setMetadata(metadata).build();

return resource;
}

@Override
public StreamObserver<RequestResources> establishResourceStream(StreamObserver<Resources> responseObserver) {

int id = connectIdGenerator.incrementAndGet();
connnections.put(id, responseObserver);

return new StreamObserver<RequestResources>() {

private int connectionId = id;

@Override
public void onNext(RequestResources value) {

Loggers.MAIN.info("receiving request, sink: {}, type: {}", value.getSinkNode(), value.getCollection());

if (value.getErrorDetail() != null && value.getErrorDetail().getCode() != 0) {

Loggers.MAIN.error("NACK error code: {}, message: {}", value.getErrorDetail().getCode()
, value.getErrorDetail().getMessage());
Loggers.MAIN.error("NACK error code: {}, message: {}", value.getErrorDetail().getCode(),
value.getErrorDetail().getMessage());
return;
}

if (StringUtils.isNotBlank(value.getResponseNonce())) {
// This is a response:
Loggers.MAIN.info("ACK nonce: {}, type: {}", value.getResponseNonce(), value.getCollection());
return;
}

if (!CollectionTypes.SERVICE_ENTRY.equals(value.getCollection())) {
// Return empty resources for other types:
Resources resources = Resources.newBuilder()
.setCollection(value.getCollection())
.setNonce(String.valueOf(System.currentTimeMillis()))
.build();

Resources resources = Resources.newBuilder().setCollection(value.getCollection())
.setNonce(String.valueOf(System.currentTimeMillis())).build();

responseObserver.onNext(resources);
}
}

@Override
public void onError(Throwable t) {
Loggers.MAIN.error("stream error.", t);
connnections.remove(connectionId);
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataSto

@PostConstruct
public void init() {
GlobalExecutor.submit(loadDataTask);
GlobalExecutor.submitLoadDataTask(loadDataTask);
GlobalExecutor.submitDistroNotifyTask(notifier);
}

Expand All @@ -117,7 +117,7 @@ public void run() {
try {
load();
if (!initialized) {
GlobalExecutor.submit(this, globalConfig.getLoadDataRetryDelayMillis());
GlobalExecutor.submitLoadDataTask(this, globalConfig.getLoadDataRetryDelayMillis());
} else {
Loggers.DISTRO.info("load data success");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package com.alibaba.nacos.naming.consistency.persistent.raft;

import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.ClassUtils;
import com.alibaba.nacos.naming.NamingApp;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
Expand Down Expand Up @@ -67,8 +71,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -99,17 +101,9 @@ public class RaftCore {

public static final String API_GET_PEER = UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft/peer";

private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);

t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.raft.notifier");

return t;
}
});
private final ScheduledExecutorService executor = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.raft.notifier"));

public static final Lock OPERATE_LOCK = new ReentrantLock();

Expand Down
Loading

0 comments on commit 3f145e6

Please sign in to comment.