From c62cb22e3e3962366f3e171b90dfbded380dace5 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Sun, 17 Feb 2019 13:06:45 -0800 Subject: [PATCH] Add Knative network probe endpoints (#3256) This adds a header `k-network-probe`, to which the Knative networking elements respond without forwarding the requests. They also identify themselves in their response, so that we know what component is handling the probe. This is related to: #2856, #2849, #3239 --- cmd/activator/main.go | 26 +++--- cmd/queue/main.go | 55 ++++++++---- .../handler/filtering_handler_test.go | 25 +++--- pkg/activator/handler/probe_handler.go | 36 ++++++++ pkg/activator/handler/probe_handler_test.go | 90 +++++++++++++++++++ pkg/network/network.go | 6 ++ 6 files changed, 193 insertions(+), 45 deletions(-) create mode 100644 pkg/activator/handler/probe_handler.go create mode 100644 pkg/activator/handler/probe_handler_test.go diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 9b7e420f1990..dcdf93e1fc44 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -246,19 +246,21 @@ func main() { cr := activatorhandler.NewConcurrencyReporter(podName, reqChan, time.NewTicker(time.Second).C, statChan) go cr.Run(stopCh) - ah := &activatorhandler.FilteringHandler{ - NextHandler: activatorhandler.NewRequestEventHandler(reqChan, - &activatorhandler.EnforceMaxContentLengthHandler{ - MaxContentLengthBytes: maxUploadBytes, - NextHandler: &activatorhandler.ActivationHandler{ - Activator: a, - Transport: rt, - Logger: logger, - Reporter: reporter, - Throttler: throttler, + ah := &activatorhandler.ProbeHandler{ + NextHandler: &activatorhandler.FilteringHandler{ + NextHandler: activatorhandler.NewRequestEventHandler(reqChan, + &activatorhandler.EnforceMaxContentLengthHandler{ + MaxContentLengthBytes: maxUploadBytes, + NextHandler: &activatorhandler.ActivationHandler{ + Activator: a, + Transport: rt, + Logger: logger, + Reporter: reporter, + Throttler: throttler, + }, }, - }, - ), + ), + }, } // Watch the logging config map and dynamically update logging levels. diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 68e10f202001..ea747db2fba0 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -38,6 +38,7 @@ import ( "github.com/knative/serving/pkg/autoscaler" "github.com/knative/serving/pkg/http/h2c" "github.com/knative/serving/pkg/logging" + "github.com/knative/serving/pkg/network" "github.com/knative/serving/pkg/queue" "github.com/knative/serving/pkg/queue/health" "github.com/knative/serving/pkg/utils" @@ -158,16 +159,47 @@ func proxyForRequest(req *http.Request) *httputil.ReverseProxy { return httpProxy } -func isProbe(r *http.Request) bool { +func isKnativeProbe(r *http.Request) bool { + return r.Header.Get(network.ProbeHeaderName) != "" +} + +func isKubeletProbe(r *http.Request) bool { // Since K8s 1.8, prober requests have // User-Agent = "kube-probe/{major-version}.{minor-version}". return strings.HasPrefix(r.Header.Get("User-Agent"), "kube-probe/") } +func probeUserContainer() bool { + var err error + wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) { + logger.Debug("TCP probing the user-container.") + err = health.TCPProbe(userTargetAddress, 100*time.Millisecond) + return err == nil, nil + }) + + if err == nil { + logger.Info("User-container successfully probed.") + } else { + logger.Errorw("User-container could not be probed successfully.", zap.Error(err)) + } + + return err == nil +} + func handler(w http.ResponseWriter, r *http.Request) { proxy := proxyForRequest(r) - if isProbe(r) { + switch { + case isKnativeProbe(r): + if probeUserContainer() { + // Respond with the name of the component handling the request. + w.Write([]byte("queue")) + } else { + http.Error(w, "container not ready", http.StatusServiceUnavailable) + } + return + + case isKubeletProbe(r): // Do not count health checks for concurrency metrics proxy.ServeHTTP(w, r) return @@ -178,6 +210,7 @@ func handler(w http.ResponseWriter, r *http.Request) { defer func() { reqChan <- queue.ReqEvent{Time: time.Now(), EventType: queue.ReqOut} }() + // Enforce queuing and concurrency limits if breaker != nil { ok := breaker.Maybe(func() { @@ -195,23 +228,7 @@ func handler(w http.ResponseWriter, r *http.Request) { // Sets up /health and /quitquitquit endpoints. func createAdminHandlers() *http.ServeMux { mux := http.NewServeMux() - mux.HandleFunc(queue.RequestQueueHealthPath, healthState.HealthHandler(func() bool { - var err error - wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) { - logger.Debug("TCP probing the user-container.") - err = health.TCPProbe(userTargetAddress, 100*time.Millisecond) - return err == nil, nil - }) - - if err == nil { - logger.Info("User-container successfully probed.") - } else { - logger.Errorw("User-container could not be probed successfully.", zap.Error(err)) - } - - return err == nil - })) - + mux.HandleFunc(queue.RequestQueueHealthPath, healthState.HealthHandler(probeUserContainer)) mux.HandleFunc(queue.RequestQueueQuitPath, healthState.QuitHandler(func() { time.Sleep(quitSleepDuration) diff --git a/pkg/activator/handler/filtering_handler_test.go b/pkg/activator/handler/filtering_handler_test.go index c1e7a495ee7b..3be3a5e94e22 100644 --- a/pkg/activator/handler/filtering_handler_test.go +++ b/pkg/activator/handler/filtering_handler_test.go @@ -31,20 +31,17 @@ func TestFilteringHandler(t *testing.T) { headers: http.Header{}, passed: true, expectedStatus: http.StatusOK, - }, - { - label: "filter a request containing retry header", - headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: "4"}), - passed: false, - expectedStatus: http.StatusServiceUnavailable, - }, - { - label: "forward a request containing empty retry header", - headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: ""}), - passed: true, - expectedStatus: http.StatusOK, - }, - } + }, { + label: "filter a request containing retry header", + headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: "4"}), + passed: false, + expectedStatus: http.StatusServiceUnavailable, + }, { + label: "forward a request containing empty retry header", + headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: ""}), + passed: true, + expectedStatus: http.StatusOK, + }} for _, e := range examples { t.Run(e.label, func(t *testing.T) { diff --git a/pkg/activator/handler/probe_handler.go b/pkg/activator/handler/probe_handler.go new file mode 100644 index 000000000000..397fdd80dcd9 --- /dev/null +++ b/pkg/activator/handler/probe_handler.go @@ -0,0 +1,36 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handler + +import ( + "net/http" + + "github.com/knative/serving/pkg/network" +) + +// ProbeHandler handles responding to Knative internal network probes. +type ProbeHandler struct { + NextHandler http.Handler +} + +func (h *ProbeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // If this header is set the request was sent by a Knative component + // probing the network, respond with a 200 and our component name. + if r.Header.Get(network.ProbeHeaderName) != "" { + w.Write([]byte("activator")) + return + } + + h.NextHandler.ServeHTTP(w, r) +} diff --git a/pkg/activator/handler/probe_handler_test.go b/pkg/activator/handler/probe_handler_test.go new file mode 100644 index 000000000000..f0fb142cc456 --- /dev/null +++ b/pkg/activator/handler/probe_handler_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package handler + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/knative/serving/pkg/network" +) + +func TestProbeHandler(t *testing.T) { + examples := []struct { + label string + headers http.Header + passed bool + expectedStatus int + method string + }{{ + label: "forward a normal POST request", + headers: http.Header{}, + passed: true, + expectedStatus: http.StatusOK, + method: http.MethodPost, + }, { + label: "filter a POST request containing probe header", + headers: mapToHeader(map[string]string{network.ProbeHeaderName: "not-empty"}), + passed: false, + expectedStatus: http.StatusOK, + method: http.MethodPost, + }, { + label: "forward a normal GET request", + headers: http.Header{}, + passed: true, + expectedStatus: http.StatusOK, + method: http.MethodGet, + }, { + label: "filter a GET request containing probe header", + headers: mapToHeader(map[string]string{network.ProbeHeaderName: "not-empty"}), + passed: false, + expectedStatus: http.StatusOK, + method: http.MethodGet, + }, { + label: "forward a request containing empty retry header", + headers: mapToHeader(map[string]string{network.ProbeHeaderName: ""}), + passed: true, + expectedStatus: http.StatusOK, + method: http.MethodPost, + }} + + for _, e := range examples { + t.Run(e.label, func(t *testing.T) { + wasPassed := false + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wasPassed = true + w.WriteHeader(http.StatusOK) + }) + handler := ProbeHandler{NextHandler: baseHandler} + + resp := httptest.NewRecorder() + req := httptest.NewRequest(e.method, "http://example.com", nil) + req.Header = e.headers + + handler.ServeHTTP(resp, req) + + if wasPassed != e.passed { + if !e.passed { + t.Error("Request got passed to the next handler unexpectedly") + } else { + t.Error("Request was not passed to the next handler as expected") + } + } + + if resp.Code != e.expectedStatus { + t.Errorf("Unexpected response status. Want %d, got %d", e.expectedStatus, resp.Code) + } + }) + } +} diff --git a/pkg/network/network.go b/pkg/network/network.go index 06cbb4def722..360421381468 100644 --- a/pkg/network/network.go +++ b/pkg/network/network.go @@ -24,6 +24,12 @@ import ( ) const ( + // ProbeHeaderName is the name of a header that can be added to + // requests to probe the knative networking layer. Requests + // with this header will not be passed to the user container or + // included in request metrics. + ProbeHeaderName = "k-network-probe" + // ConfigName is the name of the configmap containing all // customizations for networking features. ConfigName = "config-network"