From c424fed1e57566edbbc6c96a4e520b34dd7b0420 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Thu, 7 Nov 2019 15:22:08 -0800 Subject: [PATCH 1/5] profiling: add hooks within grpc --- benchmark/benchmain/main.go | 15 ++++ benchmark/stats/stats.go | 22 +++++- call_test.go | 6 +- internal/transport/controlbuf.go | 12 +++ internal/transport/handler_server.go | 4 +- internal/transport/handler_server_test.go | 2 +- internal/transport/http2_client.go | 48 +++++++++++- internal/transport/http2_server.go | 35 ++++++++- internal/transport/transport.go | 25 ++++-- internal/transport/transport_test.go | 44 +++++------ preloader.go | 6 +- rpc_util.go | 57 ++++++++++++-- rpc_util_test.go | 12 +-- server.go | 95 +++++++++++++++++++++-- stream.go | 89 +++++++++++++++++---- 15 files changed, 398 insertions(+), 74 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index a8bcb9deeec2..19f1b4d39810 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -65,6 +65,7 @@ import ( "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/profiling" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/test/bufconn" ) @@ -78,6 +79,8 @@ var ( fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff, fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) + profilingFlag = flags.StringWithAllowedValues("profiling", toggleModeOff, + fmt.Sprintf("Profiling mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) compressorMode = flags.StringWithAllowedValues("compression", compModeOff, fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes) networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone, @@ -497,6 +500,7 @@ type featureOpts struct { respPayloadCurves []*stats.PayloadCurve compModes []string enableChannelz []bool + enableProfiling []bool enablePreloader []bool } @@ -532,6 +536,8 @@ func makeFeaturesNum(b *benchOpts) []int { featuresNum[i] = len(b.features.compModes) case stats.EnableChannelzIndex: featuresNum[i] = len(b.features.enableChannelz) + case stats.EnableProfilingIndex: + featuresNum[i] = len(b.features.enableProfiling) case stats.EnablePreloaderIndex: featuresNum[i] = len(b.features.enablePreloader) default: @@ -595,6 +601,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]], ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]], EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]], + EnableProfiling: b.features.enableProfiling[curPos[stats.EnableProfilingIndex]], EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]], } if len(b.features.reqPayloadCurves) == 0 { @@ -661,6 +668,7 @@ func processFlags() *benchOpts { respSizeBytes: append([]int(nil), *readRespSizeBytes...), compModes: setCompressorMode(*compressorMode), enableChannelz: setToggleMode(*channelzOn), + enableProfiling: setToggleMode(*profilingFlag), enablePreloader: setToggleMode(*preloaderMode), }, } @@ -756,6 +764,13 @@ func main() { if bf.EnableChannelz { channelz.TurnOn() } + profiling.Enable(bf.EnableProfiling) + if bf.EnableProfiling { + if err := profiling.InitStats(1 << 12); err != nil { + fmt.Fprintf(os.Stderr, "error in InitStats: %v\n", err) + return + } + } if opts.rModes.unary { unaryBenchmark(start, stop, bf, s) } diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index f2f6a08878cb..c3599a82885e 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -31,6 +31,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/internal/profiling" ) // FeatureIndex is an enum for features that usually differ across individual @@ -52,6 +53,7 @@ const ( CompModesIndex EnableChannelzIndex EnablePreloaderIndex + EnableProfilingIndex // MaxFeatureIndex is a place holder to indicate the total number of feature // indices we have. Any new feature indices should be added above this. @@ -107,6 +109,8 @@ type Features struct { ModeCompressor string // EnableChannelz indicates if channelz was turned on. EnableChannelz bool + // EnableProfiling indicates if profiling was turned on. + EnableProfiling bool // EnablePreloader indicates if preloading was turned on. EnablePreloader bool } @@ -126,10 +130,11 @@ func (f Features) String() string { } return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+ "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+ - "compressor_%v-channelz_%v-preloader_%v", + "compressor_%v-channelz_%v-profiling_%v-preloader_%v", f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString, - respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader) + respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnableProfiling, + f.EnablePreloader) } // SharedFeatures returns the shared features as a pretty printable string. @@ -187,6 +192,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim b.WriteString(fmt.Sprintf("Compressor%v%v%v", sep, f.ModeCompressor, delim)) case EnableChannelzIndex: b.WriteString(fmt.Sprintf("Channelz%v%v%v", sep, f.EnableChannelz, delim)) + case EnableProfilingIndex: + b.WriteString(fmt.Sprintf("Profiling%v%v%v", sep, f.EnableProfiling, delim)) case EnablePreloaderIndex: b.WriteString(fmt.Sprintf("Preloader%v%v%v", sep, f.EnablePreloader, delim)) default: @@ -251,6 +258,8 @@ type RunData struct { NinetyNinth time.Duration // Average is the average latency. Average time.Duration + // Stream-level profiling data. + StreamStats []*profiling.Stat } type durationSlice []time.Duration @@ -318,10 +327,19 @@ func (s *Stats) EndRun(count uint64) { RespT: float64(count) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(), } s.computeLatencies(r) + s.drainProfiling(r) s.dump(r) s.hw = &histWrapper{} } +// drainProfiling drains stats from internal/profiling. +func (s *Stats) drainProfiling(r *BenchResults) { + results := profiling.StreamStats.Drain() + for _, stat := range results { + r.Data.StreamStats = append(r.Data.StreamStats, stat.(*profiling.Stat)) + } +} + // EndUnconstrainedRun is similar to EndRun, but is to be used for // unconstrained workloads. func (s *Stats) EndUnconstrainedRun(req uint64, resp uint64) { diff --git a/call_test.go b/call_test.go index 78760ba5297a..36adaf412d88 100644 --- a/call_test.go +++ b/call_test.go @@ -67,7 +67,7 @@ type testStreamHandler struct { func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { p := &parser{r: s} for { - pf, req, err := p.recvMsg(math.MaxInt32) + pf, req, err := p.recvMsg(math.MaxInt32, nil) if err == io.EOF { break } @@ -104,13 +104,13 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { } } // send a response back to end the stream. - data, err := encode(testCodec{}, &expectedResponse) + data, err := encode(testCodec{}, &expectedResponse, nil) if err != nil { t.Errorf("Failed to encode the response: %v", err) return } hdr, payload := msgHeader(data, nil) - h.t.Write(s, hdr, payload, &transport.Options{}) + h.t.Write(s, hdr, payload, nil, &transport.Options{}) h.t.WriteStatus(s, status.New(codes.OK, "")) } diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index ddee20b6bef2..195df1ed9297 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -27,6 +27,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/internal/profiling" ) var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { @@ -99,6 +100,7 @@ type cbItem interface { type registerStream struct { streamID uint32 wq *writeQuota + stat *profiling.Stat } func (*registerStream) isTransportResponseFrame() bool { return false } @@ -136,6 +138,7 @@ type dataFrame struct { // onEachWrite is called every time // a part of d is written out. onEachWrite func() + stat *profiling.Stat } func (*dataFrame) isTransportResponseFrame() bool { return false } @@ -591,6 +594,7 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { } func (l *loopyWriter) registerStreamHandler(h *registerStream) error { + timer := h.stat.NewTimer("/http2/recv/header/loopyWriter/registerOutStream") str := &outStream{ id: h.streamID, state: empty, @@ -598,6 +602,7 @@ func (l *loopyWriter) registerStreamHandler(h *registerStream) error { wq: h.wq, } l.estdStreams[h.streamID] = str + timer.Egress() return nil } @@ -696,8 +701,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He } func (l *loopyWriter) preprocessData(df *dataFrame) error { + timer := df.stat.NewTimer("/http2/send/dataFrame/loopyWriter/preprocess") str, ok := l.estdStreams[df.streamID] if !ok { + timer.Egress() return nil } // If we got data for a stream it means that @@ -707,6 +714,7 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error { str.state = active l.activeStreams.enqueue(str) } + timer.Egress() return nil } @@ -836,6 +844,10 @@ func (l *loopyWriter) processData() (bool, error) { // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the // maximum possilbe HTTP2 frame size. + if dataItem.stat != nil { + defer dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter").Egress() + } + if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame // Client sends out empty data frame with endStream = true if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index c3c32dafe9e5..305fc17e2d78 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -39,6 +39,7 @@ import ( "golang.org/x/net/http2" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal/profiling" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -262,7 +263,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { } } -func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { +func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, stat *profiling.Stat, opts *Options) error { return ht.do(func() { ht.writeCommonHeaders(s) ht.rw.Write(hdr) @@ -324,6 +325,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace req := ht.req + // TODO(adtac): set stat? s := &Stream{ id: 0, // irrelevant requestRead: func(int) {}, diff --git a/internal/transport/handler_server_test.go b/internal/transport/handler_server_test.go index 55bc0b88248e..f1dcfec7331b 100644 --- a/internal/transport/handler_server_test.go +++ b/internal/transport/handler_server_test.go @@ -402,7 +402,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) { st.bodyw.Close() // no body st.ht.WriteStatus(s, status.New(codes.OK, "")) - st.ht.Write(s, []byte("hdr"), []byte("data"), &Options{}) + st.ht.Write(s, []byte("hdr"), []byte("data"), nil, &Options{}) }) } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index b1b82ec956eb..2b27655fe166 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -20,6 +20,7 @@ package transport import ( "context" + "encoding/binary" "fmt" "io" "math" @@ -37,6 +38,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/profiling" "google.golang.org/grpc/internal/syscall" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -368,6 +370,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { headerChan: make(chan struct{}), contentSubtype: callHdr.ContentSubtype, } + s.wq = newWriteQuota(defaultWriteQuota, s.done) s.requestRead = func(n int) { t.adjustWindow(s, uint32(n)) @@ -385,10 +388,12 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { t.CloseStream(s, err) }, freeBuffer: t.bufferPool.put, + stat: s.stat, }, windowHandler: func(n int) { t.updateWindow(s, uint32(n)) }, + stat: s.stat, } return s } @@ -557,12 +562,29 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call // NewStream creates a stream and registers it into the transport as "active" // streams. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { + timer := profiling.NewTimer("/http2") ctx = peer.NewContext(ctx, t.getPeer()) headerFields, err := t.createHeaderFields(ctx, callHdr) if err != nil { return nil, err } + s := t.newStream(ctx, callHdr) + + if profiling.IsEnabled() { + s.stat = profiling.NewStat("client") + s.stat.Metadata = make([]byte, 12) + binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID) + // Stream ID will be set when loopy writer actually establishes the stream + // and obtains a stream ID + profiling.StreamStats.Push(s.stat) + // Usually, performance suffers when we used defers to record egress out of + // functions when profiling is disabled, but it's fine here because this is + // executed only when profliing is enabled. + s.stat.AppendTimer(timer) + defer timer.Egress() + } + cleanup := func(err error) { if s.swapState(streamDone) == streamDone { // If it was already done, return. @@ -593,6 +615,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return err } t.activeStreams[id] = s + if s.stat != nil { + binary.BigEndian.PutUint32(s.stat.Metadata[8:12], id) + } if channelz.IsOn() { atomic.AddInt64(&t.czData.streamsStarted, 1) atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) @@ -840,7 +865,7 @@ func (t *http2Client) GracefulClose() { // Write formats the data into HTTP2 data frame(s) and sends it out. The caller // should proceed only if Write returns nil. -func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { +func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, stat *profiling.Stat, opts *Options) error { if opts.Last { // If it's the last message, update stream state. if !s.compareAndSwapState(streamActive, streamWriteDone) { @@ -852,6 +877,7 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e df := &dataFrame{ streamID: s.id, endStream: opts.Last, + stat: stat, } if hdr != nil || data != nil { // If it's not an empty data frame. // Add some data to grpc message header so that we can equally @@ -921,6 +947,18 @@ func (t *http2Client) updateFlowControl(n uint32) { } func (t *http2Client) handleData(f *http2.DataFrame) { + var timer *profiling.Timer + if profiling.IsEnabled() { + // We don't have a reference to the stream's stat object right now, so + // instead we measure time and append to the stat object later. + timer = profiling.NewTimer("/http2/recv/dataFrame/loopyReader") + // Do not defer a call to timer egress here because this object is + // short-lived; once this is appended to the appropriate stat, all + // references to this object must be lost so as to be garbage collected. + // AppendTimer makes a copy of the timer, so operations on this object will + // not be reflected in the stat's copy. + } + size := f.Header().Length var sendBDPPing bool if t.bdpEst != nil { @@ -959,9 +997,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) { if s == nil { return } + s.stat.AppendTimer(timer) if size > 0 { if err := s.fc.onData(size); err != nil { t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) + timer.Egress() return } if f.Header().Flags.Has(http2.FlagDataPadded) { @@ -984,6 +1024,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true) } + timer.Egress() } func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { @@ -1169,6 +1210,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if s == nil { return } + + if s.stat != nil { + defer s.stat.NewTimer("/http2/recv/header/loopyReader").Egress() + } + endStream := frame.StreamEnded() atomic.StoreUint32(&s.bytesReceived, 1) initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0 diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 8b04b0392a0a..6531dc5e0400 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -21,6 +21,7 @@ package transport import ( "bytes" "context" + "encoding/binary" "errors" "fmt" "io" @@ -42,6 +43,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcrand" + "google.golang.org/grpc/internal/profiling" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" @@ -331,6 +333,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( method: state.data.method, contentSubtype: state.data.contentSubtype, } + + if profiling.IsEnabled() { + // This is where the concept of a stream is first established within a gRPC + // server, so let's create an associated Stat object. + s.stat = profiling.NewStat("server") + s.stat.Metadata = make([]byte, 12) + binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID) + binary.BigEndian.PutUint32(s.stat.Metadata[8:12], streamID) + profiling.StreamStats.Push(s.stat) + defer s.stat.NewTimer("/http2/recv/header").Egress() + } + if frame.StreamEnded() { // s is just created by the caller. No lock needed. s.state = streamReadDone @@ -434,15 +448,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( ctxDone: s.ctxDone, recv: s.buf, freeBuffer: t.bufferPool.put, + stat: s.stat, }, windowHandler: func(n int) { t.updateWindow(s, uint32(n)) }, + stat: s.stat, } // Register the stream with loopy. t.controlBuf.put(®isterStream{ streamID: s.id, wq: s.wq, + stat: s.Stat(), }) handle(s) return false @@ -569,6 +586,15 @@ func (t *http2Server) updateFlowControl(n uint32) { } func (t *http2Server) handleData(f *http2.DataFrame) { + var timer *profiling.Timer + if profiling.IsEnabled() { + timer = profiling.NewTimer("/http2/recv/dataFrame/loopyReader") + } + + // We don't defer timer.Egress() here because it should be called by + // google.golang.org/grpc/internal/transport.(*recvBuffer).put when the + // recvMsg is actually put into the buffer. + size := f.Header().Length var sendBDPPing bool if t.bdpEst != nil { @@ -604,9 +630,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) { if !ok { return } + s.stat.AppendTimer(timer) if size > 0 { if err := s.fc.onData(size); err != nil { t.closeStream(s, true, http2.ErrCodeFlowControl, false) + timer.Egress() return } if f.Header().Flags.Has(http2.FlagDataPadded) { @@ -629,6 +657,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) { s.compareAndSwapState(streamActive, streamReadDone) s.write(recvMsg{err: io.EOF}) } + timer.Egress() } func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { @@ -828,6 +857,9 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error { // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early // OK is adopted. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { + if s.stat != nil { + defer s.stat.NewTimer("/WriteStatus").Egress() + } if s.getState() == streamDone { return nil } @@ -889,7 +921,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { // Write converts the data into HTTP2 data frame and sends it out. Non-nil error // is returns if it fails (e.g., framing error, transport error). -func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { +func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, stat *profiling.Stat, opts *Options) error { if !s.isHeaderSent() { // Headers haven't been written yet. if err := t.WriteHeader(s, nil); err != nil { if _, ok := err.(ConnectionError); ok { @@ -923,6 +955,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e h: hdr, d: data, onEachWrite: t.setResetPingStrikes, + stat: stat, } if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { select { diff --git a/internal/transport/transport.go b/internal/transport/transport.go index a30da9eb324f..1df292f1d96d 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal/profiling" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" @@ -144,6 +145,7 @@ type recvBufferReader struct { last *bytes.Buffer // Stores the remaining data in the previous calls. err error freeBuffer func(*bytes.Buffer) + stat *profiling.Stat } // Read reads the next len(p) bytes from last. If last is drained, it tries to @@ -175,7 +177,8 @@ func (r *recvBufferReader) read(p []byte) (n int, err error) { case <-r.ctxDone: return 0, ContextErr(r.ctx.Err()) case m := <-r.recv.get(): - return r.readAdditional(m, p) + n, err = r.readAdditional(m, p) + return } } @@ -284,6 +287,8 @@ type Stream struct { // contentSubtype is the content-subtype for requests. // this must be lowercase or the behavior is undefined. contentSubtype string + + stat *profiling.Stat } // isHeaderSent is only valid on the server-side. @@ -453,6 +458,14 @@ func (s *Stream) write(m recvMsg) { s.buf.put(m) } +// Stat returns the streams's underlying *profiling.Stat. +func (s *Stream) Stat() *profiling.Stat { + if s == nil { + return nil + } + return s.stat +} + // Read reads all p bytes from the wire for this stream. func (s *Stream) Read(p []byte) (n int, err error) { // Don't request a read if there was an error earlier @@ -460,7 +473,8 @@ func (s *Stream) Read(p []byte) (n int, err error) { return 0, er } s.requestRead(len(p)) - return io.ReadFull(s.trReader, p) + n, err = io.ReadFull(s.trReader, p) + return } // tranportReader reads all the data available for this Stream from the transport and @@ -468,11 +482,12 @@ func (s *Stream) Read(p []byte) (n int, err error) { // The error is io.EOF when the stream is done or another non-nil error if // the stream broke. type transportReader struct { - reader io.Reader + reader *recvBufferReader // The handler to control the window update procedure for both this // particular stream and the associated transport. windowHandler func(int) er error + stat *profiling.Stat } func (t *transportReader) Read(p []byte) (n int, err error) { @@ -632,7 +647,7 @@ type ClientTransport interface { // Write sends the data for the given stream. A nil stream indicates // the write is to be performed on the transport as a whole. - Write(s *Stream, hdr []byte, data []byte, opts *Options) error + Write(s *Stream, hdr []byte, data []byte, stat *profiling.Stat, opts *Options) error // NewStream creates a Stream for an RPC. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) @@ -683,7 +698,7 @@ type ServerTransport interface { // Write sends the data for the given stream. // Write may not be called on all streams. - Write(s *Stream, hdr []byte, data []byte, opts *Options) error + Write(s *Stream, hdr []byte, data []byte, stat *profiling.Stat, opts *Options) error // WriteStatus sends the status of a stream to the client. WriteStatus is // the final call made on a stream and always occurs. diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 4bd312fdbe85..35f44ebeb48f 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -126,7 +126,7 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) { return } // send a response back to the client. - h.t.Write(s, nil, resp, &Options{}) + h.t.Write(s, nil, resp, nil, &Options{}) // send the trailer to end the stream. h.t.WriteStatus(s, status.New(codes.OK, "")) } @@ -154,7 +154,7 @@ func (h *testStreamHandler) handleStreamPingPong(t *testing.T, s *Stream) { buf[0] = byte(0) binary.BigEndian.PutUint32(buf[1:], uint32(sz)) copy(buf[5:], msg) - h.t.Write(s, nil, buf, &Options{}) + h.t.Write(s, nil, buf, nil, &Options{}) } } @@ -275,7 +275,7 @@ func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) { // This write will cause server to run out of stream level, // flow control and the other side won't send a window update // until that happens. - if err := h.t.Write(s, nil, resp, &Options{}); err != nil { + if err := h.t.Write(s, nil, resp, nil, &Options{}); err != nil { t.Errorf("server Write got %v, want ", err) return } @@ -533,7 +533,7 @@ func (s) TestClientSendAndReceive(t *testing.T) { t.Fatalf("wrong stream id: %d", s2.id) } opts := Options{Last: true} - if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF { + if err := ct.Write(s1, nil, expectedRequest, nil, &opts); err != nil && err != io.EOF { t.Fatalf("failed to send data: %v", err) } p := make([]byte, len(expectedResponse)) @@ -568,7 +568,7 @@ func performOneRPC(ct ClientTransport) { return } opts := Options{Last: true} - if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF { + if err := ct.Write(s, []byte{}, expectedRequest, nil, &opts); err == nil || err == io.EOF { time.Sleep(5 * time.Millisecond) // The following s.Recv()'s could error out because the // underlying transport is gone. @@ -614,7 +614,7 @@ func (s) TestLargeMessage(t *testing.T) { if err != nil { t.Errorf("%v.NewStream(_, _) = _, %v, want _, ", ct, err) } - if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil && err != io.EOF { + if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{Last: true}); err != nil && err != io.EOF { t.Errorf("%v.Write(_, _, _) = %v, want ", ct, err) } p := make([]byte, len(expectedResponseLarge)) @@ -705,7 +705,7 @@ func (s) TestLargeMessageWithDelayRead(t *testing.T) { // This write will cause client to run out of stream level, // flow control and the other side won't send a window update // until that happens. - if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{}); err != nil { + if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{}); err != nil { t.Fatalf("write(_, _, _) = %v, want ", err) } p := make([]byte, len(expectedResponseLarge)) @@ -720,7 +720,7 @@ func (s) TestLargeMessageWithDelayRead(t *testing.T) { if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) { t.Fatalf("s.Read(_) = _, %v, want _, ", err) } - if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil { + if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{Last: true}); err != nil { t.Fatalf("Write(_, _, _) = %v, want ", err) } if _, err = s.Read(p); err != io.EOF { @@ -752,7 +752,7 @@ func (s) TestGracefulClose(t *testing.T) { outgoingHeader[0] = byte(0) binary.BigEndian.PutUint32(outgoingHeader[1:], uint32(len(msg))) incomingHeader := make([]byte, 5) - if err := ct.Write(s, outgoingHeader, msg, &Options{}); err != nil { + if err := ct.Write(s, outgoingHeader, msg, nil, &Options{}); err != nil { t.Fatalf("Error while writing: %v", err) } if _, err := s.Read(incomingHeader); err != nil { @@ -777,13 +777,13 @@ func (s) TestGracefulClose(t *testing.T) { t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing) return } - ct.Write(str, nil, nil, &Options{Last: true}) + ct.Write(str, nil, nil, nil, &Options{Last: true}) if _, err := str.Read(make([]byte, 8)); err != errStreamDrain && err != ErrConnClosing { t.Errorf("_.Read(_) = _, %v, want _, %v or %v", err, errStreamDrain, ErrConnClosing) } }() } - ct.Write(s, nil, nil, &Options{Last: true}) + ct.Write(s, nil, nil, nil, &Options{Last: true}) if _, err := s.Read(incomingHeader); err != io.EOF { t.Fatalf("Client expected EOF from the server. Got: %v", err) } @@ -813,8 +813,8 @@ func (s) TestLargeMessageSuspension(t *testing.T) { }() // Write should not be done successfully due to flow control. msg := make([]byte, initialWindowSize*8) - ct.Write(s, nil, msg, &Options{}) - err = ct.Write(s, nil, msg, &Options{Last: true}) + ct.Write(s, nil, msg, nil, &Options{}) + err = ct.Write(s, nil, msg, nil, &Options{Last: true}) if err != errStreamDone { t.Fatalf("Write got %v, want io.EOF", err) } @@ -1006,7 +1006,7 @@ func (s) TestClientConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream1.id) } // Exhaust client's connection window. - if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil { + if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), nil, &Options{}); err != nil { t.Fatalf("Server failed to write data. Err: %v", err) } notifyChan = make(chan struct{}) @@ -1031,7 +1031,7 @@ func (s) TestClientConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id) } // Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream. - if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil { + if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), nil, &Options{}); err != nil { t.Fatalf("Server failed to write data. Err: %v", err) } @@ -1075,7 +1075,7 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Failed to create 1st stream. Err: %v", err) } // Exhaust server's connection window. - if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil { + if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), nil, &Options{Last: true}); err != nil { t.Fatalf("Client failed to write data. Err: %v", err) } //Client should be able to create another stream and send data on it. @@ -1083,7 +1083,7 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) { if err != nil { t.Fatalf("Failed to create 2nd stream. Err: %v", err) } - if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), &Options{}); err != nil { + if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), nil, &Options{}); err != nil { t.Fatalf("Client failed to write data. Err: %v", err) } // Get the streams on server. @@ -1316,7 +1316,7 @@ func (s) TestEncodingRequiredStatus(t *testing.T) { return } opts := Options{Last: true} - if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != errStreamDone { + if err := ct.Write(s, nil, expectedRequest, nil, &opts); err != nil && err != errStreamDone { t.Fatalf("Failed to write the request: %v", err) } p := make([]byte, http2MaxFrameLen) @@ -1494,7 +1494,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) opts := Options{} header := make([]byte, 5) for i := 1; i <= 10; i++ { - if err := client.Write(stream, nil, buf, &opts); err != nil { + if err := client.Write(stream, nil, buf, nil, &opts); err != nil { t.Errorf("Error on client while writing message: %v", err) return } @@ -1531,7 +1531,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) st.mu.Unlock() // Close all streams for _, stream := range clientStreams { - client.Write(stream, nil, nil, &Options{Last: true}) + client.Write(stream, nil, nil, nil, &Options{Last: true}) if _, err := stream.Read(make([]byte, 5)); err != io.EOF { t.Fatalf("Client expected an EOF from the server. Got: %v", err) } @@ -1687,13 +1687,13 @@ func runPingPongTest(t *testing.T, msgSize int) { for { select { case <-done: - client.Write(stream, nil, nil, &Options{Last: true}) + client.Write(stream, nil, nil, nil, &Options{Last: true}) if _, err := stream.Read(incomingHeader); err != io.EOF { t.Fatalf("Client expected EOF from the server. Got: %v", err) } return default: - if err := client.Write(stream, outgoingHeader, msg, opts); err != nil { + if err := client.Write(stream, outgoingHeader, msg, nil, opts); err != nil { t.Fatalf("Error on client while writing message. Err: %v", err) } if _, err := stream.Read(incomingHeader); err != nil { diff --git a/preloader.go b/preloader.go index 76acbbcc93b9..d2e24f7577cd 100644 --- a/preloader.go +++ b/preloader.go @@ -50,12 +50,14 @@ func (p *PreparedMsg) Encode(s Stream, msg interface{}) error { } // prepare the msg - data, err := encode(rpcInfo.preloaderInfo.codec, msg) + // Stream is deprecated and does not provide a way to retrieve the internal + // ServerStream/ClientStream's stat, so we'll pass nil here and in compress. + data, err := encode(rpcInfo.preloaderInfo.codec, msg, nil) if err != nil { return err } p.encodedData = data - compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp) + compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp, nil) if err != nil { return err } diff --git a/rpc_util.go b/rpc_util.go index cf9dbe7fd97d..fe3d220b417d 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/proto" + "google.golang.org/grpc/internal/profiling" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" @@ -506,10 +507,13 @@ type parser struct { // No other error values or types must be returned, which also means // that the underlying io.Reader must not return an incompatible // error. -func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) { +func (p *parser) recvMsg(maxReceiveMessageSize int, stat *profiling.Stat) (pf payloadFormat, msg []byte, err error) { + timer := stat.NewTimer("/header") if _, err := p.r.Read(p.header[:]); err != nil { + timer.Egress() return 0, nil, err } + timer.Egress() pf = payloadFormat(p.header[0]) length := binary.BigEndian.Uint32(p.header[1:]) @@ -523,22 +527,26 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } + // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead // of making it for each message: + timer = stat.NewTimer("/message") msg = make([]byte, int(length)) if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } + timer.Egress() return 0, nil, err } + timer.Egress() return pf, msg, nil } // encode serializes msg and returns a buffer containing the message, or an // error if it is too large to be transmitted by grpc. If msg is nil, it // generates an empty message. -func encode(c baseCodec, msg interface{}) ([]byte, error) { +func encode(c baseCodec, msg interface{}, stat *profiling.Stat) ([]byte, error) { if msg == nil { // NOTE: typed nils will not be caught by this check return nil, nil } @@ -556,7 +564,7 @@ func encode(c baseCodec, msg interface{}) ([]byte, error) { // compressors are nil, returns nil. // // TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor. -func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) { +func compress(in []byte, cp Compressor, compressor encoding.Compressor, stat *profiling.Stat) ([]byte, error) { if compressor == nil && cp == nil { return nil, nil } @@ -565,20 +573,34 @@ func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, } cbuf := &bytes.Buffer{} if compressor != nil { + timer := stat.NewTimer("/compresslib/init") z, err := compressor.Compress(cbuf) if err != nil { + timer.Egress() return nil, wrapErr(err) } + timer.Egress() + + timer = stat.NewTimer("/compresslib/write") if _, err := z.Write(in); err != nil { + timer.Egress() return nil, wrapErr(err) } + timer.Egress() + + timer = stat.NewTimer("/compresslib/close") if err := z.Close(); err != nil { + timer.Egress() return nil, wrapErr(err) } + timer.Egress() } else { + timer := stat.NewTimer("/compressor") if err := cp.Do(cbuf, in); err != nil { + timer.Egress() return nil, wrapErr(err) } + timer.Egress() } return cbuf.Bytes(), nil } @@ -637,19 +659,29 @@ type payloadInfo struct { uncompressedBytes []byte } -func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) { - pf, d, err := p.recvMsg(maxReceiveMessageSize) +func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, stat *profiling.Stat) ([]byte, error) { + overallTimer := stat.NewTimer("/recvAndDecompress") + timer := stat.NewTimer("/recvMsg") + pf, d, err := p.recvMsg(maxReceiveMessageSize, stat) + timer.Egress() if err != nil { + overallTimer.Egress() return nil, err } + if payInfo != nil { payInfo.wireLength = len(d) } + timer = stat.NewTimer("/checkRecvPayload") if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { + timer.Egress() + overallTimer.Egress() return nil, st.Err() } + timer.Egress() + timer = stat.NewTimer("/compression") var size int if pf == compressionMade { // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor, @@ -661,16 +693,22 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei d, size, err = decompress(compressor, d, maxReceiveMessageSize) } if err != nil { + timer.Egress() + overallTimer.Egress() return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } } else { size = len(d) } + timer.Egress() + if size > maxReceiveMessageSize { // TODO: Revisit the error code. Currently keep it consistent with java // implementation. + overallTimer.Egress() return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize) } + overallTimer.Egress() return d, nil } @@ -705,14 +743,19 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize // For the two compressor parameters, both should not be set, but if they are, // dc takes precedence over compressor. // TODO(dfawley): wrap the old compressor/decompressor using the new API? -func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { - d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) +func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, stat *profiling.Stat) error { + d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, stat) if err != nil { return err } + + timer := stat.NewTimer("/encoding") if err := c.Unmarshal(d, m); err != nil { + timer.Egress() return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } + timer.Egress() + if payInfo != nil { payInfo.uncompressedBytes = d } diff --git a/rpc_util_test.go b/rpc_util_test.go index 2449c23815ec..19cbbbae0abc 100644 --- a/rpc_util_test.go +++ b/rpc_util_test.go @@ -66,7 +66,7 @@ func (s) TestSimpleParsing(t *testing.T) { } { buf := fullReader{bytes.NewReader(test.p)} parser := &parser{r: buf} - pt, b, err := parser.recvMsg(math.MaxInt32) + pt, b, err := parser.recvMsg(math.MaxInt32, nil) if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt { t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err) } @@ -88,14 +88,14 @@ func (s) TestMultipleParsing(t *testing.T) { {compressionNone, []byte("d")}, } for i, want := range wantRecvs { - pt, data, err := parser.recvMsg(math.MaxInt32) + pt, data, err := parser.recvMsg(math.MaxInt32, nil) if err != nil || pt != want.pt || !reflect.DeepEqual(data, want.data) { t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, ", i, p, pt, data, err, want.pt, want.data) } } - pt, data, err := parser.recvMsg(math.MaxInt32) + pt, data, err := parser.recvMsg(math.MaxInt32, nil) if err != io.EOF { t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v", len(wantRecvs), p, pt, data, err, io.EOF) @@ -113,7 +113,7 @@ func (s) TestEncode(t *testing.T) { }{ {nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil}, } { - data, err := encode(encoding.GetCodec(protoenc.Name), test.msg) + data, err := encode(encoding.GetCodec(protoenc.Name), test.msg, nil) if err != test.err || !bytes.Equal(data, test.data) { t.Errorf("encode(_, %v) = %v, %v; want %v, %v", test.msg, data, err, test.data, test.err) continue @@ -217,12 +217,12 @@ func (s) TestParseDialTarget(t *testing.T) { func bmEncode(b *testing.B, mSize int) { cdc := encoding.GetCodec(protoenc.Name) msg := &perfpb.Buffer{Body: make([]byte, mSize)} - encodeData, _ := encode(cdc, msg) + encodeData, _ := encode(cdc, msg, nil) encodedSz := int64(len(encodeData)) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - encode(cdc, msg) + encode(cdc, msg, nil) } b.SetBytes(encodedSz) } diff --git a/server.go b/server.go index 0d75cb109a09..ef46a53f60f9 100644 --- a/server.go +++ b/server.go @@ -719,7 +719,9 @@ func (s *Server) serveStreams(st transport.ServerTransport) { wg.Add(1) go func() { defer wg.Done() + timer := stream.Stat().NewTimer("/grpc") s.handleStream(st, stream, s.traceInfo(st, stream)) + timer.Egress() }() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { @@ -842,31 +844,53 @@ func (s *Server) incrCallsFailed() { } func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { - data, err := encode(s.getCodec(stream.ContentSubtype()), msg) + stat := stream.Stat() + overallTimer := stat.NewTimer("/Server/sendResponse") + + timer := stat.NewTimer("/encoding") + codec := s.getCodec(stream.ContentSubtype()) + data, err := encode(codec, msg, stat) + timer.Egress() if err != nil { grpclog.Errorln("grpc: server failed to encode response: ", err) + overallTimer.Egress() return err } - compData, err := compress(data, cp, comp) + + timer = stat.NewTimer("/compression") + compData, err := compress(data, cp, comp, stat) + timer.Egress() + if err != nil { grpclog.Errorln("grpc: server failed to compress response: ", err) + overallTimer.Egress() return err } + + timer = stat.NewTimer("/header") hdr, payload := msgHeader(data, compData) + timer.Egress() // TODO(dfawley): should we be checking len(data) instead? if len(payload) > s.opts.maxSendMessageSize { + overallTimer.Egress() return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) } - err = t.Write(stream, hdr, payload, opts) + + timer = stat.NewTimer("/transport/enqueue") + err = t.Write(stream, hdr, payload, stat, opts) + timer.Egress() if err == nil && s.opts.statsHandler != nil { s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) } + overallTimer.Egress() return err } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { + stat := stream.Stat() + overallTimer := stat.NewTimer("/processUnaryRPC") sh := s.opts.statsHandler - if sh != nil || trInfo != nil || channelz.IsOn() { + if overallTimer != nil || sh != nil || trInfo != nil || channelz.IsOn() { if channelz.IsOn() { s.incrCallsStarted() } @@ -918,6 +942,10 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. s.incrCallsSucceeded() } } + + if overallTimer != nil { + overallTimer.Egress() + } }() } @@ -955,6 +983,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. // If dc is set and matches the stream's compression, use it. Otherwise, try // to find a matching registered compressor for decomp. + timer := stat.NewTimer("/recv/compression/compressor") if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { dc = s.opts.dc } else if rc != "" && rc != encoding.Identity { @@ -962,6 +991,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if decomp == nil { st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) t.WriteStatus(stream, st) + timer.Egress() return st.Err() } } @@ -980,12 +1010,15 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. stream.SetSendCompress(rc) } } + timer.Egress() var payInfo *payloadInfo if sh != nil || binlog != nil { payInfo = &payloadInfo{} } - d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) + + timer = stat.NewTimer("/recv") + d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp, stat) if err != nil { if st, ok := status.FromError(err); ok { if e := t.WriteStatus(stream, st); e != nil { @@ -994,13 +1027,27 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return err } + timer.Egress() + if channelz.IsOn() { t.IncrMsgRecv() } df := func(v interface{}) error { - if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { + overallTimer := stat.NewTimer("/encoding") + + // Do not create a closure on timer. + timer := stat.NewTimer("/getCodec") + codec := s.getCodec(stream.ContentSubtype()) + timer.Egress() + + timer = stat.NewTimer("/unmarshal") + err := codec.Unmarshal(d, v) + timer.Egress() + if err != nil { + overallTimer.Egress() return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } + if sh != nil { sh.HandleRPC(stream.Context(), &stats.InPayload{ RecvTime: time.Now(), @@ -1018,10 +1065,15 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) } + overallTimer.Egress() return nil } ctx := NewContextWithServerTransportStream(stream.Context(), stream) + + // TODO(adtac): measure interceptor code separately? + timer = stat.NewTimer("/applicationHandler") reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) + timer.Egress() if appErr != nil { appStatus, ok := status.FromError(appErr) if !ok { @@ -1114,6 +1166,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { + stat := stream.Stat() + overallTimer := stat.NewTimer("/processStreamingRPC") if channelz.IsOn() { s.incrCallsStarted() } @@ -1139,7 +1193,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp statsHandler: sh, } - if sh != nil || trInfo != nil || channelz.IsOn() { + if overallTimer != nil || sh != nil || trInfo != nil || channelz.IsOn() { // See comment in processUnaryRPC on defers. defer func() { if trInfo != nil { @@ -1171,6 +1225,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp s.incrCallsSucceeded() } } + + if overallTimer != nil { + overallTimer.Egress() + } }() } @@ -1199,6 +1257,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp // If dc is set and matches the stream's compression, use it. Otherwise, try // to find a matching registered compressor for decomp. + timer := stat.NewTimer("/recv/compression/compressor") if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { ss.dc = s.opts.dc } else if rc != "" && rc != encoding.Identity { @@ -1206,6 +1265,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if ss.decomp == nil { st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) t.WriteStatus(ss.s, st) + timer.Egress() return st.Err() } } @@ -1224,6 +1284,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp stream.SetSendCompress(rc) } } + timer.Egress() if trInfo != nil { trInfo.tr.LazyLog(&trInfo.firstLine, false) @@ -1234,14 +1295,19 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp server = srv.server } if s.opts.streamInt == nil { + timer = stat.NewTimer("/applicationHandler") appErr = sd.Handler(server, ss) + timer.Egress() } else { info := &StreamServerInfo{ FullMethod: stream.Method(), IsClientStream: sd.ClientStreams, IsServerStream: sd.ServerStreams, } + // TODO(adtac): measure intercept code separately? + timer = stat.NewTimer("/applicationHandler") appErr = s.opts.streamInt(server, ss, info, sd.Handler) + timer.Egress() } if appErr != nil { appStatus, ok := status.FromError(appErr) @@ -1281,6 +1347,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { + stat := stream.Stat() + // If the request is well-formed, we must stop the /stream/recv/grpc/header + // timer before either of processUnaryRPC or processStreamingRPC is called. + // If the request is malformed, the handler may exit early or use the unknown + // stream description handler. Since the latter's processing isn't header + // processing, we cannot stop the timer *after* the completion of the unknown + // stream description handler. For these reasons, we cannot defer the + // timer.Egress() call. + timer := stat.NewTimer("/grpc/stream/recv/header") + sm := stream.Method() if sm != "" && sm[0] == '/' { sm = sm[1:] @@ -1302,6 +1378,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str if trInfo != nil { trInfo.tr.Finish() } + timer.Egress() return } service := sm[:pos] @@ -1310,16 +1387,19 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str srv, knownService := s.m[service] if knownService { if md, ok := srv.md[method]; ok { + timer.Egress() s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.sd[method]; ok { + timer.Egress() s.processStreamingRPC(t, stream, srv, sd, trInfo) return } } // Unknown service, or known server unknown method. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { + timer.Egress() s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) return } @@ -1343,6 +1423,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str if trInfo != nil { trInfo.tr.Finish() } + timer.Egress() } // The key to save ServerTransportStream in the context. diff --git a/stream.go b/stream.go index bb99940e36fe..e2e56ca76ffd 100644 --- a/stream.go +++ b/stream.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcrand" + "google.golang.org/grpc/internal/profiling" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" @@ -678,6 +679,9 @@ func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error } func (cs *clientStream) SendMsg(m interface{}) (err error) { + stat := cs.attempt.s.Stat() + overallTimer := stat.NewTimer("/send") + defer func() { if err != nil && err != io.EOF { // Call finish on the client stream for errors generated by this SendMsg @@ -689,6 +693,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } }() if cs.sentLast { + overallTimer.Egress() return status.Errorf(codes.Internal, "SendMsg called after CloseSend") } if !cs.desc.ClientStreams { @@ -696,13 +701,15 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) + hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.attempt.s.Stat()) if err != nil { + overallTimer.Egress() return err } // TODO(dfawley): should we be checking len(data) instead? if len(payload) > *cs.callInfo.maxSendMessageSize { + overallTimer.Egress() return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) } msgBytes := data // Store the pointer before setting to nil. For binary logging. @@ -720,10 +727,14 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { Message: msgBytes, }) } + overallTimer.Egress() return } func (cs *clientStream) RecvMsg(m interface{}) error { + stat := cs.attempt.s.Stat() + overallTimer := stat.NewTimer("/clientStream/RecvMsg") + if cs.binlog != nil && !cs.serverHeaderBinlogged { // Call Header() to binary log header if it's not already logged. cs.Header() @@ -761,6 +772,7 @@ func (cs *clientStream) RecvMsg(m interface{}) error { cs.binlog.Log(logEntry) } } + overallTimer.Egress() return err } @@ -771,7 +783,7 @@ func (cs *clientStream) CloseSend() error { } cs.sentLast = true op := func(a *csAttempt) error { - a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) + a.t.Write(a.s, nil, nil, a.s.Stat(), &transport.Options{Last: true}) // Always return nil; io.EOF is the only error that might make sense // instead, but there is no need to signal the client to call RecvMsg // as the only use left for the stream after CloseSend is to call @@ -793,9 +805,15 @@ func (cs *clientStream) finish(err error) { // Ending a stream with EOF indicates a success. err = nil } + var stat *profiling.Stat + if cs.attempt != nil { + stat = cs.attempt.s.Stat() + } + overallTimer := stat.NewTimer("/finish") cs.mu.Lock() if cs.finished { cs.mu.Unlock() + overallTimer.Egress() return } cs.finished = true @@ -831,6 +849,7 @@ func (cs *clientStream) finish(err error) { } } cs.cancel() + overallTimer.Egress() } func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { @@ -842,15 +861,21 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { } a.mu.Unlock() } - if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { + stat := a.s.Stat() + timer := stat.NewTimer("/transport/enqueue") + if err := a.t.Write(a.s, hdr, payld, a.s.Stat(), &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { if !cs.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() // will call it with the stream's status independently. + timer.Egress() return nil } + timer.Egress() return io.EOF } + timer.Egress() + if a.statsHandler != nil { a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) } @@ -866,8 +891,10 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { payInfo = &payloadInfo{} } + stat := a.s.Stat() if !a.decompSet { // Block until we receive headers containing received message encoding. + timer := stat.NewTimer("/recv/header") if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { if a.dc == nil || a.dc.Type() != ct { // No configured decompressor, or it does not match the incoming @@ -881,8 +908,11 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { } // Only initialize this state once per stream. a.decompSet = true + timer.Egress() } - err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp) + timer := stat.NewTimer("/message") + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, stat) + timer.Egress() if err != nil { if err == io.EOF { if statusErr := a.s.Status().Err(); statusErr != nil { @@ -919,7 +949,9 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { } // Special handling for non-server-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) + timer = stat.NewTimer("/trailer") + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp, stat) + timer.Egress() if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) } @@ -1135,7 +1167,7 @@ func (as *addrConnStream) CloseSend() error { } as.sentLast = true - as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) + as.t.Write(as.s, nil, nil, as.s.Stat(), &transport.Options{Last: true}) // Always return nil; io.EOF is the only error that might make sense // instead, but there is no need to signal the client to call RecvMsg // as the only use left for the stream after CloseSend is to call @@ -1166,7 +1198,7 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) { } // load hdr, payload, data - hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp) + hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp, nil) if err != nil { return err } @@ -1176,7 +1208,7 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) } - if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { + if err := as.t.Write(as.s, hdr, payld, as.s.Stat(), &transport.Options{Last: !as.desc.ClientStreams}); err != nil { if !as.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() @@ -1216,7 +1248,7 @@ func (as *addrConnStream) RecvMsg(m interface{}) (err error) { // Only initialize this state once per stream. as.decompSet = true } - err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) + err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, as.s.Stat()) if err != nil { if err == io.EOF { if statusErr := as.s.Status().Err(); statusErr != nil { @@ -1237,7 +1269,7 @@ func (as *addrConnStream) RecvMsg(m interface{}) (err error) { // Special handling for non-server-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) + err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, as.s.Stat()) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) } @@ -1380,6 +1412,9 @@ func (ss *serverStream) SetTrailer(md metadata.MD) { } func (ss *serverStream) SendMsg(m interface{}) (err error) { + stat := ss.s.Stat() + overallTimer := stat.NewTimer("/serverStream/SendMsg") + defer func() { if ss.trInfo != nil { ss.mu.Lock() @@ -1409,18 +1444,24 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { }() // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) + hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp, ss.s.Stat()) if err != nil { + overallTimer.Egress() return err } // TODO(dfawley): should we be checking len(data) instead? if len(payload) > ss.maxSendMessageSize { + overallTimer.Egress() return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) } - if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { + timer := stat.NewTimer("/transport/enqueue") + if err := ss.t.Write(ss.s, hdr, payload, ss.s.Stat(), &transport.Options{Last: false}); err != nil { + timer.Egress() return toRPCErr(err) } + timer.Egress() + if ss.binlog != nil { if !ss.serverHeaderBinlogged { h, _ := ss.s.Header() @@ -1436,10 +1477,14 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { if ss.statsHandler != nil { ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) } + overallTimer.Egress() return nil } func (ss *serverStream) RecvMsg(m interface{}) (err error) { + stat := ss.s.Stat() + overallTimer := stat.NewTimer("/serverStream/RecvMsg") + defer func() { if ss.trInfo != nil { ss.mu.Lock() @@ -1471,16 +1516,18 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { if ss.statsHandler != nil || ss.binlog != nil { payInfo = &payloadInfo{} } - if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { + if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp, stat); err != nil { if err == io.EOF { if ss.binlog != nil { ss.binlog.Log(&binarylog.ClientHalfClose{}) } + overallTimer.Egress() return err } if err == io.ErrUnexpectedEOF { err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) } + overallTimer.Egress() return toRPCErr(err) } if ss.statsHandler != nil { @@ -1498,6 +1545,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { Message: payInfo.uncompressedBytes, }) } + overallTimer.Egress() return nil } @@ -1510,20 +1558,29 @@ func MethodFromServerStream(stream ServerStream) (string, bool) { // prepareMsg returns the hdr, payload and data // using the compressors passed or using the // passed preparedmsg -func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { +func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor stat *profiling.Stat) (hdr, payload, data []byte, err error) { if preparedMsg, ok := m.(*PreparedMsg); ok { return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil } // The input interface is not a prepared msg. // Marshal and Compress the data at this point - data, err = encode(codec, m) + timer := stat.NewTimer("/encoding") + data, err = encode(codec, m, stat) + timer.Egress() if err != nil { return nil, nil, nil, err } - compData, err := compress(data, cp, comp) + + timer = stat.NewTimer("/compression") + compData, err := compress(data, cp, comp, stat) + timer.Egress() if err != nil { return nil, nil, nil, err } + + timer = stat.NewTimer("/header") hdr, payload = msgHeader(data, compData) + timer.Egress() + return hdr, payload, data, nil } From 8de9175c7a3806cf6742211faac1f70508177ff3 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Fri, 20 Dec 2019 12:59:44 -0800 Subject: [PATCH 2/5] V2: review comment --- internal/profiling/profiling.go | 7 +++++++ internal/transport/http2_client.go | 4 +++- internal/transport/http2_server.go | 6 +++--- profiling/cmd/catapult.go | 2 ++ 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/internal/profiling/profiling.go b/internal/profiling/profiling.go index 4d8e68aab433..98bc27c696f9 100644 --- a/internal/profiling/profiling.go +++ b/internal/profiling/profiling.go @@ -221,3 +221,10 @@ func InitStats(streamStatsSize uint32) error { return nil } + +const ( + // StreamStatMetadataSize is the size of the metadata field StreamStats have + // in bytes. A 12-byte slice that is a big-endian concatenation of a stream's + // connection ID (first eight bytes) and stream ID (next four bytes) is used. + StreamStatMetadataSize = 12 +) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 2b27655fe166..df043aacb9ae 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -573,7 +573,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if profiling.IsEnabled() { s.stat = profiling.NewStat("client") - s.stat.Metadata = make([]byte, 12) + s.stat.Metadata = make([]byte, profiling.StreamStatMetadataSize) + // See comment above StreamStatMetadataSize definition for more information + // on this encoding. binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID) // Stream ID will be set when loopy writer actually establishes the stream // and obtains a stream ID diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 6531dc5e0400..6125de037578 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -335,10 +335,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( } if profiling.IsEnabled() { - // This is where the concept of a stream is first established within a gRPC - // server, so let's create an associated Stat object. s.stat = profiling.NewStat("server") - s.stat.Metadata = make([]byte, 12) + // See comment above StreamStatMetadataSize definition for more information + // on this encoding. + s.stat.Metadata = make([]byte, profiling.StreamStatMetadataSize) binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID) binary.BigEndian.PutUint32(s.stat.Metadata[8:12], streamID) profiling.StreamStats.Push(s.stat) diff --git a/profiling/cmd/catapult.go b/profiling/cmd/catapult.go index 953304b3ed59..88e4362ed182 100644 --- a/profiling/cmd/catapult.go +++ b/profiling/cmd/catapult.go @@ -124,6 +124,8 @@ func streamStatsCatapultJSONSingle(stat *ppb.Stat, baseSec int64, baseNsec int32 return nil } + // See comment above StreamStatMetadataSize definition for more information + // on this encoding. connectionCounter := binary.BigEndian.Uint64(stat.Metadata[0:8]) streamID := binary.BigEndian.Uint32(stat.Metadata[8:12]) opid := fmt.Sprintf("/%s/%d/%d", stat.Tags, connectionCounter, streamID) From 4c57f4f16e3e0e9192b8dde2e27f82015ebe66f4 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Fri, 20 Dec 2019 15:53:48 -0800 Subject: [PATCH 3/5] no magic --- internal/profiling/profiling.go | 10 ++++++---- internal/transport/http2_client.go | 10 +++++----- internal/transport/http2_server.go | 8 +++++--- profiling/cmd/catapult.go | 7 +++++-- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/internal/profiling/profiling.go b/internal/profiling/profiling.go index 98bc27c696f9..1c92051d98ef 100644 --- a/internal/profiling/profiling.go +++ b/internal/profiling/profiling.go @@ -223,8 +223,10 @@ func InitStats(streamStatsSize uint32) error { } const ( - // StreamStatMetadataSize is the size of the metadata field StreamStats have - // in bytes. A 12-byte slice that is a big-endian concatenation of a stream's - // connection ID (first eight bytes) and stream ID (next four bytes) is used. - StreamStatMetadataSize = 12 + // StreamStatMetadataConnectionIDSize is the number of bytes reserved for the + // connection ID in a stream stat's metadata field. + StreamStatMetadataConnectionIDSize = 8 + // StreamStatMetadataStreamIDSize is the number of bytes reserved for the + // stream ID in a stream stat's metadata field. + StreamStatMetadataStreamIDSize = 4 ) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index df043aacb9ae..f18e93d8a9d3 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -573,10 +573,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if profiling.IsEnabled() { s.stat = profiling.NewStat("client") - s.stat.Metadata = make([]byte, profiling.StreamStatMetadataSize) - // See comment above StreamStatMetadataSize definition for more information - // on this encoding. - binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID) + s.stat.Metadata = make([]byte, profiling.StreamStatMetadataConnectionIDSize+profiling.StreamStatMetadataStreamIDSize) + binary.BigEndian.PutUint64(s.stat.Metadata[0:profiling.StreamStatMetadataConnectionIDSize], t.connectionID) // Stream ID will be set when loopy writer actually establishes the stream // and obtains a stream ID profiling.StreamStats.Push(s.stat) @@ -618,7 +616,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } t.activeStreams[id] = s if s.stat != nil { - binary.BigEndian.PutUint32(s.stat.Metadata[8:12], id) + from := profiling.StreamStatMetadataConnectionIDSize + to := from + profiling.StreamStatMetadataStreamIDSize + binary.BigEndian.PutUint32(s.stat.Metadata[from:to], id) } if channelz.IsOn() { atomic.AddInt64(&t.czData.streamsStarted, 1) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 6125de037578..c92139c41857 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -338,9 +338,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( s.stat = profiling.NewStat("server") // See comment above StreamStatMetadataSize definition for more information // on this encoding. - s.stat.Metadata = make([]byte, profiling.StreamStatMetadataSize) - binary.BigEndian.PutUint64(s.stat.Metadata[0:8], t.connectionID) - binary.BigEndian.PutUint32(s.stat.Metadata[8:12], streamID) + s.stat.Metadata = make([]byte, profiling.StreamStatMetadataConnectionIDSize+profiling.StreamStatMetadataStreamIDSize) + written := 0 + binary.BigEndian.PutUint64(s.stat.Metadata[written:written+profiling.StreamStatMetadataConnectionIDSize], t.connectionID) + written += profiling.StreamStatMetadataConnectionIDSize + binary.BigEndian.PutUint32(s.stat.Metadata[written:written+profiling.StreamStatMetadataStreamIDSize], streamID) profiling.StreamStats.Push(s.stat) defer s.stat.NewTimer("/http2/recv/header").Egress() } diff --git a/profiling/cmd/catapult.go b/profiling/cmd/catapult.go index 88e4362ed182..09c35c344aa0 100644 --- a/profiling/cmd/catapult.go +++ b/profiling/cmd/catapult.go @@ -27,6 +27,7 @@ import ( "strings" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/profiling" ppb "google.golang.org/grpc/profiling/proto" ) @@ -126,8 +127,10 @@ func streamStatsCatapultJSONSingle(stat *ppb.Stat, baseSec int64, baseNsec int32 // See comment above StreamStatMetadataSize definition for more information // on this encoding. - connectionCounter := binary.BigEndian.Uint64(stat.Metadata[0:8]) - streamID := binary.BigEndian.Uint32(stat.Metadata[8:12]) + read := 0 + connectionCounter := binary.BigEndian.Uint64(stat.Metadata[read : read+profiling.StreamStatMetadataConnectionIDSize]) + read += profiling.StreamStatMetadataConnectionIDSize + streamID := binary.BigEndian.Uint32(stat.Metadata[read : read+profiling.StreamStatMetadataStreamIDSize]) opid := fmt.Sprintf("/%s/%d/%d", stat.Tags, connectionCounter, streamID) var loopyReaderGoID, loopyWriterGoID int64 From c401b38b2199de56ec9948320d48300802b68b02 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Sat, 8 Feb 2020 16:00:31 -0500 Subject: [PATCH 4/5] upstream rebase, go fmt --- benchmark/stats/stats.go | 2 +- stream.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index c3599a82885e..0ce1f2b462f5 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -134,7 +134,7 @@ func (f Features) String() string { f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString, respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnableProfiling, - f.EnablePreloader) + f.EnablePreloader) } // SharedFeatures returns the shared features as a pretty printable string. diff --git a/stream.go b/stream.go index e2e56ca76ffd..1c5250e7a8aa 100644 --- a/stream.go +++ b/stream.go @@ -1558,7 +1558,7 @@ func MethodFromServerStream(stream ServerStream) (string, bool) { // prepareMsg returns the hdr, payload and data // using the compressors passed or using the // passed preparedmsg -func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor stat *profiling.Stat) (hdr, payload, data []byte, err error) { +func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor, stat *profiling.Stat) (hdr, payload, data []byte, err error) { if preparedMsg, ok := m.(*PreparedMsg); ok { return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil } From 2dfc77ad44a815aa2e0ec124bb2ef60e28633da3 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Sat, 8 Feb 2020 16:05:30 -0500 Subject: [PATCH 5/5] review comments --- internal/transport/http2_server.go | 2 +- internal/transport/transport.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index c92139c41857..47ed622e7cb4 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -461,7 +461,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( t.controlBuf.put(®isterStream{ streamID: s.id, wq: s.wq, - stat: s.Stat(), + stat: s.stat, }) handle(s) return false diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 1df292f1d96d..1768f551f28c 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -460,6 +460,7 @@ func (s *Stream) write(m recvMsg) { // Stat returns the streams's underlying *profiling.Stat. func (s *Stream) Stat() *profiling.Stat { + // TODO(adtac): is this nil check really needed? if s == nil { return nil }