Skip to content

Commit

Permalink
Add Knative network probe endpoints (#3256)
Browse files Browse the repository at this point in the history
This adds a header `k-network-probe`, to which the Knative networking elements
respond without forwarding the requests.  They also identify themselves in their
response, so that we know what component is handling the probe.

This is related to: #2856, #2849, #3239
  • Loading branch information
mattmoor authored and knative-prow-robot committed Feb 17, 2019
1 parent 99eb090 commit c62cb22
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 45 deletions.
26 changes: 14 additions & 12 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,21 @@ func main() {
cr := activatorhandler.NewConcurrencyReporter(podName, reqChan, time.NewTicker(time.Second).C, statChan)
go cr.Run(stopCh)

ah := &activatorhandler.FilteringHandler{
NextHandler: activatorhandler.NewRequestEventHandler(reqChan,
&activatorhandler.EnforceMaxContentLengthHandler{
MaxContentLengthBytes: maxUploadBytes,
NextHandler: &activatorhandler.ActivationHandler{
Activator: a,
Transport: rt,
Logger: logger,
Reporter: reporter,
Throttler: throttler,
ah := &activatorhandler.ProbeHandler{
NextHandler: &activatorhandler.FilteringHandler{
NextHandler: activatorhandler.NewRequestEventHandler(reqChan,
&activatorhandler.EnforceMaxContentLengthHandler{
MaxContentLengthBytes: maxUploadBytes,
NextHandler: &activatorhandler.ActivationHandler{
Activator: a,
Transport: rt,
Logger: logger,
Reporter: reporter,
Throttler: throttler,
},
},
},
),
),
},
}

// Watch the logging config map and dynamically update logging levels.
Expand Down
55 changes: 36 additions & 19 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/knative/serving/pkg/autoscaler"
"github.com/knative/serving/pkg/http/h2c"
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/network"
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/pkg/queue/health"
"github.com/knative/serving/pkg/utils"
Expand Down Expand Up @@ -158,16 +159,47 @@ func proxyForRequest(req *http.Request) *httputil.ReverseProxy {
return httpProxy
}

func isProbe(r *http.Request) bool {
func isKnativeProbe(r *http.Request) bool {
return r.Header.Get(network.ProbeHeaderName) != ""
}

func isKubeletProbe(r *http.Request) bool {
// Since K8s 1.8, prober requests have
// User-Agent = "kube-probe/{major-version}.{minor-version}".
return strings.HasPrefix(r.Header.Get("User-Agent"), "kube-probe/")
}

func probeUserContainer() bool {
var err error
wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) {
logger.Debug("TCP probing the user-container.")
err = health.TCPProbe(userTargetAddress, 100*time.Millisecond)
return err == nil, nil
})

if err == nil {
logger.Info("User-container successfully probed.")
} else {
logger.Errorw("User-container could not be probed successfully.", zap.Error(err))
}

return err == nil
}

func handler(w http.ResponseWriter, r *http.Request) {
proxy := proxyForRequest(r)

if isProbe(r) {
switch {
case isKnativeProbe(r):
if probeUserContainer() {
// Respond with the name of the component handling the request.
w.Write([]byte("queue"))
} else {
http.Error(w, "container not ready", http.StatusServiceUnavailable)
}
return

case isKubeletProbe(r):
// Do not count health checks for concurrency metrics
proxy.ServeHTTP(w, r)
return
Expand All @@ -178,6 +210,7 @@ func handler(w http.ResponseWriter, r *http.Request) {
defer func() {
reqChan <- queue.ReqEvent{Time: time.Now(), EventType: queue.ReqOut}
}()

// Enforce queuing and concurrency limits
if breaker != nil {
ok := breaker.Maybe(func() {
Expand All @@ -195,23 +228,7 @@ func handler(w http.ResponseWriter, r *http.Request) {
// Sets up /health and /quitquitquit endpoints.
func createAdminHandlers() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc(queue.RequestQueueHealthPath, healthState.HealthHandler(func() bool {
var err error
wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) {
logger.Debug("TCP probing the user-container.")
err = health.TCPProbe(userTargetAddress, 100*time.Millisecond)
return err == nil, nil
})

if err == nil {
logger.Info("User-container successfully probed.")
} else {
logger.Errorw("User-container could not be probed successfully.", zap.Error(err))
}

return err == nil
}))

mux.HandleFunc(queue.RequestQueueHealthPath, healthState.HealthHandler(probeUserContainer))
mux.HandleFunc(queue.RequestQueueQuitPath, healthState.QuitHandler(func() {
time.Sleep(quitSleepDuration)

Expand Down
25 changes: 11 additions & 14 deletions pkg/activator/handler/filtering_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,17 @@ func TestFilteringHandler(t *testing.T) {
headers: http.Header{},
passed: true,
expectedStatus: http.StatusOK,
},
{
label: "filter a request containing retry header",
headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: "4"}),
passed: false,
expectedStatus: http.StatusServiceUnavailable,
},
{
label: "forward a request containing empty retry header",
headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: ""}),
passed: true,
expectedStatus: http.StatusOK,
},
}
}, {
label: "filter a request containing retry header",
headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: "4"}),
passed: false,
expectedStatus: http.StatusServiceUnavailable,
}, {
label: "forward a request containing empty retry header",
headers: mapToHeader(map[string]string{activator.RequestCountHTTPHeader: ""}),
passed: true,
expectedStatus: http.StatusOK,
}}

for _, e := range examples {
t.Run(e.label, func(t *testing.T) {
Expand Down
36 changes: 36 additions & 0 deletions pkg/activator/handler/probe_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"net/http"

"github.com/knative/serving/pkg/network"
)

// ProbeHandler handles responding to Knative internal network probes.
type ProbeHandler struct {
NextHandler http.Handler
}

func (h *ProbeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If this header is set the request was sent by a Knative component
// probing the network, respond with a 200 and our component name.
if r.Header.Get(network.ProbeHeaderName) != "" {
w.Write([]byte("activator"))
return
}

h.NextHandler.ServeHTTP(w, r)
}
90 changes: 90 additions & 0 deletions pkg/activator/handler/probe_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package handler

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/knative/serving/pkg/network"
)

func TestProbeHandler(t *testing.T) {
examples := []struct {
label string
headers http.Header
passed bool
expectedStatus int
method string
}{{
label: "forward a normal POST request",
headers: http.Header{},
passed: true,
expectedStatus: http.StatusOK,
method: http.MethodPost,
}, {
label: "filter a POST request containing probe header",
headers: mapToHeader(map[string]string{network.ProbeHeaderName: "not-empty"}),
passed: false,
expectedStatus: http.StatusOK,
method: http.MethodPost,
}, {
label: "forward a normal GET request",
headers: http.Header{},
passed: true,
expectedStatus: http.StatusOK,
method: http.MethodGet,
}, {
label: "filter a GET request containing probe header",
headers: mapToHeader(map[string]string{network.ProbeHeaderName: "not-empty"}),
passed: false,
expectedStatus: http.StatusOK,
method: http.MethodGet,
}, {
label: "forward a request containing empty retry header",
headers: mapToHeader(map[string]string{network.ProbeHeaderName: ""}),
passed: true,
expectedStatus: http.StatusOK,
method: http.MethodPost,
}}

for _, e := range examples {
t.Run(e.label, func(t *testing.T) {
wasPassed := false
baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wasPassed = true
w.WriteHeader(http.StatusOK)
})
handler := ProbeHandler{NextHandler: baseHandler}

resp := httptest.NewRecorder()
req := httptest.NewRequest(e.method, "http://example.com", nil)
req.Header = e.headers

handler.ServeHTTP(resp, req)

if wasPassed != e.passed {
if !e.passed {
t.Error("Request got passed to the next handler unexpectedly")
} else {
t.Error("Request was not passed to the next handler as expected")
}
}

if resp.Code != e.expectedStatus {
t.Errorf("Unexpected response status. Want %d, got %d", e.expectedStatus, resp.Code)
}
})
}
}
6 changes: 6 additions & 0 deletions pkg/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
)

const (
// ProbeHeaderName is the name of a header that can be added to
// requests to probe the knative networking layer. Requests
// with this header will not be passed to the user container or
// included in request metrics.
ProbeHeaderName = "k-network-probe"

// ConfigName is the name of the configmap containing all
// customizations for networking features.
ConfigName = "config-network"
Expand Down

0 comments on commit c62cb22

Please sign in to comment.