Skip to content

Commit

Permalink
hooks: PATCH V2
Browse files Browse the repository at this point in the history
  • Loading branch information
Adhityaa Chandrasekar committed Nov 20, 2019
1 parent 2766c6b commit 7436582
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 32 deletions.
6 changes: 3 additions & 3 deletions call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type testStreamHandler struct {
func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
p := &parser{r: s}
for {
pf, req, err := p.recvMsg(math.MaxInt32)
pf, req, err := p.recvMsg(math.MaxInt32, nil)
if err == io.EOF {
break
}
Expand Down Expand Up @@ -104,13 +104,13 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
}
}
// send a response back to end the stream.
data, err := encode(testCodec{}, &expectedResponse)
data, err := encode(testCodec{}, &expectedResponse, nil)
if err != nil {
t.Errorf("Failed to encode the response: %v", err)
return
}
hdr, payload := msgHeader(data, nil)
h.t.Write(s, hdr, payload, &transport.Options{})
h.t.Write(s, hdr, payload, nil, &transport.Options{})
h.t.WriteStatus(s, status.New(codes.OK, ""))
}

Expand Down
2 changes: 1 addition & 1 deletion internal/transport/handler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
st.bodyw.Close() // no body

st.ht.WriteStatus(s, status.New(codes.OK, ""))
st.ht.Write(s, []byte("hdr"), []byte("data"), &Options{})
st.ht.Write(s, []byte("hdr"), []byte("data"), nil, &Options{})
})
}

Expand Down
1 change: 1 addition & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}

// Stat returns the streams's underlying *profiling.Stat.
func (s *Stream) Stat() *profiling.Stat {
return s.stat
}
Expand Down
44 changes: 22 additions & 22 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) {
return
}
// send a response back to the client.
h.t.Write(s, nil, resp, &Options{})
h.t.Write(s, nil, resp, nil, &Options{})
// send the trailer to end the stream.
h.t.WriteStatus(s, status.New(codes.OK, ""))
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (h *testStreamHandler) handleStreamPingPong(t *testing.T, s *Stream) {
buf[0] = byte(0)
binary.BigEndian.PutUint32(buf[1:], uint32(sz))
copy(buf[5:], msg)
h.t.Write(s, nil, buf, &Options{})
h.t.Write(s, nil, buf, nil, &Options{})
}
}

Expand Down Expand Up @@ -266,7 +266,7 @@ func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) {
// This write will cause server to run out of stream level,
// flow control and the other side won't send a window update
// until that happens.
if err := h.t.Write(s, nil, resp, &Options{}); err != nil {
if err := h.t.Write(s, nil, resp, nil, &Options{}); err != nil {
t.Errorf("server Write got %v, want <nil>", err)
return
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestClientSendAndReceive(t *testing.T) {
t.Fatalf("wrong stream id: %d", s2.id)
}
opts := Options{Last: true}
if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF {
if err := ct.Write(s1, nil, expectedRequest, nil, &opts); err != nil && err != io.EOF {
t.Fatalf("failed to send data: %v", err)
}
p := make([]byte, len(expectedResponse))
Expand Down Expand Up @@ -559,7 +559,7 @@ func performOneRPC(ct ClientTransport) {
return
}
opts := Options{Last: true}
if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF {
if err := ct.Write(s, []byte{}, expectedRequest, nil, &opts); err == nil || err == io.EOF {
time.Sleep(5 * time.Millisecond)
// The following s.Recv()'s could error out because the
// underlying transport is gone.
Expand Down Expand Up @@ -605,7 +605,7 @@ func TestLargeMessage(t *testing.T) {
if err != nil {
t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
}
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil && err != io.EOF {
if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{Last: true}); err != nil && err != io.EOF {
t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
}
p := make([]byte, len(expectedResponseLarge))
Expand Down Expand Up @@ -696,7 +696,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
// This write will cause client to run out of stream level,
// flow control and the other side won't send a window update
// until that happens.
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{}); err != nil {
if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{}); err != nil {
t.Fatalf("write(_, _, _) = %v, want <nil>", err)
}
p := make([]byte, len(expectedResponseLarge))
Expand All @@ -711,7 +711,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
t.Fatalf("s.Read(_) = _, %v, want _, <nil>", err)
}
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil {
if err := ct.Write(s, []byte{}, expectedRequestLarge, nil, &Options{Last: true}); err != nil {
t.Fatalf("Write(_, _, _) = %v, want <nil>", err)
}
if _, err = s.Read(p); err != io.EOF {
Expand Down Expand Up @@ -743,7 +743,7 @@ func TestGracefulClose(t *testing.T) {
outgoingHeader[0] = byte(0)
binary.BigEndian.PutUint32(outgoingHeader[1:], uint32(len(msg)))
incomingHeader := make([]byte, 5)
if err := ct.Write(s, outgoingHeader, msg, &Options{}); err != nil {
if err := ct.Write(s, outgoingHeader, msg, nil, &Options{}); err != nil {
t.Fatalf("Error while writing: %v", err)
}
if _, err := s.Read(incomingHeader); err != nil {
Expand All @@ -768,13 +768,13 @@ func TestGracefulClose(t *testing.T) {
t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing)
return
}
ct.Write(str, nil, nil, &Options{Last: true})
ct.Write(str, nil, nil, nil, &Options{Last: true})
if _, err := str.Read(make([]byte, 8)); err != errStreamDrain && err != ErrConnClosing {
t.Errorf("_.Read(_) = _, %v, want _, %v or %v", err, errStreamDrain, ErrConnClosing)
}
}()
}
ct.Write(s, nil, nil, &Options{Last: true})
ct.Write(s, nil, nil, nil, &Options{Last: true})
if _, err := s.Read(incomingHeader); err != io.EOF {
t.Fatalf("Client expected EOF from the server. Got: %v", err)
}
Expand Down Expand Up @@ -804,8 +804,8 @@ func TestLargeMessageSuspension(t *testing.T) {
}()
// Write should not be done successfully due to flow control.
msg := make([]byte, initialWindowSize*8)
ct.Write(s, nil, msg, &Options{})
err = ct.Write(s, nil, msg, &Options{Last: true})
ct.Write(s, nil, msg, nil, &Options{})
err = ct.Write(s, nil, msg, nil, &Options{Last: true})
if err != errStreamDone {
t.Fatalf("Write got %v, want io.EOF", err)
}
Expand Down Expand Up @@ -997,7 +997,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream1.id)
}
// Exhaust client's connection window.
if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), nil, &Options{}); err != nil {
t.Fatalf("Server failed to write data. Err: %v", err)
}
notifyChan = make(chan struct{})
Expand All @@ -1022,7 +1022,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
}
// Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream.
if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), nil, &Options{}); err != nil {
t.Fatalf("Server failed to write data. Err: %v", err)
}

