Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Having announce.Receiver is optional for Subscriber #89

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion announce/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func getOpts(opts []Option) (config, error) {
}

// WithAllowPeer sets the function that determines whether to allow or reject
// messages from a peer.
// messages from a peer. When not set or nil, allows messages from all peers.
func WithAllowPeer(allowPeer AllowPeerFunc) Option {
return func(c *config) error {
c.allowPeer = allowPeer
Expand Down
34 changes: 14 additions & 20 deletions announce/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Receiver struct {
hostID peer.ID

announceCache *stringLRU
// announceMutex protects announceCache, and allowPeer, topicSub
// announceMutex protects announceCache and topicSub.
announceMutex sync.Mutex

closed bool
Expand Down Expand Up @@ -204,16 +204,6 @@ func (r *Receiver) Close() error {
return err
}

// SetAllowPeer configures Subscriber with a function to evaluate whether to
// allow or reject messages from a peer. Setting nil removes any filtering and
// allows messages from all peers. Calling SetAllowPeer replaces any previously
// configured AllowPeerFunc.
func (r *Receiver) SetAllowPeer(allowPeer AllowPeerFunc) {
r.announceMutex.Lock()
r.allowPeer = allowPeer
r.announceMutex.Unlock()
}

// UncacheCid removes a CID from the announce cache.
func (r *Receiver) UncacheCid(adCid cid.Cid) {
r.announceMutex.Lock()
Expand All @@ -234,7 +224,6 @@ func (r *Receiver) watch(ctx context.Context) {
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, pubsub.ErrSubscriptionCancelled) {
// This is a normal result of shutting down the Subscriber.
log.Debug("Canceled watching pubsub subscription")
break
}
log.Errorw("Error reading from pubsub", "err", err)
Expand Down Expand Up @@ -299,6 +288,9 @@ func (r *Receiver) watch(ctx context.Context) {
}
err = r.handleAnnounce(ctx, amsg, false)
if err != nil {
if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) {
break
}
log.Errorw("Cannot process message", "err", err)
continue
}
Expand All @@ -319,10 +311,10 @@ func (r *Receiver) Direct(ctx context.Context, nextCid cid.Cid, peerID peer.ID,
PeerID: peerID,
Addrs: addrs,
}
return r.handleAnnounce(ctx, amsg, true)
return r.handleAnnounce(ctx, amsg, r.resend)
}

func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct bool) error {
func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, resend bool) error {
err := r.announceCheck(amsg)
if err != nil {
if err == ErrClosed {
Expand All @@ -339,7 +331,7 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo
// address in their peer store.
}

if direct && r.resend {
if resend {
err = r.republish(ctx, amsg)
if err != nil {
log.Errorw("Cannot republish announce message", "err", err)
Expand All @@ -350,6 +342,8 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo

select {
case r.outChan <- amsg:
case <-r.done:
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -358,18 +352,18 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo
}

func (r *Receiver) announceCheck(amsg Announce) error {
// Check callback to see if peer ID allowed.
if r.allowPeer != nil && !r.allowPeer(amsg.PeerID) {
return errSourceNotAllowed
}

r.announceMutex.Lock()
defer r.announceMutex.Unlock()

if r.closed {
return ErrClosed
}

// Check callback to see if peer ID allowed.
if r.allowPeer != nil && !r.allowPeer(amsg.PeerID) {
return errSourceNotAllowed
}

// Check if a previous announce for this CID was already seen.
if r.announceCache.update(amsg.Cid.String()) {
return errAlreadySeenCid
Expand Down
11 changes: 7 additions & 4 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/httpsync"
"github.com/ipni/go-libipni/dagsync/test"
Expand Down Expand Up @@ -40,7 +41,7 @@ func TestAnnounceReplace(t *testing.T) {
require.NoError(t, err)
defer pub.Close()

sub, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil)
sub, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, RecvAnnounce())
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -163,7 +164,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
defer pubh.Close()
subds := dssync.MutexWrap(datastore.NewMapDatastore())
subls := test.MkLinkSystem(subds)
sub, err := NewSubscriber(subh, subds, subls, testTopic, nil)
sub, err := NewSubscriber(subh, subds, subls, testTopic, RecvAnnounce())
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -217,11 +218,13 @@ func TestAnnounceRepublish(t *testing.T) {

topics := test.WaitForMeshWithMessage(t, testTopic, dstHost, dstHost2)

sub2, err := NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic, nil, Topic(topics[1]))
sub2, err := NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic,
RecvAnnounce(announce.WithTopic(topics[1])))
require.NoError(t, err)
defer sub2.Close()

sub1, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, Topic(topics[0]), ResendAnnounce(true))
sub1, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
RecvAnnounce(announce.WithTopic(topics[0]), announce.WithResend(true)))
require.NoError(t, err)
defer sub1.Close()

Expand Down
2 changes: 1 addition & 1 deletion dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option)
dstLinkSys := test.MkLinkSystem(dstStore)
dstHost := test.MkTestHost()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, nil, subscriberOptions...)
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, subscriberOptions...)
require.NoError(t, err)
t.Cleanup(func() {
sub.Close()
Expand Down
36 changes: 21 additions & 15 deletions dagsync/legs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) {
func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer func(peer.ID) bool) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) {
srcHost := test.MkTestHost()
dstHost := test.MkTestHost()

Expand All @@ -54,7 +54,8 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host,
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1]))
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
Expand All @@ -67,20 +68,24 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host,

