diff --git a/call_test.go b/call_test.go index 78760ba5297a..36adaf412d88 100644 --- a/call_test.go +++ b/call_test.go @@ -67,7 +67,7 @@ type testStreamHandler struct { func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { p := &parser{r: s} for { - pf, req, err := p.recvMsg(math.MaxInt32) + pf, req, err := p.recvMsg(math.MaxInt32, nil) if err == io.EOF { break } @@ -104,13 +104,13 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { } } // send a response back to end the stream. - data, err := encode(testCodec{}, &expectedResponse) + data, err := encode(testCodec{}, &expectedResponse, nil) if err != nil { t.Errorf("Failed to encode the response: %v", err) return } hdr, payload := msgHeader(data, nil) - h.t.Write(s, hdr, payload, &transport.Options{}) + h.t.Write(s, hdr, payload, nil, &transport.Options{}) h.t.WriteStatus(s, status.New(codes.OK, "")) } diff --git a/internal/transport/handler_server_test.go b/internal/transport/handler_server_test.go index 737be7c7f5a3..fef3a3d4998f 100644 --- a/internal/transport/handler_server_test.go +++ b/internal/transport/handler_server_test.go @@ -402,7 +402,7 @@ func TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) { st.bodyw.Close() // no body st.ht.WriteStatus(s, status.New(codes.OK, "")) - st.ht.Write(s, []byte("hdr"), []byte("data"), &Options{}) + st.ht.Write(s, []byte("hdr"), []byte("data"), nil, &Options{}) }) } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index ebccf699ad84..9c59f4454d10 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -460,6 +460,7 @@ func (s *Stream) write(m recvMsg) { s.buf.put(m) } +// Stat returns the streams's underlying *profiling.Stat. func (s *Stream) Stat() *profiling.Stat { return s.stat } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index c5ee74848ccc..414f772b8090 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -117,7 +117,7 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) { return } // send a response back to the client. - h.t.Write(s, nil, resp, &Options{}) + h.t.Write(s, nil, resp, nil, &Options{}) // send the trailer to end the stream. h.t.WriteStatus(s, status.New(codes.OK, "")) } @@ -145,7 +145,7 @@ func (h *testStreamHandler) handleStreamPingPong(t *testing.T, s *Stream) { buf[0] = byte(0) binary.BigEndian.PutUint32(buf[1:], uint32(sz)) copy(buf[5:], msg) - h.t.Write(s, nil, buf, &Options{}) + h.t.Write(s, nil, buf, nil, &Options{}) } } @@ -266,7 +266,7 @@ func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) { // This write will cause server to run out of stream level, // flow control and the other side won't send a window update // until that happens. - if err := h.t.Write(s, nil, resp, &Options{}); err != nil { + if err := h.t.Write(s, nil, resp, nil, &Options{}); err != nil { t.Errorf("server Write got %v, want ", err) return } @@ -524,7 +524,7 @@ func TestClientSendAndReceive(t *testing.T) { t.Fatalf("wrong stream id: %d", s2.id) } opts := Options{Last: true} - if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF { + if err := ct.Write(s1, nil, expectedRequest, nil, &opts); err != nil && err != io.EOF { t.Fatalf("failed to send data: %v", err) } p := make([]byte, len(expectedResponse)) @@ -559,7 +559,7 @@ func performOneRPC(ct ClientTransport) { return } opts := Options{Last: true} - if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF { + if err := ct.Write(s, []byte{}, expectedRequest, nil, &opts); err == nil || err == io.EOF { time.Sleep(5 * time.Millisecond) // The following s.Recv()'s could error out because the // underlying transport is gone. @@ -605,7 +605,7 @@ func TestLargeMessage(t *testing.T) { if err != nil { t.Errorf("%v.NewStream(_, _) = _, %v, want _, ", ct, err) } - if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil && err != io.EOF { + if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{Last: true}); err != nil && err != io.EOF { t.Errorf("%v.Write(_, _, _) = %v, want ", ct, err) } p := make([]byte, len(expectedResponseLarge)) @@ -696,7 +696,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) { // This write will cause client to run out of stream level, // flow control and the other side won't send a window update // until that happens. - if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{}); err != nil { + if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{}); err != nil { t.Fatalf("write(_, _, _) = %v, want ", err) } p := make([]byte, len(expectedResponseLarge)) @@ -711,7 +711,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) { if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) { t.Fatalf("s.Read(_) = _, %v, want _, ", err) } - if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil { + if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{Last: true}); err != nil { t.Fatalf("Write(_, _, _) = %v, want ", err) } if _, err = s.Read(p); err != io.EOF { @@ -743,7 +743,7 @@ func TestGracefulClose(t *testing.T) { outgoingHeader[0] = byte(0) binary.BigEndian.PutUint32(outgoingHeader[1:], uint32(len(msg))) incomingHeader := make([]byte, 5) - if err := ct.Write(s, outgoingHeader, msg, &Options{}); err != nil { + if err := ct.Write(s, outgoingHeader, msg, nil, &Options{}); err != nil { t.Fatalf("Error while writing: %v", err) } if _, err := s.Read(incomingHeader); err != nil { @@ -768,13 +768,13 @@ func TestGracefulClose(t *testing.T) { t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing) return } - ct.Write(str, nil, nil, &Options{Last: true}) + ct.Write(str, nil, nil, nil, &Options{Last: true}) if _, err := str.Read(make([]byte, 8)); err != errStreamDrain && err != ErrConnClosing { t.Errorf("_.Read(_) = _, %v, want _, %v or %v", err, errStreamDrain, ErrConnClosing) } }() } - ct.Write(s, nil, nil, &Options{Last: true}) + ct.Write(s, nil, nil, nil, &Options{Last: true}) if _, err := s.Read(incomingHeader); err != io.EOF { t.Fatalf("Client expected EOF from the server. Got: %v", err) } @@ -804,8 +804,8 @@ func TestLargeMessageSuspension(t *testing.T) { }() // Write should not be done successfully due to flow control. msg := make([]byte, initialWindowSize*8) - ct.Write(s, nil, msg, &Options{}) - err = ct.Write(s, nil, msg, &Options{Last: true}) + ct.Write(s, nil, msg, nil, &Options{}) + err = ct.Write(s, nil, msg, nil, &Options{Last: true}) if err != errStreamDone { t.Fatalf("Write got %v, want io.EOF", err) } @@ -997,7 +997,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream1.id) } // Exhaust client's connection window. - if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil { + if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), nil, &Options{}); err != nil { t.Fatalf("Server failed to write data. Err: %v", err) } notifyChan = make(chan struct{}) @@ -1022,7 +1022,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id) } // Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream. - if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil { + if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), nil, &Options{}); err != nil { t.Fatalf("Server failed to write data. Err: %v", err) } @@ -1066,7 +1066,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Failed to create 1st stream. Err: %v", err) } // Exhaust server's connection window. - if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil { + if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), nil, &Options{Last: true}); err != nil { t.Fatalf("Client failed to write data. Err: %v", err) } //Client should be able to create another stream and send data on it. @@ -1074,7 +1074,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) { if err != nil { t.Fatalf("Failed to create 2nd stream. Err: %v", err) } - if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), &Options{}); err != nil { + if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), nil, &Options{}); err != nil { t.Fatalf("Client failed to write data. Err: %v", err) } // Get the streams on server. @@ -1307,7 +1307,7 @@ func TestEncodingRequiredStatus(t *testing.T) { return } opts := Options{Last: true} - if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != errStreamDone { + if err := ct.Write(s, nil, expectedRequest, nil, &opts); err != nil && err != errStreamDone { t.Fatalf("Failed to write the request: %v", err) } p := make([]byte, http2MaxFrameLen) @@ -1485,7 +1485,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) opts := Options{} header := make([]byte, 5) for i := 1; i <= 10; i++ { - if err := client.Write(stream, nil, buf, &opts); err != nil { + if err := client.Write(stream, nil, buf, nil, &opts); err != nil { t.Errorf("Error on client while writing message: %v", err) return } @@ -1522,7 +1522,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) st.mu.Unlock() // Close all streams for _, stream := range clientStreams { - client.Write(stream, nil, nil, &Options{Last: true}) + client.Write(stream, nil, nil, nil, &Options{Last: true}) if _, err := stream.Read(make([]byte, 5)); err != io.EOF { t.Fatalf("Client expected an EOF from the server. Got: %v", err) } @@ -1678,13 +1678,13 @@ func runPingPongTest(t *testing.T, msgSize int) { for { select { case <-done: - client.Write(stream, nil, nil, &Options{Last: true}) + client.Write(stream, nil, nil, nil, &Options{Last: true}) if _, err := stream.Read(incomingHeader); err != io.EOF { t.Fatalf("Client expected EOF from the server. Got: %v", err) } return default: - if err := client.Write(stream, outgoingHeader, msg, opts); err != nil { + if err := client.Write(stream, outgoingHeader, msg, nil, opts); err != nil { t.Fatalf("Error on client while writing message. Err: %v", err) } if _, err := stream.Read(incomingHeader); err != nil { diff --git a/rpc_util_test.go b/rpc_util_test.go index 2449c23815ec..19cbbbae0abc 100644 --- a/rpc_util_test.go +++ b/rpc_util_test.go @@ -66,7 +66,7 @@ func (s) TestSimpleParsing(t *testing.T) { } { buf := fullReader{bytes.NewReader(test.p)} parser := &parser{r: buf} - pt, b, err := parser.recvMsg(math.MaxInt32) + pt, b, err := parser.recvMsg(math.MaxInt32, nil) if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt { t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err) } @@ -88,14 +88,14 @@ func (s) TestMultipleParsing(t *testing.T) { {compressionNone, []byte("d")}, } for i, want := range wantRecvs { - pt, data, err := parser.recvMsg(math.MaxInt32) + pt, data, err := parser.recvMsg(math.MaxInt32, nil) if err != nil || pt != want.pt || !reflect.DeepEqual(data, want.data) { t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, ", i, p, pt, data, err, want.pt, want.data) } } - pt, data, err := parser.recvMsg(math.MaxInt32) + pt, data, err := parser.recvMsg(math.MaxInt32, nil) if err != io.EOF { t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v", len(wantRecvs), p, pt, data, err, io.EOF) @@ -113,7 +113,7 @@ func (s) TestEncode(t *testing.T) { }{ {nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil}, } { - data, err := encode(encoding.GetCodec(protoenc.Name), test.msg) + data, err := encode(encoding.GetCodec(protoenc.Name), test.msg, nil) if err != test.err || !bytes.Equal(data, test.data) { t.Errorf("encode(_, %v) = %v, %v; want %v, %v", test.msg, data, err, test.data, test.err) continue @@ -217,12 +217,12 @@ func (s) TestParseDialTarget(t *testing.T) { func bmEncode(b *testing.B, mSize int) { cdc := encoding.GetCodec(protoenc.Name) msg := &perfpb.Buffer{Body: make([]byte, mSize)} - encodeData, _ := encode(cdc, msg) + encodeData, _ := encode(cdc, msg, nil) encodedSz := int64(len(encodeData)) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - encode(cdc, msg) + encode(cdc, msg, nil) } b.SetBytes(encodedSz) }