Skip to content

Commit

Permalink
[SPARK-6955] Perform port retries at NettyBlockTransferService level
Browse files Browse the repository at this point in the history
Currently we're doing port retries in the TransportServer level, but this is not specified by the TransportContext API and it has other further-reaching impacts like causing undesirable behvior for the Yarn and Standalone shuffle services.
  • Loading branch information
aarondav committed May 8, 2015
1 parent 008a60d commit 59e5e38
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,23 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
securityManager.isSaslEncryptionEnabled()))
}
transportContext = new TransportContext(transportConf, rpcHandler)
clientFactory = transportContext.createClientFactory(clientBootstrap.toList)
server = transportContext.createServer(conf.getInt("spark.blockManager.port", 0),
serverBootstrap.toList)
clientFactory = transportContext.createClientFactory(bootstrap.toList)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
logInfo("Server created on " + server.getPort)
}

/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
val server = transportContext.createServer(port, bootstraps)
(server, server.getPort)
}

val portToTry = conf.getInt("spark.blockManager.port", 0)
Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
}

override def fetchBlocks(
host: String,
port: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import org.apache.spark.network.BlockDataManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.mockito.Mockito.mock
import org.scalatest._

class NettyBlockTransferServiceSuite extends FunSuite with BeforeAndAfterEach with ShouldMatchers {
private var service0: NettyBlockTransferService = _
private var service1: NettyBlockTransferService = _

override def afterEach() {
if (service0 != null) {
service0.close()
service0 = null
}

if (service1 != null) {
service1.close()
service1 = null
}
}

test("can bind to a random port") {
service0 = createService(port = 0)
service0.port should not be 0
}

test("can bind to two random ports") {
service0 = createService(port = 0)
service1 = createService(port = 0)
service0.port should not be service1.port
}

test("can bind to a specific port") {
val port = 17634
service0 = createService(port)
service0.port should be >= port
service0.port should be <= (port + 10) // avoid testing equality in case of simultaneous tests
}

test("can bind to a specific port twice and the second increments") {
val port = 17634
service0 = createService(port)
service1 = createService(port)
service0.port should be >= port
service0.port should be <= (port + 10)
service1.port should be (service0.port + 1)
}

private def createService(port: Int): NettyBlockTransferService = {
val conf = new SparkConf()
.set("spark.app.id", s"test-${getClass.getName}")
.set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
val service = new NettyBlockTransferService(conf, securityManager, numCores = 1)
service.init(blockDataManager)
service
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import org.apache.spark.network.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,7 +66,12 @@ public TransportServer(
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

init(portToBind);
try {
init(portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}

public int getPort() {
Expand Down Expand Up @@ -114,7 +120,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
});

bindRightPort(portToBind);
channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
channelFuture.syncUninterruptibly();

port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port :" + port);
Expand All @@ -135,38 +142,4 @@ public void close() {
}
bootstrap = null;
}

/**
* Attempt to bind to the specified port up to a fixed number of retries.
* If all attempts fail after the max number of retries, exit.
*/
private void bindRightPort(int portToBind) {
int maxPortRetries = conf.portMaxRetries();

for (int i = 0; i <= maxPortRetries; i++) {
int tryPort = -1;
if (0 == portToBind) {
// Do not increment port if tryPort is 0, which is treated as a special port
tryPort = 0;
} else {
// If the new port wraps around, do not try a privilege port
tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
}
try {
channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
channelFuture.syncUninterruptibly();
return;
} catch (Exception e) {
logger.warn("Netty service could not bind on port " + tryPort +
". Attempting the next port.");
if (i >= maxPortRetries) {
logger.error(e.getMessage() + ": Netty server failed after "
+ maxPortRetries + " retries.");

// If it can't find a right port, it should exit directly.
System.exit(-1);
}
}
}
}
}

0 comments on commit 59e5e38

Please sign in to comment.