Skip to content

Commit

Permalink
=htc akka#2067 fix memory leak in server terminators (akka#2070)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso authored and raboof committed Jun 14, 2018
1 parent 7546223 commit 04ece9f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,21 @@ package akka.http.impl.engine.server

import java.util.concurrent.atomic.AtomicReference

import akka.Done
import akka.actor.{ ActorSystem, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.{ HttpConnectionTerminated, HttpServerTerminated, HttpTerminated }
import akka.http.scaladsl.settings.{ RoutingSettings, ServerSettings }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, StatusCodes }
import akka.stream.scaladsl.BidiFlow

import scala.concurrent.duration._
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.http.scaladsl.settings.ServerSettings
import akka.stream._
import akka.stream.scaladsl.BidiFlow
import akka.stream.stage._
import akka.util.PrettyDuration

import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.{ Failure, Success }

/**
Expand Down Expand Up @@ -77,6 +74,28 @@ private[http] final class MasterServerTerminator(log: LoggingAdapter) extends Se
}
}

/**
* Removes a previously registered per-connection terminator.
* Terminators must remove themselves like this once their respective connection is closed,
* otherwise they would leak and remain in the set indefinitely.
*
* @return true if the terminator has been successfully removed.
*/
@tailrec def removeConnection(terminator: ServerTerminator): Unit = {
terminators.get() match {
case v @ AliveConnectionTerminators(ts)
if (!terminators.compareAndSet(v, v.copy(ts = ts - terminator)))
removeConnection(terminator) // retry

case _: Terminating
// the `terminator` that we are being called with can only be one that already was registered,
// due to the register call happening during materialization, and the remove call happening during
// connection stream completion. Since we are in Terminating state, this means `terminate()` was called,
// and all existing terminators were invoked to `terminate()`. So if this is an existing one, we must not call
// terminate on it, as it would be the 2nd invocation.
}
}

// If a connection attempts to register once termination has started, it will immediately be rejected (though
// since termination also implies unbinding such new connections should not really happen).
def terminate(timeout: FiniteDuration)(implicit ex: ExecutionContext): Future[HttpTerminated] = {
Expand All @@ -92,7 +111,13 @@ private[http] final class MasterServerTerminator(log: LoggingAdapter) extends Se
if (terminators.compareAndSet(v, Terminating(timeout.fromNow))) {
// cause the termination for all connections
val connectionsTerminated = Future.sequence(ts.map { t
t.terminate(timeout)
// termination in general always succeeds, but we make sure here in order
// to not accidentally short-circuit terminating the other connection-terminators -- all must be terminated
t.terminate(timeout).recover {
case ex
log.warning("Ignoring termination failure of {}, failure was: {}", t, ex.getMessage)
HttpServerTerminated
}
})
val serverTerminated = connectionsTerminated.map(_ HttpServerTerminated)
termination.completeWith(serverTerminated)
Expand Down
8 changes: 8 additions & 0 deletions akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,21 @@ class HttpExt private[http] (private val config: Config)(implicit val system: Ex
.mapAsyncUnordered(settings.maxConnections) { incoming
try {
fullLayer
.watchTermination() {
case ((done, connectionTerminator), whenTerminates)
whenTerminates.onComplete({ _
masterTerminator.removeConnection(connectionTerminator)
})(fm.executionContext)
(done, connectionTerminator)
}
.addAttributes(prepareAttributes(settings, incoming))
.joinMat(incoming.flow)(Keep.left)
.mapMaterializedValue {
case (future, connectionTerminator)
masterTerminator.registerConnection(connectionTerminator)(fm.executionContext)
future // drop the terminator matValue, we already registered is which is all we need to do here
}

.run()
.recover {
// Ignore incoming errors from the connection as they will cancel the binding.
Expand Down

0 comments on commit 04ece9f

Please sign in to comment.