From 1b4f6f8edba5f374f1afbf10d7666136286806e7 Mon Sep 17 00:00:00 2001 From: Kevin Parsons Date: Fri, 10 May 2024 21:42:50 -0700 Subject: [PATCH] client: Fix deadlock when writing to pipe blocks Use sendLock to guard the entire stream allocation + write to wire operation, and streamLock to only guard access to the underlying stream map. This ensures the following: - We uphold the constraint that new stream IDs on the wire are always increasing, because whoever holds sendLock will be ensured to get the next stream ID and be the next to write to the wire. - Locks are always released in LIFO order. This prevents deadlocks. Taking sendLock before releasing streamLock means that if a goroutine blocks writing to the pipe, it can make another goroutine get stuck trying to take sendLock, and therefore streamLock will be kept locked as well. This can lead to the receiver goroutine no longer being able to read responses from the pipe, since it needs to take streamLock when processing a response. This ultimately leads to a complete deadlock of the client. It is reasonable for a server to block writes to the pipe if the client is not reading responses fast enough. So we can't expect writes to never block. I have repro'd the hang with a simple ttrpc client and server. The client spins up 100 goroutines that spam the server with requests constantly. After a few seconds of running I can see it hang. I have set the buffer size for the pipe to 0 to more easily repro, but it would still be possible to hit with a larger buffer size (just may take a higher volume of requests or larger payloads). I also validated that I no longer see the hang with this fix, by leaving the test client/server running for a few minutes. Obviously not 100% conclusive, but before I could get a hang within several seconds of running. Signed-off-by: Kevin Parsons --- client.go | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index 482a68e3d..685a00a9b 100644 --- a/client.go +++ b/client.go @@ -386,25 +386,44 @@ func (c *Client) receiveLoop() error { // createStream creates a new stream and registers it with the client // Introduce stream types for multiple or single response func (c *Client) createStream(flags uint8, b []byte) (*stream, error) { - c.streamLock.Lock() + // sendLock must be held across both allocation of the stream ID and sending it across the wire. + // This ensures that new stream IDs sent on the wire are always increasing, which is a + // requirement of the TTRPC protocol. + // This use of sendLock could be split into another mutex that covers stream creation + first send, + // and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes. + c.sendLock.Lock() + defer c.sendLock.Unlock() // Check if closed since lock acquired to prevent adding // anything after cleanup completes select { case <-c.ctx.Done(): - c.streamLock.Unlock() return nil, ErrClosed default: } - // Stream ID should be allocated at same time - s := newStream(c.nextStreamID, c) - c.streams[s.id] = s - c.nextStreamID = c.nextStreamID + 2 + var s *stream + if err := func() error { + // In the future this could be replaced with a sync.Map instead of streamLock+map. + c.streamLock.Lock() + defer c.streamLock.Unlock() - c.sendLock.Lock() - defer c.sendLock.Unlock() - c.streamLock.Unlock() + // Check if closed since lock acquired to prevent adding + // anything after cleanup completes + select { + case <-c.ctx.Done(): + return ErrClosed + default: + } + + s = newStream(c.nextStreamID, c) + c.streams[s.id] = s + c.nextStreamID = c.nextStreamID + 2 + + return nil + }(); err != nil { + return nil, err + } if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil { return s, filterCloseErr(err)