Skip to content

Commit

Permalink
transport: fix racey send to writes channel in WriteStatus (#1546)
Browse files Browse the repository at this point in the history
Concurrent 'SendMsg' calls to stream lead to
multiple 'WriteStatus' calls, while closing
'writes' channel is not synchronized.

This patch marks 'streamDone' first before 'ht.do',
so that following 'WriteStatus' does not trigger panic
on 'writes' channel.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho authored and menghanl committed Oct 4, 2017
1 parent cf79c84 commit 22c3f92
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
5 changes: 1 addition & 4 deletions transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func (ht *serverHandlerTransport) do(fn func()) error {
case <-ht.closedCh:
return ErrConnClosing
}

}
}

Expand All @@ -183,6 +182,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
ht.mu.Unlock()
return nil
}
ht.streamDone = true
ht.mu.Unlock()
err := ht.do(func() {
ht.writeCommonHeaders(s)
Expand Down Expand Up @@ -223,9 +223,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
})
close(ht.writes)
ht.mu.Lock()
ht.streamDone = true
ht.mu.Unlock()
return err
}

Expand Down
25 changes: 25 additions & 0 deletions transport/handler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -390,6 +391,30 @@ func TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
}
}

func TestHandlerTransport_HandleStreams_MultiWriteStatus(t *testing.T) {
st := newHandleStreamTest(t)
handleStream := func(s *Stream) {
if want := "/service/foo.bar"; s.method != want {
t.Errorf("stream method = %q; want %q", s.method, want)
}
st.bodyw.Close() // no body

var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
st.ht.WriteStatus(s, status.New(codes.OK, ""))
}()
}
wg.Wait()
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
func(ctx context.Context, method string) context.Context { return ctx },
)
}

func TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
errDetails := []proto.Message{
&epb.RetryInfo{
Expand Down

0 comments on commit 22c3f92

Please sign in to comment.