Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit the number of incoming streams to 1000 #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ require (
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8
github.com/multiformats/go-varint v0.0.6
github.com/stretchr/testify v1.4.0
go.uber.org/multierr v1.6.0
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 // indirect
google.golang.org/grpc v1.28.1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
Expand Down Expand Up @@ -189,6 +191,7 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
google.golang.org/grpc v1.28.1 h1:C1QC6KzgSiLyBabDi87BbjaGreoRgGUF5nOyvfrAZ1k=
google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
Expand Down
40 changes: 28 additions & 12 deletions multiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var errTimeout = timeout{}

var (
ResetStreamTimeout = 2 * time.Minute

MaxIncomingStreams = 1000
WriteCoalesceDelay = 100 * time.Microsecond
)

Expand Down Expand Up @@ -83,22 +83,26 @@ type Multiplex struct {

nstreams chan *Stream

channels map[streamID]*Stream
chLock sync.Mutex
maxIncoming int

channels map[streamID]*Stream
numIncoming int
chLock sync.Mutex
}

// NewMultiplex creates a new multiplexer session.
func NewMultiplex(con net.Conn, initiator bool) *Multiplex {
mp := &Multiplex{
con: con,
initiator: initiator,
buf: bufio.NewReader(con),
channels: make(map[streamID]*Stream),
closed: make(chan struct{}),
shutdown: make(chan struct{}),
writeCh: make(chan []byte, 16),
writeTimer: time.NewTimer(0),
nstreams: make(chan *Stream, 16),
con: con,
initiator: initiator,
buf: bufio.NewReader(con),
channels: make(map[streamID]*Stream),
closed: make(chan struct{}),
shutdown: make(chan struct{}),
writeCh: make(chan []byte, 16),
writeTimer: time.NewTimer(0),
maxIncoming: MaxIncomingStreams,
nstreams: make(chan *Stream, 16),
}

go mp.handleIncoming()
Expand Down Expand Up @@ -410,6 +414,15 @@ func (mp *Multiplex) handleIncoming() {

msch = mp.newStream(ch, name)
mp.chLock.Lock()
if remoteIsInitiator {
if mp.numIncoming >= mp.maxIncoming {
msch.mp.sendResetMsg(msch.id.header(resetTag), true)
mp.chLock.Unlock()
continue
} else {
mp.numIncoming++
}
}
mp.channels[ch] = msch
mp.chLock.Unlock()
select {
Expand All @@ -436,6 +449,9 @@ func (mp *Multiplex) handleIncoming() {
// unregister and throw away future data.
mp.chLock.Lock()
delete(mp.channels, ch)
if remoteIsInitiator {
mp.numIncoming--
}
mp.chLock.Unlock()

// close data channel, there will be no more data.
Expand Down
47 changes: 47 additions & 0 deletions multiplex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func init() {
Expand Down Expand Up @@ -880,3 +882,48 @@ func arrComp(a, b []byte) error {
}
return nil
}

func TestMaxIncomingStreams(t *testing.T) {
a, b := net.Pipe()
client := NewMultiplex(a, true)
defer client.Close()

server := NewMultiplex(b, false)
defer server.Close()

go func() {
for {
str, err := server.Accept()
if err != nil {
return
}
_, err = str.Write([]byte("foobar"))
require.NoError(t, err)
}
}()

var streams []*Stream
for i := 0; i < MaxIncomingStreams; i++ {
str, err := client.NewStream(context.Background())
require.NoError(t, err)
_, err = str.Read(make([]byte, 6))
require.NoError(t, err)
streams = append(streams, str)
}
// The server now has maxIncomingStreams incoming streams.
// It will now reset the next stream that is opened.
str, err := client.NewStream(context.Background())
require.NoError(t, err)
str.SetDeadline(time.Now().Add(time.Second))
_, err = str.Read([]byte{0})
require.EqualError(t, err, "stream reset")

// Now close one of the streams.
// This should then allow the client to open a new stream.
streams[0].Close()
str, err = client.NewStream(context.Background())
require.NoError(t, err)
str.SetDeadline(time.Now().Add(time.Second))
_, err = str.Read([]byte{0})
require.NoError(t, err)
}