Skip to content

Commit

Permalink
Remove maxRequestKB maxResponseKB and set finagle's maxRequestSize an…
Browse files Browse the repository at this point in the history
…d maxResponseSize to max allowed(2GB) (linkerd#2298)

This PR's purpose is to remove `maxRequestKB` `maxResponseKB` and rely on finagle's maximum allowed size for fixed size messages which is 2GB. The semantics now are: 

- **Fixed size messages** (`Content-Length` set) are allowed up to 2GB and will be streamed if they are above the threshold specified in `streamAfterContentLengthKB`. Note that id these messages are larger than 2GB or have their `Content-Length` set to a larger value they will be rejected.
- **Chunk encoded messages** (Transfer-Encoding: chunked`) will always be streamed.
- **Streaming messages** (`Content-Length` not set) will be streamed.

Fixes: linkerd#2196

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev authored and adleong committed Jul 24, 2019
1 parent 8dbab84 commit 9026195
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 21 deletions.
6 changes: 1 addition & 5 deletions linkerd/docs/protocol-http.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ routers:
kind: io.l5d.methodAndHost
maxHeadersKB: 8
maxInitialLineKB: 4
maxRequestKB: 5120
maxResponseKB: 5120
servers:
- port: 5000
addForwardedHeader:
Expand All @@ -36,11 +34,9 @@ httpAccessLogRollPolicy | never | When to roll the logfile. Possible values: Nev
httpAccessLogAppend | true | Append to an existing logfile, or truncate it?
httpAccessLogRotateCount | -1 | How many rotated logfiles to keep around, maximum. -1 means to keep them all.
identifier | The `io.l5d.header.token` identifier | An identifier or list of identifiers. See [Http-specific identifiers](#http-1-1-identifiers).
streamAfterContentLengthKB | 5 | The threshold at which HTTP messages will be streamed if exceeded.
streamAfterContentLengthKB | 5 | The threshold at which HTTP messages will be streamed if exceeded. You can use this to allow for sufficiently small messages to be buffered, instead of always streamed
maxHeadersKB | 8 | The maximum size of all headers in an HTTP message.
maxInitialLineKB | 4 | The maximum size of an initial HTTP message line.
maxRequestKB | 5120 | The maximum size of a non-chunked HTTP request payload.
maxResponseKB | 5120 | The maximum size of a non-chunked HTTP response payload.
compressionLevel | `-1`, automatically compresses textual content types with compression level 6 | The compression level to use (on 0-9).
streamingEnabled | `true` | Streaming allows Linkerd to work with HTTP messages that have large (or infinite) content bodies using chunked encoding. Disabling this is highly discouraged.
tracePropagator | `io.l5d.default` | A trace propagator. See [Http-specific trace propagator](#http-1-1-trace-propagators).
Expand Down
3 changes: 1 addition & 2 deletions linkerd/examples/http.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ routers:
httpAccessLogRotateCount: -1
maxHeadersKB: 16
maxInitialLineKB: 16
maxRequestKB: 102400 # 100MB
maxResponseKB: 102400 # 100MB
streamAfterContentLengthKB: 102400 # 100MB
compressionLevel: 9
identifier: {kind: io.l5d.header}
dtab: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ class HttpEndToEndTest
.newClient(name, "upstream").toService
}

def basicConfig(dtab: Dtab) =
def basicConfig(dtab: Dtab, streaming: Boolean = false) =
s"""|routers:
|- protocol: http
| streamingEnabled: $streaming
| dtab: ${dtab.show}
| servers:
| - port: 0
Expand Down Expand Up @@ -922,7 +923,7 @@ class HttpEndToEndTest
/svc => /srv;
""")

val linker = Linker.Initializers(Seq(HttpInitializer)).load(basicConfig(dtab))
val linker = Linker.Initializers(Seq(HttpInitializer)).load(basicConfig(dtab, streaming = true))
val router = linker.routers.head.initialize()
val server = router.servers.head.serve()
val client = upstream(server)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package io.buoyant.linkerd.protocol

import com.twitter.conversions.StorageUnitOps._
import com.twitter.finagle.buoyant._
import com.twitter.finagle.http.param.{MaxRequestSize, Streaming}
import com.twitter.finagle.http.{Method, Request, Response, Version}
import com.twitter.finagle.{Http, Service, Stack}
import com.twitter.io.{Buf, Reader}
import com.twitter.util.{Future, Promise, StorageUnit}
import io.buoyant.test.{Awaits, BudgetedRetries}
import java.net.InetSocketAddress
import org.scalatest.{FunSuite, MustMatchers, OptionValues}

/**
*
* This test is to illustrate the behavior of `fixedLengthStreamedAfter` parameter. The following
* cases are covered:
*
* chunked message - a message with `Transfer-Encoding: chunked`. These are never aggregated and are
* always streamed
*
* non-chunked message - a message that has a known fixed size set via the "Content-Length" header.
* There are two options here. If the size of the message is less than the
* value set for streamAfterContentLengthKB, then this message will be
* buffered. If the size is more, then the message will be streamed.
*
* non-chunked streaming message - a message that is not chunk-encoded and does not have
* "Content-Length" would fall under this category. If that is the
* case, the message will be streamed.
*
*
* NB: A very important detail is that a non chunked message(i.e. "Content-Length" is set) that
* is larger than maxRequestKb cannot be processed. (even when the "Content-Length" value is
* larger than fixedLengthStreamedAfter)
*
*/



class HttpStreamingTest extends FunSuite
with Awaits
with MustMatchers
with OptionValues
with BudgetedRetries {


def withServerAndClient(
fixedLengthStreamedAfter: Option[StorageUnit],
messageBody: String,
maxRequestSize: Option[StorageUnit] = None,
withContentLength: Boolean = false,
withChunkedEncoding: Boolean = false
)(f: Request => Any) = {

val Address = "127.0.0.1:8080"
val params = Stack.Params.empty.maybeWith(fixedLengthStreamedAfter.map(v => Streaming(v)))
.maybeWith(maxRequestSize.map(MaxRequestSize(_)))

val receivedFut: Promise[Request] = Promise()
val svc = Service.mk[Request, Response] { req =>
receivedFut.setValue(req)
Future.value(Response(req))
}

val server = Http.server.withParams(params).serve(Address, svc)
val addr = server.boundAddress.asInstanceOf[InetSocketAddress]
val client = Http.client.newService(s"${addr.getHostName}:${addr.getPort}", "client")


val req = Request(
Version.Http11,
Method.Get,
Address,
Reader.fromBuf(Buf.Utf8(messageBody), 1)
)


if (withContentLength) {
req.headerMap.put("Content-Length", messageBody.length.toString)
}
if (withChunkedEncoding) {
req.headerMap.put("Transfer-Encoding", "chunked")
}

client(req)
f(await(receivedFut))
await(server.close())
await(client.close())

}


test("server will stream fixed size messages exceeding streamAfterContentLengthKB") {
val messageBody = "must-stream"
withServerAndClient(Some((messageBody.length - 1).bytes), messageBody, withContentLength = true)
{ receivedRequest =>
assert(receivedRequest.isChunked)
val receivedBody = await(Reader.readAll(receivedRequest.reader))
assert(receivedBody == Buf.Utf8(messageBody))
assert(receivedRequest.contentString.isEmpty)
}

}

test("server will buffer fixed size messages less than streamAfterContentLengthKB in size") {
val messageBody = "must-not-stream"
withServerAndClient(Some((messageBody.length + 1).bytes), messageBody, withContentLength = true)
{ receivedRequest =>
assert(!receivedRequest.isChunked)
assert(receivedRequest.contentString == messageBody)
}

}



test(
"server will stream messages missing Content-Length (even if below streamAfterContentLengthKB in size)"
) {
val messageBody = "must-stream"
withServerAndClient(Some(1.gigabyte), messageBody) { receivedRequest =>
val receivedBody = await(Reader.readAll(receivedRequest.reader))
assert(receivedBody == Buf.Utf8(messageBody))
assert(receivedRequest.isChunked)
assert(receivedRequest.contentString.isEmpty)
}
}


test("server will always stream messages that have Content-Length and Transfer-Encoding:chunked")
{
val messageBody = "must-stream"
withServerAndClient(
Some(0.bytes),
messageBody,
withChunkedEncoding = true,
withContentLength = true
) { receivedRequest =>
val receivedBody = await(Reader.readAll(receivedRequest.reader))
assert(receivedBody == Buf.Utf8(messageBody))
assert(receivedRequest.isChunked)
assert(receivedRequest.contentString.isEmpty)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import com.twitter.finagle.buoyant.{ParamsMaybeWith, PathMatcher}
import com.twitter.finagle.client.{AddrMetadataExtraction, StackClient}
import com.twitter.finagle.filter.DtabStatsFilter
import com.twitter.finagle.http.filter.{ClientDtabContextFilter, ServerDtabContextFilter, StatsFilter}
import com.twitter.finagle.http.param.FixedLengthStreamedAfter
import com.twitter.finagle.http.{Request, Response, param => hparam}
import com.twitter.finagle.liveness.FailureAccrualFactory
import com.twitter.finagle.service.Retries
import com.twitter.finagle.stack.nilStack
import com.twitter.finagle.tracing.TraceInitializerFilter
import com.twitter.finagle.{ServiceFactory, Stack, param => fparam}
import com.twitter.finagle.{ServiceFactory, Stack}
import com.twitter.logging.Policy
import io.buoyant.linkerd.protocol.HttpRequestAuthorizerConfig.param
import io.buoyant.linkerd.protocol.http._
Expand Down Expand Up @@ -214,8 +213,6 @@ case class HttpConfig(
streamAfterContentLengthKB: Option[Int],
maxHeadersKB: Option[Int],
maxInitialLineKB: Option[Int],
maxRequestKB: Option[Int],
maxResponseKB: Option[Int],
streamingEnabled: Option[Boolean],
compressionLevel: Option[Int],
tracePropagator: Option[HttpTracePropagatorConfig]
Expand All @@ -225,6 +222,15 @@ case class HttpConfig(
var servers: Seq[HttpServerConfig] = Nil
var service: Option[HttpSvc] = None

private val streaming = streamingEnabled -> streamAfterContentLengthKB match {
case (Some(true), None) => hparam.Streaming(true)
case (_, Some(streamAfter)) => hparam.Streaming(streamAfter.kilobytes)
case _ => hparam.Streaming(false)
}

// imposed by finagle (https://github.com/twitter/finagle/issues/780)
private val MaxReqRespSize = 2.gigabytes - 1.byte

@JsonIgnore
override val protocol: ProtocolInitializer = HttpInitializer

Expand Down Expand Up @@ -253,10 +259,9 @@ case class HttpConfig(
.maybeWith(httpAccessLogRotateCount.map(AccessLogger.param.RotateCount.apply))
.maybeWith(maxHeadersKB.map(kb => hparam.MaxHeaderSize(kb.kilobytes)))
.maybeWith(streamAfterContentLengthKB.map(kb => hparam.FixedLengthStreamedAfter(kb.kilobytes)))
.maybeWith(maxInitialLineKB.map(kb => hparam.MaxInitialLineSize(kb.kilobytes)))
.maybeWith(maxRequestKB.map(kb => hparam.MaxRequestSize(kb.kilobytes)))
.maybeWith(maxResponseKB.map(kb => hparam.MaxResponseSize(kb.kilobytes)))
.maybeWith(streamingEnabled.map(hparam.Streaming(_)))
.maybeWith(Some(streaming))
.maybeWith(Some(hparam.MaxRequestSize(MaxReqRespSize)))
.maybeWith(Some(hparam.MaxResponseSize(MaxReqRespSize)))
.maybeWith(compressionLevel.map(hparam.CompressionLevel(_)))
.maybeWith(combinedIdentifier(params))
.maybeWith(tracePropagator.map(tp => HttpTracePropagatorConfig.Param(tp.mk(params))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class HttpConfigTest extends FunSuite with Awaits {
| kind: io.l5d.methodAndHost
|maxHeadersKB: 8
|maxInitialLineKB: 4
|maxRequestKB: 5120
|maxResponseKB: 5120
|streamingEnabled: true
|servers:
|- port: 5000
""".stripMargin
Expand All @@ -43,8 +42,8 @@ class HttpConfigTest extends FunSuite with Awaits {
assert(config.httpAccessLogRotateCount.get == -1)
assert(config.maxHeadersKB.get == 8)
assert(config.maxInitialLineKB.get == 4)
assert(config.maxRequestKB.get == 5120)
assert(config.maxResponseKB.get == 5120)
assert(config.streamingEnabled.contains(true))

}

test("default identifier") {
Expand Down

0 comments on commit 9026195

Please sign in to comment.