diff --git a/dagsync/announce_test.go b/dagsync/announce_test.go index 28cd655..9ca28eb 100644 --- a/dagsync/announce_test.go +++ b/dagsync/announce_test.go @@ -35,8 +35,7 @@ func TestAnnounceReplace(t *testing.T) { blocksSeenByHook[c] = struct{}{} } - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(), - dagsync.BlockHook(blockHook), dagsync.WithCidSchemaHint(false)) + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(), dagsync.BlockHook(blockHook)) require.NoError(t, err) defer sub.Close() @@ -458,7 +457,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.WithCidSchemaHint(false), + sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer))) require.NoError(t, err) diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index f52f9c5..156323c 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -13,11 +13,9 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagjson" - "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" - "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/maurl" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" @@ -229,7 +227,6 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { ipldCtx := ipld.LinkContext{} reqType := r.Header.Get(CidSchemaHeader) if reqType != "" { - log.Debug("sync request has cid schema type hint", "hint", reqType) ipldCtx.Ctx, err = CtxWithCidSchema(context.Background(), reqType) if err != nil { // Log warning about unknown cid schema type, but continue on since @@ -238,17 +235,7 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - var ipldProto datamodel.NodePrototype - switch reqType { - case CidSchemaAdvertisement: - ipldProto = schema.AdvertisementPrototype - case CidSchemaEntryChunk: - ipldProto = schema.EntryChunkPrototype - default: - ipldProto = basicnode.Prototype.Any - } - - item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto) + item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, basicnode.Prototype.Any) if err != nil { if errors.Is(err, ipld.ErrNotExists{}) { http.Error(w, "cid not found", http.StatusNotFound) diff --git a/dagsync/ipnisync/sync.go b/dagsync/ipnisync/sync.go index 9ec3363..1ac09a1 100644 --- a/dagsync/ipnisync/sync.go +++ b/dagsync/ipnisync/sync.go @@ -22,7 +22,6 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" - "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/maurl" "github.com/ipni/go-libipni/mautil" "github.com/libp2p/go-libp2p/core/network" @@ -244,22 +243,12 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error } // Check for valid cid schema type if set. - reqType, err := CidSchemaFromCtx(ctx) + _, err = CidSchemaFromCtx(ctx) if err != nil { return err } - var ipldProto datamodel.NodePrototype - switch reqType { - case CidSchemaAdvertisement: - ipldProto = schema.AdvertisementPrototype - case CidSchemaEntryChunk: - ipldProto = schema.EntryChunkPrototype - default: - ipldProto = basicnode.Prototype.Any - } - - cids, err := s.walkFetch(ctx, nextCid, xsel, ipldProto) + cids, err := s.walkFetch(ctx, nextCid, xsel) if err != nil { return fmt.Errorf("failed to traverse requested dag: %w", err) } @@ -285,7 +274,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error // walkFetch is run by a traversal of the selector. For each block that the // selector walks over, walkFetch will look to see if it can find it in the // local data store. If it cannot, it will then go and get it over HTTP. -func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector, ipldProto datamodel.NodePrototype) ([]cid.Cid, error) { +func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector) ([]cid.Cid, error) { // Track the order of cids seen during traversal so that the block hook // function gets called in the same order. var traversalOrder []cid.Cid @@ -296,7 +285,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) { c := l.(cidlink.Link).Cid // fetchBlock checks if the node is already present in storage. - err := s.fetchBlock(ctx, c, ipldProto) + err := s.fetchBlock(ctx, c) if err != nil { return nil, fmt.Errorf("failed to fetch block for cid %s: %w", c, err) } @@ -318,7 +307,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se } // get the direct node. - rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, ipldProto) + rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any) if err != nil { return nil, fmt.Errorf("failed to load node for root cid %s: %w", rootCid, err) } @@ -401,8 +390,8 @@ retry: } // fetchBlock fetches an item into the datastore at c if not locally available. -func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid, ipldProto datamodel.NodePrototype) error { - n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, ipldProto) +func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid) error { + n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) // node is already present. if n != nil && err == nil { return nil diff --git a/dagsync/option.go b/dagsync/option.go index 8370d37..45096e2 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -53,7 +53,6 @@ type config struct { gsMaxInRequests uint64 gsMaxOutRequests uint64 - cidSchemaHint bool strictAdsSelSeq bool httpTimeout time.Duration @@ -74,7 +73,6 @@ func getOpts(opts []Option) (config, error) { segDepthLimit: defaultSegDepthLimit, gsMaxInRequests: defaultGsMaxInRequests, gsMaxOutRequests: defaultGsMaxOutRequests, - cidSchemaHint: true, strictAdsSelSeq: true, } @@ -357,10 +355,3 @@ func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockH } } } - -func WithCidSchemaHint(enable bool) Option { - return func(c *config) error { - c.cidSchemaHint = enable - return nil - } -} diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 8f557dd..0f0ead0 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -105,10 +105,6 @@ type Subscriber struct { receiver *announce.Receiver topicName string - // cidSchemaHint enables sending the cid schema type hint as - // an HTTP header in sync requests. - cidSchemaHint bool - // Track explicit Sync calls in progress and allow them to complete before // closing subscriber. expSyncClosed bool @@ -248,8 +244,6 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { efsb.Insert("Next", ssb.ExploreRecursiveEdge()) // Next field in EntryChunk })).Node(), - - cidSchemaHint: opts.cidSchemaHint, } if opts.strictAdsSelSeq { @@ -258,7 +252,6 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, }).Node() } else { s.adsSelectorSeq = ssb.ExploreAll(ssb.ExploreRecursiveEdge()).Node() - s.cidSchemaHint = false } if opts.hasRcvr { @@ -504,11 +497,9 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk) - if s.cidSchemaHint { - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) - if err != nil { - panic(err.Error()) - } + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) } syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid) if err != nil { @@ -593,11 +584,9 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) - if s.cidSchemaHint { - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) - if err != nil { - panic(err.Error()) - } + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) + if err != nil { + panic(err.Error()) } _, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef) if err != nil { @@ -901,11 +890,10 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { log.Errorw("Cannot make syncer for announce", "err", err, "peer", h.peerID) return } - if h.subscriber.cidSchemaHint { - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) - if err != nil { - panic(err.Error()) - } + + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) } sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid)