Skip to content

Commit

Permalink
Fix a dead lock in ServerBootstrap as described in netty#1175
Browse files Browse the repository at this point in the history
- Reduce code duplication between bootstrap implementations
  • Loading branch information
trustin committed Mar 21, 2013
1 parent 9b20802 commit 8fb80e9
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.util.AttributeKey;

Expand Down Expand Up @@ -265,7 +267,75 @@ public ChannelFuture bind(SocketAddress localAddress) {
return doBind(localAddress);
}

abstract ChannelFuture doBind(SocketAddress localAddress);
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regPromise = initAndRegister();
final Channel channel = regPromise.channel();
final ChannelPromise promise = channel.newPromise();
if (regPromise.isDone()) {
doBind0(regPromise, channel, localAddress, promise);
} else {
regPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
}
});
}

return promise;
}

final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}

ChannelPromise regPromise = channel.newPromise();
group().register(channel, regPromise);
if (regPromise.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return regPromise;
}

abstract void init(Channel channel) throws Exception;

private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

/**
* the {@link ChannelHandler} to use for serving the requests.
Expand Down
139 changes: 44 additions & 95 deletions transport/src/main/java/io/netty/bootstrap/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,39 +76,6 @@ public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) {
return this;
}

@Override
ChannelFuture doBind(final SocketAddress localAddress) {
final Channel channel = channelFactory().newChannel();
ChannelPromise initPromise = init(channel);
if (initPromise.cause() != null) {
return initPromise;
}

final ChannelPromise promise = channel.newPromise();
if (initPromise.isDone()) {
doBind0(initPromise, channel, localAddress, promise);
} else {
initPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
}
});
}

return promise;
}

private static void doBind0(
ChannelFuture initFuture, Channel channel, SocketAddress localAddress, ChannelPromise promise) {

if (initFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(initFuture.cause());
}
}

/**
* Connect a {@link Channel} to the remote peer.
*/
Expand Down Expand Up @@ -163,20 +130,20 @@ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAdd
* @see {@link #connect()}
*/
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final Channel channel = channelFactory().newChannel();
ChannelPromise initPromise = init(channel);
if (initPromise.cause() != null) {
return initPromise;
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

final ChannelPromise promise = channel.newPromise();
if (initPromise.isDone()) {
doConnect0(initPromise, channel, remoteAddress, localAddress, promise);
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
initPromise.addListener(new ChannelFutureListener() {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(future, channel, remoteAddress, localAddress, promise);
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
Expand All @@ -185,71 +152,53 @@ public void operationComplete(ChannelFuture future) throws Exception {
}

private static void doConnect0(
ChannelFuture initFuture, Channel channel,
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {

if (initFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(initFuture.cause());
}
});
}

@Override
@SuppressWarnings("unchecked")
private ChannelPromise init(Channel channel) {
ChannelPromise promise = channel.newPromise();
try {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());

final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());

final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}

final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}

group().register(channel, promise);
} catch (Throwable t) {
promise.setFailure(t);
}

if (promise.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return promise;
}

@Override
Expand Down
25 changes: 4 additions & 21 deletions transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandler;
Expand All @@ -34,7 +32,6 @@
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.net.SocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -140,17 +137,10 @@ public ServerBootstrap childHandler(ChannelHandler childHandler) {
}

@Override
ChannelFuture doBind(SocketAddress localAddress) {
Channel channel = channelFactory().newChannel();

try {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
} catch (Exception e) {
channel.close();
return channel.newFailedFuture(e);
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}

final Map<AttributeKey<?>, Object> attrs = attrs();
Expand Down Expand Up @@ -185,13 +175,6 @@ public void initChannel(Channel ch) throws Exception {
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});

ChannelFuture f = group().register(channel).awaitUninterruptibly();
if (!f.isSuccess()) {
return f;
}

return channel.bind(localAddress).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,8 @@ public void connectSuccess() {
assert eventLoop().inEventLoop();
assert connectPromise != null;
try {
boolean wasActive = isActive();
connectPromise.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
pipeline().fireChannelActive();
} catch (Throwable t) {
connectPromise.setFailure(t);
closeIfClosed();
Expand Down

0 comments on commit 8fb80e9

Please sign in to comment.