Skip to content

Commit

Permalink
hooks: PATCH V5
Browse files Browse the repository at this point in the history
  • Loading branch information
Adhityaa Chandrasekar committed Nov 26, 2019
1 parent 7436582 commit 31d9dad
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 38 deletions.
11 changes: 8 additions & 3 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,14 +594,15 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
}

func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
defer h.stat.Egress(h.stat.NewTimer("/http2/recv/header/loopyWriter/registerOutStream"))
timer := h.stat.NewTimer("/http2/recv/header/loopyWriter/registerOutStream")
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
l.estdStreams[h.streamID] = str
h.stat.Egress(timer)
return nil
}

Expand Down Expand Up @@ -700,9 +701,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
}

func (l *loopyWriter) preprocessData(df *dataFrame) error {
defer df.stat.Egress(df.stat.NewTimer("/http2/send/dataFrame/loopyWriter/preprocess"))
timer := df.stat.NewTimer("/http2/send/dataFrame/loopyWriter/preprocess")
str, ok := l.estdStreams[df.streamID]
if !ok {
df.stat.Egress(timer)
return nil
}
// If we got data for a stream it means that
Expand All @@ -712,6 +714,7 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
str.state = active
l.activeStreams.enqueue(str)
}
df.stat.Egress(timer)
return nil
}

