Skip to content

Commit

Permalink
DUBBO-431 zookeeper注册中心增加curator集成
Browse files Browse the repository at this point in the history
git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@1966 1a56cb94-b969-4eaa-88fa-be21384802f2
  • Loading branch information
william.liangf committed Jun 12, 2012
1 parent 1e4bf76 commit 9144446
Show file tree
Hide file tree
Showing 19 changed files with 548 additions and 72 deletions.
29 changes: 29 additions & 0 deletions dubbo-common/src/main/java/com/alibaba/dubbo/common/URL.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,35 @@ public int getPort(int defaultPort) {
public String getAddress() {
return port <= 0 ? host : host + ":" + port;
}

public String getBackupAddress() {
return getBackupAddress(0);
}

public String getBackupAddress(int defaultPort) {
StringBuilder address = new StringBuilder(appendDefaultPort(getAddress(), defaultPort));
String[] backups = getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
for (String backup : backups) {
address.append(",");
address.append(appendDefaultPort(backup, defaultPort));
}
}
return address.toString();
}

private String appendDefaultPort(String address, int defaultPort) {
if (address != null && address.length() > 0
&& defaultPort > 0) {
int i = address.indexOf(':');
if (i < 0) {
return address + ":" + defaultPort;
} else if (Integer.parseInt(address.substring(i + 1)) == 0) {
return address.substring(0, i + 1) + defaultPort;
}
}
return address;
}

public String getPath() {
return path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
import java.util.Map;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.config.support.Parameter;
import com.alibaba.dubbo.registry.support.AbstractRegistryFactory;
import com.alibaba.dubbo.remoting.Transporter;

/**
* RegistryConfig
Expand Down Expand Up @@ -208,9 +206,9 @@ public String getTransporter() {

public void setTransporter(String transporter) {
checkName("transporter", transporter);
if(transporter != null && transporter.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(transporter)){
/*if(transporter != null && transporter.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(transporter)){
throw new IllegalStateException("No such transporter type : " + transporter);
}
}*/
this.transporter = transporter;
}

