From 902619529f4c4b690400e46308e7e594a49b5e34 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 24 Jul 2019 19:50:52 +0300 Subject: [PATCH] Remove maxRequestKB maxResponseKB and set finagle's maxRequestSize and maxResponseSize to max allowed(2GB) (#2298) This PR's purpose is to remove `maxRequestKB` `maxResponseKB` and rely on finagle's maximum allowed size for fixed size messages which is 2GB. The semantics now are: - **Fixed size messages** (`Content-Length` set) are allowed up to 2GB and will be streamed if they are above the threshold specified in `streamAfterContentLengthKB`. Note that id these messages are larger than 2GB or have their `Content-Length` set to a larger value they will be rejected. - **Chunk encoded messages** (Transfer-Encoding: chunked`) will always be streamed. - **Streaming messages** (`Content-Length` not set) will be streamed. Fixes: #2196 Signed-off-by: Zahari Dichev --- linkerd/docs/protocol-http.md | 6 +- linkerd/examples/http.yaml | 3 +- .../linkerd/protocol/HttpEndToEndTest.scala | 5 +- .../linkerd/protocol/HttpStreamingTest.scala | 146 ++++++++++++++++++ .../buoyant/linkerd/protocol/HttpConfig.scala | 21 ++- .../protocol/http/HttpConfigTest.scala | 7 +- 6 files changed, 167 insertions(+), 21 deletions(-) create mode 100644 linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpStreamingTest.scala diff --git a/linkerd/docs/protocol-http.md b/linkerd/docs/protocol-http.md index 9e18c54da9..6bc464e9c9 100644 --- a/linkerd/docs/protocol-http.md +++ b/linkerd/docs/protocol-http.md @@ -13,8 +13,6 @@ routers: kind: io.l5d.methodAndHost maxHeadersKB: 8 maxInitialLineKB: 4 - maxRequestKB: 5120 - maxResponseKB: 5120 servers: - port: 5000 addForwardedHeader: @@ -36,11 +34,9 @@ httpAccessLogRollPolicy | never | When to roll the logfile. Possible values: Nev httpAccessLogAppend | true | Append to an existing logfile, or truncate it? httpAccessLogRotateCount | -1 | How many rotated logfiles to keep around, maximum. -1 means to keep them all. identifier | The `io.l5d.header.token` identifier | An identifier or list of identifiers. See [Http-specific identifiers](#http-1-1-identifiers). -streamAfterContentLengthKB | 5 | The threshold at which HTTP messages will be streamed if exceeded. +streamAfterContentLengthKB | 5 | The threshold at which HTTP messages will be streamed if exceeded. You can use this to allow for sufficiently small messages to be buffered, instead of always streamed maxHeadersKB | 8 | The maximum size of all headers in an HTTP message. maxInitialLineKB | 4 | The maximum size of an initial HTTP message line. -maxRequestKB | 5120 | The maximum size of a non-chunked HTTP request payload. -maxResponseKB | 5120 | The maximum size of a non-chunked HTTP response payload. compressionLevel | `-1`, automatically compresses textual content types with compression level 6 | The compression level to use (on 0-9). streamingEnabled | `true` | Streaming allows Linkerd to work with HTTP messages that have large (or infinite) content bodies using chunked encoding. Disabling this is highly discouraged. tracePropagator | `io.l5d.default` | A trace propagator. See [Http-specific trace propagator](#http-1-1-trace-propagators). diff --git a/linkerd/examples/http.yaml b/linkerd/examples/http.yaml index 11a0a1827e..55e0e83fb9 100644 --- a/linkerd/examples/http.yaml +++ b/linkerd/examples/http.yaml @@ -11,8 +11,7 @@ routers: httpAccessLogRotateCount: -1 maxHeadersKB: 16 maxInitialLineKB: 16 - maxRequestKB: 102400 # 100MB - maxResponseKB: 102400 # 100MB + streamAfterContentLengthKB: 102400 # 100MB compressionLevel: 9 identifier: {kind: io.l5d.header} dtab: | diff --git a/linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpEndToEndTest.scala b/linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpEndToEndTest.scala index 0dc4d7064a..51087e6021 100644 --- a/linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpEndToEndTest.scala +++ b/linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpEndToEndTest.scala @@ -74,9 +74,10 @@ class HttpEndToEndTest .newClient(name, "upstream").toService } - def basicConfig(dtab: Dtab) = + def basicConfig(dtab: Dtab, streaming: Boolean = false) = s"""|routers: |- protocol: http + | streamingEnabled: $streaming | dtab: ${dtab.show} | servers: | - port: 0 @@ -922,7 +923,7 @@ class HttpEndToEndTest /svc => /srv; """) - val linker = Linker.Initializers(Seq(HttpInitializer)).load(basicConfig(dtab)) + val linker = Linker.Initializers(Seq(HttpInitializer)).load(basicConfig(dtab, streaming = true)) val router = linker.routers.head.initialize() val server = router.servers.head.serve() val client = upstream(server) diff --git a/linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpStreamingTest.scala b/linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpStreamingTest.scala new file mode 100644 index 0000000000..bfcfec0596 --- /dev/null +++ b/linkerd/protocol/http/src/e2e/scala/io/buoyant/linkerd/protocol/HttpStreamingTest.scala @@ -0,0 +1,146 @@ +package io.buoyant.linkerd.protocol + +import com.twitter.conversions.StorageUnitOps._ +import com.twitter.finagle.buoyant._ +import com.twitter.finagle.http.param.{MaxRequestSize, Streaming} +import com.twitter.finagle.http.{Method, Request, Response, Version} +import com.twitter.finagle.{Http, Service, Stack} +import com.twitter.io.{Buf, Reader} +import com.twitter.util.{Future, Promise, StorageUnit} +import io.buoyant.test.{Awaits, BudgetedRetries} +import java.net.InetSocketAddress +import org.scalatest.{FunSuite, MustMatchers, OptionValues} + +/** + * + * This test is to illustrate the behavior of `fixedLengthStreamedAfter` parameter. The following + * cases are covered: + * + * chunked message - a message with `Transfer-Encoding: chunked`. These are never aggregated and are + * always streamed + * + * non-chunked message - a message that has a known fixed size set via the "Content-Length" header. + * There are two options here. If the size of the message is less than the + * value set for streamAfterContentLengthKB, then this message will be + * buffered. If the size is more, then the message will be streamed. + * + * non-chunked streaming message - a message that is not chunk-encoded and does not have + * "Content-Length" would fall under this category. If that is the + * case, the message will be streamed. + * + * + * NB: A very important detail is that a non chunked message(i.e. "Content-Length" is set) that + * is larger than maxRequestKb cannot be processed. (even when the "Content-Length" value is + * larger than fixedLengthStreamedAfter) + * + */ + + + +class HttpStreamingTest extends FunSuite + with Awaits + with MustMatchers + with OptionValues + with BudgetedRetries { + + + def withServerAndClient( + fixedLengthStreamedAfter: Option[StorageUnit], + messageBody: String, + maxRequestSize: Option[StorageUnit] = None, + withContentLength: Boolean = false, + withChunkedEncoding: Boolean = false + )(f: Request => Any) = { + + val Address = "127.0.0.1:8080" + val params = Stack.Params.empty.maybeWith(fixedLengthStreamedAfter.map(v => Streaming(v))) + .maybeWith(maxRequestSize.map(MaxRequestSize(_))) + + val receivedFut: Promise[Request] = Promise() + val svc = Service.mk[Request, Response] { req => + receivedFut.setValue(req) + Future.value(Response(req)) + } + + val server = Http.server.withParams(params).serve(Address, svc) + val addr = server.boundAddress.asInstanceOf[InetSocketAddress] + val client = Http.client.newService(s"${addr.getHostName}:${addr.getPort}", "client") + + + val req = Request( + Version.Http11, + Method.Get, + Address, + Reader.fromBuf(Buf.Utf8(messageBody), 1) + ) + + + if (withContentLength) { + req.headerMap.put("Content-Length", messageBody.length.toString) + } + if (withChunkedEncoding) { + req.headerMap.put("Transfer-Encoding", "chunked") + } + + client(req) + f(await(receivedFut)) + await(server.close()) + await(client.close()) + + } + + + test("server will stream fixed size messages exceeding streamAfterContentLengthKB") { + val messageBody = "must-stream" + withServerAndClient(Some((messageBody.length - 1).bytes), messageBody, withContentLength = true) + { receivedRequest => + assert(receivedRequest.isChunked) + val receivedBody = await(Reader.readAll(receivedRequest.reader)) + assert(receivedBody == Buf.Utf8(messageBody)) + assert(receivedRequest.contentString.isEmpty) + } + + } + + test("server will buffer fixed size messages less than streamAfterContentLengthKB in size") { + val messageBody = "must-not-stream" + withServerAndClient(Some((messageBody.length + 1).bytes), messageBody, withContentLength = true) + { receivedRequest => + assert(!receivedRequest.isChunked) + assert(receivedRequest.contentString == messageBody) + } + + } + + + + test( + "server will stream messages missing Content-Length (even if below streamAfterContentLengthKB in size)" + ) { + val messageBody = "must-stream" + withServerAndClient(Some(1.gigabyte), messageBody) { receivedRequest => + val receivedBody = await(Reader.readAll(receivedRequest.reader)) + assert(receivedBody == Buf.Utf8(messageBody)) + assert(receivedRequest.isChunked) + assert(receivedRequest.contentString.isEmpty) + } + } + + + test("server will always stream messages that have Content-Length and Transfer-Encoding:chunked") + { + val messageBody = "must-stream" + withServerAndClient( + Some(0.bytes), + messageBody, + withChunkedEncoding = true, + withContentLength = true + ) { receivedRequest => + val receivedBody = await(Reader.readAll(receivedRequest.reader)) + assert(receivedBody == Buf.Utf8(messageBody)) + assert(receivedRequest.isChunked) + assert(receivedRequest.contentString.isEmpty) + } + } + +} diff --git a/linkerd/protocol/http/src/main/scala/io/buoyant/linkerd/protocol/HttpConfig.scala b/linkerd/protocol/http/src/main/scala/io/buoyant/linkerd/protocol/HttpConfig.scala index fa4e943880..15af6a1dfd 100644 --- a/linkerd/protocol/http/src/main/scala/io/buoyant/linkerd/protocol/HttpConfig.scala +++ b/linkerd/protocol/http/src/main/scala/io/buoyant/linkerd/protocol/HttpConfig.scala @@ -11,13 +11,12 @@ import com.twitter.finagle.buoyant.{ParamsMaybeWith, PathMatcher} import com.twitter.finagle.client.{AddrMetadataExtraction, StackClient} import com.twitter.finagle.filter.DtabStatsFilter import com.twitter.finagle.http.filter.{ClientDtabContextFilter, ServerDtabContextFilter, StatsFilter} -import com.twitter.finagle.http.param.FixedLengthStreamedAfter import com.twitter.finagle.http.{Request, Response, param => hparam} import com.twitter.finagle.liveness.FailureAccrualFactory import com.twitter.finagle.service.Retries import com.twitter.finagle.stack.nilStack import com.twitter.finagle.tracing.TraceInitializerFilter -import com.twitter.finagle.{ServiceFactory, Stack, param => fparam} +import com.twitter.finagle.{ServiceFactory, Stack} import com.twitter.logging.Policy import io.buoyant.linkerd.protocol.HttpRequestAuthorizerConfig.param import io.buoyant.linkerd.protocol.http._ @@ -214,8 +213,6 @@ case class HttpConfig( streamAfterContentLengthKB: Option[Int], maxHeadersKB: Option[Int], maxInitialLineKB: Option[Int], - maxRequestKB: Option[Int], - maxResponseKB: Option[Int], streamingEnabled: Option[Boolean], compressionLevel: Option[Int], tracePropagator: Option[HttpTracePropagatorConfig] @@ -225,6 +222,15 @@ case class HttpConfig( var servers: Seq[HttpServerConfig] = Nil var service: Option[HttpSvc] = None + private val streaming = streamingEnabled -> streamAfterContentLengthKB match { + case (Some(true), None) => hparam.Streaming(true) + case (_, Some(streamAfter)) => hparam.Streaming(streamAfter.kilobytes) + case _ => hparam.Streaming(false) + } + + // imposed by finagle (https://github.com/twitter/finagle/issues/780) + private val MaxReqRespSize = 2.gigabytes - 1.byte + @JsonIgnore override val protocol: ProtocolInitializer = HttpInitializer @@ -253,10 +259,9 @@ case class HttpConfig( .maybeWith(httpAccessLogRotateCount.map(AccessLogger.param.RotateCount.apply)) .maybeWith(maxHeadersKB.map(kb => hparam.MaxHeaderSize(kb.kilobytes))) .maybeWith(streamAfterContentLengthKB.map(kb => hparam.FixedLengthStreamedAfter(kb.kilobytes))) - .maybeWith(maxInitialLineKB.map(kb => hparam.MaxInitialLineSize(kb.kilobytes))) - .maybeWith(maxRequestKB.map(kb => hparam.MaxRequestSize(kb.kilobytes))) - .maybeWith(maxResponseKB.map(kb => hparam.MaxResponseSize(kb.kilobytes))) - .maybeWith(streamingEnabled.map(hparam.Streaming(_))) + .maybeWith(Some(streaming)) + .maybeWith(Some(hparam.MaxRequestSize(MaxReqRespSize))) + .maybeWith(Some(hparam.MaxResponseSize(MaxReqRespSize))) .maybeWith(compressionLevel.map(hparam.CompressionLevel(_))) .maybeWith(combinedIdentifier(params)) .maybeWith(tracePropagator.map(tp => HttpTracePropagatorConfig.Param(tp.mk(params)))) diff --git a/linkerd/protocol/http/src/test/scala/io/buoyant/linkerd/protocol/http/HttpConfigTest.scala b/linkerd/protocol/http/src/test/scala/io/buoyant/linkerd/protocol/http/HttpConfigTest.scala index 372a47e684..06157d3c24 100644 --- a/linkerd/protocol/http/src/test/scala/io/buoyant/linkerd/protocol/http/HttpConfigTest.scala +++ b/linkerd/protocol/http/src/test/scala/io/buoyant/linkerd/protocol/http/HttpConfigTest.scala @@ -31,8 +31,7 @@ class HttpConfigTest extends FunSuite with Awaits { | kind: io.l5d.methodAndHost |maxHeadersKB: 8 |maxInitialLineKB: 4 - |maxRequestKB: 5120 - |maxResponseKB: 5120 + |streamingEnabled: true |servers: |- port: 5000 """.stripMargin @@ -43,8 +42,8 @@ class HttpConfigTest extends FunSuite with Awaits { assert(config.httpAccessLogRotateCount.get == -1) assert(config.maxHeadersKB.get == 8) assert(config.maxInitialLineKB.get == 4) - assert(config.maxRequestKB.get == 5120) - assert(config.maxResponseKB.get == 5120) + assert(config.streamingEnabled.contains(true)) + } test("default identifier") {