Skip to content

Commit

Permalink
Revert "Propagate client cert info to linkerd-fronted service linkerd…
Browse files Browse the repository at this point in the history
…#1153 (linkerd#1656)"

This reverts commit b0ea28a.
  • Loading branch information
hawkw committed Nov 14, 2017
1 parent cf42a0a commit 81f54c8
Show file tree
Hide file tree
Showing 25 changed files with 253 additions and 584 deletions.
21 changes: 0 additions & 21 deletions finagle/h2/src/test/scala/io/buoyant/test/h2/StreamTestUtils.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.buoyant.test.h2

import com.twitter.finagle.buoyant.h2.{Frame, Stream}
import com.twitter.io.Buf
import com.twitter.util.Future

object StreamTestUtils {
Expand All @@ -26,25 +25,6 @@ object StreamTestUtils {
}
}

def readDataStream(stream: Stream): Future[Buf] = {
stream.read().flatMap {
case frame: Frame.Data if frame.isEnd =>
val buf = frame.buf
val _ = frame.release()
Future.value(buf)
case frame: Frame.Data =>
val buf = frame.buf
val _ = frame.release()
readDataStream(stream).map(buf.concat)
case frame: Frame.Trailers =>
val _ = frame.release()
Future.value(Buf.Empty)
}
}

def readDataString(stream: Stream): Future[String] =
readDataStream(stream).map(Buf.Utf8.unapply).map(_.get)

