Skip to content

Commit

Permalink
hooks: V15: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Adhityaa Chandrasekar committed Dec 19, 2019
1 parent ff93118 commit 1453c8c
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 13 deletions.
2 changes: 1 addition & 1 deletion benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import (
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
profiling "google.golang.org/grpc/internal/profiling"
"google.golang.org/grpc/internal/profiling"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"
)
Expand Down
2 changes: 0 additions & 2 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
}
ht.stats.HandleRPC(s.ctx, inHeader)
}
// TODO(adtac): set stat in transportReader and recvBufferReader?
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
Expand All @@ -369,7 +368,6 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
// TODO(adtac): what should recvMsg.timer be here?
s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
buf = buf[n:]
}
Expand Down
3 changes: 0 additions & 3 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne

func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
// TODO(adtac): setup stat?
s := &Stream{
ct: t,
done: make(chan struct{}),
Expand Down Expand Up @@ -592,7 +591,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
// The stream was unprocessed by the server.
atomic.StoreUint32(&s.unprocessed, 1)
// TODO(adtac): set timer?
s.write(recvMsg{err: err})
close(s.done)
// If headerChan isn't closed, then close it.
Expand Down Expand Up @@ -751,7 +749,6 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
}
if err != nil {
// This will unblock reads eventually.
// TODO(adtac): set timer?
s.write(recvMsg{err: err})
}
// If headerChan isn't closed, then close it.
Expand Down
4 changes: 2 additions & 2 deletions preloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func (p *PreparedMsg) Encode(s Stream, msg interface{}) error {
}

// prepare the msg
// TODO(adtac): pass an actual stat to encode?
// Stream is deprecated and does not provide a way to retrieve the internal
// ServerStream/ClientStream's stat, so we'll pass nil here and in compress.
data, err := encode(rpcInfo.preloaderInfo.codec, msg, nil)
if err != nil {
return err
}
p.encodedData = data
// TODO(adtac): pass an actual stat to encode?
compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp, nil)
if err != nil {
return err
Expand Down
6 changes: 2 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,8 +1249,7 @@ func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
// Only initialize this state once per stream.
as.decompSet = true
}
// TODO(adtac): use a real stat instead of nil
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, nil)
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, as.s.Stat())
if err != nil {
if err == io.EOF {
if statusErr := as.s.Status().Err(); statusErr != nil {
Expand All @@ -1271,8 +1270,7 @@ func (as *addrConnStream) RecvMsg(m interface{}) (err error) {

// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
// TODO(adtac): use a real stat instead of nil
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, nil)
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, as.s.Stat())
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
Expand Down
2 changes: 1 addition & 1 deletion test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3759,7 +3759,7 @@ func testPingPong(t *testing.T, e env) {
}
}

func (s) TestMetadataStreamingRPC(t *testing.T) {
func TestMetadataStreamingRPC(t *testing.T) {
for _, e := range listTestEnv() {
testMetadataStreamingRPC(t, e)
}
Expand Down

0 comments on commit 1453c8c

Please sign in to comment.