Skip to content

Commit

Permalink
Make our ReverseProxies unbuffered and flushable. (#3253)
Browse files Browse the repository at this point in the history
In Go 1.12 setting `FlushInterval` to -1 makes it `Flush()` the ResponseWriter on each `Write()` (thanks @bradfitz).

We also need the `ResponseWriter` wrappers we have to implement `http.Flusher` for this to work.

Fixes: #3188
  • Loading branch information
mattmoor authored and knative-prow-robot committed Feb 17, 2019
1 parent 900acde commit 99eb090
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
2 changes: 2 additions & 0 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ func main() {
}

httpProxy = httputil.NewSingleHostReverseProxy(target)
httpProxy.FlushInterval = -1
h2cProxy = httputil.NewSingleHostReverseProxy(target)
h2cProxy.Transport = h2c.DefaultTransport
h2cProxy.FlushInterval = -1

activatorutil.SetupHeaderPruning(httpProxy)
activatorutil.SetupHeaderPruning(h2cProxy)
Expand Down
7 changes: 7 additions & 0 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (a *ActivationHandler) proxyRequest(w http.ResponseWriter, r *http.Request,
}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = a.Transport
proxy.FlushInterval = -1

attempts := int(1) // one attempt is always needed
proxy.ModifyResponse = func(r *http.Response) error {
Expand Down Expand Up @@ -115,6 +116,8 @@ type statusCapture struct {
statusCode int
}

var _ http.Flusher = (*statusCapture)(nil)

func (s *statusCapture) WriteHeader(statusCode int) {
s.statusCode = statusCode
s.ResponseWriter.WriteHeader(statusCode)
Expand All @@ -126,3 +129,7 @@ func (s *statusCapture) WriteHeader(statusCode int) {
func (s *statusCapture) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return websocket.HijackIfPossible(s.ResponseWriter)
}

func (s *statusCapture) Flush() {
s.ResponseWriter.(http.Flusher).Flush()
}
6 changes: 6 additions & 0 deletions pkg/queue/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ type timeoutWriter struct {
wroteOnce bool
}

var _ http.Flusher = (*timeoutWriter)(nil)

var _ http.ResponseWriter = (*timeoutWriter)(nil)

func (tw *timeoutWriter) Flush() {
tw.w.(http.Flusher).Flush()
}

// Hijack calls Hijack() on the wrapped http.ResponseWriter if it implements
// http.Hijacker interface, which is required for net/http/httputil/reverseproxy
// to handle connection upgrade/switching protocol. Otherwise returns an error.
Expand Down
10 changes: 4 additions & 6 deletions test/e2e/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestGRPC(t *testing.T) {
},
"DeploymentIsScaledDown",
test.ServingNamespace,
2*time.Minute,
3*time.Minute,
)
if err != nil {
t.Fatalf("Could not scale to zero: %v", err)
Expand Down Expand Up @@ -166,8 +166,7 @@ func TestGRPC(t *testing.T) {
for i := 0; i < count; i++ {
logger.Infof("Sending stream %d of %d", i+1, count)

// TODO(#3188): Responses less than 4KB are buffered indefinitely
want := payload(4096)
want := payload(10)

err = stream.Send(&ping.Request{Msg: want})
if err != nil {
Expand Down Expand Up @@ -198,10 +197,9 @@ func TestGRPC(t *testing.T) {
t.Run("streaming ping", streamTest)

waitForScaleToZero()

t.Run("unary ping after scale-to-zero", unaryTest)

// TODO(#3239): Fix gRPC streaming after cold start
//waitForScaleToZero()
//t.Run("streaming ping after scale-to-zero", streamTest)
// waitForScaleToZero()
// t.Run("streaming ping after scale-to-zero", streamTest)
}

0 comments on commit 99eb090

Please sign in to comment.