From bc1be6ec41563ee7103a046f283fcc58c71fc60f Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 25 Jul 2023 07:49:39 -0700 Subject: [PATCH] Having announce.Receiver is optional for Subscriber --- announce/option.go | 2 +- announce/receiver.go | 34 ++++++++------------ dagsync/announce_test.go | 11 ++++--- dagsync/http_test.go | 2 +- dagsync/legs_test.go | 36 ++++++++++++--------- dagsync/option.go | 44 +++++++++++-------------- dagsync/subscriber.go | 66 ++++++++++++++++++-------------------- dagsync/subscriber_test.go | 14 ++++---- dagsync/sync_test.go | 17 +++++----- 9 files changed, 109 insertions(+), 117 deletions(-) diff --git a/announce/option.go b/announce/option.go index 52cc053..bb5d044 100644 --- a/announce/option.go +++ b/announce/option.go @@ -29,7 +29,7 @@ func getOpts(opts []Option) (config, error) { } // WithAllowPeer sets the function that determines whether to allow or reject -// messages from a peer. +// messages from a peer. When not set or nil, allows messages from all peers. func WithAllowPeer(allowPeer AllowPeerFunc) Option { return func(c *config) error { c.allowPeer = allowPeer diff --git a/announce/receiver.go b/announce/receiver.go index 145f9a7..bc49202 100644 --- a/announce/receiver.go +++ b/announce/receiver.go @@ -52,7 +52,7 @@ type Receiver struct { hostID peer.ID announceCache *stringLRU - // announceMutex protects announceCache, and allowPeer, topicSub + // announceMutex protects announceCache and topicSub. announceMutex sync.Mutex closed bool @@ -204,16 +204,6 @@ func (r *Receiver) Close() error { return err } -// SetAllowPeer configures Subscriber with a function to evaluate whether to -// allow or reject messages from a peer. Setting nil removes any filtering and -// allows messages from all peers. Calling SetAllowPeer replaces any previously -// configured AllowPeerFunc. -func (r *Receiver) SetAllowPeer(allowPeer AllowPeerFunc) { - r.announceMutex.Lock() - r.allowPeer = allowPeer - r.announceMutex.Unlock() -} - // UncacheCid removes a CID from the announce cache. func (r *Receiver) UncacheCid(adCid cid.Cid) { r.announceMutex.Lock() @@ -234,7 +224,6 @@ func (r *Receiver) watch(ctx context.Context) { if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, pubsub.ErrSubscriptionCancelled) { // This is a normal result of shutting down the Subscriber. - log.Debug("Canceled watching pubsub subscription") break } log.Errorw("Error reading from pubsub", "err", err) @@ -299,6 +288,9 @@ func (r *Receiver) watch(ctx context.Context) { } err = r.handleAnnounce(ctx, amsg, false) if err != nil { + if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { + break + } log.Errorw("Cannot process message", "err", err) continue } @@ -319,10 +311,10 @@ func (r *Receiver) Direct(ctx context.Context, nextCid cid.Cid, peerID peer.ID, PeerID: peerID, Addrs: addrs, } - return r.handleAnnounce(ctx, amsg, true) + return r.handleAnnounce(ctx, amsg, r.resend) } -func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct bool) error { +func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, resend bool) error { err := r.announceCheck(amsg) if err != nil { if err == ErrClosed { @@ -339,7 +331,7 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo // address in their peer store. } - if direct && r.resend { + if resend { err = r.republish(ctx, amsg) if err != nil { log.Errorw("Cannot republish announce message", "err", err) @@ -350,6 +342,8 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo select { case r.outChan <- amsg: + case <-r.done: + return ErrClosed case <-ctx.Done(): return ctx.Err() } @@ -358,6 +352,11 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo } func (r *Receiver) announceCheck(amsg Announce) error { + // Check callback to see if peer ID allowed. + if r.allowPeer != nil && !r.allowPeer(amsg.PeerID) { + return errSourceNotAllowed + } + r.announceMutex.Lock() defer r.announceMutex.Unlock() @@ -365,11 +364,6 @@ func (r *Receiver) announceCheck(amsg Announce) error { return ErrClosed } - // Check callback to see if peer ID allowed. - if r.allowPeer != nil && !r.allowPeer(amsg.PeerID) { - return errSourceNotAllowed - } - // Check if a previous announce for this CID was already seen. if r.announceCache.update(amsg.Cid.String()) { return errAlreadySeenCid diff --git a/dagsync/announce_test.go b/dagsync/announce_test.go index 48f1563..87f506c 100644 --- a/dagsync/announce_test.go +++ b/dagsync/announce_test.go @@ -11,6 +11,7 @@ import ( "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipni/go-libipni/announce" "github.com/ipni/go-libipni/dagsync/dtsync" "github.com/ipni/go-libipni/dagsync/httpsync" "github.com/ipni/go-libipni/dagsync/test" @@ -40,7 +41,7 @@ func TestAnnounceReplace(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil) + sub, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, RecvAnnounce()) require.NoError(t, err) defer sub.Close() @@ -163,7 +164,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) { defer pubh.Close() subds := dssync.MutexWrap(datastore.NewMapDatastore()) subls := test.MkLinkSystem(subds) - sub, err := NewSubscriber(subh, subds, subls, testTopic, nil) + sub, err := NewSubscriber(subh, subds, subls, testTopic, RecvAnnounce()) require.NoError(t, err) defer sub.Close() @@ -217,11 +218,13 @@ func TestAnnounceRepublish(t *testing.T) { topics := test.WaitForMeshWithMessage(t, testTopic, dstHost, dstHost2) - sub2, err := NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic, nil, Topic(topics[1])) + sub2, err := NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic, + RecvAnnounce(announce.WithTopic(topics[1]))) require.NoError(t, err) defer sub2.Close() - sub1, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, Topic(topics[0]), ResendAnnounce(true)) + sub1, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, + RecvAnnounce(announce.WithTopic(topics[0]), announce.WithResend(true))) require.NoError(t, err) defer sub1.Close() diff --git a/dagsync/http_test.go b/dagsync/http_test.go index c92782a..15e1d25 100644 --- a/dagsync/http_test.go +++ b/dagsync/http_test.go @@ -49,7 +49,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option) dstLinkSys := test.MkLinkSystem(dstStore) dstHost := test.MkTestHost() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, nil, subscriberOptions...) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, subscriberOptions...) require.NoError(t, err) t.Cleanup(func() { sub.Close() diff --git a/dagsync/legs_test.go b/dagsync/legs_test.go index 4ca74cc..ed2c181 100644 --- a/dagsync/legs_test.go +++ b/dagsync/legs_test.go @@ -36,7 +36,7 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) { +func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer func(peer.ID) bool) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) { srcHost := test.MkTestHost() dstHost := test.MkTestHost() @@ -54,7 +54,8 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1])) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, + dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer))) require.NoError(t, err) err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID())) @@ -67,20 +68,24 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, func TestAllowPeerReject(t *testing.T) { t.Parallel() + + // Set function to reject anything except dstHost, which is not the one + // generating the update. + var destID peer.ID + allow := func(peerID peer.ID) bool { + return peerID == destID + } + // Init dagsync publisher and subscriber srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) - srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore) + srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore, allow) defer srcHost.Close() defer dstHost.Close() defer pub.Close() defer sub.Close() - // Set function to reject anything except dstHost, which is not the one - // generating the update. - sub.SetAllowPeer(func(peerID peer.ID) bool { - return peerID == dstHost.ID() - }) + destID = dstHost.ID() watcher, cncl := sub.OnSyncFinished() defer cncl() @@ -101,20 +106,21 @@ func TestAllowPeerReject(t *testing.T) { func TestAllowPeerAllows(t *testing.T) { t.Parallel() + + // Set function to allow any peer. + allow := func(_ peer.ID) bool { + return true + } + // Init dagsync publisher and subscriber srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) - srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore) + srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore, allow) defer srcHost.Close() defer dstHost.Close() defer pub.Close() defer sub.Close() - // Set function to allow any peer. - sub.SetAllowPeer(func(_ peer.ID) bool { - return true - }) - watcher, cncl := sub.OnSyncFinished() defer cncl() @@ -167,7 +173,7 @@ func TestPublisherRejectsPeer(t *testing.T) { dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1])) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1]))) require.NoError(t, err) defer sub.Close() diff --git a/dagsync/option.go b/dagsync/option.go index ee93022..b311923 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -8,6 +8,7 @@ import ( dt "github.com/filecoin-project/go-data-transfer/v2" "github.com/ipfs/go-cid" "github.com/ipfs/go-graphsync" + "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipni/go-libipni/announce" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -33,9 +34,7 @@ type LastKnownSyncFunc func(peer.ID) (cid.Cid, bool) // config contains all options for configuring Subscriber. type config struct { - addrTTL time.Duration - allowPeer announce.AllowPeerFunc - filterIPs bool + addrTTL time.Duration topic *pubsub.Topic @@ -45,12 +44,14 @@ type config struct { blockHook BlockHookFunc httpClient *http.Client + dss ipld.Node syncRecLimit selector.RecursionLimit idleHandlerTTL time.Duration lastKnownSync LastKnownSyncFunc - resendAnnounce bool + hasRcvr bool + rcvrOpts []announce.Option segDepthLimit int64 @@ -79,15 +80,6 @@ func getOpts(opts []Option) (config, error) { return cfg, nil } -// AllowPeer sets the function that determines whether to allow or reject -// messages from a peer. -func AllowPeer(allowPeer announce.AllowPeerFunc) Option { - return func(c *config) error { - c.allowPeer = allowPeer - return nil - } -} - // AddrTTL sets the peerstore address time-to-live for addresses discovered // from pubsub messages. func AddrTTL(addrTTL time.Duration) Option { @@ -105,6 +97,15 @@ func Topic(topic *pubsub.Topic) Option { } } +// DefaultSelectorSeq sets the default selector sequence passed to +// ExploreRecursiveWithStopNode. +func DefaultSelectorSeq(dss ipld.Node) Option { + return func(c *config) error { + c.dss = dss + return nil + } +} + // DtManager provides an existing datatransfer manager. func DtManager(dtManager dt.Manager, gs graphsync.GraphExchange) Option { return func(c *config) error { @@ -133,15 +134,6 @@ func BlockHook(blockHook BlockHookFunc) Option { } } -// FilterIPs removes any private, loopback, or unspecified IP multiaddrs from -// addresses supplied in announce messages. -func FilterIPs(enable bool) Option { - return func(c *config) error { - c.filterIPs = enable - return nil - } -} - // IdleHandlerTTL configures the time after which idle handlers are removed. func IdleHandlerTTL(ttl time.Duration) Option { return func(c *config) error { @@ -171,11 +163,11 @@ func SyncRecursionLimit(limit selector.RecursionLimit) Option { } } -// ResendAnnounce determines whether to resend the direct announce mesages -// (those that are not received via pubsub) over pubsub. -func ResendAnnounce(enable bool) Option { +// RecvAnnounce enables an announcement message receiver. +func RecvAnnounce(opts ...announce.Option) Option { return func(c *config) error { - c.resendAnnounce = enable + c.hasRcvr = true + c.rcvrOpts = opts return nil } } diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 624c0d5..e2dcf7e 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -98,7 +98,8 @@ type Subscriber struct { segDepthLimit int64 - receiver *announce.Receiver + receiver *announce.Receiver + topicName string // Track explicit Sync calls in progress and allow them to complete before // closing subscriber. @@ -160,7 +161,7 @@ func wrapBlockHook() (*sync.RWMutex, map[peer.ID]func(peer.ID, cid.Cid), func(pe // NewSubscriber creates a new Subscriber that processes pubsub messages and // syncs dags advertised using the specified selector. -func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, dss ipld.Node, options ...Option) (*Subscriber, error) { +func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, options ...Option) (*Subscriber, error) { opts, err := getOpts(options) if err != nil { return nil, err @@ -187,22 +188,12 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, return nil, err } - rcvr, err := announce.NewReceiver(host, topic, - announce.WithAllowPeer(opts.allowPeer), - announce.WithFilterIPs(opts.filterIPs), - announce.WithResend(opts.resendAnnounce), - announce.WithTopic(opts.topic)) - if err != nil { - return nil, err - } - s := &Subscriber{ - dss: dss, + dss: opts.dss, host: host, - addrTTL: opts.addrTTL, - closing: make(chan struct{}), - watchDone: make(chan struct{}), + addrTTL: opts.addrTTL, + closing: make(chan struct{}), handlers: make(map[peer.ID]*handler), inEvents: make(chan SyncFinished, 1), @@ -225,12 +216,18 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, lastKnownSync: opts.lastKnownSync, segDepthLimit: opts.segDepthLimit, - - receiver: rcvr, + topicName: topic, } - // Start watcher to read announce messages. - go s.watch() + if opts.hasRcvr { + s.receiver, err = announce.NewReceiver(host, topic, opts.rcvrOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create announcement receiver: %w", err) + } + s.watchDone = make(chan struct{}) + // Start watcher to read announce messages. + go s.watch() + } // Start distributor to send SyncFinished messages to interested parties. go s.distributeEvents() // Start goroutine to remove idle publisher handlers. @@ -269,14 +266,6 @@ func (s *Subscriber) SetLatestSync(peerID peer.ID, latestSync cid.Cid) error { return nil } -// SetAllowPeer configures Subscriber with a function to evaluate whether to -// allow or reject messages from a peer. Setting nil removes any filtering and -// allows messages from all peers. Calling SetAllowPeer replaces any previously -// configured AllowPeerFunc. -func (s *Subscriber) SetAllowPeer(allowPeer announce.AllowPeerFunc) { - s.receiver.SetAllowPeer(allowPeer) -} - // Close shuts down the Subscriber. func (s *Subscriber) Close() error { var err error @@ -297,17 +286,19 @@ func (s *Subscriber) doClose() error { // Wait for explicit Syncs calls to finish. s.expSyncWG.Wait() - // Close receiver and wait for watch to exit. - err := s.receiver.Close() - if err != nil { - log.Errorw("error closing receiver", "err", err) + if s.receiver != nil { + // Close receiver and wait for watch to exit. + err := s.receiver.Close() + if err != nil { + log.Errorw("error closing receiver", "err", err) + } + <-s.watchDone } - <-s.watchDone // Wait for any syncs to complete. s.asyncWG.Wait() - err = s.dtSync.Close() + err := s.dtSync.Close() // Stop the distribution goroutine. close(s.inEvents) @@ -629,6 +620,9 @@ func (s *Subscriber) watch() { // an announce message announces the availability of an advertisement and where // to retrieve it from. func (s *Subscriber) Announce(ctx context.Context, nextCid cid.Cid, peerID peer.ID, peerAddrs []multiaddr.Multiaddr) error { + if s.receiver == nil { + return nil + } return s.receiver.Direct(ctx, nextCid, peerID, peerAddrs) } @@ -664,7 +658,7 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, addrTTL time.Duration) ( peerStore.AddAddrs(peerInfo.ID, peerInfo.Addrs, addrTTL) } - return s.dtSync.NewSyncer(peerInfo.ID, s.receiver.TopicName()), false, nil + return s.dtSync.NewSyncer(peerInfo.ID, s.topicName), false, nil } // handleAsync starts a goroutine to process the latest announce message @@ -704,7 +698,9 @@ func (h *handler) handleAsync(ctx context.Context, nextCid cid.Cid, syncer Synce h.syncMutex.Unlock() if err != nil { // Failed to handle the sync, so allow another announce for the same CID. - h.subscriber.receiver.UncacheCid(c) + if h.subscriber.receiver != nil { + h.subscriber.receiver.UncacheCid(c) + } log.Errorw("Cannot process message", "err", err, "publisher", h.peerID) if strings.Contains(err.Error(), "response rejected") { // A "response rejected" error happens when the indexer diff --git a/dagsync/subscriber_test.go b/dagsync/subscriber_test.go index 09e8132..5819022 100644 --- a/dagsync/subscriber_test.go +++ b/dagsync/subscriber_test.go @@ -66,7 +66,7 @@ func TestScopedBlockHook(t *testing.T) { require.NoError(t, test.WaitForP2PPublisher(pub, subHost, testTopic)) var calledGeneralBlockHookTimes int64 - sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, nil, dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { + sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { atomic.AddInt64(&calledGeneralBlockHookTimes, 1) })) require.NoError(t, err) @@ -129,7 +129,7 @@ func TestSyncedCidsReturned(t *testing.T) { require.NoError(t, test.WaitForP2PPublisher(pub, subHost, testTopic)) - sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, nil) + sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic) require.NoError(t, err) onFinished, cancel := sub.OnSyncFinished() @@ -192,7 +192,7 @@ func TestConcurrentSync(t *testing.T) { subLsys := test.MkLinkSystem(subDS) var calledTimes int64 - sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, nil, dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { + sub, err := dagsync.NewSubscriber(subHost, subDS, subLsys, testTopic, dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { atomic.AddInt64(&calledTimes, 1) })) require.NoError(t, err) @@ -352,7 +352,7 @@ func TestRoundTripSimple(t *testing.T) { // Init dagsync publisher and subscriber srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) - srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore) + srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore, nil) defer srcHost.Close() defer dstHost.Close() defer pub.Close() @@ -421,7 +421,7 @@ func TestRoundTrip(t *testing.T) { t.Log("block hook got", c, "from", p) } - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[2]), dagsync.BlockHook(blockHook)) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[2])), dagsync.BlockHook(blockHook)) require.NoError(t, err) defer sub.Close() @@ -617,7 +617,7 @@ func TestCloseSubscriber(t *testing.T) { sh := test.MkTestHost() lsys := test.MkLinkSystem(st) - sub, err := dagsync.NewSubscriber(sh, st, lsys, testTopic, nil) + sub, err := dagsync.NewSubscriber(sh, st, lsys, testTopic) require.NoError(t, err) watcher, cncl := sub.OnSyncFinished() @@ -696,7 +696,7 @@ func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostS require.NoError(t, test.WaitForP2PPublisher(pub, subSys.host, topicName)) } - sub, err := dagsync.NewSubscriber(subSys.host, subSys.ds, subSys.lsys, topicName, nil, subOpts...) + sub, err := dagsync.NewSubscriber(subSys.host, subSys.ds, subSys.lsys, topicName, subOpts...) require.NoError(t, err) return pub, sub, senders diff --git a/dagsync/sync_test.go b/dagsync/sync_test.go index f3129ad..b72ea75 100644 --- a/dagsync/sync_test.go +++ b/dagsync/sync_test.go @@ -44,7 +44,7 @@ func TestLatestSyncSuccess(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1])) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1]))) require.NoError(t, err) defer sub.Close() @@ -94,7 +94,8 @@ func TestSyncFn(t *testing.T) { blocksSeenByHook[c] = struct{}{} } - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1]), dagsync.BlockHook(blockHook)) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.BlockHook(blockHook), + dagsync.RecvAnnounce(announce.WithTopic(topics[1]))) require.NoError(t, err) defer sub.Close() @@ -208,7 +209,7 @@ func TestPartialSync(t *testing.T) { defer pub.Close() test.MkChain(srcLnkS, true) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1])) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1]))) require.NoError(t, err) defer sub.Close() @@ -267,7 +268,7 @@ func TestStepByStepSync(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1])) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1]))) require.NoError(t, err) defer sub.Close() @@ -312,7 +313,7 @@ func TestLatestSyncFailure(t *testing.T) { t.Log("source host:", srcHost.ID()) t.Log("targer host:", dstHost.ID()) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce()) require.NoError(t, err) defer sub.Close() @@ -333,7 +334,7 @@ func TestLatestSyncFailure(t *testing.T) { sub.Close() dstStore = dssync.MutexWrap(datastore.NewMapDatastore()) - sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil) + sub2, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic) require.NoError(t, err) defer sub2.Close() @@ -364,7 +365,7 @@ func TestAnnounce(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce()) require.NoError(t, err) defer sub.Close() @@ -401,7 +402,7 @@ func TestCancelDeadlock(t *testing.T) { require.NoError(t, err) defer pub.Close() - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic) require.NoError(t, err) defer sub.Close()