Skip to content

Commit

Permalink
[WIP] Add a simple network prober to the activator.
Browse files Browse the repository at this point in the history
When the flag `-enable-network-probing` is passed the activator will replace its retring transport logic with a simple network probe based on #3256 with a similar number of retries to what the retrying transport was previously configured to use.  Enabling this allows the GRPC test with streaming and cold-start fairly reliably on my cluster (and also with the GRPC ping sample in knative/docs, with my fixes).

This change also refactors the GRPC test into 4 tests, for each of the logical things tested, which will hopefully reduce the amount of time this adds to e2e dramatically when we switch to use `t.Parallel()` since it will parallelize the two times this waits for a scale-to-zero.

Still TODO:
 - Unit testing for the get probes
 - Disable by default, and `t.Skip()` the streaming GRPC test

These will be `Fixes:` when this is enabled by default.
Related: #3239
Related: #2856
  • Loading branch information
mattmoor committed Feb 17, 2019
1 parent c62cb22 commit 334506c
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 112 deletions.
21 changes: 16 additions & 5 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")

enableNetworkProbe = flag.Bool("enable-network-probing", false, "Whether to replace request retries with network level probing.")

logger *zap.SugaredLogger

statSink *websocket.ManagedConnection
Expand Down Expand Up @@ -233,6 +235,14 @@ func main() {
Steps: maxRetries,
}
rt := activatorutil.NewRetryRoundTripper(activatorutil.AutoTransport, logger, backoffSettings, shouldRetry)
getProbeCount := 0
// When network probing is enabled remove the retrying transport
// and pass in the retry count for our network probes instead.
if *enableNetworkProbe {
logger.Info("Enabling network probing for activation.")
getProbeCount = maxRetries
rt = activatorutil.AutoTransport
}

// Open a websocket connection to the autoscaler
autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s:%s", "autoscaler", system.Namespace(), utils.GetClusterDomainName(), "8080")
Expand All @@ -252,11 +262,12 @@ func main() {
&activatorhandler.EnforceMaxContentLengthHandler{
MaxContentLengthBytes: maxUploadBytes,
NextHandler: &activatorhandler.ActivationHandler{
Activator: a,
Transport: rt,
Logger: logger,
Reporter: reporter,
Throttler: throttler,
Activator: a,
Transport: rt,
Logger: logger,
Reporter: reporter,
Throttler: throttler,
GetProbeCount: getProbeCount,
},
},
),
Expand Down
1 change: 1 addition & 0 deletions config/activator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ spec:
# and seeing k8s logs in addition to ours is not useful.
- "-logtostderr=false"
- "-stderrthreshold=FATAL"
- "-enable-network-probing"
resources:
# Request 2x what we saw running e2e
requests:
Expand Down
50 changes: 49 additions & 1 deletion pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import (
"github.com/knative/serving/pkg/activator"
"github.com/knative/serving/pkg/activator/util"
pkghttp "github.com/knative/serving/pkg/http"
"github.com/knative/serving/pkg/network"
"go.uber.org/zap"
)

const (
getProbeInterval = 1 * time.Second
)

