Skip to content

Commit

Permalink
Make graphsync max requests configurable (#32)
Browse files Browse the repository at this point in the history
* Make graphsync max requests configurable
* bump version
  • Loading branch information
gammazero authored May 12, 2023
1 parent a15b6d1 commit b22696e
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 14 deletions.
22 changes: 21 additions & 1 deletion dagsync/dtsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,31 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

const (
// Maximum number of in-prgress graphsync requests.
defaultGsMaxInRequests = 1024
defaultGsMaxOutRequests = 1024
)

// config contains all options for configuring dtsync.publisher.
type config struct {
extraData []byte
allowPeer func(peer.ID) bool
senders []announce.Sender

gsMaxInRequests uint64
gsMaxOutRequests uint64
}

// Option is a function that sets a value in a config.
type Option func(*config) error

// getOpts creates a config and applies Options to it.
func getOpts(opts []Option) (config, error) {
var cfg config
cfg := config{
gsMaxInRequests: defaultGsMaxInRequests,
gsMaxOutRequests: defaultGsMaxOutRequests,
}
for i, opt := range opts {
if err := opt(&cfg); err != nil {
return config{}, fmt.Errorf("option %d failed: %s", i, err)
Expand Down Expand Up @@ -56,3 +68,11 @@ func WithAnnounceSenders(senders ...announce.Sender) Option {
return nil
}
}

func WithMaxGraphsyncRequests(maxIn, maxOut uint64) Option {
return func(c *config) error {
c.gsMaxInRequests = maxIn
c.gsMaxOutRequests = maxOut
return nil
}
}
2 changes: 1 addition & 1 deletion dagsync/dtsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewPublisher(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, t
return nil, err
}

dtManager, _, dtClose, err := makeDataTransfer(host, ds, lsys, opts.allowPeer)
dtManager, _, dtClose, err := makeDataTransfer(host, ds, lsys, opts.allowPeer, opts.gsMaxInRequests, opts.gsMaxOutRequests)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions dagsync/dtsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewSyncWithDT(host host.Host, dtManager dt.Manager, gs graphsync.GraphExcha
}

// NewSync creates a new Sync with its own datatransfer.Manager.
func NewSync(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, blockHook func(peer.ID, cid.Cid)) (*Sync, error) {
dtManager, gs, dtClose, err := makeDataTransfer(host, ds, lsys, nil)
func NewSync(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, blockHook func(peer.ID, cid.Cid), gsMaxInReq, gsMaxOutReq uint64) (*Sync, error) {
dtManager, gs, dtClose, err := makeDataTransfer(host, ds, lsys, nil, gsMaxInReq, gsMaxOutReq)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions dagsync/dtsync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestDTSync_CallsBlockHookWhenCIDsAreFullyFoundLocally(t *testing.T) {

// Use the same linksystem as the publisher for the syncer; this is to assure all the blocks being
// synced are already present on the syncer side.
subject, err := dtsync.NewSync(subh, dssync.MutexWrap(datastore.NewMapDatastore()), ls, testHook)
subject, err := dtsync.NewSync(subh, dssync.MutexWrap(datastore.NewMapDatastore()), ls, testHook, 0, 0)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })

Expand Down Expand Up @@ -169,7 +169,7 @@ func TestDTSync_CallsBlockHookWhenCIDsArePartiallyFoundLocally(t *testing.T) {
_, err = subls.Load(ipld.LinkContext{Ctx: ctx}, l2, basicnode.Prototype.Any)
require.Error(t, err)

subject, err := dtsync.NewSync(subh, dssync.MutexWrap(datastore.NewMapDatastore()), subls, testHook)
subject, err := dtsync.NewSync(subh, dssync.MutexWrap(datastore.NewMapDatastore()), subls, testHook, 0, 0)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })

Expand Down
15 changes: 13 additions & 2 deletions dagsync/dtsync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
// Time to wait for datatransfer to gracefully stop before canceling.
const datatransferStopTimeout = time.Minute

const (
defaultGsMaxInReq = 1024
defaultGsMaxOutReq = 1024
)

type dtCloseFunc func() error

// configureDataTransferForDagsync configures an existing data transfer
Expand Down Expand Up @@ -65,10 +70,16 @@ func registerVoucher(dtManager dt.Manager, allowPeer func(peer.ID) bool) error {
return nil
}

func makeDataTransfer(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, allowPeer func(peer.ID) bool) (dt.Manager, graphsync.GraphExchange, dtCloseFunc, error) {
func makeDataTransfer(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, allowPeer func(peer.ID) bool, gsMaxInReq, gsMaxOutReq uint64) (dt.Manager, graphsync.GraphExchange, dtCloseFunc, error) {
gsNet := gsnet.NewFromLibp2pHost(host)
ctx, cancel := context.WithCancel(context.Background())
gs := gsimpl.New(ctx, gsNet, lsys, gsimpl.MaxInProgressOutgoingRequests(1000), gsimpl.MaxInProgressIncomingRequests(1000))
if gsMaxInReq == 0 {
gsMaxInReq = defaultGsMaxInReq
}
if gsMaxOutReq == 0 {
gsMaxOutReq = defaultGsMaxOutReq
}
gs := gsimpl.New(ctx, gsNet, lsys, gsimpl.MaxInProgressIncomingRequests(gsMaxInReq), gsimpl.MaxInProgressOutgoingRequests(gsMaxOutReq))

dtNet := dtnetwork.NewFromLibp2pHost(host)
tp := gstransport.NewTransport(host.ID(), gs)
Expand Down
2 changes: 1 addition & 1 deletion dagsync/dtsync/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func Test_registerVoucherHandlesAlreadyRegisteredGracefully(t *testing.T) {
h := test.MkTestHost()

dt, _, close, err := makeDataTransfer(h, datastore.NewMapDatastore(), cidlink.DefaultLinkSystem(), nil)
dt, _, close, err := makeDataTransfer(h, datastore.NewMapDatastore(), cidlink.DefaultLinkSystem(), nil, 0, 0)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, close()) })

Expand Down
24 changes: 21 additions & 3 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
defaultIdleHandlerTTL = time.Hour
// defaultSegDepthLimit disables (-1) segmented sync by default.
defaultSegDepthLimit = -1
// Maximum number of in-prgress graphsync requests.
defaultGsMaxInRequests = 1024
defaultGsMaxOutRequests = 1024
)

// config contains all options for configuring Subscriber.
Expand All @@ -51,6 +54,9 @@ type config struct {
resendAnnounce bool

segDepthLimit int64

gsMaxInRequests uint64
gsMaxOutRequests uint64
}

// Option is a function that sets a value in a config.
Expand All @@ -59,9 +65,11 @@ type Option func(*config) error
// getOpts creates a config and applies Options to it.
func getOpts(opts []Option) (config, error) {
cfg := config{
addrTTL: defaultAddrTTL,
idleHandlerTTL: defaultIdleHandlerTTL,
segDepthLimit: defaultSegDepthLimit,
addrTTL: defaultAddrTTL,
idleHandlerTTL: defaultIdleHandlerTTL,
segDepthLimit: defaultSegDepthLimit,
gsMaxInRequests: defaultGsMaxInRequests,
gsMaxOutRequests: defaultGsMaxOutRequests,
}

for i, opt := range opts {
Expand Down Expand Up @@ -173,6 +181,16 @@ func ResendAnnounce(enable bool) Option {
}
}

// WithMaxGraphsyncRequests sets the maximum number of in-progress inbound and
// outbound graphsync requests.
func WithMaxGraphsyncRequests(maxIn, maxOut uint64) Option {
return func(c *config) error {
c.gsMaxInRequests = maxIn
c.gsMaxOutRequests = maxOut
return nil
}
}

type RateLimiterFor func(publisher peer.ID) *rate.Limiter

// RateLimiter configures a function that is called for each sync to get the
Expand Down
2 changes: 1 addition & 1 deletion dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem,
dtSync, err = dtsync.NewSyncWithDT(host, opts.dtManager, opts.graphExchange, &lsys, blockHook)
} else {
ds := namespace.Wrap(ds, datastore.NewKey("data-transfer-v2"))
dtSync, err = dtsync.NewSync(host, ds, lsys, blockHook)
dtSync, err = dtsync.NewSync(host, ds, lsys, blockHook, opts.gsMaxInRequests, opts.gsMaxOutRequests)
}
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.0.8"
"version": "v0.0.9"
}

0 comments on commit b22696e

Please sign in to comment.