Skip to content

Commit

Permalink
transport: stop always closing connections when loopy returns (#6110)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Mar 14, 2023
1 parent 11e2506 commit b458a4f
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 38 deletions.
48 changes: 30 additions & 18 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"bytes"
"errors"
"fmt"
"net"
"runtime"
"strconv"
"sync"
Expand Down Expand Up @@ -486,12 +487,13 @@ type loopyWriter struct {
hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator
draining bool
conn net.Conn

// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}

func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
Expand All @@ -504,6 +506,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
}
return l
}
Expand All @@ -521,15 +524,27 @@ const minBatchSize = 1000
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
// This results in writing of HTTP2 frames into an underlying write buffer.
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
// frame, loopy calls processData, which processes one node from the
// activeStreams linked-list. This results in writing of HTTP2 frames into an
// underlying write buffer. When there's no more control frames to read from
// controlBuf, loopy flushes the write buffer. As an optimization, to increase
// the batch size for each flush, loopy yields the processor, once if the batch
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) {
// Always flush the writer before exiting in case there are pending frames
// to be sent.
defer l.framer.writer.Flush()
defer func() {
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
Expand Down Expand Up @@ -757,6 +772,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.draining && len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("finished processing active streams while in draining mode")
}
return nil
Expand Down Expand Up @@ -792,6 +808,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("received GOAWAY with no active streams")
}
}
Expand All @@ -810,13 +827,6 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}

func (l *loopyWriter) closeConnectionHandler() error {
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.
return ErrConnClosing
}

func (l *loopyWriter) handle(i interface{}) error {
switch i := i.(type) {
case *incomingWindowUpdate:
Expand Down Expand Up @@ -846,7 +856,9 @@ func (l *loopyWriter) handle(i interface{}) error {
case *outFlowControlSizeRequest:
l.outFlowControlSizeRequestHandler(i)
case closeConnection:
return l.closeConnectionHandler()
// Just return a non-I/O error and run() will flush and close the
// connection.
return ErrConnClosing
default:
return fmt.Errorf("transport: unknown control message type %T", i)
}
Expand Down Expand Up @@ -905,7 +917,7 @@ func (l *loopyWriter) processData() (bool, error) {
return false, err
}
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
return false, nil
return false, err
}
} else {
l.activeStreams.enqueue(str)
Expand Down
11 changes: 2 additions & 9 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.run()
close(t.writerDone)
}()
return t, nil
Expand Down
12 changes: 2 additions & 10 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)

go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
t.loopy.run()
close(t.writerDone)
}()
go t.keepalive()
Expand Down Expand Up @@ -1355,9 +1350,6 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, retErr
}
return true, nil
Expand Down
24 changes: 23 additions & 1 deletion internal/transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package transport
import (
"bufio"
"encoding/base64"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -330,7 +331,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b)
n, err = w.conn.Write(b)
return n, toIOError(err)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
Expand All @@ -352,10 +354,30 @@ func (w *bufWriter) Flush() error {
return nil
}
_, w.err = w.conn.Write(w.buf[:w.offset])
w.err = toIOError(w.err)
w.offset = 0
return w.err
}

type ioError struct {
error
}

func (i ioError) Unwrap() error {
return i.error
}

func isIOError(err error) bool {
return errors.As(err, &ioError{})
}

func toIOError(err error) error {
if err == nil {
return nil
}
return ioError{error: err}
}

type framer struct {
writer *bufWriter
fr *http2.Framer
Expand Down

0 comments on commit b458a4f

Please sign in to comment.