Expand All @@ -220,9 +218,9 @@ public String getServer() {

public void setServer(String server) {
checkName("server", server);
if(server != null && server.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(server)){
/*if(server != null && server.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(server)){
throw new IllegalStateException("No such server type : " + server);
}
}*/
this.server = server;
}

Expand All @@ -232,9 +230,9 @@ public String getClient() {

public void setClient(String client) {
checkName("client", client);
if(client != null && client.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(client)){
/*if(client != null && client.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(client)){
throw new IllegalStateException("No such client type : " + client);
}
}*/
this.client = client;
}

Expand Down
9 changes: 3 additions & 6 deletions dubbo-registry/dubbo-registry-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-remoting-zookeeper</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
Expand All @@ -36,6 +31,10 @@
import com.alibaba.dubbo.common.utils.UrlUtils;
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.FailbackRegistry;
import com.alibaba.dubbo.remoting.zookeeper.ChildListener;
import com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient;
import com.alibaba.dubbo.remoting.zookeeper.StateListener;
import com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter;
import com.alibaba.dubbo.rpc.RpcException;

/**
Expand All @@ -53,49 +52,35 @@ public class ZookeeperRegistry extends FailbackRegistry {

private final String root;

// private final boolean auth;

// private final List<ACL> acl; // zkclient 0.1.0 unsupported

private final Set<String> anyServices = new ConcurrentHashSet<String>();

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, IZkChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, IZkChildListener>>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();

private final ZkClient zkClient;
private final ZookeeperClient zkClient;

private volatile KeeperState zkState = KeeperState.SyncConnected;

public ZookeeperRegistry(URL url) {
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
// this.auth = url.getUsername() != null && url.getUsername().length() > 0
// && url.getPassword() != null && url.getPassword().length() > 0;
// this.acl = auth ? Ids.CREATOR_ALL_ACL : Ids.OPEN_ACL_UNSAFE;
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
StringBuilder address = new StringBuilder(appendDefaultPort(url.getAddress()));
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
for (String backup : backups) {
address.append(",");
address.append(appendDefaultPort(backup));
}
}
zkClient = new ZkClient(address.toString());
zkClient.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
ZookeeperRegistry.this.zkState = state;
}
public void handleNewSession() throws Exception {
recover();
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}

public boolean isAvailable() {
return zkState == KeeperState.SyncConnected;
return zkClient.isConnected();
}

public void destroy() {
Expand All @@ -109,12 +94,7 @@ public void destroy() {

protected void doRegister(URL url) {
try {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
zkClient.createPersistent(toCategoryPath(url), true);
zkClient.createEphemeral(toUrlPath(url));
} else {
zkClient.createPersistent(toUrlPath(url), true);
}
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
Expand All @@ -132,15 +112,15 @@ protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, IZkChildListener> listeners = zkListeners.get(url);
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, IZkChildListener>());
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
IZkChildListener zkListener = listeners.get(listener);
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
if (! anyServices.contains(child)) {
anyServices.add(child);
Expand All @@ -152,7 +132,7 @@ public void handleChildChange(String parentPath, List<String> currentChilds) thr
});
zkListener = listeners.get(listener);
}
List<String> services = zkClient.subscribeChildChanges(root, zkListener);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
anyServices.addAll(services);
for (String service : services) {
Expand All @@ -163,21 +143,21 @@ public void handleChildChange(String parentPath, List<String> currentChilds) thr
} else {
List<String> providers = new ArrayList<String>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, IZkChildListener> listeners = zkListeners.get(url);
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, IZkChildListener>());
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
IZkChildListener zkListener = listeners.get(listener);
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
ZookeeperRegistry.this.notify(url, listener, toUrls(url, currentChilds));
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrls(url, currentChilds));
}
});
zkListener = listeners.get(listener);
}
List<String> children = zkClient.subscribeChildChanges(path, zkListener);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
providers.addAll(children);
}
Expand All @@ -191,11 +171,11 @@ public void handleChildChange(String parentPath, List<String> currentChilds) thr
}

protected void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, IZkChildListener> listeners = zkListeners.get(url);
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
IZkChildListener zkListener = listeners.get(listener);
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
zkClient.unsubscribeChildChanges(toUrlPath(url), zkListener);
zkClient.removeChildListener(toUrlPath(url), zkListener);
}
}
}
Expand Down Expand Up @@ -264,7 +244,7 @@ private String toUrlPath(URL url) {
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}

private List<URL> toUrls(URL consumer, List<String> providers) throws KeeperException, InterruptedException {
private List<URL> toUrls(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<URL>();
if (providers != null && providers.size() > 0) {
for (String provider : providers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.registry.Registry;
import com.alibaba.dubbo.registry.support.AbstractRegistryFactory;
import com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter;

/**
* ZookeeperRegistryFactory.
*
* @author william.liangf
*/
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

private ZookeeperTransporter zookeeperTransporter;

public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url);
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}

public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}

}
50 changes: 50 additions & 0 deletions dubbo-remoting/dubbo-remoting-zookeeper/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<!--
- Copyright 1999-2011 Alibaba Group.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-remoting</artifactId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<artifactId>dubbo-remoting-zookeeper</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>The zookeeper remoting module of dubbo project</description>
<properties>
<skip_maven_deploy>true</skip_maven_deploy>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.alibaba.dubbo.remoting.zookeeper;

import java.util.List;

public interface ChildListener {

void childChanged(String path, List<String> children);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.alibaba.dubbo.remoting.zookeeper;

public interface StateListener {

int DISCONNECTED = 0;

int CONNECTED = 1;

int RECONNECTED = 2;

void stateChanged(int connected);

}
Loading

0 comments on commit 9144446

Please sign in to comment.