Expand Down Expand Up @@ -1066,15 +1066,15 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
t.Fatalf("Failed to create 1st stream. Err: %v", err)
}
// Exhaust server's connection window.
if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), nil, &Options{Last: true}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
//Client should be able to create another stream and send data on it.
cstream2, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Failed to create 2nd stream. Err: %v", err)
}
if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), &Options{}); err != nil {
if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), nil, &Options{}); err != nil {
t.Fatalf("Client failed to write data. Err: %v", err)
}
// Get the streams on server.
Expand Down Expand Up @@ -1307,7 +1307,7 @@ func TestEncodingRequiredStatus(t *testing.T) {
return
}
opts := Options{Last: true}
if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != errStreamDone {
if err := ct.Write(s, nil, expectedRequest, nil, &opts); err != nil && err != errStreamDone {
t.Fatalf("Failed to write the request: %v", err)
}
p := make([]byte, http2MaxFrameLen)
Expand Down Expand Up @@ -1485,7 +1485,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
opts := Options{}
header := make([]byte, 5)
for i := 1; i <= 10; i++ {
if err := client.Write(stream, nil, buf, &opts); err != nil {
if err := client.Write(stream, nil, buf, nil, &opts); err != nil {
t.Errorf("Error on client while writing message: %v", err)
return
}
Expand Down Expand Up @@ -1522,7 +1522,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
st.mu.Unlock()
// Close all streams
for _, stream := range clientStreams {
client.Write(stream, nil, nil, &Options{Last: true})
client.Write(stream, nil, nil, nil, &Options{Last: true})
if _, err := stream.Read(make([]byte, 5)); err != io.EOF {
t.Fatalf("Client expected an EOF from the server. Got: %v", err)
}
Expand Down Expand Up @@ -1678,13 +1678,13 @@ func runPingPongTest(t *testing.T, msgSize int) {
for {
select {
case <-done:
client.Write(stream, nil, nil, &Options{Last: true})
client.Write(stream, nil, nil, nil, &Options{Last: true})
if _, err := stream.Read(incomingHeader); err != io.EOF {
t.Fatalf("Client expected EOF from the server. Got: %v", err)
}
return
default:
if err := client.Write(stream, outgoingHeader, msg, opts); err != nil {
if err := client.Write(stream, outgoingHeader, msg, nil, opts); err != nil {
t.Fatalf("Error on client while writing message. Err: %v", err)
}
if _, err := stream.Read(incomingHeader); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s) TestSimpleParsing(t *testing.T) {
} {
buf := fullReader{bytes.NewReader(test.p)}
parser := &parser{r: buf}
pt, b, err := parser.recvMsg(math.MaxInt32)
pt, b, err := parser.recvMsg(math.MaxInt32, nil)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
}
Expand All @@ -88,14 +88,14 @@ func (s) TestMultipleParsing(t *testing.T) {
{compressionNone, []byte("d")},
}
for i, want := range wantRecvs {
pt, data, err := parser.recvMsg(math.MaxInt32)
pt, data, err := parser.recvMsg(math.MaxInt32, nil)
if err != nil || pt != want.pt || !reflect.DeepEqual(data, want.data) {
t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, <nil>",
i, p, pt, data, err, want.pt, want.data)
}
}

pt, data, err := parser.recvMsg(math.MaxInt32)
pt, data, err := parser.recvMsg(math.MaxInt32, nil)
if err != io.EOF {
t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v",
len(wantRecvs), p, pt, data, err, io.EOF)
Expand All @@ -113,7 +113,7 @@ func (s) TestEncode(t *testing.T) {
}{
{nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil},
} {
data, err := encode(encoding.GetCodec(protoenc.Name), test.msg)
data, err := encode(encoding.GetCodec(protoenc.Name), test.msg, nil)
if err != test.err || !bytes.Equal(data, test.data) {
t.Errorf("encode(_, %v) = %v, %v; want %v, %v", test.msg, data, err, test.data, test.err)
continue
Expand Down Expand Up @@ -217,12 +217,12 @@ func (s) TestParseDialTarget(t *testing.T) {
func bmEncode(b *testing.B, mSize int) {
cdc := encoding.GetCodec(protoenc.Name)
msg := &perfpb.Buffer{Body: make([]byte, mSize)}
encodeData, _ := encode(cdc, msg)
encodeData, _ := encode(cdc, msg, nil)
encodedSz := int64(len(encodeData))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
encode(cdc, msg)
encode(cdc, msg, nil)
}
b.SetBytes(encodedSz)
}
Expand Down

0 comments on commit 7436582

Please sign in to comment.