diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 194236a43a24..1e13308e8bde 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -594,7 +594,7 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { } func (l *loopyWriter) registerStreamHandler(h *registerStream) error { - defer h.stat.Egress(h.stat.NewTimer("/http2/recv/header/loopyWriter/registerOutStream")) + timer := h.stat.NewTimer("/http2/recv/header/loopyWriter/registerOutStream") str := &outStream{ id: h.streamID, state: empty, @@ -602,6 +602,7 @@ func (l *loopyWriter) registerStreamHandler(h *registerStream) error { wq: h.wq, } l.estdStreams[h.streamID] = str + h.stat.Egress(timer) return nil } @@ -700,9 +701,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He } func (l *loopyWriter) preprocessData(df *dataFrame) error { - defer df.stat.Egress(df.stat.NewTimer("/http2/send/dataFrame/loopyWriter/preprocess")) + timer := df.stat.NewTimer("/http2/send/dataFrame/loopyWriter/preprocess") str, ok := l.estdStreams[df.streamID] if !ok { + df.stat.Egress(timer) return nil } // If we got data for a stream it means that @@ -712,6 +714,7 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error { str.state = active l.activeStreams.enqueue(str) } + df.stat.Egress(timer) return nil } @@ -841,7 +844,9 @@ func (l *loopyWriter) processData() (bool, error) { // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the // maximum possilbe HTTP2 frame size. - defer dataItem.stat.Egress(dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter")) + if dataItem.stat != nil { + defer dataItem.stat.Egress(dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter")) + } if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame // Client sends out empty data frame with endStream = true diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index e1b5ee35dfa0..b34bb5d05ad6 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -572,10 +572,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // Stream ID will be set when loopy writer actually establishes the stream // and obtains a stream ID profiling.StreamStats.Push(s.stat) + // Usually, performance suffers when we used defers to record egress out of + // functions when profiling is disabled, but it's fine here because this is + // executed only when profliing is enabled. + defer s.stat.Egress(s.stat.AppendTimer(timer)) } - defer s.stat.Egress(s.stat.AppendTimer(timer)) - cleanup := func(err error) { if s.swapState(streamDone) == streamDone { // If it was already done, return. @@ -607,7 +609,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return err } t.activeStreams[id] = s - if profiling.IsEnabled() { + if s.stat != nil { binary.BigEndian.PutUint32(s.stat.Metadata[8:12], id) } if channelz.IsOn() { @@ -935,13 +937,14 @@ func (t *http2Client) updateFlowControl(n uint32) { func (t *http2Client) handleData(f *http2.DataFrame) { var timer *profiling.Timer if profiling.IsEnabled() { - // We don't have a reference to the corresponding stream's stat object - // right now, so instead we measure time and append to the stat object - // later. + // We don't have a reference to the stream's stat object right now, so + // instead we measure time and append to the stat object later. timer = profiling.NewTimer("/http2/recv/dataFrame/loopyReader") // Do not defer a call to timer egress here because this object is // short-lived; once this is appended to the appropriate stat, all // references to this object must be lost so as to be garbage collected. + // AppendTimer makes a copy of the timer, so operations on this object will + // not be reflected in the stat's copy. } size := f.Header().Length @@ -982,10 +985,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) { if s == nil { return } - defer s.stat.Egress(s.stat.AppendTimer(timer)) + timerIdx := s.stat.AppendTimer(timer) if size > 0 { if err := s.fc.onData(size); err != nil { t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) + s.stat.Egress(timerIdx) return } if f.Header().Flags.Has(http2.FlagDataPadded) { @@ -1008,6 +1012,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true) } + s.stat.Egress(timerIdx) } func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { @@ -1194,7 +1199,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - defer s.stat.Egress(s.stat.NewTimer("/http2/recv/header/loopyReader")) + if s.stat != nil { + defer s.stat.Egress(s.stat.NewTimer("/http2/recv/header/loopyReader")) + } endStream := frame.StreamEnded() atomic.StoreUint32(&s.bytesReceived, 1) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 7fd5fa9d257e..548e59ff207f 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -629,10 +629,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) { if !ok { return } - defer s.stat.Egress(s.stat.AppendTimer(timer)) + timerIdx := s.stat.AppendTimer(timer) if size > 0 { if err := s.fc.onData(size); err != nil { t.closeStream(s, true, http2.ErrCodeFlowControl, false) + s.stat.Egress(timerIdx) return } if f.Header().Flags.Has(http2.FlagDataPadded) { @@ -655,6 +656,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) { s.compareAndSwapState(streamActive, streamReadDone) s.write(recvMsg{err: io.EOF}) } + s.stat.Egress(timerIdx) } func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { @@ -852,6 +854,9 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error { // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early // OK is adopted. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { + if s.stat != nil { + defer s.stat.Egress(s.stat.NewTimer("/WriteStatus")) + } if s.getState() == streamDone { return nil } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 9c59f4454d10..bb82d4fe7397 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -151,7 +151,6 @@ type recvBufferReader struct { // read additional data from recv. It blocks if there no additional data available // in recv. If Read returns any non-nil error, it will continue to return that error. func (r *recvBufferReader) Read(p []byte) (n int, err error) { - defer r.stat.Egress(r.stat.NewTimer("/recvBufferReader")) if r.err != nil { return 0, r.err } @@ -173,7 +172,6 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) { } func (r *recvBufferReader) read(p []byte) (n int, err error) { - defer r.stat.Egress(r.stat.NewTimer("/read")) select { case <-r.ctxDone: return 0, ContextErr(r.ctx.Err()) @@ -316,7 +314,6 @@ func (s *Stream) getState() streamState { } func (s *Stream) waitOnHeader() { - defer s.stat.Egress(s.stat.NewTimer("/waitOnHeader")) if s.headerChan == nil { // On the server headerChan is always nil since a stream originates // only after having received headers. @@ -471,15 +468,8 @@ func (s *Stream) Read(p []byte) (n int, err error) { if er := s.trReader.(*transportReader).er; er != nil { return 0, er } - - timer := s.stat.NewTimer("/requestRead") s.requestRead(len(p)) - s.stat.Egress(timer) - - timer = s.stat.NewTimer("/io") n, err = io.ReadFull(s.trReader, p) - s.stat.Egress(timer) - return } @@ -497,18 +487,12 @@ type transportReader struct { } func (t *transportReader) Read(p []byte) (n int, err error) { - timer := t.stat.NewTimer("/transportReader/reader") n, err = t.reader.Read(p) - t.stat.Egress(timer) if err != nil { t.er = err return } - - timer = t.stat.NewTimer("/transportReader/windowHandler") t.windowHandler(n) - t.stat.Egress(timer) - return } diff --git a/rpc_util.go b/rpc_util.go index 21a2c880e585..c6ab0b57418c 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -658,7 +658,9 @@ type payloadInfo struct { } func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, stat *profiling.Stat) ([]byte, error) { - defer stat.Egress(stat.NewTimer("/recvAndDecompress")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/recvAndDecompress")) + } timer := stat.NewTimer("/recvMsg") pf, d, err := p.recvMsg(maxReceiveMessageSize, stat) diff --git a/server.go b/server.go index f3ae15eeb9c9..41a6f77f45f6 100644 --- a/server.go +++ b/server.go @@ -845,7 +845,9 @@ func (s *Server) incrCallsFailed() { func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { stat := stream.Stat() - defer stat.Egress(stat.NewTimer("/Server/sendResponse")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/Server/sendResponse")) + } timer := stat.NewTimer("/encoding") data, err := encode(s.getCodec(stream.ContentSubtype()), msg, stat) @@ -882,7 +884,9 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { stat := stream.Stat() - defer stat.Egress(stat.NewTimer("/processUnaryRPC")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/processUnaryRPC")) + } if channelz.IsOn() { s.incrCallsStarted() @@ -1007,7 +1011,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. t.IncrMsgRecv() } df := func(v interface{}) error { - defer stat.Egress(stat.NewTimer("/encoding")) + overallTimer := stat.NewTimer("/encoding") // Do not create a closure on timer. timer := stat.NewTimer("/getCodec") @@ -1018,6 +1022,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. err := codec.Unmarshal(d, v) stat.Egress(timer) if err != nil { + stat.Egress(overallTimer) return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } @@ -1038,6 +1043,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) } + stat.Egress(overallTimer) return nil } ctx := NewContextWithServerTransportStream(stream.Context(), stream) @@ -1139,7 +1145,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { stat := stream.Stat() - defer stat.Egress(stat.NewTimer("/processStreamingRPC")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/processStreamingRPC")) + } if channelz.IsOn() { s.incrCallsStarted() @@ -1306,6 +1314,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { + stat := stream.Stat() // If the request is well-formed, we must stop the /stream/recv/grpc/header // timer before either of processUnaryRPC or processStreamingRPC is called. // If the request is malformed, the handler may exit early or use the unknown @@ -1313,7 +1322,6 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str // processing, we cannot stop the timer *after* the completion of the unknown // stream description handler. For these reasons, we cannot defer the // timer.Egress() call. - stat := stream.Stat() timer := stat.NewTimer("/grpc/stream/recv/header") sm := stream.Method() diff --git a/stream.go b/stream.go index c512db7286d4..8a9d75706cdc 100644 --- a/stream.go +++ b/stream.go @@ -680,7 +680,9 @@ func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error func (cs *clientStream) SendMsg(m interface{}) (err error) { stat := cs.attempt.s.Stat() - defer stat.Egress(stat.NewTimer("/send")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/send")) + } defer func() { if err != nil && err != io.EOF { @@ -729,7 +731,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { func (cs *clientStream) RecvMsg(m interface{}) error { stat := cs.attempt.s.Stat() - defer stat.Egress(stat.NewTimer("/clientStream/RecvMsg")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/clientStream/RecvMsg")) + } if cs.binlog != nil && !cs.serverHeaderBinlogged { // Call Header() to binary log header if it's not already logged. @@ -1401,7 +1405,9 @@ func (ss *serverStream) SetTrailer(md metadata.MD) { func (ss *serverStream) SendMsg(m interface{}) (err error) { stat := ss.s.Stat() - defer stat.Egress(stat.NewTimer("/serverStream/SendMsg")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/serverStream/SendMsg")) + } defer func() { if ss.trInfo != nil { @@ -1469,7 +1475,9 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { func (ss *serverStream) RecvMsg(m interface{}) (err error) { stat := ss.s.Stat() - defer stat.Egress(stat.NewTimer("/serverStream/RecvMsg")) + if stat != nil { + defer stat.Egress(stat.NewTimer("/serverStream/RecvMsg")) + } defer func() { if ss.trInfo != nil {