Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiling: add hooks within grpc #3159

Merged
merged 5 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/profiling"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"
)
Expand All @@ -78,6 +79,8 @@ var (
fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
profilingFlag = flags.StringWithAllowedValues("profiling", toggleModeOff,
fmt.Sprintf("Profiling mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
Expand Down Expand Up @@ -497,6 +500,7 @@ type featureOpts struct {
respPayloadCurves []*stats.PayloadCurve
compModes []string
enableChannelz []bool
enableProfiling []bool
enablePreloader []bool
}

Expand Down Expand Up @@ -532,6 +536,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.compModes)
case stats.EnableChannelzIndex:
featuresNum[i] = len(b.features.enableChannelz)
case stats.EnableProfilingIndex:
featuresNum[i] = len(b.features.enableProfiling)
case stats.EnablePreloaderIndex:
featuresNum[i] = len(b.features.enablePreloader)
default:
Expand Down Expand Up @@ -595,6 +601,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
EnableProfiling: b.features.enableProfiling[curPos[stats.EnableProfilingIndex]],
EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
}
if len(b.features.reqPayloadCurves) == 0 {
Expand Down Expand Up @@ -661,6 +668,7 @@ func processFlags() *benchOpts {
respSizeBytes: append([]int(nil), *readRespSizeBytes...),
compModes: setCompressorMode(*compressorMode),
enableChannelz: setToggleMode(*channelzOn),
enableProfiling: setToggleMode(*profilingFlag),
enablePreloader: setToggleMode(*preloaderMode),
},
}
Expand Down Expand Up @@ -756,6 +764,13 @@ func main() {
if bf.EnableChannelz {
channelz.TurnOn()
}
profiling.Enable(bf.EnableProfiling)
if bf.EnableProfiling {
if err := profiling.InitStats(1 << 12); err != nil {
fmt.Fprintf(os.Stderr, "error in InitStats: %v\n", err)
return
}
}
if opts.rModes.unary {
unaryBenchmark(start, stop, bf, s)
}
Expand Down
22 changes: 20 additions & 2 deletions benchmark/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/internal/profiling"
)

// FeatureIndex is an enum for features that usually differ across individual
Expand All @@ -52,6 +53,7 @@ const (
CompModesIndex
EnableChannelzIndex
EnablePreloaderIndex
EnableProfilingIndex

// MaxFeatureIndex is a place holder to indicate the total number of feature
// indices we have. Any new feature indices should be added above this.
Expand Down Expand Up @@ -107,6 +109,8 @@ type Features struct {
ModeCompressor string
// EnableChannelz indicates if channelz was turned on.
EnableChannelz bool
// EnableProfiling indicates if profiling was turned on.
EnableProfiling bool
// EnablePreloader indicates if preloading was turned on.
EnablePreloader bool
}
Expand All @@ -126,10 +130,11 @@ func (f Features) String() string {
}
return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+
"trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+
"compressor_%v-channelz_%v-preloader_%v",
"compressor_%v-channelz_%v-profiling_%v-preloader_%v",
f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace,
f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString,
respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader)
respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnableProfiling,
f.EnablePreloader)
}

// SharedFeatures returns the shared features as a pretty printable string.
Expand Down Expand Up @@ -187,6 +192,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim
b.WriteString(fmt.Sprintf("Compressor%v%v%v", sep, f.ModeCompressor, delim))
case EnableChannelzIndex:
b.WriteString(fmt.Sprintf("Channelz%v%v%v", sep, f.EnableChannelz, delim))
case EnableProfilingIndex:
b.WriteString(fmt.Sprintf("Profiling%v%v%v", sep, f.EnableProfiling, delim))
case EnablePreloaderIndex:
b.WriteString(fmt.Sprintf("Preloader%v%v%v", sep, f.EnablePreloader, delim))
default:
Expand Down Expand Up @@ -251,6 +258,8 @@ type RunData struct {
NinetyNinth time.Duration
// Average is the average latency.
Average time.Duration
// Stream-level profiling data.
StreamStats []*profiling.Stat
}

type durationSlice []time.Duration
Expand Down Expand Up @@ -318,10 +327,19 @@ func (s *Stats) EndRun(count uint64) {
RespT: float64(count) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
}
s.computeLatencies(r)
s.drainProfiling(r)
s.dump(r)
s.hw = &histWrapper{}
}