Expand Down Expand Up @@ -841,7 +844,9 @@ func (l *loopyWriter) processData() (bool, error) {
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
// maximum possilbe HTTP2 frame size.

defer dataItem.stat.Egress(dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter"))
if dataItem.stat != nil {
defer dataItem.stat.Egress(dataItem.stat.NewTimer("/http2/send/dataFrame/loopyWriter"))
}

if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
Expand Down
23 changes: 15 additions & 8 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// Stream ID will be set when loopy writer actually establishes the stream
// and obtains a stream ID
profiling.StreamStats.Push(s.stat)
// Usually, performance suffers when we used defers to record egress out of
// functions when profiling is disabled, but it's fine here because this is
// executed only when profliing is enabled.
defer s.stat.Egress(s.stat.AppendTimer(timer))
}

defer s.stat.Egress(s.stat.AppendTimer(timer))

cleanup := func(err error) {
if s.swapState(streamDone) == streamDone {
// If it was already done, return.
Expand Down Expand Up @@ -607,7 +609,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return err
}
t.activeStreams[id] = s
if profiling.IsEnabled() {
if s.stat != nil {
binary.BigEndian.PutUint32(s.stat.Metadata[8:12], id)
}
if channelz.IsOn() {
Expand Down Expand Up @@ -935,13 +937,14 @@ func (t *http2Client) updateFlowControl(n uint32) {
func (t *http2Client) handleData(f *http2.DataFrame) {
var timer *profiling.Timer
if profiling.IsEnabled() {
// We don't have a reference to the corresponding stream's stat object
// right now, so instead we measure time and append to the stat object
// later.
// We don't have a reference to the stream's stat object right now, so
// instead we measure time and append to the stat object later.
timer = profiling.NewTimer("/http2/recv/dataFrame/loopyReader")
// Do not defer a call to timer egress here because this object is
// short-lived; once this is appended to the appropriate stat, all
// references to this object must be lost so as to be garbage collected.
// AppendTimer makes a copy of the timer, so operations on this object will
// not be reflected in the stat's copy.
}

size := f.Header().Length
Expand Down Expand Up @@ -982,10 +985,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
if s == nil {
return
}
defer s.stat.Egress(s.stat.AppendTimer(timer))
timerIdx := s.stat.AppendTimer(timer)
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
s.stat.Egress(timerIdx)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
Expand All @@ -1008,6 +1012,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
s.stat.Egress(timerIdx)
}

func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
Expand Down Expand Up @@ -1194,7 +1199,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

defer s.stat.Egress(s.stat.NewTimer("/http2/recv/header/loopyReader"))
if s.stat != nil {
defer s.stat.Egress(s.stat.NewTimer("/http2/recv/header/loopyReader"))
}

endStream := frame.StreamEnded()
atomic.StoreUint32(&s.bytesReceived, 1)
Expand Down
7 changes: 6 additions & 1 deletion internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
if !ok {
return
}
defer s.stat.Egress(s.stat.AppendTimer(timer))
timerIdx := s.stat.AppendTimer(timer)
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
s.stat.Egress(timerIdx)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
Expand All @@ -655,6 +656,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}
s.stat.Egress(timerIdx)
}

func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
Expand Down Expand Up @@ -852,6 +854,9 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
if s.stat != nil {
defer s.stat.Egress(s.stat.NewTimer("/WriteStatus"))
}
if s.getState() == streamDone {
return nil
}
Expand Down
16 changes: 0 additions & 16 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ type recvBufferReader struct {
// read additional data from recv. It blocks if there no additional data available
// in recv. If Read returns any non-nil error, it will continue to return that error.
func (r *recvBufferReader) Read(p []byte) (n int, err error) {
defer r.stat.Egress(r.stat.NewTimer("/recvBufferReader"))
if r.err != nil {
return 0, r.err
}
Expand All @@ -173,7 +172,6 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
}

func (r *recvBufferReader) read(p []byte) (n int, err error) {
defer r.stat.Egress(r.stat.NewTimer("/read"))
select {
case <-r.ctxDone:
return 0, ContextErr(r.ctx.Err())
Expand Down Expand Up @@ -316,7 +314,6 @@ func (s *Stream) getState() streamState {
}

func (s *Stream) waitOnHeader() {
defer s.stat.Egress(s.stat.NewTimer("/waitOnHeader"))
if s.headerChan == nil {
// On the server headerChan is always nil since a stream originates
// only after having received headers.
Expand Down Expand Up @@ -471,15 +468,8 @@ func (s *Stream) Read(p []byte) (n int, err error) {
if er := s.trReader.(*transportReader).er; er != nil {
return 0, er
}

timer := s.stat.NewTimer("/requestRead")
s.requestRead(len(p))
s.stat.Egress(timer)

timer = s.stat.NewTimer("/io")
n, err = io.ReadFull(s.trReader, p)
s.stat.Egress(timer)

return
}

Expand All @@ -497,18 +487,12 @@ type transportReader struct {
}

func (t *transportReader) Read(p []byte) (n int, err error) {
timer := t.stat.NewTimer("/transportReader/reader")
n, err = t.reader.Read(p)
t.stat.Egress(timer)
if err != nil {
t.er = err
return
}

timer = t.stat.NewTimer("/transportReader/windowHandler")
t.windowHandler(n)
t.stat.Egress(timer)

return
}

Expand Down
4 changes: 3 additions & 1 deletion rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ type payloadInfo struct {
}

func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, stat *profiling.Stat) ([]byte, error) {
defer stat.Egress(stat.NewTimer("/recvAndDecompress"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/recvAndDecompress"))
}

timer := stat.NewTimer("/recvMsg")
pf, d, err := p.recvMsg(maxReceiveMessageSize, stat)
Expand Down
18 changes: 13 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,9 @@ func (s *Server) incrCallsFailed() {

func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
stat := stream.Stat()
defer stat.Egress(stat.NewTimer("/Server/sendResponse"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/Server/sendResponse"))
}

timer := stat.NewTimer("/encoding")
data, err := encode(s.getCodec(stream.ContentSubtype()), msg, stat)
Expand Down Expand Up @@ -882,7 +884,9 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str

func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
stat := stream.Stat()
defer stat.Egress(stat.NewTimer("/processUnaryRPC"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/processUnaryRPC"))
}

if channelz.IsOn() {
s.incrCallsStarted()
Expand Down Expand Up @@ -1007,7 +1011,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
t.IncrMsgRecv()
}
df := func(v interface{}) error {
defer stat.Egress(stat.NewTimer("/encoding"))
overallTimer := stat.NewTimer("/encoding")

// Do not create a closure on timer.
timer := stat.NewTimer("/getCodec")
Expand All @@ -1018,6 +1022,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
err := codec.Unmarshal(d, v)
stat.Egress(timer)
if err != nil {
stat.Egress(overallTimer)
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}

Expand All @@ -1038,6 +1043,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
stat.Egress(overallTimer)
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
Expand Down Expand Up @@ -1139,7 +1145,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.

func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
stat := stream.Stat()
defer stat.Egress(stat.NewTimer("/processStreamingRPC"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/processStreamingRPC"))
}

if channelz.IsOn() {
s.incrCallsStarted()
Expand Down Expand Up @@ -1306,14 +1314,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
stat := stream.Stat()
// If the request is well-formed, we must stop the /stream/recv/grpc/header
// timer before either of processUnaryRPC or processStreamingRPC is called.
// If the request is malformed, the handler may exit early or use the unknown
// stream description handler. Since the latter's processing isn't header
// processing, we cannot stop the timer *after* the completion of the unknown
// stream description handler. For these reasons, we cannot defer the
// timer.Egress() call.
stat := stream.Stat()
timer := stat.NewTimer("/grpc/stream/recv/header")

sm := stream.Method()
Expand Down
16 changes: 12 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,9 @@ func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error

func (cs *clientStream) SendMsg(m interface{}) (err error) {
stat := cs.attempt.s.Stat()
defer stat.Egress(stat.NewTimer("/send"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/send"))
}

defer func() {
if err != nil && err != io.EOF {
Expand Down Expand Up @@ -729,7 +731,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {

func (cs *clientStream) RecvMsg(m interface{}) error {
stat := cs.attempt.s.Stat()
defer stat.Egress(stat.NewTimer("/clientStream/RecvMsg"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/clientStream/RecvMsg"))
}

if cs.binlog != nil && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
Expand Down Expand Up @@ -1401,7 +1405,9 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {

func (ss *serverStream) SendMsg(m interface{}) (err error) {
stat := ss.s.Stat()
defer stat.Egress(stat.NewTimer("/serverStream/SendMsg"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/serverStream/SendMsg"))
}

defer func() {
if ss.trInfo != nil {
Expand Down Expand Up @@ -1469,7 +1475,9 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {

func (ss *serverStream) RecvMsg(m interface{}) (err error) {
stat := ss.s.Stat()
defer stat.Egress(stat.NewTimer("/serverStream/RecvMsg"))
if stat != nil {
defer stat.Egress(stat.NewTimer("/serverStream/RecvMsg"))
}

defer func() {
if ss.trInfo != nil {
Expand Down

0 comments on commit 31d9dad

Please sign in to comment.