Skip to content

Commit

Permalink
[WIP] Add a simple network prober to the activator.
Browse files Browse the repository at this point in the history
When the flag `-enable-network-probing` is passed the activator will replace its retring transport logic with a simple network probe based on #3256 with a similar number of retries to what the retrying transport was previously configured to use.  Enabling this allows the GRPC test with streaming and cold-start fairly reliably on my cluster (and also with the GRPC ping sample in knative/docs, with my fixes).

This change also refactors the GRPC test into 4 tests, for each of the logical things tested, which will hopefully reduce the amount of time this adds to e2e dramatically when we switch to use `t.Parallel()` since it will parallelize the two times this waits for a scale-to-zero.

Still TODO:
 - Disable by default, and `t.Skip()` the streaming GRPC test

These will be `Fixes:` when this is enabled by default.
Related: #3239
Related: #2856
  • Loading branch information
mattmoor committed Feb 19, 2019
1 parent 3bf44aa commit d071d17
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 123 deletions.
21 changes: 16 additions & 5 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")

enableNetworkProbe = flag.Bool("enable-network-probe", false, "Whether to replace request retries with network level probing.")

logger *zap.SugaredLogger

statSink *websocket.ManagedConnection
Expand Down Expand Up @@ -233,6 +235,14 @@ func main() {
Steps: maxRetries,
}
rt := activatorutil.NewRetryRoundTripper(activatorutil.AutoTransport, logger, backoffSettings, shouldRetry)
getProbeCount := 0
// When network probing is enabled remove the retrying transport
// and pass in the retry count for our network probes instead.
if *enableNetworkProbe {
logger.Info("Enabling network probing for activation.")
getProbeCount = maxRetries
rt = activatorutil.AutoTransport
}

// Open a websocket connection to the autoscaler
autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s:%s", "autoscaler", system.Namespace(), utils.GetClusterDomainName(), "8080")
Expand All @@ -252,11 +262,12 @@ func main() {
&activatorhandler.EnforceMaxContentLengthHandler{
MaxContentLengthBytes: maxUploadBytes,
NextHandler: &activatorhandler.ActivationHandler{
Activator: a,
Transport: rt,
Logger: logger,
Reporter: reporter,
Throttler: throttler,
Activator: a,
Transport: rt,
Logger: logger,
Reporter: reporter,
Throttler: throttler,
GetProbeCount: getProbeCount,
},
},
),
Expand Down
1 change: 1 addition & 0 deletions config/activator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ spec:
# and seeing k8s logs in addition to ours is not useful.
- "-logtostderr=false"
- "-stderrthreshold=FATAL"
- "-enable-network-probe"
resources:
# Request 2x what we saw running e2e
requests:
Expand Down
51 changes: 50 additions & 1 deletion pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import (
"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/activator/util"
pkghttp "github.com/knative/serving/pkg/http"
"github.com/knative/serving/pkg/network"
"go.uber.org/zap"
)

const (
getProbeInterval = 1 * time.Second
)