// drainProfiling drains stats from internal/profiling.
func (s *Stats) drainProfiling(r *BenchResults) {
results := profiling.StreamStats.Drain()
for _, stat := range results {
r.Data.StreamStats = append(r.Data.StreamStats, stat.(*profiling.Stat))
}
}

// EndUnconstrainedRun is similar to EndRun, but is to be used for
// unconstrained workloads.
func (s *Stats) EndUnconstrainedRun(req uint64, resp uint64) {
Expand Down
6 changes: 3 additions & 3 deletions call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type testStreamHandler struct {
func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
p := &parser{r: s}
for {
pf, req, err := p.recvMsg(math.MaxInt32)
pf, req, err := p.recvMsg(math.MaxInt32, nil)
if err == io.EOF {
break
}
Expand Down Expand Up @@ -104,13 +104,13 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
}
}
// send a response back to end the stream.
data, err := encode(testCodec{}, &expectedResponse)
data, err := encode(testCodec{}, &expectedResponse, nil)
if err != nil {
t.Errorf("Failed to encode the response: %v", err)
return
}
hdr, payload := msgHeader(data, nil)
h.t.Write(s, hdr, payload, &transport.Options{})
h.t.Write(s, hdr, payload, nil, &transport.Options{})
h.t.WriteStatus(s, status.New(codes.OK, ""))
}

Expand Down
9 changes: 9 additions & 0 deletions internal/profiling/profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,12 @@ func InitStats(streamStatsSize uint32) error {

return nil
}

const (
// StreamStatMetadataConnectionIDSize is the number of bytes reserved for the
// connection ID in a stream stat's metadata field.
StreamStatMetadataConnectionIDSize = 8
// StreamStatMetadataStreamIDSize is the number of bytes reserved for the
// stream ID in a stream stat's metadata field.
StreamStatMetadataStreamIDSize = 4
)
12 changes: 12 additions & 0 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/profiling"
)

var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
Expand Down Expand Up @@ -99,6 +100,7 @@ type cbItem interface {
type registerStream struct {
streamID uint32
wq *writeQuota
stat *profiling.Stat
}

func (*registerStream) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -136,6 +138,7 @@ type dataFrame struct {
// onEachWrite is called every time
// a part of d is written out.
onEachWrite func()
stat *profiling.Stat
}

func (*dataFrame) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -591,13 +594,15 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
}

func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
timer := h.stat.NewTimer("/http2/recv/header/loopyWriter/registerOutStream")
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
l.estdStreams[h.streamID] = str
timer.Egress()
return nil
}

Expand Down Expand Up @@ -696,8 +701,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
}

func (l *loopyWriter) preprocessData(df *dataFrame) error {
timer := df.stat.NewTimer("/http2/send/dataFrame/loopyWriter/preprocess")
str, ok := l.estdStreams[df.streamID]
if !ok {
timer.Egress()
return nil
}
// If we got data for a stream it means that
Expand All @@ -707,6 +714,7 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
str.state = active
l.activeStreams.enqueue(str)
}
timer.Egress()
return nil
}

Expand Down Expand Up @@ -836,6 +844,10 @@ func (l *loopyWriter) processData() (bool, error) {
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
// maximum possilbe HTTP2 frame size.

if dataItem.stat != nil {
defer dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter").Egress()
}

if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/profiling"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
Expand Down Expand Up @@ -262,7 +263,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
}
}

func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, stat *profiling.Stat, opts *Options) error {
return ht.do(func() {
ht.writeCommonHeaders(s)
ht.rw.Write(hdr)
Expand Down Expand Up @@ -324,6 +325,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace

req := ht.req

// TODO(adtac): set stat?
dfawley marked this conversation as resolved.
Show resolved Hide resolved
s := &Stream{
id: 0, // irrelevant
requestRead: func(int) {},
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/handler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
st.bodyw.Close() // no body

st.ht.WriteStatus(s, status.New(codes.OK, ""))
st.ht.Write(s, []byte("hdr"), []byte("data"), &Options{})
st.ht.Write(s, []byte("hdr"), []byte("data"), nil, &Options{})
})
}

Expand Down
Loading