Skip to content

Commit

Permalink
tests: use copy of SocketUtil that does not use 127.x.y.255 addresses (
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudolph authored Sep 3, 2020
1 parent 5904760 commit d68eaaf
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ class NewConnectionPoolSpec extends AkkaSpecWithMaterializer("""
"The superPool client infrastructure" should {

"route incoming requests to the right cached host connection pool" in new TestSetup(autoAccept = true) {
val (serverHostName2, serverPort2) = SocketUtil.temporaryServerHostnameAndPort()
val (serverHostName2, serverPort2) = SocketUtil2.temporaryServerHostnameAndPort()
Http().newServerAt(serverHostName2, serverPort2).bindSync(testServerHandler(0))

val (requestIn, responseOut, responseOutSub, _) = superPool[Int]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
}

"report failure if bind fails" in EventFilter[BindException](occurrences = 2).intercept {
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val binding = Http().newServerAt(hostname, port).connectionSource()
val probe1 = TestSubscriber.manualProbe[Http.IncomingConnection]()
// Bind succeeded, we have a local address
Expand Down Expand Up @@ -124,7 +124,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
}

"run with bindSync" in {
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val binding = Http().newServerAt(hostname, port).bindSync(_ => HttpResponse())
val b1 = Await.result(binding, 3.seconds.dilated)

Expand All @@ -136,7 +136,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
}

"prevent more than the configured number of max-connections with bind" in {
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val settings = ServerSettings(system).withMaxConnections(1)

val receivedSlow = Promise[Long]()
Expand Down Expand Up @@ -203,7 +203,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
}

abstract class RemoteAddressTestScenario {
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()

val settings = ServerSettings(system).withRemoteAddressHeader(true)
def createBinding(): Future[ServerBinding]
Expand Down Expand Up @@ -245,7 +245,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
"support server timeouts" should {
"close connection with idle client after idleTimeout" in {
val serverIdleTimeout = 300.millis
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverIdleTimeout)

try {
Expand Down Expand Up @@ -311,7 +311,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
val clientTimeout = 345.millis.dilated
val clientPoolSettings = cs.withIdleTimeout(clientTimeout)

val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout)

try {
Expand Down Expand Up @@ -346,7 +346,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
val clientTimeout = 345.millis.dilated
val clientPoolSettings = cs.withIdleTimeout(clientTimeout)

val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout)

try {
Expand Down Expand Up @@ -376,7 +376,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
"are triggered in `mapMaterialized`" in Utils.assertAllStagesStopped {
// FIXME racy feature, needs https://github.com/akka/akka/issues/17849 to be fixed
pending
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val flow = Flow[HttpRequest].map(_ => HttpResponse()).mapMaterializedValue(_ => sys.error("BOOM"))
val binding = Http(system2).newServerAt(hostname, port).bindFlow(flow)
val b1 = Await.result(binding, 1.seconds.dilated)
Expand Down Expand Up @@ -608,7 +608,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
val serverToClientNetworkBufferSize = 1000
val responseSize = 200000

val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
def request(i: Int) = HttpRequest(uri = s"http://$hostname:$port/$i", headers = headers.Connection("close") :: Nil)
def response(req: HttpRequest) = HttpResponse(entity = HttpEntity.Strict(ContentTypes.`text/plain(UTF-8)`, ByteString(req.uri.path.toString.takeRight(1) * responseSize)))

Expand Down Expand Up @@ -701,7 +701,7 @@ Host: example.com
"complete a request/response over https when request has `Connection: close` set" in Utils.assertAllStagesStopped {
// akka/akka-http#1219
val serverToClientNetworkBufferSize = 1000
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val request = HttpRequest(uri = s"https://akka.example.org", headers = headers.Connection("close") :: Nil)

// settings adapting network buffer sizes
Expand Down Expand Up @@ -899,7 +899,7 @@ Host: example.com
}

class TestSetup {
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
def configOverrides = ""

// automatically bind a server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ClientSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll {
"HTTP Client" should {

"reuse connection pool" in {
val (hostname, port) = SocketUtil.temporaryServerHostnameAndPort()
val (hostname, port) = SocketUtil2.temporaryServerHostnameAndPort()
val bindingFuture = Http().newServerAt(hostname, port).bindSync(_ => HttpResponse())
val binding = Await.result(bindingFuture, 3.seconds.dilated)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ClientTransportWithCustomResolverSpec extends AkkaSpecWithMaterializer("ak
"change to the desired destination" in {
val hostnameToFind = "some-name-out-there"
val portToFind = 21345
val (hostnameToUse, portToUse) = SocketUtil.temporaryServerHostnameAndPort()
val (hostnameToUse, portToUse) = SocketUtil2.temporaryServerHostnameAndPort()
val bindingFuture = Http().newServerAt(hostnameToUse, portToUse).bindSync(_ => HttpResponse())
val binding = Await.result(bindingFuture, 3.seconds.dilated)

Expand All @@ -50,7 +50,7 @@ class ClientTransportWithCustomResolverSpec extends AkkaSpecWithMaterializer("ak
"resolve not before a connection is needed" in {
val hostnameToFind = "some-name-out-there"
val portToFind = 21345
val (hostnameToUse, portToUse) = SocketUtil.temporaryServerHostnameAndPort()
val (hostnameToUse, portToUse) = SocketUtil2.temporaryServerHostnameAndPort()
val bindingFuture = Http().newServerAt(hostnameToUse, portToUse).bindSync(_ => HttpResponse())
val binding = Await.result(bindingFuture, 3.seconds.dilated)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class EntityDiscardingSpec extends AkkaSpecWithMaterializer {
// TODO consider improving this by storing a mutable "already materialized" flag somewhere
// TODO likely this is going to inter-op with the auto-draining as described in #18716
"should not allow draining a second time" in {
val (host, port) = SocketUtil.temporaryServerHostnameAndPort()
val (host, port) = SocketUtil2.temporaryServerHostnameAndPort()
val bound = Http().newServerAt(host, port).bindSync(req =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/csv(UTF-8)`, Source.fromIterator[ByteString](() => testData.iterator)))).futureValue
Expand Down
128 changes: 128 additions & 0 deletions akka-http-core/src/test/scala/akka/testkit/SocketUtil2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.testkit

import scala.collection.immutable
import scala.util.Random
import java.net.{ DatagramSocket, InetSocketAddress, NetworkInterface, StandardProtocolFamily }
import java.nio.channels.DatagramChannel
import java.nio.channels.ServerSocketChannel

import scala.util.control.NonFatal

/**
* Utilities to get free socket address.
*/
object SocketUtil2 {

val RANDOM_LOOPBACK_ADDRESS = "RANDOM_LOOPBACK_ADDRESS"

private val canBindOnAlternativeLoopbackAddresses = {
try {
SocketUtil2.temporaryServerAddress(address = "127.20.0.0")
true
} catch {
case _: java.net.BindException =>
false
}
}

sealed trait Protocol
final case object Tcp extends Protocol
final case object Udp extends Protocol
final case object Both extends Protocol

/** @return A port on 'localhost' that is currently available */
def temporaryLocalPort(udp: Boolean = false): Int = temporaryServerAddress("localhost", udp).getPort

/**
* Find a free local post on 'localhost' that is available on the given protocol
* If both UDP and TCP need to be free specify `Both`
*/
def temporaryLocalPort(protocol: Protocol): Int = {
def findBoth(tries: Int): Int = {
if (tries == 0) {
throw new RuntimeException("Unable to find a port that is free for tcp and udp")
}
val tcpPort = SocketUtil2.temporaryLocalPort(udp = false)
val ds: DatagramSocket = DatagramChannel.open().socket()
try {
ds.bind(new InetSocketAddress("localhost", tcpPort))
tcpPort
} catch {
case NonFatal(_) => findBoth(tries - 1)
} finally {
ds.close()
}
}

protocol match {
case Tcp => temporaryLocalPort(udp = false)
case Udp => temporaryLocalPort(udp = true)
case Both => findBoth(5)
}
}

/**
* @param address host address. If not set, a loopback IP from the 127.20.0.0/16 range is picked
* @param udp if true, select a port that is free for running a UDP server. Otherwise TCP.
* @return an address (host+port) that is currently available to bind on
*/
def temporaryServerAddress(address: String = RANDOM_LOOPBACK_ADDRESS, udp: Boolean = false): InetSocketAddress =
temporaryServerAddresses(1, address, udp).head

def temporaryServerAddresses(
numberOfAddresses: Int,
hostname: String = RANDOM_LOOPBACK_ADDRESS,
udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress] = {
Vector
.fill(numberOfAddresses) {

val address = hostname match {
case RANDOM_LOOPBACK_ADDRESS =>
// JDK limitation? You cannot bind on addresses matching the pattern 127.x.y.255,
// that's why the last component must be < 255
if (canBindOnAlternativeLoopbackAddresses) s"127.20.${Random.nextInt(256)}.${Random.nextInt(255)}"
else "127.0.0.1"
case other =>
other
}

val addr = new InetSocketAddress(address, 0)
try
if (udp) {
val ds = DatagramChannel.open().socket()
ds.bind(addr)
(ds, new InetSocketAddress(address, ds.getLocalPort))
} else {
val ss = ServerSocketChannel.open().socket()
ss.bind(addr)
(ss, new InetSocketAddress(address, ss.getLocalPort))
}
catch {
case NonFatal(ex) =>
throw new RuntimeException(s"Binding to $addr failed with ${ex.getMessage}", ex)
}
}
.collect { case (socket, address) => socket.close(); address }
}

def temporaryServerHostnameAndPort(interface: String = RANDOM_LOOPBACK_ADDRESS): (String, Int) = {
val socketAddress = temporaryServerAddress(interface)
socketAddress.getHostString -> socketAddress.getPort
}

def temporaryUdpIpv6Port(iface: NetworkInterface) = {
val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket()
serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0))
val port = serverSocket.getLocalPort
serverSocket.close()
port
}

def notBoundServerAddress(address: String): InetSocketAddress = new InetSocketAddress(address, 0)

def notBoundServerAddress(): InetSocketAddress = notBoundServerAddress("127.0.0.1")
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import akka.http.scaladsl.server.{ Directives, Route }
import akka.http.scaladsl.Http
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.stream.scaladsl.Source
import akka.testkit.{ ImplicitSender, LongRunningTest, SocketUtil }
import akka.testkit.{ ImplicitSender, LongRunningTest, SocketUtil2 }
import akka.util.{ ByteString, Timeout }
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
Expand Down Expand Up @@ -176,7 +176,7 @@ class AkkaHttpServerLatencyMultiNodeSpec extends MultiNodeSpec(AkkaHttpServerLat
enterBarrier("load-gen-ready")

runOn(server) {
val (_, port) = SocketUtil.temporaryServerHostnameAndPort()
val (_, port) = SocketUtil2.temporaryServerHostnameAndPort()
info(s"Binding Akka HTTP Server to port: $port @ ${myself}")
val futureBinding = Http().newServerAt("0.0.0.0", port).bind(routes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CustomMediaTypesSpec extends AkkaSpec with ScalaFutures

"allow registering custom media type" in {
import system.dispatcher
val (host, port) = SocketUtil.temporaryServerHostnameAndPort()
val (host, port) = SocketUtil2.temporaryServerHostnameAndPort()

//#application-custom

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.http.scaladsl.model.HttpProtocols._
import akka.http.scaladsl.model.RequestEntityAcceptance.Expected
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives
import akka.testkit.{ AkkaSpec, SocketUtil }
import akka.testkit.{ AkkaSpec, SocketUtil2 }
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.duration._
Expand All @@ -20,7 +20,7 @@ class CustomHttpMethodSpec extends AkkaSpec with ScalaFutures
"Http" should {
"allow registering custom method" in {
import system.dispatcher
val (host, port) = SocketUtil.temporaryServerHostnameAndPort()
val (host, port) = SocketUtil2.temporaryServerHostnameAndPort()

//#application-custom
import akka.http.scaladsl.settings.{ ParserSettings, ServerSettings }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import akka.testkit.{ AkkaSpec, SocketUtil }
import akka.testkit.{ AkkaSpec, SocketUtil2 }

class TimeoutDirectivesExamplesSpec extends RoutingSpec
with ScalaFutures with CompileOnlySpec {
Expand Down

0 comments on commit d68eaaf

Please sign in to comment.