// ActivationHandler will wait for an active endpoint for a revision
// to be available before proxing the request
type ActivationHandler struct {
Expand All @@ -38,6 +43,12 @@ type ActivationHandler struct {
Transport http.RoundTripper
Reporter activator.StatsReporter
Throttler *activator.Throttler

// GetProbeCount is the number is the number of attempts we should
// make to network probe the queue-proxy after the revision becomes
// ready before forwarding the payload. If zero, a network probe
// is not required.
GetProbeCount int
}

func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -61,7 +72,45 @@ func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

err := a.Throttler.Try(revID, func() {
attempts, httpStatus := a.proxyRequest(w, r, target)
var attempts int
var httpStatus int
// If a GET probe interval has been configured, then probe
// the queue-proxy with our network probe header until it
// returns a 200 status code.
success := (a.GetProbeCount == 0)
probeReq := &http.Request{
Method: http.MethodGet,
URL: target,
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
Host: r.Host,
Header: map[string][]string{
http.CanonicalHeaderKey(network.ProbeHeaderName): []string{"true"},
},
}
for attempts = 0; attempts < a.GetProbeCount && !success; attempts++ {
probeResp, err := a.Transport.RoundTrip(probeReq)
if err != nil {
a.Logger.Errorw("Pod probe failed", zap.Error(err))
time.Sleep(getProbeInterval)
continue
}
if probeResp.StatusCode != http.StatusOK {
httpStatus = probeResp.StatusCode
a.Logger.Errorf("Pod probe sent status: %d", probeResp.StatusCode)
time.Sleep(getProbeInterval)
continue
}
success = true
}
if success {
// Once we see a successful probe, send traffic.
attempts, httpStatus = a.proxyRequest(w, r, target)
} else {
httpStatus = http.StatusInternalServerError
w.WriteHeader(httpStatus)
}

// Report the metrics
duration := time.Since(start)
Expand Down
86 changes: 78 additions & 8 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/activator/util"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
"github.com/knative/serving/pkg/network"
"github.com/knative/serving/pkg/queue"
)

Expand Down Expand Up @@ -95,7 +96,10 @@ func TestActivationHandler(t *testing.T) {
wantBody string
wantCode int
wantErr error
probeErr error
probeCode int
attempts string
gpc int
endpointsGetter func(activator.RevisionID) (int32, error)
reporterCalls []reporterCall
}{{
Expand Down Expand Up @@ -124,6 +128,7 @@ func TestActivationHandler(t *testing.T) {
Config: "config-real-name",
StatusCode: http.StatusOK,
}},
gpc: 1,
}, {
label: "active endpoint with missing count header",
namespace: testNamespace,
Expand Down Expand Up @@ -158,6 +163,56 @@ func TestActivationHandler(t *testing.T) {
wantErr: nil,
endpointsGetter: goodEndpointsGetter,
reporterCalls: nil,
}, {
label: "active endpoint (probe failure)",
namespace: testNamespace,
name: testRevName,
probeErr: errors.New("probe error"),
wantCode: http.StatusInternalServerError,
endpointsGetter: goodEndpointsGetter,
gpc: 1,
reporterCalls: []reporterCall{{
Op: "ReportRequestCount",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
Attempts: 1,
Value: 1,
}, {
Op: "ReportResponseTime",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
}},
}, {
label: "active endpoint (probe 500)",
namespace: testNamespace,
name: testRevName,
probeCode: http.StatusInternalServerError,
wantCode: http.StatusInternalServerError,
endpointsGetter: goodEndpointsGetter,
gpc: 1,
reporterCalls: []reporterCall{{
Op: "ReportRequestCount",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
Attempts: 1,
Value: 1,
}, {
Op: "ReportResponseTime",
Namespace: testNamespace,
Revision: testRevName,
Service: "service-real-name",
Config: "config-real-name",
StatusCode: http.StatusInternalServerError,
}},
}, {
label: "request error",
namespace: testNamespace,
Expand Down Expand Up @@ -219,12 +274,20 @@ func TestActivationHandler(t *testing.T) {
attempts: "hi there",
endpointsGetter: brokenEndpointGetter,
reporterCalls: nil,
},
}
}}

for _, e := range examples {
t.Run(e.label, func(t *testing.T) {
rt := util.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
if r.Header.Get(network.ProbeHeaderName) != "" {
if e.probeErr != nil {
return nil, e.probeErr
}
fake := httptest.NewRecorder()
fake.WriteHeader(e.probeCode)
fake.WriteString("queue")
return fake.Result(), nil
}
if e.wantErr != nil {
return nil, e.wantErr
}
Expand All @@ -242,20 +305,27 @@ func TestActivationHandler(t *testing.T) {

reporter := &fakeReporter{}
params := queue.BreakerParams{QueueDepth: 1000, MaxConcurrency: 1000, InitialCapacity: 0}
throttlerParams := activator.ThrottlerParams{BreakerParams: params, Logger: TestLogger(t), GetRevision: stubRevisionGetter, GetEndpoints: e.endpointsGetter}
throttlerParams := activator.ThrottlerParams{
BreakerParams: params,
Logger: TestLogger(t),
GetRevision: stubRevisionGetter,
GetEndpoints: e.endpointsGetter,
}
handler := ActivationHandler{
Activator: act,
Transport: rt,
Logger: TestLogger(t),
Reporter: reporter,
Throttler: activator.NewThrottler(throttlerParams),
Activator: act,
Transport: rt,
Logger: TestLogger(t),
Reporter: reporter,
Throttler: activator.NewThrottler(throttlerParams),
GetProbeCount: e.gpc,
}

resp := httptest.NewRecorder()

req := httptest.NewRequest("POST", "http://example.com", nil)
req.Header.Set(activator.RevisionHeaderNamespace, e.namespace)
req.Header.Set(activator.RevisionHeaderName, e.name)

handler.ServeHTTP(resp, req)

if resp.Code != e.wantCode {
Expand Down
Loading

0 comments on commit d071d17

Please sign in to comment.