From ea1a36d9a442b809d6dd0953513c073bcf96666b Mon Sep 17 00:00:00 2001 From: ZhouYiMing Date: Mon, 21 Oct 2013 09:07:40 +0800 Subject: [PATCH] clean up --- .../loadbalance/AbstractLoadBalance.java | 63 ++++++++ .../ConsistentHashLoadBalance.java | 149 ++++++++++++++++++ .../loadbalance/LeastActiveLoadBalance.java | 86 ++++++++++ .../loadbalance/RandomLoadBalance.java | 64 ++++++++ .../loadbalance/RoundRobinLoadBalance.java | 93 +++++++++++ 5 files changed, 455 insertions(+) create mode 100644 dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java create mode 100644 dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java create mode 100644 dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java create mode 100644 dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java create mode 100644 dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java new file mode 100644 index 00000000000..8f53582e2e0 --- /dev/null +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java @@ -0,0 +1,63 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.dubbo.rpc.cluster.loadbalance; + +import java.util.List; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.rpc.Invoker; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.cluster.LoadBalance; + +/** + * AbstractLoadBalance + * + * @author william.liangf + */ +public abstract class AbstractLoadBalance implements LoadBalance { + + public Invoker select(List> invokers, URL url, Invocation invocation) { + if (invokers == null || invokers.size() == 0) + return null; + if (invokers.size() == 1) + return invokers.get(0); + return doSelect(invokers, url, invocation); + } + + protected abstract Invoker doSelect(List> invokers, URL url, Invocation invocation); + + protected int getWeight(Invoker invoker, Invocation invocation) { + int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); + if (weight > 0) { + long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L); + if (timestamp > 0L) { + int uptime = (int) (System.currentTimeMillis() - timestamp); + int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); + if (uptime > 0 && uptime < warmup) { + weight = calculateWarmupWeight(uptime, warmup, weight); + } + } + } + return weight; + } + + static int calculateWarmupWeight(int uptime, int warmup, int weight) { + int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) ); + return ww < 1 ? 1 : (ww > weight ? weight : ww); + } + +} \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java new file mode 100644 index 00000000000..30512f90bf1 --- /dev/null +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java @@ -0,0 +1,149 @@ +/* + * Copyright 1999-2012 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.dubbo.rpc.cluster.loadbalance; + +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.Invoker; + +/** + * ConsistentHashLoadBalance + * + * @author william.liangf + */ +public class ConsistentHashLoadBalance extends AbstractLoadBalance { + + private final ConcurrentMap> selectors = new ConcurrentHashMap>(); + + @SuppressWarnings("unchecked") + @Override + protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { + String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); + int identityHashCode = System.identityHashCode(invokers); + ConsistentHashSelector selector = (ConsistentHashSelector) selectors.get(key); + if (selector == null || selector.getIdentityHashCode() != identityHashCode) { + selectors.put(key, new ConsistentHashSelector(invokers, invocation.getMethodName(), identityHashCode)); + selector = (ConsistentHashSelector) selectors.get(key); + } + return selector.select(invocation); + } + + private static final class ConsistentHashSelector { + + private final TreeMap> virtualInvokers; + + private final int replicaNumber; + + private final int identityHashCode; + + private final int[] argumentIndex; + + public ConsistentHashSelector(List> invokers, String methodName, int identityHashCode) { + this.virtualInvokers = new TreeMap>(); + this.identityHashCode = System.identityHashCode(invokers); + URL url = invokers.get(0).getUrl(); + this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); + String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); + argumentIndex = new int[index.length]; + for (int i = 0; i < index.length; i ++) { + argumentIndex[i] = Integer.parseInt(index[i]); + } + for (Invoker invoker : invokers) { + for (int i = 0; i < replicaNumber / 4; i++) { + byte[] digest = md5(invoker.getUrl().toFullString() + i); + for (int h = 0; h < 4; h++) { + long m = hash(digest, h); + virtualInvokers.put(m, invoker); + } + } + } + } + + public int getIdentityHashCode() { + return identityHashCode; + } + + public Invoker select(Invocation invocation) { + String key = toKey(invocation.getArguments()); + byte[] digest = md5(key); + Invoker invoker = sekectForKey(hash(digest, 0)); + return invoker; + } + + private String toKey(Object[] args) { + StringBuilder buf = new StringBuilder(); + for (int i : argumentIndex) { + if (i >= 0 && i < args.length) { + buf.append(args[i]); + } + } + return buf.toString(); + } + + private Invoker sekectForKey(long hash) { + Invoker invoker; + Long key = hash; + if (!virtualInvokers.containsKey(key)) { + SortedMap> tailMap = virtualInvokers.tailMap(key); + if (tailMap.isEmpty()) { + key = virtualInvokers.firstKey(); + } else { + key = tailMap.firstKey(); + } + } + invoker = virtualInvokers.get(key); + return invoker; + } + + private long hash(byte[] digest, int number) { + return (((long) (digest[3 + number * 4] & 0xFF) << 24) + | ((long) (digest[2 + number * 4] & 0xFF) << 16) + | ((long) (digest[1 + number * 4] & 0xFF) << 8) + | (digest[0 + number * 4] & 0xFF)) + & 0xFFFFFFFFL; + } + + private byte[] md5(String value) { + MessageDigest md5; + try { + md5 = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException(e.getMessage(), e); + } + md5.reset(); + byte[] bytes = null; + try { + bytes = value.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException(e.getMessage(), e); + } + md5.update(bytes); + return md5.digest(); + } + + } + +} diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java new file mode 100644 index 00000000000..c9ebf79eb7a --- /dev/null +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/LeastActiveLoadBalance.java @@ -0,0 +1,86 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.dubbo.rpc.cluster.loadbalance; + +import java.util.List; +import java.util.Random; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.Invoker; +import com.alibaba.dubbo.rpc.RpcStatus; + +/** + * LeastActiveLoadBalance + * + * @author william.liangf + */ +public class LeastActiveLoadBalance extends AbstractLoadBalance { + + public static final String NAME = "leastactive"; + + private final Random random = new Random(); + + protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { + int length = invokers.size(); // 总个数 + int leastActive = -1; // 最小的活跃数 + int leastCount = 0; // 相同最小活跃数的个数 + int[] leastIndexs = new int[length]; // 相同最小活跃数的下标 + int totalWeight = 0; // 总权重 + int firstWeight = 0; // 第一个权重,用于于计算是否相同 + boolean sameWeight = true; // 是否所有权重相同 + for (int i = 0; i < length; i++) { + Invoker invoker = invokers.get(i); + int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数 + int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重 + if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始 + leastActive = active; // 记录最小活跃数 + leastCount = 1; // 重新统计相同最小活跃数的个数 + leastIndexs[0] = i; // 重新记录最小活跃数下标 + totalWeight = weight; // 重新累计总权重 + firstWeight = weight; // 记录第一个权重 + sameWeight = true; // 还原权重相同标识 + } else if (active == leastActive) { // 累计相同最小的活跃数 + leastIndexs[leastCount ++] = i; // 累计相同最小活跃数下标 + totalWeight += weight; // 累计总权重 + // 判断所有权重是否一样 + if (sameWeight && i > 0 + && weight != firstWeight) { + sameWeight = false; + } + } + } + // assert(leastCount > 0) + if (leastCount == 1) { + // 如果只有一个最小则直接返回 + return invokers.get(leastIndexs[0]); + } + if (! sameWeight && totalWeight > 0) { + // 如果权重不相同且权重大于0则按总权重数随机 + int offsetWeight = random.nextInt(totalWeight); + // 并确定随机值落在哪个片断上 + for (int i = 0; i < leastCount; i++) { + int leastIndex = leastIndexs[i]; + offsetWeight -= getWeight(invokers.get(leastIndex), invocation); + if (offsetWeight <= 0) + return invokers.get(leastIndex); + } + } + // 如果权重相同或权重为0则均等随机 + return invokers.get(leastIndexs[random.nextInt(leastCount)]); + } +} \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java new file mode 100644 index 00000000000..de5002c489a --- /dev/null +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RandomLoadBalance.java @@ -0,0 +1,64 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.dubbo.rpc.cluster.loadbalance; + +import java.util.List; +import java.util.Random; + +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.Invoker; + +/** + * random load balance. + * + * @author qianlei + * @author william.liangf + */ +public class RandomLoadBalance extends AbstractLoadBalance { + + public static final String NAME = "random"; + + private final Random random = new Random(); + + protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { + int length = invokers.size(); // 总个数 + int totalWeight = 0; // 总权重 + boolean sameWeight = true; // 权重是否都一样 + for (int i = 0; i < length; i++) { + int weight = getWeight(invokers.get(i), invocation); + totalWeight += weight; // 累计总权重 + if (sameWeight && i > 0 + && weight != getWeight(invokers.get(i - 1), invocation)) { + sameWeight = false; // 计算所有权重是否一样 + } + } + if (totalWeight > 0 && ! sameWeight) { + // 如果权重不相同且权重大于0则按总权重数随机 + int offset = random.nextInt(totalWeight); + // 并确定随机值落在哪个片断上 + for (int i = 0; i < length; i++) { + offset -= getWeight(invokers.get(i), invocation); + if (offset < 0) { + return invokers.get(i); + } + } + } + // 如果权重相同或权重为0则均等随机 + return invokers.get(random.nextInt(length)); + } + +} \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java new file mode 100644 index 00000000000..db2dd0d94a6 --- /dev/null +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/loadbalance/RoundRobinLoadBalance.java @@ -0,0 +1,93 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.dubbo.rpc.cluster.loadbalance; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.utils.AtomicPositiveInteger; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.Invoker; + +/** + * Round robin load balance. + * + * @author qian.lei + * @author william.liangf + * @author zhou.yiming + */ +public class RoundRobinLoadBalance extends AbstractLoadBalance { + + public static final String NAME = "roundrobin"; + + private final ConcurrentMap sequences = new ConcurrentHashMap(); + + private final ConcurrentMap weightSequences = new ConcurrentHashMap(); + + protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { + String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); + int length = invokers.size(); // 总个数 + int maxWeight = 0; // 最大权重 + int minWeight = Integer.MAX_VALUE; // 最小权重 + for (int i = 0; i < length; i++) { + int weight = getWeight(invokers.get(i), invocation); + maxWeight = Math.max(maxWeight, weight); // 累计最大权重 + minWeight = Math.min(minWeight, weight); // 累计最小权重 + } + if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样 + AtomicPositiveInteger weightSequence = weightSequences.get(key); + if (weightSequence == null) { + weightSequences.putIfAbsent(key, new AtomicPositiveInteger()); + weightSequence = weightSequences.get(key); + } + + int currentWeight; + if(sequences.get(key).get() % length == 0) + { + currentWeight = weightSequence.getAndIncrement() % maxWeight; + } + else + { + currentWeight = weightSequence.get() % maxWeight; + } + + List> weightInvokers = new ArrayList>(); + for (Invoker invoker : invokers) { // 筛选权重大于当前权重基数的Invoker + if (getWeight(invoker, invocation) > currentWeight) { + weightInvokers.add(invoker); + } + } + int weightLength = weightInvokers.size(); + if (weightLength == 1) { + return weightInvokers.get(0); + } else if (weightLength > 1) { + invokers = weightInvokers; + length = invokers.size(); + } + } + AtomicPositiveInteger sequence = sequences.get(key); + if (sequence == null) { + sequences.putIfAbsent(key, new AtomicPositiveInteger()); + sequence = sequences.get(key); + } + // 取模轮循 + return invokers.get(sequence.getAndIncrement() % length); + } + +}