Skip to content

Commit

Permalink
DUBBO-8 针对 DubboCodec 优化线程模型
Browse files Browse the repository at this point in the history
收到一个 Request/Response 时,把消息体保存起来不立即做反序列化,派发到业务线程后再把消息体进行反序列化
  • Loading branch information
kimi committed Jul 1, 2012
1 parent 20c1f7f commit fbf1387
Show file tree
Hide file tree
Showing 10 changed files with 611 additions and 170 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.common.utils;

/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public abstract class Assert {

protected Assert() {}

public static void notNull(Object obj, String message) {
if (obj == null) {
throw new IllegalArgumentException(message);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.remoting;

/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public interface Decodeable {

public void decode() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.StreamUtils;
import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
Expand All @@ -39,6 +35,7 @@
import com.alibaba.dubbo.remoting.exchange.Response;
import com.alibaba.dubbo.remoting.exchange.support.DefaultFuture;
import com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec;
import com.alibaba.dubbo.remoting.transport.CodecSupport;

/**
* ExchangeCodec.
Expand Down Expand Up @@ -69,23 +66,6 @@ public class ExchangeCodec extends TelnetCodec {

protected static final int SERIALIZATION_MASK = 0x1f;

private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
static {
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
for (String name : supportedExtensions) {
Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
byte idByte = serialization.getContentTypeId();
if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
logger.error("Serialization extension " + serialization.getClass().getName()
+ " has duplicate id to Serialization extension "
+ ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
+ ", ignore this Serialization extension");
continue;
}
ID_SERIALIZATION_MAP.put(idByte, serialization);
}
}

public Short getMagicCode() {
return MAGIC;
}
Expand Down Expand Up @@ -144,12 +124,13 @@ protected Object decode(Channel channel, InputStream is, int readable, byte[] he
if( readable != tt )
is = StreamUtils.limitedInputStream(is, len);

return decodeBody(channel, is, header);
}

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
try {
byte flag = header[2], proto = (byte)( flag & SERIALIZATION_MASK );
Serialization s = getSerializationById(proto);
if (s == null) {
s = getSerialization(channel);
}
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
ObjectInput in = s.deserialize(channel.getUrl(), is);
// get request id.
long id = Bytes.bytes2long(header, 4);
Expand Down Expand Up @@ -332,10 +313,6 @@ protected void encodeResponse(Channel channel, OutputStream os, Response res) th
}
}

private static final Serialization getSerializationById(Byte id) {
return ID_SERIALIZATION_MAP.get(id);
}

@Override
protected Object decodeData(ObjectInput in) throws IOException {
return decodeRequestData(in);
Expand Down Expand Up @@ -393,7 +370,7 @@ protected Object decodeData(Channel channel, ObjectInput in) throws IOException
return decodeRequestData(channel ,in);
}

private Object decodeEventData(Channel channel, ObjectInput in) throws IOException {
protected Object decodeEventData(Channel channel, ObjectInput in) throws IOException {
try {
return in.readObject();
} catch (ClassNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.dubbo.remoting.exchange.ExchangeHandler;
import com.alibaba.dubbo.remoting.exchange.ExchangeServer;
import com.alibaba.dubbo.remoting.exchange.Exchanger;
import com.alibaba.dubbo.remoting.transport.DecodeHandler;

/**
* DefaultMessenger
Expand All @@ -33,11 +34,11 @@ public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new HeaderExchangeHandler(handler)));
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new HeaderExchangeHandler(handler)));
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.IOException;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.Serialization;
Expand All @@ -35,8 +34,7 @@ public abstract class AbstractCodec implements Codec {
private final Logger logger = LoggerFactory.getLogger(getClass());

protected Serialization getSerialization(Channel channel) {
Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(channel.getUrl().getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
return serialization;
return CodecSupport.getSerialization(channel.getUrl());
}

protected void checkPayload(Channel channel, long size) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.remoting.transport;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.Serialization;

/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class CodecSupport {

private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class);

private CodecSupport() {
}

private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();

static {
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
for (String name : supportedExtensions) {
Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
byte idByte = serialization.getContentTypeId();
if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
logger.error("Serialization extension " + serialization.getClass().getName()
+ " has duplicate id to Serialization extension "
+ ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
+ ", ignore this Serialization extension");
continue;
}
ID_SERIALIZATION_MAP.put(idByte, serialization);
}
}

public static Serialization getSerializationById(Byte id) {
return ID_SERIALIZATION_MAP.get(id);
}

public static Serialization getSerialization(URL url) {
return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
}

public static Serialization getSerialization(URL url, Byte id) {
Serialization result = getSerializationById(id);
if (result == null) {
result = getSerialization(url);
}
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.remoting.transport;

import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.Assert;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Decodeable;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.Response;

/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class DecodeHandler implements ChannelHandlerDelegate {

private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);

private ChannelHandler handler;

public DecodeHandler(ChannelHandler handler) {
Assert.notNull(handler, "handler == null");
this.handler = handler;
}

public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate)handler).getHandler();
}
return handler;
}

public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}

public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(channel);
}

public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(channel, message);
}

public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}

if (message instanceof Request) {
decode(((Request)message).getData());
}

if (message instanceof Response) {
decode( ((Response)message).getResult());
}

handler.received(channel, message);
}

public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}

private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
try {
((Decodeable)message).decode();
if (log.isDebugEnabled()) {
log.debug(new StringBuilder(32).append("Decode decodeable message ")
.append(message.getClass().getName()).toString());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn(
new StringBuilder(32)
.append("Call Decodeable.decode failed: ")
.append(e.getMessage()).toString(),
e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode

}
Loading

0 comments on commit fbf1387

Please sign in to comment.