Skip to content

Commit

Permalink
DUBBO-166 增强 codec 避免数据 copy,重构类名,添加单元测试,测试 CodecAdapter 以及 mina 实现
Browse files Browse the repository at this point in the history
  • Loading branch information
kimi committed Oct 18, 2012
1 parent 955a1a7 commit ae56284
Show file tree
Hide file tree
Showing 20 changed files with 1,214 additions and 430 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @author <a href="mailto:gang.lvg@taobao.com">kimi</a>
*/
@SPI
public interface ChannelCodec {
public interface Codec2 {

@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,78 +1,78 @@
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport;

import java.io.IOException;
import java.net.InetSocketAddress;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.Serialization;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelCodec;

/**
* AbstractCodec
*
* @author william.liangf
*/
public abstract class AbstractCodec implements ChannelCodec {

private static final Logger logger = LoggerFactory.getLogger(AbstractCodec.class);

protected Serialization getSerialization(Channel channel) {
return CodecSupport.getSerialization(channel.getUrl());
}

protected static void checkPayload(Channel channel, long size) throws IOException {
int payload = Constants.DEFAULT_PAYLOAD;
if (channel != null && channel.getUrl() != null) {
payload = channel.getUrl().getPositiveParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
}
if (size > payload) {
IOException e = new IOException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
logger.error(e);
throw e;
}
}

protected boolean isClientSide(Channel channel) {
String side = (String) channel.getAttribute(Constants.SIDE_KEY);
if ("client".equals(side)) {
return true;
} else if ("server".equals(side)) {
return false;
} else {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
boolean client = url.getPort() == address.getPort()
&& NetUtils.filterLocalHost(url.getIp()).equals(
NetUtils.filterLocalHost(address.getAddress()
.getHostAddress()));
channel.setAttribute(Constants.SIDE_KEY, client ? "client"
: "server");
return client;
}
}

protected boolean isServerSide(Channel channel) {
return !isClientSide(channel);
}

/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport;

import java.io.IOException;
import java.net.InetSocketAddress;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.Serialization;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.Codec2;

/**
* AbstractCodec
*
* @author william.liangf
*/
public abstract class AbstractCodec implements Codec2 {

private static final Logger logger = LoggerFactory.getLogger(AbstractCodec.class);

protected Serialization getSerialization(Channel channel) {
return CodecSupport.getSerialization(channel.getUrl());
}

protected static void checkPayload(Channel channel, long size) throws IOException {
int payload = Constants.DEFAULT_PAYLOAD;
if (channel != null && channel.getUrl() != null) {
payload = channel.getUrl().getPositiveParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
}
if (size > payload) {
IOException e = new IOException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
logger.error(e);
throw e;
}
}

protected boolean isClientSide(Channel channel) {
String side = (String) channel.getAttribute(Constants.SIDE_KEY);
if ("client".equals(side)) {
return true;
} else if ("server".equals(side)) {
return false;
} else {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
boolean client = url.getPort() == address.getPort()
&& NetUtils.filterLocalHost(url.getIp()).equals(
NetUtils.filterLocalHost(address.getAddress()
.getHostAddress()));
channel.setAttribute(Constants.SIDE_KEY, client ? "client"
: "server");
return client;
}
}

protected boolean isServerSide(Channel channel) {
return !isClientSide(channel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.ChannelCodec;
import com.alibaba.dubbo.remoting.Codec2;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Codec;
import com.alibaba.dubbo.remoting.transport.codec.ChannelCodecAdapter;
import com.alibaba.dubbo.remoting.transport.codec.CodecAdapter;

/**
* AbstractEndpoint
Expand All @@ -35,7 +35,7 @@ public abstract class AbstractEndpoint extends AbstractPeer implements Resetable

private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);

private ChannelCodec codec;
private Codec2 codec;

private int timeout;

Expand Down Expand Up @@ -87,7 +87,7 @@ public void reset(com.alibaba.dubbo.common.Parameters parameters){
reset(getUrl().addParameters(parameters.getParameters()));
}

protected ChannelCodec getCodec() {
protected Codec2 getCodec() {
return codec;
}

Expand All @@ -99,12 +99,12 @@ protected int getConnectTimeout() {
return connectTimeout;
}

protected static ChannelCodec getChannelCodec(URL url) {
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(ChannelCodec.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(ChannelCodec.class).getExtension(codecName);
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new ChannelCodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
import com.alibaba.dubbo.common.utils.Assert;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.Codec;
import com.alibaba.dubbo.remoting.ChannelCodec;
import com.alibaba.dubbo.remoting.Codec2;
import com.alibaba.dubbo.remoting.buffer.ChannelBuffer;

/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class ChannelCodecAdapter implements ChannelCodec {
public class CodecAdapter implements Codec2 {

private Codec codec;

public ChannelCodecAdapter(Codec codec) {
public CodecAdapter(Codec codec) {
Assert.notNull(codec, "codec == null");
this.codec = codec;
}
Expand All @@ -51,11 +51,7 @@ public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
buffer.readBytes(bytes);
UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(bytes);
Object result = codec.decode(channel, is);
if (result == Codec.NEED_MORE_INPUT) {
buffer.readerIndex(savedReaderIndex);
} else {
buffer.readerIndex(savedReaderIndex + is.position());
}
buffer.readerIndex(savedReaderIndex + is.position());
return result == Codec.NEED_MORE_INPUT ? DecodeResult.NEED_MORE_INPUT : result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
transport=com.alibaba.dubbo.remoting.transport.codec.TransportCodec
telnet=com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec
transport=com.alibaba.dubbo.remoting.transport.codec.TransportCodec
telnet=com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec
exchange=com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelCodec;
import com.alibaba.dubbo.remoting.Codec2;
import com.alibaba.dubbo.remoting.buffer.ChannelBuffer;
import com.alibaba.dubbo.remoting.buffer.ChannelBuffers;
import com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec;
Expand All @@ -41,7 +41,7 @@
*
*/
public class TelnetCodecTest {
protected ChannelCodec codec ;
protected Codec2 codec ;
byte[] UP = new byte[] {27, 91, 65};
byte[] DOWN = new byte[] {27, 91, 66};
/**
Expand Down Expand Up @@ -210,7 +210,7 @@ private void testDecode_PersonWithEnterByte(byte[] enterbytes ,boolean isNeedmor
//decode
Object obj = codec.decode(channel, buffer);
if (isNeedmore){
Assert.assertEquals(ChannelCodec.DecodeResult.NEED_MORE_INPUT , obj);
Assert.assertEquals(Codec2.DecodeResult.NEED_MORE_INPUT , obj);
}else {
Assert.assertTrue("return must string ", obj instanceof String);
}
Expand All @@ -237,12 +237,12 @@ public void testDecode_String_ClientSide() throws IOException{

@Test
public void testDecode_BlankMessage() throws IOException{
testDecode_assertEquals(new byte[]{}, ChannelCodec.DecodeResult.NEED_MORE_INPUT);
testDecode_assertEquals(new byte[]{}, Codec2.DecodeResult.NEED_MORE_INPUT);
}

@Test
public void testDecode_String_NoEnter() throws IOException{
testDecode_assertEquals("aaa", ChannelCodec.DecodeResult.NEED_MORE_INPUT);
testDecode_assertEquals("aaa", Codec2.DecodeResult.NEED_MORE_INPUT);
}

@Test
Expand All @@ -251,12 +251,12 @@ public void testDecode_String_WithEnter() throws IOException{
}
@Test
public void testDecode_String_MiddleWithEnter() throws IOException{
testDecode_assertEquals("aaa\r\naaa", ChannelCodec.DecodeResult.NEED_MORE_INPUT);
testDecode_assertEquals("aaa\r\naaa", Codec2.DecodeResult.NEED_MORE_INPUT);
}

@Test
public void testDecode_Person_ObjectOnly() throws IOException{
testDecode_assertEquals(new Person(), ChannelCodec.DecodeResult.NEED_MORE_INPUT);
testDecode_assertEquals(new Person(), Codec2.DecodeResult.NEED_MORE_INPUT);
}
@Test
public void testDecode_Person_WithEnter() throws IOException{
Expand All @@ -282,14 +282,14 @@ public void testDecode_WithExitByte() throws IOException{
@Test
public void testDecode_Backspace() throws IOException{
//32 8 先加空格在补退格.
testDecode_assertEquals(new byte[]{'\b'}, ChannelCodec.DecodeResult.NEED_MORE_INPUT, new String(new byte[] {32, 8}));
testDecode_assertEquals(new byte[]{'\b'}, Codec2.DecodeResult.NEED_MORE_INPUT, new String(new byte[] {32, 8}));

//测试中文
byte[] chineseBytes = "中".getBytes();
byte[] request = join(chineseBytes, new byte[]{'\b'});
testDecode_assertEquals(request, ChannelCodec.DecodeResult.NEED_MORE_INPUT, new String(new byte[] {32, 32, 8, 8}));
testDecode_assertEquals(request, Codec2.DecodeResult.NEED_MORE_INPUT, new String(new byte[] {32, 32, 8, 8}));
//中文会带来此问题 (-数判断) 忽略此问题,退格键只有在真的telnet程序中才输入有意义.
testDecode_assertEquals(new byte[]{'a', 'x', -1, 'x', '\b'}, ChannelCodec.DecodeResult.NEED_MORE_INPUT, new String(new byte[] {32, 32, 8, 8}));
testDecode_assertEquals(new byte[]{'a', 'x', -1, 'x', '\b'}, Codec2.DecodeResult.NEED_MORE_INPUT, new String(new byte[] {32, 32, 8, 8}));
}

@Test(expected = IOException.class)
Expand All @@ -304,14 +304,14 @@ public void testDecode_History_UP() throws IOException{
//init channel
AbstractMockChannel channel = getServerSideChannel(url);

testDecode_assertEquals(channel, UP, ChannelCodec.DecodeResult.NEED_MORE_INPUT, null);
testDecode_assertEquals(channel, UP, Codec2.DecodeResult.NEED_MORE_INPUT, null);

String request1 = "aaa\n";
Object expected1 = "aaa";
//init history
testDecode_assertEquals(channel, request1, expected1, null);

testDecode_assertEquals(channel, UP, ChannelCodec.DecodeResult.NEED_MORE_INPUT, expected1);
testDecode_assertEquals(channel, UP, Codec2.DecodeResult.NEED_MORE_INPUT, expected1);
}

@Test(expected = IOException.class)
Expand All @@ -321,14 +321,14 @@ public void testDecode_UPorDOWN_WithError() throws IOException{
//init channel
AbstractMockChannel channel = getServerSideChannel(url);

testDecode_assertEquals(channel, UP, ChannelCodec.DecodeResult.NEED_MORE_INPUT, null);
testDecode_assertEquals(channel, UP, Codec2.DecodeResult.NEED_MORE_INPUT, null);

String request1 = "aaa\n";
Object expected1 = "aaa";
//init history
testDecode_assertEquals(channel, request1, expected1, null);

testDecode_assertEquals(channel, UP, ChannelCodec.DecodeResult.NEED_MORE_INPUT, expected1);
testDecode_assertEquals(channel, UP, Codec2.DecodeResult.NEED_MORE_INPUT, expected1);

url = url.removeParameter(AbstractMockChannel.ERROR_WHEN_SEND);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.alibaba.dubbo.remoting.transport.codec;

import java.io.IOException;

import org.junit.Before;
import org.junit.Test;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.Codec;
import com.alibaba.dubbo.remoting.buffer.ChannelBuffer;
import com.alibaba.dubbo.remoting.buffer.ChannelBuffers;
import com.alibaba.dubbo.remoting.codec.ExchangeCodecTest;
import com.alibaba.dubbo.remoting.telnet.codec.TelnetCodec;

import junit.framework.Assert;

/**
* @author <a href="mailto:gang.lvg@taobao.com">kimi</a>
*/
public class CodecAdapterTest extends ExchangeCodecTest {

@Before
public void setUp() throws Exception {
codec = new CodecAdapter(new DeprecatedExchangeCodec());
}

}
Loading

0 comments on commit ae56284

Please sign in to comment.