// ActivationHandler will wait for an active endpoint for a revision
// to be available before proxing the request
type ActivationHandler struct {
Expand All @@ -38,6 +43,12 @@ type ActivationHandler struct {
Transport http.RoundTripper
Reporter activator.StatsReporter
Throttler *activator.Throttler

// GetProbeCount is the number is the number of attempts we should
// make to network probe the queue-proxy after the revision becomes
// ready before forwarding the payload. If zero, a network probe
// is not required.
GetProbeCount int
}

func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -61,7 +72,44 @@ func (a *ActivationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

err := a.Throttler.Try(revID, func() {
attempts, httpStatus := a.proxyRequest(w, r, target)
var attempts int
var httpStatus int
// If a GET probe interval has been configured, then probe
// the queue-proxy with our network probe header until it
// returns a 200 status code.
success := (a.GetProbeCount == 0)
for attempts = 0; attempts < a.GetProbeCount; attempts++ {
probeReq := &http.Request{
Method: http.MethodGet,
URL: target,
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
Host: r.Host,
Header: map[string][]string{
network.ProbeHeaderName: []string{"true"},
},
}

probeResp, err := a.Transport.RoundTrip(probeReq)
if err != nil {
a.Logger.Errorf("Pod probe failed with: %v", err)
time.Sleep(getProbeInterval)
continue
}
if probeResp.StatusCode != http.StatusOK {
httpStatus = probeResp.StatusCode
a.Logger.Errorf("Pod probe sent status: %d", probeResp.StatusCode)
time.Sleep(getProbeInterval)
continue
}
success = true
break
}
if success {
// Once we see a successful probe, send traffic.
attempts, httpStatus = a.proxyRequest(w, r, target)
}

// Report the metrics
duration := time.Since(start)
Expand Down
229 changes: 123 additions & 106 deletions test/e2e/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package e2e

import (
"context"
"crypto/rand"
"io"
"net/http"
"testing"
Expand All @@ -37,7 +36,109 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGRPC(t *testing.T) {
type grpcTest func(*testing.T, test.ResourceNames, *test.Clients, string, string)

func unaryTest(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) {
t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain)
conn, err := grpc.Dial(
host+":80",
grpc.WithAuthority(domain),
grpc.WithInsecure(),
)
if err != nil {
t.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

pc := ping.NewPingServiceClient(conn)
t.Log("Testing unary Ping")

want := &ping.Request{Msg: "Hello!"}

got, err := pc.Ping(context.TODO(), want)
if err != nil {
t.Fatalf("Couldn't send request: %v", err)
}

if got.Msg != want.Msg {
t.Errorf("Unexpected response. Want %q, got %q", want.Msg, got.Msg)
}
}

func streamTest(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) {
t.Logf("Connecting to grpc-ping using host %q and authority %q", host, domain)
conn, err := grpc.Dial(
host+":80",
grpc.WithAuthority(domain),
grpc.WithInsecure(),
)
if err != nil {
t.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

pc := ping.NewPingServiceClient(conn)
t.Log("Testing streaming Ping")

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

stream, err := pc.PingStream(ctx)
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}

count := 3
for i := 0; i < count; i++ {
t.Logf("Sending stream %d of %d", i+1, count)

want := "This is a short message!"

err = stream.Send(&ping.Request{Msg: want})
if err != nil {
t.Fatalf("Error sending request: %v", err)
}

resp, err := stream.Recv()
if err != nil {
t.Fatalf("Error receiving response: %v", err)
}

got := resp.Msg

if want != got {
t.Errorf("Unexpected response. Want %q, got %q", want, got)
}
}

stream.CloseSend()

_, err = stream.Recv()
if err != io.EOF {
t.Errorf("Expected EOF, got %v", err)
}
}

func waitForScaleToZero(t *testing.T, names test.ResourceNames, clients *test.Clients) {
deploymentName := names.Revision + "-deployment"
t.Logf("Waiting for %q to scale to zero", deploymentName)
err := pkgTest.WaitForDeploymentState(
clients.KubeClient,
deploymentName,
func(d *v1beta1.Deployment) (bool, error) {
t.Logf("Deployment %q has %d replicas", deploymentName, d.Status.ReadyReplicas)
return d.Status.ReadyReplicas == 0, nil
},
"DeploymentIsScaledDown",
test.ServingNamespace,
3*time.Minute,
)
if err != nil {
t.Fatalf("Could not scale to zero: %v", err)
}
}

func testGRPC(t *testing.T, f grpcTest) {
// Setup
clients := Setup(t)
logger := logging.GetContextLogger(t.Name())
Expand All @@ -49,6 +150,7 @@ func TestGRPC(t *testing.T) {
{Name: "h2c", ContainerPort: 8080},
},
}

names, err := CreateRouteAndConfig(clients, logger, "grpc-ping", options)
if err != nil {
t.Fatalf("Failed to create Route and Configuration: %v", err)
Expand All @@ -74,7 +176,6 @@ func TestGRPC(t *testing.T) {
t.Fatalf("Configuration %s was not updated with the new revision: %v", names.Config, err)
}
names.Revision = config.Status.LatestCreatedRevisionName
deploymentName := names.Revision + "-deployment"

_, err = pkgTest.WaitForEndpointState(
clients.KubeClient,
Expand All @@ -95,111 +196,27 @@ func TestGRPC(t *testing.T) {
}
}

logger.Infof("Connecting to grpc-ping using host %q and authority %q", *host, domain)

waitForScaleToZero := func() {
logger.Infof("Waiting for scale to zero")
err = pkgTest.WaitForDeploymentState(
clients.KubeClient,
deploymentName,
func(d *v1beta1.Deployment) (bool, error) {
logger.Infof("Deployment %q has %d replicas", deploymentName, d.Status.ReadyReplicas)
return d.Status.ReadyReplicas == 0, nil
},
"DeploymentIsScaledDown",
test.ServingNamespace,
3*time.Minute,
)
if err != nil {
t.Fatalf("Could not scale to zero: %v", err)
}
}

payload := func(size int) string {
b := make([]byte, size)
_, err = rand.Read(b)
if err != nil {
t.Fatalf("Error generating payload: %v", err)
}
return string(b)
}

conn, err := grpc.Dial(
*host+":80",
grpc.WithAuthority(domain),
grpc.WithInsecure(),
)
if err != nil {
t.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

pc := ping.NewPingServiceClient(conn)

unaryTest := func(t *testing.T) {
logger.Info("Testing unary Ping")

want := &ping.Request{Msg: "Hello!"}

got, err := pc.Ping(context.TODO(), want)
if err != nil {
t.Fatalf("Couldn't send request: %v", err)
}

if got.Msg != want.Msg {
t.Errorf("Unexpected response. Want %q, got %q", want.Msg, got.Msg)
}
}

streamTest := func(t *testing.T) {
logger.Info("Testing streaming Ping")

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

stream, err := pc.PingStream(ctx)
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}

count := 3
for i := 0; i < count; i++ {
logger.Infof("Sending stream %d of %d", i+1, count)

want := payload(10)

err = stream.Send(&ping.Request{Msg: want})
if err != nil {
t.Fatalf("Error sending request: %v", err)
}

resp, err := stream.Recv()
if err != nil {
t.Fatalf("Error receiving response: %v", err)
}

got := resp.Msg

if want != got {
t.Errorf("Unexpected response. Want %q, got %q", want, got)
}
}

stream.CloseSend()
f(t, names, clients, *host, domain)
}

_, err = stream.Recv()
if err != io.EOF {
t.Errorf("Expected EOF, got %v", err)
}
}
func TestGRPCUnaryPing(t *testing.T) {
testGRPC(t, unaryTest)
}

t.Run("unary ping", unaryTest)
t.Run("streaming ping", streamTest)
func TestGRPCStreamingPing(t *testing.T) {
testGRPC(t, streamTest)
}

waitForScaleToZero()
t.Run("unary ping after scale-to-zero", unaryTest)
func TestGRPCUnaryPingFromZero(t *testing.T) {
testGRPC(t, func(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) {
waitForScaleToZero(t, names, clients)
unaryTest(t, names, clients, host, domain)
})
}

// TODO(#3239): Fix gRPC streaming after cold start
// waitForScaleToZero()
// t.Run("streaming ping after scale-to-zero", streamTest)
func TestGRPCStreamingPingFromZero(t *testing.T) {
testGRPC(t, func(t *testing.T, names test.ResourceNames, clients *test.Clients, host, domain string) {
waitForScaleToZero(t, names, clients)
streamTest(t, names, clients, host, domain)
})
}

0 comments on commit 334506c

Please sign in to comment.