diff --git a/cmd/activator/main.go b/cmd/activator/main.go index f0bc2351b890..bac11496860f 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -93,6 +93,8 @@ var ( masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + enableNetworkProbe = flag.Bool("enable-network-probe", true, "Whether to replace request retries with network level probing.") + logger *zap.SugaredLogger statSink *websocket.ManagedConnection @@ -237,6 +239,14 @@ func main() { Steps: maxRetries, } rt := activatorutil.NewRetryRoundTripper(activatorutil.AutoTransport, logger, backoffSettings, shouldRetry) + getProbeCount := 0 + // When network probing is enabled remove the retrying transport + // and pass in the retry count for our network probes instead. + if *enableNetworkProbe { + logger.Info("Enabling network probing for activation.") + getProbeCount = maxRetries + rt = activatorutil.AutoTransport + } // Open a websocket connection to the autoscaler autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s:%s", "autoscaler", system.Namespace(), utils.GetClusterDomainName(), "8080") @@ -253,7 +263,14 @@ func main() { // Create activation handler chain // Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first var ah http.Handler - ah = &activatorhandler.ActivationHandler{Activator: a, Transport: rt, Logger: logger, Reporter: reporter, Throttler: throttler} + ah = &activatorhandler.ActivationHandler{ + Activator: a, + Transport: rt, + Logger: logger, + Reporter: reporter, + Throttler: throttler, + GetProbeCount: getProbeCount, + } ah = &activatorhandler.EnforceMaxContentLengthHandler{MaxContentLengthBytes: maxUploadBytes, NextHandler: ah} ah = activatorhandler.NewRequestEventHandler(reqChan, ah) ah = &activatorhandler.FilteringHandler{NextHandler: ah} diff --git a/pkg/activator/handler/handler.go b/pkg/activator/handler/handler.go index 0ef5600ce3d7..83e7128aabfc 100644 --- a/pkg/activator/handler/handler.go +++ b/pkg/activator/handler/handler.go @@ -27,7 +27,9 @@ import ( "github.com/knative/serving/pkg/activator" "github.com/knative/serving/pkg/activator/util" pkghttp "github.com/knative/serving/pkg/http" + "github.com/knative/serving/pkg/network" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" ) // ActivationHandler will wait for an active endpoint for a revision @@ -38,6 +40,12 @@ type ActivationHandler struct { Transport http.RoundTripper Reporter activator.StatsReporter Throttler *activator.Throttler + + // GetProbeCount is the number of attempts we should + // make to network probe the queue-proxy after the revision becomes + // ready before forwarding the payload. If zero, a network probe + // is not required. + GetProbeCount int } func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -61,7 +69,53 @@ func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } err := a.Throttler.Try(revID, func() { - attempts, httpStatus := a.proxyRequest(w, r, target) + var ( + attempts int + httpStatus int + ) + // If a GET probe interval has been configured, then probe + // the queue-proxy with our network probe header until it + // returns a 200 status code. + success := (a.GetProbeCount == 0) + if !success { + probeReq := &http.Request{ + Method: http.MethodGet, + URL: target, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Host: r.Host, + Header: map[string][]string{ + http.CanonicalHeaderKey(network.ProbeHeaderName): []string{"true"}, + }, + } + settings := wait.Backoff{ + Duration: 100 * time.Millisecond, + Factor: 1.3, + Steps: a.GetProbeCount, + } + err := wait.ExponentialBackoff(settings, func() (bool, error) { + probeResp, err := a.Transport.RoundTrip(probeReq) + if err != nil { + a.Logger.Errorw("Pod probe failed", zap.Error(err)) + return false, nil + } + httpStatus = probeResp.StatusCode + if httpStatus == http.StatusServiceUnavailable { + a.Logger.Errorf("Pod probe sent status: %d", httpStatus) + return false, nil + } + return true, nil + }) + success = (err == nil) && httpStatus == http.StatusOK + } + if success { + // Once we see a successful probe, send traffic. + attempts, httpStatus = a.proxyRequest(w, r, target) + } else { + httpStatus = http.StatusInternalServerError + w.WriteHeader(httpStatus) + } // Report the metrics duration := time.Since(start) diff --git a/pkg/activator/handler/handler_test.go b/pkg/activator/handler/handler_test.go index 3fcbeb627faa..cbf56af33aab 100644 --- a/pkg/activator/handler/handler_test.go +++ b/pkg/activator/handler/handler_test.go @@ -29,6 +29,7 @@ import ( "github.com/knative/serving/pkg/activator" "github.com/knative/serving/pkg/activator/util" "github.com/knative/serving/pkg/apis/serving/v1alpha1" + "github.com/knative/serving/pkg/network" "github.com/knative/serving/pkg/queue" ) @@ -95,7 +96,10 @@ func TestActivationHandler(t *testing.T) { wantBody string wantCode int wantErr error + probeErr error + probeCode int attempts string + gpc int endpointsGetter func(activator.RevisionID) (int32, error) reporterCalls []reporterCall }{{ @@ -124,6 +128,7 @@ func TestActivationHandler(t *testing.T) { Config: "config-real-name", StatusCode: http.StatusOK, }}, + gpc: 1, }, { label: "active endpoint with missing count header", namespace: testNamespace, @@ -158,6 +163,56 @@ func TestActivationHandler(t *testing.T) { wantErr: nil, endpointsGetter: goodEndpointsGetter, reporterCalls: nil, + }, { + label: "active endpoint (probe failure)", + namespace: testNamespace, + name: testRevName, + probeErr: errors.New("probe error"), + wantCode: http.StatusInternalServerError, + endpointsGetter: goodEndpointsGetter, + gpc: 1, + reporterCalls: []reporterCall{{ + Op: "ReportRequestCount", + Namespace: testNamespace, + Revision: testRevName, + Service: "service-real-name", + Config: "config-real-name", + StatusCode: http.StatusInternalServerError, + Attempts: 0, + Value: 1, + }, { + Op: "ReportResponseTime", + Namespace: testNamespace, + Revision: testRevName, + Service: "service-real-name", + Config: "config-real-name", + StatusCode: http.StatusInternalServerError, + }}, + }, { + label: "active endpoint (probe 500)", + namespace: testNamespace, + name: testRevName, + probeCode: http.StatusServiceUnavailable, + wantCode: http.StatusInternalServerError, + endpointsGetter: goodEndpointsGetter, + gpc: 1, + reporterCalls: []reporterCall{{ + Op: "ReportRequestCount", + Namespace: testNamespace, + Revision: testRevName, + Service: "service-real-name", + Config: "config-real-name", + StatusCode: http.StatusInternalServerError, + Attempts: 0, + Value: 1, + }, { + Op: "ReportResponseTime", + Namespace: testNamespace, + Revision: testRevName, + Service: "service-real-name", + Config: "config-real-name", + StatusCode: http.StatusInternalServerError, + }}, }, { label: "request error", namespace: testNamespace, @@ -219,12 +274,20 @@ func TestActivationHandler(t *testing.T) { attempts: "hi there", endpointsGetter: brokenEndpointGetter, reporterCalls: nil, - }, - } + }} for _, e := range examples { t.Run(e.label, func(t *testing.T) { rt := util.RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + if r.Header.Get(network.ProbeHeaderName) != "" { + if e.probeErr != nil { + return nil, e.probeErr + } + fake := httptest.NewRecorder() + fake.WriteHeader(e.probeCode) + fake.WriteString("queue") + return fake.Result(), nil + } if e.wantErr != nil { return nil, e.wantErr } @@ -242,13 +305,19 @@ func TestActivationHandler(t *testing.T) { reporter := &fakeReporter{} params := queue.BreakerParams{QueueDepth: 1000, MaxConcurrency: 1000, InitialCapacity: 0} - throttlerParams := activator.ThrottlerParams{BreakerParams: params, Logger: TestLogger(t), GetRevision: stubRevisionGetter, GetEndpoints: e.endpointsGetter} + throttlerParams := activator.ThrottlerParams{ + BreakerParams: params, + Logger: TestLogger(t), + GetRevision: stubRevisionGetter, + GetEndpoints: e.endpointsGetter, + } handler := ActivationHandler{ - Activator: act, - Transport: rt, - Logger: TestLogger(t), - Reporter: reporter, - Throttler: activator.NewThrottler(throttlerParams), + Activator: act, + Transport: rt, + Logger: TestLogger(t), + Reporter: reporter, + Throttler: activator.NewThrottler(throttlerParams), + GetProbeCount: e.gpc, } resp := httptest.NewRecorder() @@ -256,6 +325,7 @@ func TestActivationHandler(t *testing.T) { req := httptest.NewRequest("POST", "http://example.com", nil) req.Header.Set(activator.RevisionHeaderNamespace, e.namespace) req.Header.Set(activator.RevisionHeaderName, e.name) + handler.ServeHTTP(resp, req) if resp.Code != e.wantCode { diff --git a/test/e2e/grpc_test.go b/test/e2e/grpc_test.go index 9f10a269753d..b71e6a5e6b5e 100644 --- a/test/e2e/grpc_test.go +++ b/test/e2e/grpc_test.go @@ -20,7 +20,6 @@ package e2e import ( "context" - "crypto/rand" "io" "testing" "time" @@ -35,7 +34,113 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestGRPC(t *testing.T) { +type grpcTest func(*testing.T, test.ResourceNames, *test.Clients, string, string) + +func unaryTest(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) { + t.Helper() + t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain) + conn, err := grpc.Dial( + host+":80", + grpc.WithAuthority(domain), + grpc.WithInsecure(), + ) + if err != nil { + t.Fatalf("fail to dial: %v", err) + } + defer conn.Close() + + pc := ping.NewPingServiceClient(conn) + t.Log("Testing unary Ping") + + want := &ping.Request{Msg: "Hello!"} + + got, err := pc.Ping(context.TODO(), want) + if err != nil { + t.Fatalf("Couldn't send request: %v", err) + } + + if got.Msg != want.Msg { + t.Errorf("Response = %q, want = %q", got.Msg, want.Msg) + } +} + +func streamTest(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) { + t.Helper() + t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain) + conn, err := grpc.Dial( + host+":80", + grpc.WithAuthority(domain), + grpc.WithInsecure(), + ) + if err != nil { + t.Fatalf("Fail to dial: %v", err) + } + defer conn.Close() + + pc := ping.NewPingServiceClient(conn) + t.Log("Testing streaming Ping") + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + stream, err := pc.PingStream(ctx) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + count := 3 + for i := 0; i < count; i++ { + t.Logf("Sending stream %d of %d", i+1, count) + + want := "This is a short message!" + + err = stream.Send(&ping.Request{Msg: want}) + if err != nil { + t.Fatalf("Error sending request: %v", err) + } + + resp, err := stream.Recv() + if err != nil { + t.Fatalf("Error receiving response: %v", err) + } + + got := resp.Msg + + if want != got { + t.Errorf("Stream %d: response = %q, want = %q", i, got, want) + } + } + + stream.CloseSend() + + _, err = stream.Recv() + if err != io.EOF { + t.Errorf("Expected EOF, got %v", err) + } +} + +func waitForScaleToZero(t *testing.T, names test.ResourceNames, clients *test.Clients) { + t.Helper() + deploymentName := names.Revision + "-deployment" + t.Logf("Waiting for %q to scale to zero", deploymentName) + err := pkgTest.WaitForDeploymentState( + clients.KubeClient, + deploymentName, + func(d *v1beta1.Deployment) (bool, error) { + t.Logf("Deployment %q has %d replicas", deploymentName, d.Status.ReadyReplicas) + return d.Status.ReadyReplicas == 0, nil + }, + "DeploymentIsScaledDown", + test.ServingNamespace, + 3*time.Minute, + ) + if err != nil { + t.Fatalf("Could not scale to zero: %v", err) + } +} + +func testGRPC(t *testing.T, f grpcTest) { + t.Helper() t.Parallel() // Setup clients := Setup(t) @@ -43,9 +148,10 @@ func TestGRPC(t *testing.T) { t.Log("Creating route and configuration for grpc-ping") options := &test.Options{ - ContainerPorts: []corev1.ContainerPort{ - {Name: "h2c", ContainerPort: 8080}, - }, + ContainerPorts: []corev1.ContainerPort{{ + Name: "h2c", + ContainerPort: 8080, + }}, } names, err := CreateRouteAndConfig(t, clients, "grpc-ping", options) if err != nil { @@ -72,7 +178,6 @@ func TestGRPC(t *testing.T) { t.Fatalf("Configuration %s was not updated with the new revision: %v", names.Config, err) } names.Revision = config.Status.LatestCreatedRevisionName - deploymentName := names.Revision + "-deployment" _, err = pkgTest.WaitForEndpointState( clients.KubeClient, @@ -93,111 +198,27 @@ func TestGRPC(t *testing.T) { } } - t.Logf("Connecting to grpc-ping using host %q and authority %q", *host, domain) - - waitForScaleToZero := func() { - t.Logf("Waiting for scale to zero") - err = pkgTest.WaitForDeploymentState( - clients.KubeClient, - deploymentName, - func(d *v1beta1.Deployment) (bool, error) { - t.Logf("Deployment %q has %d replicas", deploymentName, d.Status.ReadyReplicas) - return d.Status.ReadyReplicas == 0, nil - }, - "DeploymentIsScaledDown", - test.ServingNamespace, - 3*time.Minute, - ) - if err != nil { - t.Fatalf("Could not scale to zero: %v", err) - } - } - - payload := func(size int) string { - b := make([]byte, size) - _, err = rand.Read(b) - if err != nil { - t.Fatalf("Error generating payload: %v", err) - } - return string(b) - } - - conn, err := grpc.Dial( - *host+":80", - grpc.WithAuthority(domain), - grpc.WithInsecure(), - ) - if err != nil { - t.Fatalf("fail to dial: %v", err) - } - defer conn.Close() - - pc := ping.NewPingServiceClient(conn) - - unaryTest := func(t *testing.T) { - t.Log("Testing unary Ping") - - want := &ping.Request{Msg: "Hello!"} - - got, err := pc.Ping(context.TODO(), want) - if err != nil { - t.Fatalf("Couldn't send request: %v", err) - } - - if got.Msg != want.Msg { - t.Errorf("Unexpected response. Want %q, got %q", want.Msg, got.Msg) - } - } - - streamTest := func(t *testing.T) { - t.Log("Testing streaming Ping") - - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - - stream, err := pc.PingStream(ctx) - if err != nil { - t.Fatalf("Error creating stream: %v", err) - } - - count := 3 - for i := 0; i < count; i++ { - t.Logf("Sending stream %d of %d", i+1, count) - - want := payload(10) - - err = stream.Send(&ping.Request{Msg: want}) - if err != nil { - t.Fatalf("Error sending request: %v", err) - } - - resp, err := stream.Recv() - if err != nil { - t.Fatalf("Error receiving response: %v", err) - } - - got := resp.Msg - - if want != got { - t.Errorf("Unexpected response. Want %q, got %q", want, got) - } - } - - stream.CloseSend() + f(t, names, clients, *host, domain) +} - _, err = stream.Recv() - if err != io.EOF { - t.Errorf("Expected EOF, got %v", err) - } - } +func TestGRPCUnaryPing(t *testing.T) { + testGRPC(t, unaryTest) +} - t.Run("unary ping", unaryTest) - t.Run("streaming ping", streamTest) +func TestGRPCStreamingPing(t *testing.T) { + testGRPC(t, streamTest) +} - waitForScaleToZero() - t.Run("unary ping after scale-to-zero", unaryTest) +func TestGRPCUnaryPingFromZero(t *testing.T) { + testGRPC(t, func(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) { + waitForScaleToZero(t, names, clients) + unaryTest(t, names, clients, host, domain) + }) +} - // TODO(#3239): Fix gRPC streaming after cold start - // waitForScaleToZero() - // t.Run("streaming ping after scale-to-zero", streamTest) +func TestGRPCStreamingPingFromZero(t *testing.T) { + testGRPC(t, func(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) { + waitForScaleToZero(t, names, clients) + streamTest(t, names, clients, host, domain) + }) }