From fbf13872e6f920e35ab614e11588b628017c412d Mon Sep 17 00:00:00 2001 From: kimi Date: Sun, 1 Jul 2012 17:48:33 +0800 Subject: [PATCH] =?UTF-8?q?DUBBO-8=20=E9=92=88=E5=AF=B9=20DubboCodec=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BA=BF=E7=A8=8B=E6=A8=A1=E5=9E=8B=20?= =?UTF-8?q?=E6=94=B6=E5=88=B0=E4=B8=80=E4=B8=AA=20Request/Response=20?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E6=8A=8A=E6=B6=88=E6=81=AF=E4=BD=93=E4=BF=9D?= =?UTF-8?q?=E5=AD=98=E8=B5=B7=E6=9D=A5=E4=B8=8D=E7=AB=8B=E5=8D=B3=E5=81=9A?= =?UTF-8?q?=E5=8F=8D=E5=BA=8F=E5=88=97=E5=8C=96=EF=BC=8C=E6=B4=BE=E5=8F=91?= =?UTF-8?q?=E5=88=B0=E4=B8=9A=E5=8A=A1=E7=BA=BF=E7=A8=8B=E5=90=8E=E5=86=8D?= =?UTF-8?q?=E6=8A=8A=E6=B6=88=E6=81=AF=E4=BD=93=E8=BF=9B=E8=A1=8C=E5=8F=8D?= =?UTF-8?q?=E5=BA=8F=E5=88=97=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../alibaba/dubbo/common/utils/Assert.java | 32 +++ .../alibaba/dubbo/remoting/Decodeable.java | 26 ++ .../exchange/codec/ExchangeCodec.java | 37 +-- .../support/header/HeaderExchanger.java | 5 +- .../remoting/transport/AbstractCodec.java | 4 +- .../remoting/transport/CodecSupport.java | 75 +++++ .../remoting/transport/DecodeHandler.java | 102 +++++++ .../dubbo/rpc/protocol/dubbo/DubboCodec.java | 261 +++++++++--------- .../rpc/protocol/dubbo/RpcInvocationExt.java | 128 +++++++++ .../rpc/protocol/dubbo/RpcResultExt.java | 111 ++++++++ 10 files changed, 611 insertions(+), 170 deletions(-) create mode 100644 dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java create mode 100644 dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/Decodeable.java create mode 100644 dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java create mode 100644 dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java create mode 100644 dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcInvocationExt.java create mode 100644 dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcResultExt.java diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java new file mode 100644 index 00000000000..ff4dd37f642 --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/Assert.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.utils; + +/** + * @author kimi + */ +public abstract class Assert { + + protected Assert() {} + + public static void notNull(Object obj, String message) { + if (obj == null) { + throw new IllegalArgumentException(message); + } + } + +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/Decodeable.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/Decodeable.java new file mode 100644 index 00000000000..7020ee851f2 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/Decodeable.java @@ -0,0 +1,26 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.remoting; + +/** + * @author kimi + */ +public interface Decodeable { + + public void decode() throws Exception; + +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java index 3462ca107ca..62e80d51f26 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java @@ -18,11 +18,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.io.Bytes; import com.alibaba.dubbo.common.io.StreamUtils; import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream; @@ -39,6 +35,7 @@ import com.alibaba.dubbo.remoting.exchange.Response; import com.alibaba.dubbo.remoting.exchange.support.DefaultFuture; import com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec; +import com.alibaba.dubbo.remoting.transport.CodecSupport; /** * ExchangeCodec. @@ -69,23 +66,6 @@ public class ExchangeCodec extends TelnetCodec { protected static final int SERIALIZATION_MASK = 0x1f; - private static Map ID_SERIALIZATION_MAP = new HashMap(); - static { - Set supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions(); - for (String name : supportedExtensions) { - Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name); - byte idByte = serialization.getContentTypeId(); - if (ID_SERIALIZATION_MAP.containsKey(idByte)) { - logger.error("Serialization extension " + serialization.getClass().getName() - + " has duplicate id to Serialization extension " - + ID_SERIALIZATION_MAP.get(idByte).getClass().getName() - + ", ignore this Serialization extension"); - continue; - } - ID_SERIALIZATION_MAP.put(idByte, serialization); - } - } - public Short getMagicCode() { return MAGIC; } @@ -144,12 +124,13 @@ protected Object decode(Channel channel, InputStream is, int readable, byte[] he if( readable != tt ) is = StreamUtils.limitedInputStream(is, len); + return decodeBody(channel, is, header); + } + + protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { try { byte flag = header[2], proto = (byte)( flag & SERIALIZATION_MASK ); - Serialization s = getSerializationById(proto); - if (s == null) { - s = getSerialization(channel); - } + Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); ObjectInput in = s.deserialize(channel.getUrl(), is); // get request id. long id = Bytes.bytes2long(header, 4); @@ -332,10 +313,6 @@ protected void encodeResponse(Channel channel, OutputStream os, Response res) th } } - private static final Serialization getSerializationById(Byte id) { - return ID_SERIALIZATION_MAP.get(id); - } - @Override protected Object decodeData(ObjectInput in) throws IOException { return decodeRequestData(in); @@ -393,7 +370,7 @@ protected Object decodeData(Channel channel, ObjectInput in) throws IOException return decodeRequestData(channel ,in); } - private Object decodeEventData(Channel channel, ObjectInput in) throws IOException { + protected Object decodeEventData(Channel channel, ObjectInput in) throws IOException { try { return in.readObject(); } catch (ClassNotFoundException e) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchanger.java index 76badec6d87..786bf2f425b 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchanger.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchanger.java @@ -22,6 +22,7 @@ import com.alibaba.dubbo.remoting.exchange.ExchangeHandler; import com.alibaba.dubbo.remoting.exchange.ExchangeServer; import com.alibaba.dubbo.remoting.exchange.Exchanger; +import com.alibaba.dubbo.remoting.transport.DecodeHandler; /** * DefaultMessenger @@ -33,11 +34,11 @@ public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { - return new HeaderExchangeClient(Transporters.connect(url, new HeaderExchangeHandler(handler))); + return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { - return new HeaderExchangeServer(Transporters.bind(url, new HeaderExchangeHandler(handler))); + return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java index 4082b86aded..62d36961865 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractCodec.java @@ -18,7 +18,6 @@ import java.io.IOException; import com.alibaba.dubbo.common.Constants; -import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.serialize.Serialization; @@ -35,8 +34,7 @@ public abstract class AbstractCodec implements Codec { private final Logger logger = LoggerFactory.getLogger(getClass()); protected Serialization getSerialization(Channel channel) { - Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(channel.getUrl().getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION)); - return serialization; + return CodecSupport.getSerialization(channel.getUrl()); } protected void checkPayload(Channel channel, long size) throws IOException { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java new file mode 100644 index 00000000000..f4b918d5ab7 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java @@ -0,0 +1,75 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.remoting.transport; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.extension.ExtensionLoader; +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.serialize.Serialization; + +/** + * @author kimi + */ +public class CodecSupport { + + private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class); + + private CodecSupport() { + } + + private static Map ID_SERIALIZATION_MAP = new HashMap(); + + static { + Set supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions(); + for (String name : supportedExtensions) { + Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name); + byte idByte = serialization.getContentTypeId(); + if (ID_SERIALIZATION_MAP.containsKey(idByte)) { + logger.error("Serialization extension " + serialization.getClass().getName() + + " has duplicate id to Serialization extension " + + ID_SERIALIZATION_MAP.get(idByte).getClass().getName() + + ", ignore this Serialization extension"); + continue; + } + ID_SERIALIZATION_MAP.put(idByte, serialization); + } + } + + public static Serialization getSerializationById(Byte id) { + return ID_SERIALIZATION_MAP.get(id); + } + + public static Serialization getSerialization(URL url) { + return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension( + url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION)); + } + + public static Serialization getSerialization(URL url, Byte id) { + Serialization result = getSerializationById(id); + if (result == null) { + result = getSerialization(url); + } + return result; + } + +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java new file mode 100644 index 00000000000..837de8a153f --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/DecodeHandler.java @@ -0,0 +1,102 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.remoting.transport; + +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.utils.Assert; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.ChannelHandler; +import com.alibaba.dubbo.remoting.Decodeable; +import com.alibaba.dubbo.remoting.RemotingException; +import com.alibaba.dubbo.remoting.exchange.Request; +import com.alibaba.dubbo.remoting.exchange.Response; + +/** + * @author kimi + */ +public class DecodeHandler implements ChannelHandlerDelegate { + + private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class); + + private ChannelHandler handler; + + public DecodeHandler(ChannelHandler handler) { + Assert.notNull(handler, "handler == null"); + this.handler = handler; + } + + public ChannelHandler getHandler() { + if (handler instanceof ChannelHandlerDelegate) { + return ((ChannelHandlerDelegate)handler).getHandler(); + } + return handler; + } + + public void connected(Channel channel) throws RemotingException { + handler.connected(channel); + } + + public void disconnected(Channel channel) throws RemotingException { + handler.disconnected(channel); + } + + public void sent(Channel channel, Object message) throws RemotingException { + handler.sent(channel, message); + } + + public void received(Channel channel, Object message) throws RemotingException { + if (message instanceof Decodeable) { + decode(message); + } + + if (message instanceof Request) { + decode(((Request)message).getData()); + } + + if (message instanceof Response) { + decode( ((Response)message).getResult()); + } + + handler.received(channel, message); + } + + public void caught(Channel channel, Throwable exception) throws RemotingException { + handler.caught(channel, exception); + } + + private void decode(Object message) { + if (message != null && message instanceof Decodeable) { + try { + ((Decodeable)message).decode(); + if (log.isDebugEnabled()) { + log.debug(new StringBuilder(32).append("Decode decodeable message ") + .append(message.getClass().getName()).toString()); + } + } catch (Throwable e) { + if (log.isWarnEnabled()) { + log.warn( + new StringBuilder(32) + .append("Call Decodeable.decode failed: ") + .append(e.getMessage()).toString(), + e); + } + } // ~ end of catch + } // ~ end of if + } // ~ end of method decode + +} diff --git a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java index dcb67be9a2c..42e27e6b389 100644 --- a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -1,66 +1,143 @@ -/* - * Copyright 1999-2011 Alibaba Group. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alibaba.dubbo.rpc.protocol.dubbo; -import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.decodeInvocationArgument; -import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument; - -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.HashMap; -import java.util.Map; - -import com.alibaba.dubbo.common.Constants; -import com.alibaba.dubbo.common.Version; -import com.alibaba.dubbo.common.serialize.ObjectInput; -import com.alibaba.dubbo.common.serialize.ObjectOutput; -import com.alibaba.dubbo.common.utils.ReflectUtils; -import com.alibaba.dubbo.common.utils.StringUtils; -import com.alibaba.dubbo.remoting.Channel; -import com.alibaba.dubbo.remoting.Codec; -import com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec; -import com.alibaba.dubbo.rpc.Invocation; -import com.alibaba.dubbo.rpc.Result; -import com.alibaba.dubbo.rpc.RpcInvocation; -import com.alibaba.dubbo.rpc.RpcResult; -import com.alibaba.dubbo.rpc.support.RpcUtils; +import java.io.IOException; +import java.io.InputStream; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.Version; +import com.alibaba.dubbo.common.io.Bytes; +import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream; +import com.alibaba.dubbo.common.serialize.ObjectInput; +import com.alibaba.dubbo.common.serialize.ObjectOutput; +import com.alibaba.dubbo.common.serialize.Serialization; +import com.alibaba.dubbo.common.utils.ReflectUtils; +import com.alibaba.dubbo.common.utils.StringUtils; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.Codec; +import com.alibaba.dubbo.remoting.exchange.Request; +import com.alibaba.dubbo.remoting.exchange.Response; +import com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec; +import com.alibaba.dubbo.remoting.transport.CodecSupport; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.Result; +import com.alibaba.dubbo.rpc.RpcInvocation; + +import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument; /** * Dubbo codec. - * + * * @author qianlei * @author chao.liuc */ -public class DubboCodec extends ExchangeCodec implements Codec { - - public static final String NAME = "dubbo"; +public class DubboCodec extends ExchangeCodec implements Codec { + + public static final String NAME = "dubbo"; + + public static final String DUBBO_VERSION = Version.getVersion(DubboCodec.class, Version.getVersion()); + + public static final byte RESPONSE_WITH_EXCEPTION = 0; + + public static final byte RESPONSE_VALUE = 1; - private static final String DUBBO_VERSION = Version.getVersion(DubboCodec.class, Version.getVersion()); + public static final byte RESPONSE_NULL_VALUE = 2; - private static final byte RESPONSE_WITH_EXCEPTION = 0; + public static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; - private static final byte RESPONSE_VALUE = 1; + public static final Class[] EMPTY_CLASS_ARRAY = new Class[0]; - private static final byte RESPONSE_NULL_VALUE = 2; + protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { + byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); + Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); + // get request id. + long id = Bytes.bytes2long(header, 4); + if ((flag & FLAG_REQUEST) == 0) { + // decode response. + Response res = new Response(id); + if ((flag & FLAG_EVENT) != 0) { + res.setEvent(Response.HEARTBEAT_EVENT); + } + // get status. + byte status = header[3]; + res.setStatus(status); + if (status == Response.OK) { + try { + Object data; + if (res.isHeartbeat()) { + data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + } else if (res.isEvent()) { + data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + } else { + data = new RpcResultExt(channel, res, + new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); + } + res.setResult(data); + } catch (Throwable t) { + res.setStatus(Response.CLIENT_ERROR); + res.setErrorMessage(StringUtils.toString(t)); + } + } else { + res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); + } + return res; + } else { + // decode request. + Request req = new Request(id); + req.setVersion("2.0.0"); + req.setTwoWay((flag & FLAG_TWOWAY) != 0); + if ((flag & FLAG_EVENT) != 0) { + req.setEvent(Request.HEARTBEAT_EVENT); + } + try { + Object data; + if (req.isHeartbeat()) { + data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + } else if (req.isEvent()) { + data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + } else { + data = new RpcInvocationExt(channel, req, + new UnsafeByteArrayInputStream(readMessageData(is)), proto); + } + req.setData(data); + } catch (Throwable t) { + // bad request + req.setBroken(true); + req.setData(t); + } + return req; + } + } + + private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) + throws IOException { + return serialization.deserialize(url, is); + } - private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; + private byte[] readMessageData(InputStream is) throws IOException { + if (is.available() > 0) { + byte[] result = new byte[is.available()]; + is.read(result); + return result; + } + return new byte[]{}; + } - private static final Class[] EMPTY_CLASS_ARRAY = new Class[0]; - - @Override protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { RpcInvocation inv = (RpcInvocation) data; @@ -79,58 +156,6 @@ protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) out.writeObject(inv.getAttachments()); } - @Override - @SuppressWarnings("unchecked") - protected Object decodeRequestData(Channel channel, ObjectInput in) throws IOException { - RpcInvocation inv = new RpcInvocation(); - - inv.setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF()); - inv.setAttachment(Constants.PATH_KEY, in.readUTF()); - inv.setAttachment(Constants.VERSION_KEY, in.readUTF()); - - inv.setMethodName(in.readUTF()); - try { - Object[] args; - Class[] pts; - String desc = in.readUTF(); - if (desc.length() == 0) { - pts = EMPTY_CLASS_ARRAY; - args = EMPTY_OBJECT_ARRAY; - } else { - pts = ReflectUtils.desc2classArray(desc); - args = new Object[pts.length]; - for (int i = 0; i < args.length; i++){ - try{ - args[i] = in.readObject(pts[i]); - }catch (Exception e) { - e.printStackTrace(); - } - } - } - inv.setParameterTypes(pts); - - Map map = (Map) in.readObject(Map.class); - if (map != null && map.size() > 0) { - Map attachment = inv.getAttachments(); - if (attachment == null) { - attachment = new HashMap(); - } - attachment.putAll(map); - inv.setAttachments(attachment); - } - //decode argument ,may be callback - for (int i = 0; i < args.length; i++){ - args[i] = decodeInvocationArgument(channel, inv, pts, i, args[i]); - } - - inv.setArguments(args); - - } catch (ClassNotFoundException e) { - throw new IOException(StringUtils.toString("Read invocation data failed.", e)); - } - return inv; - } - @Override protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException { Result result = (Result) data; @@ -148,39 +173,5 @@ protected void encodeResponseData(Channel channel, ObjectOutput out, Object data out.writeByte(RESPONSE_WITH_EXCEPTION); out.writeObject(th); } - } - - @Override - protected Object decodeResponseData(Channel channel, ObjectInput in, Object request) throws IOException { - Invocation invocation = (Invocation) request; - RpcResult result = new RpcResult(); - - byte flag = in.readByte(); - switch (flag) { - case RESPONSE_NULL_VALUE: - break; - case RESPONSE_VALUE: - try { - Type[] returnType = RpcUtils.getReturnTypes(invocation); - result.setValue(returnType == null || returnType.length == 0 ? in.readObject() : - (returnType.length == 1 ? in.readObject((Class)returnType[0]) - : in.readObject((Class)returnType[0], returnType[1]))); - } catch (ClassNotFoundException e) { - throw new IOException(StringUtils.toString("Read response data failed.", e)); - } - break; - case RESPONSE_WITH_EXCEPTION: - try { - Object obj = in.readObject(); - if (obj instanceof Throwable == false) throw new IOException("Response data error, expect Throwable, but get " + obj); - result.setException((Throwable) obj); - } catch (ClassNotFoundException e) { - throw new IOException(StringUtils.toString("Read response data failed.", e)); - } - break; - default: - throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); - } - return result; } } \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcInvocationExt.java b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcInvocationExt.java new file mode 100644 index 00000000000..27e75dd77d5 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcInvocationExt.java @@ -0,0 +1,128 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.rpc.protocol.dubbo; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.serialize.ObjectInput; +import com.alibaba.dubbo.common.utils.Assert; +import com.alibaba.dubbo.common.utils.ReflectUtils; +import com.alibaba.dubbo.common.utils.StringUtils; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.Codec; +import com.alibaba.dubbo.remoting.Decodeable; +import com.alibaba.dubbo.remoting.exchange.Request; +import com.alibaba.dubbo.remoting.transport.CodecSupport; +import com.alibaba.dubbo.rpc.RpcInvocation; + +import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.decodeInvocationArgument; + +/** + * @author kimi + */ +public class RpcInvocationExt extends RpcInvocation implements Codec, Decodeable { + + private Channel channel; + + private byte serializationType; + + private InputStream inputStream; + + private Request request; + + public RpcInvocationExt(Channel channel, Request request, InputStream is, byte id) { + Assert.notNull(channel, "channel == null"); + Assert.notNull(request, "request == null"); + Assert.notNull(is, "inputStream == null"); + this.channel = channel; + this.request = request; + this.inputStream = is; + this.serializationType = id; + } + + public void decode() throws Exception { + if (channel != null && inputStream != null) { + try { + decode(channel, inputStream); + } catch (Throwable e) { + request.setBroken(true); + request.setData(e); + } + } + } + + public void encode(Channel channel, OutputStream output, Object message) throws IOException { + throw new UnsupportedOperationException(); + } + + public Object decode(Channel channel, InputStream input) throws IOException { + ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) + .deserialize(channel.getUrl(), input); + + setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF()); + setAttachment(Constants.PATH_KEY, in.readUTF()); + setAttachment(Constants.VERSION_KEY, in.readUTF()); + + setMethodName(in.readUTF()); + try { + Object[] args; + Class[] pts; + String desc = in.readUTF(); + if (desc.length() == 0) { + pts = DubboCodec.EMPTY_CLASS_ARRAY; + args = DubboCodec.EMPTY_OBJECT_ARRAY; + } else { + pts = ReflectUtils.desc2classArray(desc); + args = new Object[pts.length]; + for (int i = 0; i < args.length; i++) { + try { + args[i] = in.readObject(pts[i]); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + setParameterTypes(pts); + + Map map = (Map) in.readObject(Map.class); + if (map != null && map.size() > 0) { + Map attachment = getAttachments(); + if (attachment == null) { + attachment = new HashMap(); + } + attachment.putAll(map); + setAttachments(attachment); + } + //decode argument ,may be callback + for (int i = 0; i < args.length; i++) { + args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]); + } + + setArguments(args); + + } catch (ClassNotFoundException e) { + throw new IOException(StringUtils.toString("Read invocation data failed.", e)); + } + return this; + } + +} diff --git a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcResultExt.java b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcResultExt.java new file mode 100644 index 00000000000..4d394c3c232 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/RpcResultExt.java @@ -0,0 +1,111 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.rpc.protocol.dubbo; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Type; + +import com.alibaba.dubbo.common.serialize.ObjectInput; +import com.alibaba.dubbo.common.utils.Assert; +import com.alibaba.dubbo.common.utils.StringUtils; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.Codec; +import com.alibaba.dubbo.remoting.Decodeable; +import com.alibaba.dubbo.remoting.exchange.Response; +import com.alibaba.dubbo.remoting.transport.CodecSupport; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.RpcResult; +import com.alibaba.dubbo.rpc.support.RpcUtils; + +/** + * @author kimi + */ +public class RpcResultExt extends RpcResult implements Codec, Decodeable { + + private Channel channel; + + private byte serializationType; + + private InputStream inputStream; + + private Response response; + + private Invocation invocation; + + public RpcResultExt(Channel channel, Response response, InputStream is, Invocation invocation, byte id) { + Assert.notNull(channel, "channel == null"); + Assert.notNull(response, "response == null"); + Assert.notNull(is, "inputStream == null"); + this.channel = channel; + this.response = response; + this.inputStream = is; + this.invocation = invocation; + this.serializationType = id; + } + + public void encode(Channel channel, OutputStream output, Object message) throws IOException { + throw new UnsupportedOperationException(); + } + + public Object decode(Channel channel, InputStream input) throws IOException { + ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) + .deserialize(channel.getUrl(), input); + + byte flag = in.readByte(); + switch (flag) { + case DubboCodec.RESPONSE_NULL_VALUE: + break; + case DubboCodec.RESPONSE_VALUE: + try { + Type[] returnType = RpcUtils.getReturnTypes(invocation); + setValue(returnType == null || returnType.length == 0 ? in.readObject() : + (returnType.length == 1 ? in.readObject((Class) returnType[0]) + : in.readObject((Class) returnType[0], returnType[1]))); + } catch (ClassNotFoundException e) { + throw new IOException(StringUtils.toString("Read response data failed.", e)); + } + break; + case DubboCodec.RESPONSE_WITH_EXCEPTION: + try { + Object obj = in.readObject(); + if (obj instanceof Throwable == false) + throw new IOException("Response data error, expect Throwable, but get " + obj); + setException((Throwable) obj); + } catch (ClassNotFoundException e) { + throw new IOException(StringUtils.toString("Read response data failed.", e)); + } + break; + default: + throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); + } + return this; + } + + public void decode() throws Exception { + if (channel != null && inputStream != null) { + try { + decode(channel, inputStream); + } catch (Throwable e) { + response.setStatus(Response.CLIENT_ERROR); + response.setErrorMessage(StringUtils.toString(e)); + } + } + } + +}