/**
* Enhances a [[Stream]] by providing the [[readToEnd()]] function in the
* method position
Expand All @@ -53,7 +33,6 @@ object StreamTestUtils {
*/
implicit class ReadAllStream(val stream: Stream) extends AnyVal {
@inline def readToEnd: Future[Unit] = StreamTestUtils.readToEnd(stream)
@inline def readDataString: Future[String] = StreamTestUtils.readDataString(stream)
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,85 @@
package io.buoyant.linkerd.protocol.h2

import com.twitter.concurrent.AsyncQueue
import com.twitter.finagle.buoyant.H2
import com.twitter.finagle.buoyant.h2._
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.finagle.param.Stats
import com.twitter.finagle.stats.{InMemoryStatsReceiver, NullStatsReceiver}
import com.twitter.finagle.tracing.NullTracer
import com.twitter.finagle.{param => fparam, Status => _, _}
import com.twitter.io.Buf
import com.twitter.logging.Level
import com.twitter.util.{Future, Promise, Var}
import io.buoyant.linkerd.Linker
import io.buoyant.linkerd.protocol.H2Initializer
import io.buoyant.test.FunSuite
import io.buoyant.test.h2.StreamTestUtils._
import java.net.InetSocketAddress
import scala.collection.mutable

class H2EndToEndTest extends FunSuite {

case class Downstream(name: String, server: ListeningServer) {
val address = server.boundAddress.asInstanceOf[InetSocketAddress]
val port = address.getPort
val dentry = Dentry(
Path.read(s"/svs/$name"),
NameTree.read(s"/$$/inet/127.1/$port")
)
}

object Downstream {
def mk(name: String)(f: Request=>Future[Response]): Downstream = {
val service = Service.mk { req: Request => f(req) }
val server = H2.server
.configured(fparam.Label(name))
.configured(fparam.Tracer(NullTracer))
.serve(":*", service)
Downstream(name, server)
}

def const(name: String, value: String, status: Status = Status.Ok): Downstream =
mk(name) { _ =>
Future.value(Response(status, Stream.const(value)))
}

def promise(name: String): (Downstream, mutable.Seq[Promise[Response]]) = {
val ps = mutable.MutableList[Promise[Response]]()
val svc = mk(name) { _ =>
val p = new Promise[Response]()
ps += p
p
}
(svc, ps)
}
}

def upstream(server: ListeningServer) = {
val address = Address(server.boundAddress.asInstanceOf[InetSocketAddress])
val name = Name.Bound(Var.value(Addr.Bound(address)), address)
H2.client
.configured(fparam.Stats(NullStatsReceiver))
.configured(fparam.Tracer(NullTracer))
.newClient(name, "upstream").toService
}

def readDataStream(stream: Stream): Future[Buf] = {
stream.read().flatMap {
case frame: Frame.Data if frame.isEnd =>
val buf = frame.buf
val _ = frame.release()
Future.value(buf)
case frame: Frame.Data =>
val buf = frame.buf
val _ = frame.release()
readDataStream(stream).map(buf.concat)
case frame: Frame.Trailers =>
val _ = frame.release()
Future.value(Buf.Empty)
}
}

def readDataString(stream: Stream): Future[String] =
readDataStream(stream).map(Buf.Utf8.unapply).map(_.get)

test("single request") {
val stats = new InMemoryStatsReceiver

Expand All @@ -28,11 +97,11 @@ class H2EndToEndTest extends FunSuite {
|""".stripMargin

val linker = Linker.Initializers(Seq(H2Initializer)).load(config)
.configured(Stats(stats))
.configured(fparam.Stats(stats))
val router = linker.routers.head.initialize()
val server = router.servers.head.serve()

val client = Upstream.mk(server)
val client = upstream(server)
def get(host: String, path: String = "/")(f: Response => Unit) = {
val req = Request("http", Method.Get, host, path, Stream.empty())
val rsp = await(client(req))
Expand All @@ -41,7 +110,7 @@ class H2EndToEndTest extends FunSuite {

get("dog") { rsp =>
assert(rsp.status == Status.Ok)
assert(await(rsp.stream.readDataString) == "woof")
assert(await(readDataString(rsp.stream)) == "woof")
()
}
assert(stats.counters(Seq("rt", "h2", "client", s"$$/inet/127.1/${dog.port}", "connects")) == 1)
Expand All @@ -67,11 +136,11 @@ class H2EndToEndTest extends FunSuite {
|""".stripMargin

val linker = Linker.Initializers(Seq(H2Initializer)).load(config)
.configured(Stats(stats))
.configured(fparam.Stats(stats))
val router = linker.routers.head.initialize()
val server = router.servers.head.serve()

val client = Upstream.mk(server)
val client = upstream(server)


val req0 = Request("http", Method.Get, "dog", "/", Stream.empty())
Expand All @@ -90,11 +159,11 @@ class H2EndToEndTest extends FunSuite {

rsps(1).setValue(Response(Status.Ok, Stream.const("bow")))
val rsp1 = await(fRsp1)
assert(await(rsp1.stream.readDataString) == "bow")
assert(await(readDataString(rsp1.stream)) == "bow")

rsps(0).setValue(Response(Status.Ok, Stream.const("wow")))
val rsp0 = await(fRsp0)
assert(await(rsp0.stream.readDataString) == "wow")
assert(await(readDataString(rsp0.stream)) == "wow")

// should multiplex over a single connection
assert(stats.counters(Seq("rt", "h2", "client", s"$$/inet/127.1/${dog.port}", "connects")) == 1)
Expand All @@ -120,11 +189,11 @@ class H2EndToEndTest extends FunSuite {
|""".stripMargin

val linker = Linker.Initializers(Seq(H2Initializer)).load(config)
.configured(Stats(stats))
.configured(fparam.Stats(stats))
val router = linker.routers.head.initialize()
val server = router.servers.head.serve()

val client = Upstream.mk(server)
val client = upstream(server)

val req = Request("http", Method.Get, "dog", "/", Stream.empty())
val fRsp = client(req)
Expand All @@ -143,7 +212,7 @@ class H2EndToEndTest extends FunSuite {

val rsp = await(fRsp)

assert(await(rsp.stream.readDataString) == "bowwow")
assert(await(readDataString(rsp.stream)) == "bowwow")

await(client.close())
await(server.close())
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.finagle.buoyant.h2.{param => h2Param, _}
import com.twitter.finagle.buoyant.h2.param._
import com.twitter.finagle.buoyant.h2.service.H2Classifier
import com.twitter.finagle.buoyant.h2.{param => h2Param, _}
import com.twitter.finagle.buoyant.{ParamsMaybeWith, PathMatcher}
import com.twitter.finagle.client.StackClient
import com.twitter.finagle.filter.DtabStatsFilter
Expand All @@ -21,7 +21,6 @@ import io.buoyant.config.PolymorphicConfig
import io.buoyant.linkerd.protocol.h2.{H2ClassifierConfig, H2RequestAuthorizerConfig}
import io.buoyant.router.h2.ClassifiedRetries.{BufferSize, ClassificationTimeout}
import io.buoyant.router.h2.{ClassifiedRetryFilter, DupRequest}
import io.buoyant.router.http.ForwardClientCertFilter
import io.buoyant.router.{ClassifiedRetries, H2, RoutingFactory}
import io.netty.handler.ssl.ApplicationProtocolNames
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -154,12 +153,10 @@ class H2StaticClient(val configs: Seq[H2PrefixConfig]) extends H2Client with Sta
class H2PrefixConfig(prefix: PathMatcher) extends PrefixConfig(prefix) with H2ClientConfig

trait H2ClientConfig extends ClientConfig with H2EndpointConfig {
var forwardClientCert: Option[Boolean] = None

@JsonIgnore
override def params(vars: Map[String, String]): Stack.Params =
withEndpointParams(super.params(vars))
.maybeWith(forwardClientCert.map(ForwardClientCertFilter.Enabled))
}

@JsonTypeInfo(
Expand Down Expand Up @@ -234,7 +231,6 @@ case class RetryBufferSize(
class H2ServerConfig extends ServerConfig with H2EndpointConfig {

var maxConcurrentStreamsPerConnection: Option[Int] = None
val forwardClientCert: Option[Boolean] = None

@JsonIgnore
override val alpnProtocols: Option[Seq[String]] =
Expand All @@ -245,7 +241,6 @@ class H2ServerConfig extends ServerConfig with H2EndpointConfig {

override def withEndpointParams(params: Stack.Params): Stack.Params = super.withEndpointParams(params)
.maybeWith(maxConcurrentStreamsPerConnection.map(c => Settings.MaxConcurrentStreams(Some(c.toLong))))
.maybeWith(forwardClientCert.map(ForwardClientCertFilter.Enabled))

@JsonIgnore
override def serverParams = withEndpointParams(super.serverParams)
Expand Down

This file was deleted.

Loading

0 comments on commit 81f54c8

Please sign in to comment.