diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index e742859..a98638f 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -98,6 +98,12 @@ type Subscriber struct { segDepthLimit int64 receiver *announce.Receiver + + // Track explicit Sync calls in progress and allow them to complete before + // closing subscriber. + expSyncClosed bool + expSyncMutex sync.Mutex + expSyncWG sync.WaitGroup } // SyncFinished notifies an OnSyncFinished reader that a specified peer @@ -290,6 +296,13 @@ func (s *Subscriber) doClose() error { // Cancel idle handler cleaner. close(s.closing) + // Block any additional explicit Sync calls. + s.expSyncMutex.Lock() + s.expSyncClosed = true + s.expSyncMutex.Unlock() + // Wait for explicit Syncs calls to finish. + s.expSyncWG.Wait() + // Close receiver and wait for watch to exit. err := s.receiver.Close() if err != nil { @@ -403,6 +416,15 @@ func (s *Subscriber) RemoveHandler(peerID peer.ID) bool { // // See: ExploreRecursiveWithStopNode. func (s *Subscriber) Sync(ctx context.Context, peerInfo peer.AddrInfo, nextCid cid.Cid, sel ipld.Node, options ...SyncOption) (cid.Cid, error) { + s.expSyncMutex.Lock() + if s.expSyncClosed { + s.expSyncMutex.Unlock() + return cid.Undef, errors.New("shutdown") + } + s.expSyncWG.Add(1) + s.expSyncMutex.Unlock() + defer s.expSyncWG.Done() + defaultOptions := []SyncOption{ ScopedBlockHook(s.generalBlockHook), ScopedSegmentDepthLimit(s.segDepthLimit)} diff --git a/go.mod b/go.mod index 9d7d8b6..c31f656 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ipfs/go-ipld-format v0.3.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-ipld-prime v0.20.0 - github.com/libp2p/go-libp2p v0.27.4 + github.com/libp2p/go-libp2p v0.27.5 github.com/libp2p/go-libp2p-gostream v0.6.0 github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/libp2p/go-msgio v0.3.0 @@ -22,7 +22,7 @@ require ( github.com/multiformats/go-multistream v0.4.1 github.com/multiformats/go-varint v0.0.7 github.com/stretchr/testify v1.8.2 - github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa + github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0 golang.org/x/crypto v0.7.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 google.golang.org/protobuf v1.30.0 diff --git a/go.sum b/go.sum index c5bb064..e61e018 100644 --- a/go.sum +++ b/go.sum @@ -324,8 +324,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic= github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= -github.com/libp2p/go-libp2p v0.27.4 h1:zliwN9xuzCBqCtWe0XjLKJGK6EIZTkp9L1e15wBpiOU= -github.com/libp2p/go-libp2p v0.27.4/go.mod h1:oMfQGTb9CHnrOuSM6yMmyK2lXz3qIhnkn2+oK3B1Y2g= +github.com/libp2p/go-libp2p v0.27.5 h1:KwA7pXKXpz8hG6Cr1fMA7UkgleogcwQj0sxl5qquWRg= +github.com/libp2p/go-libp2p v0.27.5/go.mod h1:oMfQGTb9CHnrOuSM6yMmyK2lXz3qIhnkn2+oK3B1Y2g= github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s= github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU= @@ -542,8 +542,8 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20191216205031-b047b6acb3c0/go.mod h1:x github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= -github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa h1:EyA027ZAkuaCLoxVX4r1TZMPy1d31fM6hbfQ4OU4I5o= -github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= +github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0 h1:XYEgH2nJgsrcrj32p+SAbx6T3s/6QknOXezXtz7kzbg= +github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=