Skip to content

Commit

Permalink
[ISSUE alibaba#4364] Fix Cluster member state isn't updated to 'DOWN'…
Browse files Browse the repository at this point in the history
… after the node becomes down (alibaba#4371)

* fix: fix issue alibaba#4364

* refactor: fixed node changes that could not trigger event publishing

* fix: fixed a problem with frequent releases of events
  • Loading branch information
chuntaojun committed Dec 16, 2020
1 parent 97efd23 commit 5b2d76f
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
*/
public class ExceptionUtil {

/**
* Represents an empty exception, that is, no exception occurs, only a constant.
*/
public static final Exception NONE_EXCEPTION = new RuntimeException("");

public static String getAllExceptionMsg(Throwable e) {
Throwable cause = e;
StringBuilder strBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.IPUtil;
import com.alibaba.nacos.common.utils.Objects;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -29,7 +30,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
Expand All @@ -40,16 +40,10 @@
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class MemberUtils {

private static final String TARGET_MEMBER_CONNECT_REFUSE_ERRMSG = "Connection refused";

private static ServerMemberManager manager;

public static void setManager(ServerMemberManager manager) {
MemberUtils.manager = manager;
}
public class MemberUtil {

protected static final String TARGET_MEMBER_CONNECT_REFUSE_ERRMSG = "Connection refused";

/**
* Information copy.
*
Expand Down Expand Up @@ -117,17 +111,19 @@ public static Collection<Member> multiParse(Collection<String> addresses) {
*
* @param member {@link Member}
*/
public static void onSuccess(Member member) {
Member cloneMember = new Member();
copy(member, cloneMember);
public static void onSuccess(final ServerMemberManager manager, final Member member) {
final NodeState old = member.getState();
manager.getMemberAddressInfos().add(member.getAddress());
cloneMember.setState(NodeState.UP);
cloneMember.setFailAccessCnt(0);
manager.update(cloneMember);
member.setState(NodeState.UP);
member.setFailAccessCnt(0);
if (!Objects.equals(old, member.getState())) {
manager.notifyMemberChange();
}
}

public static void onFail(Member member) {
onFail(member, null);
public static void onFail(final ServerMemberManager manager, final Member member) {
// To avoid null pointer judgments, pass in one NONE_EXCEPTION
onFail(manager, member, ExceptionUtil.NONE_EXCEPTION);
}

/**
Expand All @@ -136,21 +132,22 @@ public static void onFail(Member member) {
* @param member {@link Member}
* @param ex {@link Throwable}
*/
public static void onFail(Member member, Throwable ex) {
Member cloneMember = new Member();
copy(member, cloneMember);
public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
manager.getMemberAddressInfos().remove(member.getAddress());
cloneMember.setState(NodeState.SUSPICIOUS);
cloneMember.setFailAccessCnt(member.getFailAccessCnt() + 1);
final NodeState old = member.getState();
member.setState(NodeState.SUSPICIOUS);
member.setFailAccessCnt(member.getFailAccessCnt() + 1);
int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);

// If the number of consecutive failures to access the target node reaches
// a maximum, or the link request is rejected, the state is directly down
if (cloneMember.getFailAccessCnt() > maxFailAccessCnt || StringUtils
if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
cloneMember.setState(NodeState.DOWN);
member.setState(NodeState.DOWN);
}
if (!Objects.equals(old, member.getState())) {
manager.notifyMemberChange();
}
manager.update(cloneMember);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
public ServerMemberManager(ServletContext servletContext) throws Exception {
this.serverList = new ConcurrentSkipListMap<>();
EnvUtil.setContextPath(servletContext.getContextPath());
MemberUtils.setManager(this);

init();
}
Expand All @@ -133,7 +132,7 @@ protected void init() throws NacosException {
Loggers.CORE.info("Nacos-related cluster resource initialization");
this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
this.localAddress = InetUtils.getSelfIP() + ":" + port;
this.self = MemberUtils.singleParse(this.localAddress);
this.self = MemberUtil.singleParse(this.localAddress);
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
serverList.put(self.getAddress(), self);

Expand Down Expand Up @@ -196,7 +195,7 @@ public Class<? extends Event> subscribeType() {
* member information update.
*
* @param newMember {@link Member}
* @return update is success
* @return update is successw
*/
public boolean update(Member newMember) {
Loggers.CLUSTER.debug("member information update : {}", newMember);
Expand All @@ -210,19 +209,23 @@ public boolean update(Member newMember) {
if (NodeState.DOWN.equals(newMember.getState())) {
memberAddressInfos.remove(newMember.getAddress());
}
boolean isPublishChangeEvent = MemberUtils.isBasicInfoChanged(newMember, member);
boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member);
newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
MemberUtils.copy(newMember, member);
MemberUtil.copy(newMember, member);
if (isPublishChangeEvent) {
// member basic data changes and all listeners need to be notified
NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
notifyMemberChange();
}
return member;
});

return true;
}

void notifyMemberChange() {
NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
}

/**
* Whether the node exists within the cluster.
*
Expand Down Expand Up @@ -324,7 +327,7 @@ synchronized boolean memberChange(Collection<Member> members) {
// <important> need to put the event publication into a synchronized block to ensure
// that the event publication is sequential
if (hasChange) {
MemberUtils.syncToFile(finalMembers);
MemberUtil.syncToFile(finalMembers);
Event event = MembersChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event);
}
Expand Down Expand Up @@ -466,12 +469,12 @@ public void onReceive(RestResult<String> result) {
return;
}
if (result.ok()) {
MemberUtils.onSuccess(target);
MemberUtil.onSuccess(ServerMemberManager.this, target);
} else {
Loggers.CLUSTER
.warn("failed to report new info to target node : {}, result : {}",
target.getAddress(), result);
MemberUtils.onFail(target);
MemberUtil.onFail(ServerMemberManager.this, target);
}
}

Expand All @@ -481,7 +484,7 @@ public void onError(Throwable throwable) {
.error("failed to report new info to target node : {}, error : {}",
target.getAddress(),
ExceptionUtil.getAllExceptionMsg(throwable));
MemberUtils.onFail(target, throwable);
MemberUtil.onFail(ServerMemberManager.this, target, throwable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
Expand Down Expand Up @@ -147,7 +147,7 @@ private void syncFromAddressUrl() throws Exception {
isAddressServerHealth = true;
Reader reader = new StringReader(result.getData());
try {
afterLookup(MemberUtils.readServerConf(EnvUtil.analyzeClusterConf(reader)));
afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
} catch (Throwable e) {
Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
ExceptionUtil.getAllExceptionMsg(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.file.FileChangeEvent;
import com.alibaba.nacos.sys.file.FileWatcher;
Expand Down Expand Up @@ -74,7 +74,7 @@ private void readClusterConfFromDisk() {
Collection<Member> tmpMembers = new ArrayList<>();
try {
List<String> tmp = EnvUtil.readClusterConf();
tmpMembers = MemberUtils.readServerConf(tmp);
tmpMembers = MemberUtil.readServerConf(tmp);
} catch (Throwable e) {
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.alibaba.nacos.core.cluster.lookup;

import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.InetUtils;

Expand All @@ -34,7 +34,7 @@ public class StandaloneMemberLookup extends AbstractMemberLookup {
public void start() {
if (start.compareAndSet(false, true)) {
String url = InetUtils.getSelfIP() + ":" + EnvUtil.getPort();
afterLookup(MemberUtils.readServerConf(Collections.singletonList(url)));
afterLookup(MemberUtil.readServerConf(Collections.singletonList(url)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.alibaba.nacos.common.model.RestResultUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.env.EnvUtil;
Expand Down Expand Up @@ -150,7 +150,7 @@ public RestResult<String> switchLookup(@RequestParam(name = "type") String type)
*/
@PostMapping("/server/leave")
public RestResult<String> leave(@RequestBody Collection<String> params) throws Exception {
Collection<Member> memberList = MemberUtils.multiParse(params);
Collection<Member> memberList = MemberUtil.multiParse(params);
memberManager.memberLeave(memberList);
final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientBeanHolder.getNacosAsyncRestTemplate(Loggers.CLUSTER);
final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {
Expand All @@ -169,12 +169,12 @@ public void onReceive(RestResult<String> result) {
if (result.ok()) {
LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER,
"The node : [{}] success to process the request", member);
MemberUtils.onSuccess(member);
MemberUtil.onSuccess(memberManager, member);
} else {
Loggers.CLUSTER
.warn("The node : [{}] failed to process the request, response is : {}", member,
result);
MemberUtils.onFail(member);
MemberUtil.onFail(memberManager, member);
}
} finally {
latch.countDown();
Expand All @@ -185,7 +185,7 @@ public void onReceive(RestResult<String> result) {
public void onError(Throwable throwable) {
try {
Loggers.CLUSTER.error("Failed to communicate with the node : {}", member);
MemberUtils.onFail(member);
MemberUtil.onFail(memberManager, member);
} finally {
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ClassUtils;
Expand Down Expand Up @@ -73,7 +73,7 @@ public static Set<String> toCPMembersInfo(Collection<Member> members) {
Set<String> nodes = new HashSet<>();
members.forEach(member -> {
final String ip = member.getIp();
final int raftPort = MemberUtils.calculateRaftPort(member);
final int raftPort = MemberUtil.calculateRaftPort(member);
nodes.add(ip + ":" + raftPort);
});
return nodes;
Expand Down
Loading

0 comments on commit 5b2d76f

Please sign in to comment.