Skip to content

Commit

Permalink
feature : support least active load balance (apache#2676)
Browse files Browse the repository at this point in the history
  • Loading branch information
ph3636 committed Oct 20, 2020
1 parent fc3640a commit 7ea7399
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 3 deletions.
93 changes: 93 additions & 0 deletions common/src/main/java/io/seata/common/rpc/RpcStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.common.rpc;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
* The state statistics.
*
* @author ph3636
*/
public class RpcStatus {

private static final ConcurrentMap<String, RpcStatus> SERVICE_STATUS_MAP = new ConcurrentHashMap<>();
private final AtomicLong active = new AtomicLong();
private final LongAdder total = new LongAdder();

private RpcStatus() {
}

/**
* get the RpcStatus of this service
*
* @param service the service
* @return RpcStatus
*/
public static RpcStatus getStatus(String service) {
return SERVICE_STATUS_MAP.computeIfAbsent(service, key -> new RpcStatus());
}

/**
* remove the RpcStatus of this service
*
* @param service the service
*/
public static void removeStatus(String service) {
SERVICE_STATUS_MAP.remove(service);
}

/**
* begin count
*
* @param service the service
*/
public static void beginCount(String service) {
getStatus(service).active.incrementAndGet();
}

/**
* end count
*
* @param service the service
*/
public static void endCount(String service) {
RpcStatus rpcStatus = getStatus(service);
rpcStatus.active.decrementAndGet();
rpcStatus.total.increment();
}

/**
* get active.
*
* @return active
*/
public long getActive() {
return active.get();
}

/**
* get total.
*
* @return total
*/
public long getTotal() {
return total.longValue();
}
}
42 changes: 42 additions & 0 deletions common/src/test/java/io/seata/common/rpc/RpcStatusTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.seata.common.rpc;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* The state statistics test.
*
* @author ph3636
*/
public class RpcStatusTest {

public static final String SERVICE = "127.0.0.1:80";

@Test
public void getStatus() {
RpcStatus rpcStatus1 = RpcStatus.getStatus(SERVICE);
Assertions.assertNotNull(rpcStatus1);
RpcStatus rpcStatus2 = RpcStatus.getStatus(SERVICE);
Assertions.assertEquals(rpcStatus1, rpcStatus2);
}

@Test
public void removeStatus() {
RpcStatus old = RpcStatus.getStatus(SERVICE);
RpcStatus.removeStatus(SERVICE);
Assertions.assertNotEquals(RpcStatus.getStatus(SERVICE), old);
}

@Test
public void beginCount() {
RpcStatus.beginCount(SERVICE);
Assertions.assertEquals(RpcStatus.getStatus(SERVICE).getActive(), 1);
}

@Test
public void endCount() {
RpcStatus.endCount(SERVICE);
Assertions.assertEquals(RpcStatus.getStatus(SERVICE).getActive(), 0);
Assertions.assertEquals(RpcStatus.getStatus(SERVICE).getTotal(), 1);
}
}
28 changes: 28 additions & 0 deletions core/src/main/java/io/seata/core/rpc/hook/RpcHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.rpc.hook;

import io.seata.core.protocol.RpcMessage;

/**
* @author ph3636
*/
public interface RpcHook {

void doBeforeRequest(String remoteAddr, RpcMessage request);

void doAfterResponse(String remoteAddr, RpcMessage request, Object response);
}
35 changes: 35 additions & 0 deletions core/src/main/java/io/seata/core/rpc/hook/StatusRpcHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.rpc.hook;

import io.seata.common.rpc.RpcStatus;
import io.seata.core.protocol.RpcMessage;

/**
* @author ph3636
*/
public class StatusRpcHook implements RpcHook {

@Override
public void doBeforeRequest(String remoteAddr, RpcMessage request) {
RpcStatus.beginCount(remoteAddr);
}

@Override
public void doAfterResponse(String remoteAddr, RpcMessage request, Object response) {
RpcStatus.endCount(remoteAddr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.thread.PositiveAtomicCounter;
import io.seata.core.protocol.MessageFuture;
Expand All @@ -28,6 +29,7 @@
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.Disposable;
import io.seata.core.rpc.hook.RpcHook;
import io.seata.core.rpc.processor.Pair;
import io.seata.core.rpc.processor.RemotingProcessor;
import org.slf4j.Logger;
Expand All @@ -37,6 +39,7 @@
import java.lang.management.ManagementFactory;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -99,6 +102,8 @@ public abstract class AbstractNettyRemoting implements Disposable {
*/
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

protected final List<RpcHook> rpcHooks = EnhancedServiceLoader.loadAll(RpcHook.class);

public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
Expand Down Expand Up @@ -174,6 +179,9 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi

channelWritableCheck(channel, rpcMessage.getBody());

String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
doBeforeRpcHooks(remoteAddr, rpcMessage);

channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
Expand All @@ -185,7 +193,9 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi
});

try {
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
rpcMessage.getBody());
Expand All @@ -209,6 +219,9 @@ protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
}

doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);

channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
destroyChannel(future.channel());
Expand Down Expand Up @@ -349,4 +362,15 @@ private void channelWritableCheck(Channel channel, Object msg) {
*/
public abstract void destroyChannel(String serverAddress, Channel channel);

protected void doBeforeRpcHooks(String remoteAddr, RpcMessage request) {
for (RpcHook rpcHook: rpcHooks) {
rpcHook.doBeforeRequest(remoteAddr, request);
}
}

protected void doAfterRpcHooks(String remoteAddr, RpcMessage request, Object response) {
for (RpcHook rpcHook: rpcHooks) {
rpcHook.doAfterResponse(remoteAddr, request, response);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.seata.core.rpc.hook.StatusRpcHook
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import io.seata.common.loader.LoadLevel;
import io.seata.common.rpc.RpcStatus;

/**
* The type Least Active load balance.
*
* @author ph3636
*/
@LoadLevel(name = "LeastActiveLoadBalance")
public class LeastActiveLoadBalance extends AbstractLoadBalance {

@Override
protected <T> T doSelect(List<T> invokers, String xid) {
int length = invokers.size();
long leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[length];
for (int i = 0; i < length; i++) {
long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive();
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
} else if (active == leastActive) {
leastIndexes[leastCount++] = i;
}
}
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@

io.seata.discovery.loadbalance.RoundRobinLoadBalance
io.seata.discovery.loadbalance.RandomLoadBalance
io.seata.discovery.loadbalance.ConsistentHashLoadBalance
io.seata.discovery.loadbalance.ConsistentHashLoadBalance
io.seata.discovery.loadbalance.LeastActiveLoadBalance
Loading

0 comments on commit 7ea7399

Please sign in to comment.