func TestAllowPeerReject(t *testing.T) {
t.Parallel()

// Set function to reject anything except dstHost, which is not the one
// generating the update.
var destID peer.ID
allow := func(peerID peer.ID) bool {
return peerID == destID
}

// Init dagsync publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore)
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore, allow)
defer srcHost.Close()
defer dstHost.Close()
defer pub.Close()
defer sub.Close()

// Set function to reject anything except dstHost, which is not the one
// generating the update.
sub.SetAllowPeer(func(peerID peer.ID) bool {
return peerID == dstHost.ID()
})
destID = dstHost.ID()

watcher, cncl := sub.OnSyncFinished()
defer cncl()
Expand All @@ -101,20 +106,21 @@ func TestAllowPeerReject(t *testing.T) {

func TestAllowPeerAllows(t *testing.T) {
t.Parallel()

// Set function to allow any peer.
allow := func(_ peer.ID) bool {
return true
}

// Init dagsync publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore)
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore, allow)
defer srcHost.Close()
defer dstHost.Close()
defer pub.Close()
defer sub.Close()

// Set function to allow any peer.
sub.SetAllowPeer(func(_ peer.ID) bool {
return true
})

watcher, cncl := sub.OnSyncFinished()
defer cncl()

Expand Down Expand Up @@ -167,7 +173,7 @@ func TestPublisherRejectsPeer(t *testing.T) {
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1]))
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1])))
require.NoError(t, err)
defer sub.Close()

Expand Down
44 changes: 18 additions & 26 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
dt "github.com/filecoin-project/go-data-transfer/v2"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/announce"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -33,9 +34,7 @@ type LastKnownSyncFunc func(peer.ID) (cid.Cid, bool)

// config contains all options for configuring Subscriber.
type config struct {
addrTTL time.Duration
allowPeer announce.AllowPeerFunc
filterIPs bool
addrTTL time.Duration

topic *pubsub.Topic

Expand All @@ -45,12 +44,14 @@ type config struct {
blockHook BlockHookFunc
httpClient *http.Client

dss ipld.Node
syncRecLimit selector.RecursionLimit

idleHandlerTTL time.Duration
lastKnownSync LastKnownSyncFunc

resendAnnounce bool
hasRcvr bool
rcvrOpts []announce.Option

segDepthLimit int64

Expand Down Expand Up @@ -79,15 +80,6 @@ func getOpts(opts []Option) (config, error) {
return cfg, nil
}

// AllowPeer sets the function that determines whether to allow or reject
// messages from a peer.
func AllowPeer(allowPeer announce.AllowPeerFunc) Option {
return func(c *config) error {
c.allowPeer = allowPeer
return nil
}
}

// AddrTTL sets the peerstore address time-to-live for addresses discovered
// from pubsub messages.
func AddrTTL(addrTTL time.Duration) Option {
Expand All @@ -105,6 +97,15 @@ func Topic(topic *pubsub.Topic) Option {
}
}

// DefaultSelectorSeq sets the default selector sequence passed to
// ExploreRecursiveWithStopNode.
func DefaultSelectorSeq(dss ipld.Node) Option {
return func(c *config) error {
c.dss = dss
return nil
}
}

// DtManager provides an existing datatransfer manager.
func DtManager(dtManager dt.Manager, gs graphsync.GraphExchange) Option {
return func(c *config) error {
Expand Down Expand Up @@ -133,15 +134,6 @@ func BlockHook(blockHook BlockHookFunc) Option {
}
}

// FilterIPs removes any private, loopback, or unspecified IP multiaddrs from
// addresses supplied in announce messages.
func FilterIPs(enable bool) Option {
return func(c *config) error {
c.filterIPs = enable
return nil
}
}

// IdleHandlerTTL configures the time after which idle handlers are removed.
func IdleHandlerTTL(ttl time.Duration) Option {
return func(c *config) error {
Expand Down Expand Up @@ -171,11 +163,11 @@ func SyncRecursionLimit(limit selector.RecursionLimit) Option {
}
}

// ResendAnnounce determines whether to resend the direct announce mesages
// (those that are not received via pubsub) over pubsub.
func ResendAnnounce(enable bool) Option {
// RecvAnnounce enables an announcement message receiver.
func RecvAnnounce(opts ...announce.Option) Option {
return func(c *config) error {
c.resendAnnounce = enable
c.hasRcvr = true
c.rcvrOpts = opts
return nil
}
}
Expand Down
Loading