From 83d2304631a8712b6177ebb2ff5053f176b077bf Mon Sep 17 00:00:00 2001 From: Andrew Gillis Date: Thu, 13 Jan 2022 04:15:52 -0800 Subject: [PATCH] Fix incorrect log messages (#151) * Fix incorrect log messages * Cache advertisement while processing entry chunks for advertisement to prevent reloading re-validating the same data. * Sync datastore after saving ad mapping * Log completion of entry block indexing at info level * Index current chunk even if unable to set mapping for next chunk * Update config defaults * Populate defaults if config file missing values * bump version --- config/addresses.go | 18 +++ config/config.go | 10 ++ config/datastore.go | 12 ++ config/discovery.go | 15 +- config/indexer.go | 15 ++ config/ingest.go | 19 ++- go.mod | 2 +- go.sum | 3 +- internal/ingest/ingest.go | 4 + internal/ingest/ingest_test.go | 2 +- internal/ingest/linksystem.go | 262 ++++++++++++++++++++------------- version.json | 2 +- 12 files changed, 254 insertions(+), 110 deletions(-) diff --git a/config/addresses.go b/config/addresses.go index a7446b106..5a2ff31f0 100644 --- a/config/addresses.go +++ b/config/addresses.go @@ -23,3 +23,21 @@ func NewAddresses() Addresses { P2PAddr: "/ip4/0.0.0.0/tcp/3003", } } + +// populateUnset replaces zero-values in the config with default values. +func (c *Addresses) populateUnset() { + def := NewAddresses() + + if c.Admin == "" { + c.Admin = def.Admin + } + if c.Finder == "" { + c.Finder = def.Finder + } + if c.Ingest == "" { + c.Ingest = def.Ingest + } + if c.P2PAddr == "" { + c.P2PAddr = def.P2PAddr + } +} diff --git a/config/config.go b/config/config.go index 2e68d0c92..82c812f69 100644 --- a/config/config.go +++ b/config/config.go @@ -98,6 +98,8 @@ func Load(filePath string) (*Config, error) { return nil, err } + cfg.populateUnset() + return &cfg, nil } @@ -138,3 +140,11 @@ func (c *Config) String() string { } return string(b) } + +func (c *Config) populateUnset() { + c.Addresses.populateUnset() + c.Datastore.populateUnset() + c.Discovery.populateUnset() + c.Indexer.populateUnset() + c.Ingest.populateUnset() +} diff --git a/config/datastore.go b/config/datastore.go index 4ae189a54..308eec668 100644 --- a/config/datastore.go +++ b/config/datastore.go @@ -17,3 +17,15 @@ func NewDatastore() Datastore { Dir: "datastore", } } + +// populateUnset replaces zero-values in the config with default values. +func (c *Datastore) populateUnset() { + def := NewDatastore() + + if c.Type == "" { + c.Type = def.Type + } + if c.Dir == "" { + c.Dir = def.Dir + } +} diff --git a/config/discovery.go b/config/discovery.go index 853212074..3f5bc0621 100644 --- a/config/discovery.go +++ b/config/discovery.go @@ -24,7 +24,8 @@ type Discovery struct { // Values are a number ending in "s", "m", "h" for seconds. minutes, hours. PollInterval Duration // RediscoverWait is the amount of time that must pass before a provider - // can be discovered following a previous discovery attempt + // can be discovered following a previous discovery attempt. A value of 0 + // means there is no wait time. RediscoverWait Duration // Timeout is the maximum amount of time that the indexer will spend trying // to discover and verify a new provider. @@ -41,3 +42,15 @@ func NewDiscovery() Discovery { Timeout: Duration(2 * time.Minute), } } + +// populateUnset replaces zero-values in the config with default values. +func (c *Discovery) populateUnset() { + def := NewDiscovery() + + if c.PollInterval == 0 { + c.PollInterval = def.PollInterval + } + if c.Timeout == 0 { + c.Timeout = def.Timeout + } +} diff --git a/config/indexer.go b/config/indexer.go index 5a27ebf25..574059534 100644 --- a/config/indexer.go +++ b/config/indexer.go @@ -19,3 +19,18 @@ func NewIndexer() Indexer { ValueStoreType: "sth", } } + +// populateUnset replaces zero-values in the config with default values. +func (c *Indexer) populateUnset() { + def := NewIndexer() + + if c.CacheSize == 0 { + c.CacheSize = def.CacheSize + } + if c.ValueStoreDir == "" { + c.ValueStoreDir = def.ValueStoreDir + } + if c.ValueStoreType == "" { + c.ValueStoreType = def.ValueStoreType + } +} diff --git a/config/ingest.go b/config/ingest.go index 54d88e120..e1c039e3d 100644 --- a/config/ingest.go +++ b/config/ingest.go @@ -7,7 +7,9 @@ type Ingest struct { // PubSubTopic used to advertise ingestion announcements. PubSubTopic string // StoreBatchSize is the number of entries in each write to the value - // store. Specifying a value less than 2 disables batching. + // store. Specifying a value less than 2 disables batching. This should + // be smaller than the maximum number of multihashes in an entry block to + // write concurrently to the value store. StoreBatchSize int // SyncTimeout is the maximum amount of time allowed for a sync to complete // before it is canceled. This can be a sync of a chain of advertisements @@ -24,3 +26,18 @@ func NewIngest() Ingest { SyncTimeout: Duration(2 * time.Hour), } } + +// populateUnset replaces zero-values in the config with default values. +func (c *Ingest) populateUnset() { + def := NewIngest() + + if c.PubSubTopic == "" { + c.PubSubTopic = def.PubSubTopic + } + if c.StoreBatchSize == 0 { + c.StoreBatchSize = def.StoreBatchSize + } + if c.SyncTimeout == 0 { + c.SyncTimeout = def.SyncTimeout + } +} diff --git a/go.mod b/go.mod index c41f37904..65ad058b2 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/ipfs/go-datastore v0.5.1 github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ipfs v0.11.0 - github.com/ipfs/go-log/v2 v2.4.0 + github.com/ipfs/go-log/v2 v2.5.0 github.com/ipld/go-ipld-prime v0.14.3 github.com/libp2p/go-libp2p v0.17.0 github.com/libp2p/go-libp2p-core v0.13.0 diff --git a/go.sum b/go.sum index b8b5616d3..03a5cd827 100644 --- a/go.sum +++ b/go.sum @@ -565,8 +565,9 @@ github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscw github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g= -github.com/ipfs/go-log/v2 v2.4.0 h1:iR/2o9PGWanVJrBgIH5Ff8mPGOwpqLaPIAFqSnsdlzk= github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo= +github.com/ipfs/go-log/v2 v2.5.0 h1:+MhAooFd9XZNvR0i9FriKW6HB0ql7HNXUuflWtc0dd4= +github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto= github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= github.com/ipfs/go-merkledag v0.2.4/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 0ba145b7a..1b9568d6b 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" indexer "github.com/filecoin-project/go-indexer-core/engine" @@ -49,6 +50,9 @@ type Ingester struct { syncTimeout time.Duration adLocks *lockChain watchDone chan struct{} + + adCache map[cid.Cid]adCacheItem + adCacheMutex sync.Mutex } // NewIngester creates a new Ingester that uses a go-legs Subscriber to handle diff --git a/internal/ingest/ingest_test.go b/internal/ingest/ingest_test.go index 6326423f4..9ea89891a 100644 --- a/internal/ingest/ingest_test.go +++ b/internal/ingest/ingest_test.go @@ -319,7 +319,7 @@ func publishRandomAdv(t *testing.T, i *Ingester, pubHost host.Host, pub legs.Pub requireTrueEventually(t, func() bool { has, err := i.ds.Has(context.Background(), datastore.NewKey(c.String())) return err == nil && has - }, 2*time.Second, 15*time.Second, "expected advertisement with ID %s was not received", c) + }, 3*time.Second, 21*time.Second, "expected advertisement with ID %s was not received", c) } // Check if advertisement in datastore. diff --git a/internal/ingest/linksystem.go b/internal/ingest/linksystem.go index 3eb5ee860..38588823b 100644 --- a/internal/ingest/linksystem.go +++ b/internal/ingest/linksystem.go @@ -26,6 +26,11 @@ import ( "go.opencensus.io/stats" ) +type adCacheItem struct { + value indexer.Value + isRm bool +} + var ( errBadAdvert = errors.New("bad advertisement") errInvalidAdvertSignature = errors.New("invalid advertisement signature") @@ -54,15 +59,18 @@ func mkLinkSystem(ds datastore.Batching, reg *registry.Registry) ipld.LinkSystem return buf, func(lnk ipld.Link) error { c := lnk.(cidlink.Link).Cid origBuf := buf.Bytes() + + log := log.With("cid", c) + // Decode the node to check its type. n, err := decodeIPLDNode(buf) if err != nil { - log.Errorf("Error decoding IPLD node in linksystem: %s", err) + log.Errorw("Error decoding IPLD node in linksystem", "err", err) return errors.New("bad ipld data") } // If it is an advertisement. if isAdvertisement(n) { - log.Infow("Received advertisement", "cid", c) + log.Infow("Received advertisement") // Verify that the signature is correct and the advertisement // is valid. @@ -87,25 +95,24 @@ func mkLinkSystem(ds datastore.Batching, reg *registry.Registry) ipld.LinkSystem // Store entries link into the reverse map so there is a way of // identifying what advertisementID announced these entries // when we come across the link. - log.Debug("Setting reverse map for entries after receiving advertisement") + log.Debug("Saving map of entries to advertisement and advertisement data") elnk, err := ad.FieldEntries().AsLink() if err != nil { log.Errorw("Error getting link for entries from advertisement", "err", err) return errBadAdvert } - err = putCidToAdMapping(ds, elnk, c) + err = putCidToAdMapping(lctx.Ctx, ds, elnk, c) if err != nil { log.Errorw("Error storing reverse map for entries in datastore", "err", err) return errors.New("cannot process advertisement") } - log.Debug("Persisting new advertisement") // Persist the advertisement. This is read later when // processing each chunk of entries, to get info common to all // entries in a chunk. return ds.Put(lctx.Ctx, dsKey(c.String()), origBuf) } - log.Debug("Persisting IPLD node") + log.Debug("Received IPLD node") // Any other type of node (like entries) are stored right away. return ds.Put(lctx.Ctx, dsKey(c.String()), origBuf) }, nil @@ -160,8 +167,6 @@ func verifyAdvertisement(n ipld.Node) (schema.Advertisement, peer.ID, error) { return nil, peer.ID(""), errInvalidAdvertSignature } - log.Infow("Advertisement signature is valid", "provider", provID) - return ad, provID, nil } @@ -178,8 +183,7 @@ func verifyAdvertisement(n ipld.Node) (schema.Advertisement, peer.ID, error) { // called for each entry in an advertisement's chain of entries. Process the // entry and save all the multihashes in it as indexes in the indexer-core. func (ing *Ingester) storageHook(pubID peer.ID, c cid.Cid) { - log := log.With("publisher", pubID, "adCid", c) - log.Debug("Incoming block hook triggered") + log := log.With("publisher", pubID, "cid", c) // Get data corresponding to the block. val, err := ing.ds.Get(context.Background(), dsKey(c.String())) @@ -197,8 +201,6 @@ func (ing *Ingester) storageHook(pubID peer.ID, c cid.Cid) { // If this is an advertisement, sync entries within it. if isAdvertisement(node) { - log.Debug("Incoming block is an advertisement") - ad, err := decodeAd(node) if err != nil { log.Errorw("Error decoding advertisement", "err", err) @@ -225,6 +227,9 @@ func (ing *Ingester) storageHook(pubID peer.ID, c cid.Cid) { prevCid = lnk.(cidlink.Link).Cid } } + + log.Infow("Incoming block is an advertisement", "prevAd", prevCid) + // Signal this ad is busy and wait for any sync on the previous ad to // finish. Signal this ad is done at function return. prevWait, unlock, err := ing.adLocks.lockWait(prevCid, c) @@ -242,27 +247,27 @@ func (ing *Ingester) storageHook(pubID peer.ID, c cid.Cid) { // corresponding to the link CID, from the reverse map. Then load the // advertisement to get the metadata for indexing all the content in // the incoming block. - log.Debug("Incoming block is not an advertisement; processing entries") - adCid, err := getCidToAdMapping(ing.ds, c) + adCid, err := getCidToAdMapping(context.Background(), ing.ds, c) if err != nil { - log.Errorw("Error getting advertisementID for entries map", "err", err) + log.Errorw("Error getting advertisement CID for entry CID", "err", err) return } log = log.With("adCid", adCid) - log.Infow("Indexing content in block") + log.Info("Indexing content in incoming entry block") err = ing.indexContentBlock(adCid, pubID, node) if err != nil { log.Errorw("Error processing entries for advertisement", "err", err) - return + } else { + log.Info("Done indexing content in entry block") + ing.signalMetricsUpdate() } - ing.signalMetricsUpdate() + log.Debug("removing entry-to-ad mapping and entry block") - // Remove the datastore entry that maps a chunk to an advertisement - // now that the chunk is processed. - log.Debug("Removing mapping to advertisement for processed entries") - err = deleteCidToAdMapping(ing.ds, c) + // Remove the mapping of an entry chunk CID to an advertisement CID now + // that the chunk is processed. + err = deleteCidToAdMapping(context.Background(), ing.ds, c) if err != nil { log.Errorw("Error deleting cid-advertisement mapping for entries", "err", err) } @@ -270,7 +275,6 @@ func (ing *Ingester) storageHook(pubID peer.ID, c cid.Cid) { // Remove the content block from the data store now that processing it // has finished. This prevents storing redundant information in // several datastores. - log.Debug("Removing processed entries from datastore") err = ing.ds.Delete(context.Background(), dsKey(c.String())) if err != nil { log.Errorw("Error deleting index from datastore", "err", err) @@ -281,11 +285,8 @@ func (ing *Ingester) storageHook(pubID peer.ID, c cid.Cid) { func (ing *Ingester) syncAdEntries(from peer.ID, ad schema.Advertisement, adCid cid.Cid, prevWait <-chan struct{}, unlock context.CancelFunc) { <-prevWait defer unlock() - log := log.With("publisher", from, "adCid", adCid) - log.Infow("Syncing content for advertisement") - isRm, err := ad.FieldIsRm().AsBool() if err != nil { log.Errorw("Cannot read IsRm field", "err", err) @@ -302,6 +303,7 @@ func (ing *Ingester) syncAdEntries(from peer.ID, ad schema.Advertisement, adCid // everything from the indexer-core with the contextID and provider from // this advertisement. if isRm && len(contextID) != 0 { + log.Infow("Removing content by context ID") provider, err := ad.FieldProvider().AsString() if err != nil { log.Errorw("cannot read provider from advertisement", "err", err) @@ -318,16 +320,9 @@ func (ing *Ingester) syncAdEntries(from peer.ID, ad schema.Advertisement, adCid log.Error("Failed to removed content by context ID") return } - log.Infow("Removed content by context ID") return } - if isRm { - log.Warnw("Syncing content blocks to remove for removal advertisement with no context ID") - } else { - log.Infow("Syncing content blocks for advertisement") - } - elink, err := ad.FieldEntries().AsLink() if err != nil { log.Errorw("Error decoding advertisement entries link", "err", err) @@ -341,6 +336,13 @@ func (ing *Ingester) syncAdEntries(from peer.ID, ad schema.Advertisement, adCid } log = log.With("entriesCid", entriesCid) + + if isRm { + log.Warnw("Syncing content entries for removal advertisement with no context ID") + } else { + log.Infow("Syncing content entries for advertisement") + } + exists, err := ing.ds.Has(context.Background(), dsKey(entriesCid.String())) if err != nil && err != datastore.ErrNotFound { log.Errorw("Failed checking if entries exist", "err", err) @@ -350,7 +352,6 @@ func (ing *Ingester) syncAdEntries(from peer.ID, ad schema.Advertisement, adCid return } - log.Info("Starting sync for entries") ctx := context.Background() if ing.syncTimeout != 0 { var cancel context.CancelFunc @@ -382,33 +383,126 @@ func (ing *Ingester) syncAdEntries(from peer.ID, ad schema.Advertisement, adCid // retrieved from. It is the provider ID that needs to be stored by the // indexer. func (ing *Ingester) indexContentBlock(adCid cid.Cid, pubID peer.ID, nentries ipld.Node) error { + // Decode the list of cids into a List_String + nb := schema.Type.EntryChunk.NewBuilder() + err := nb.AssignNode(nentries) + if err != nil { + return fmt.Errorf("cannot decode entries: %s", err) + } + + nchunk := nb.Build().(schema.EntryChunk) + + // If this entry chunk hash a next link, add a mapping from the next + // entries CID to the ad CID so that the ad can be loaded when that chunk + // is received. + // + // Do not return here if error; try to index content in this chunk. + hasNextLink, linkErr := ing.setNextCidToAd(nchunk, adCid) + + // Load the advertisement data for this chunk. If there are more chunks to + // follow, then cache the ad data. + value, isRm, err := ing.loadAdData(adCid, hasNextLink) + if err != nil { + return err + } + + mhChan := make(chan multihash.Multihash, ing.batchSize) + // The isRm parameter is passed in for an advertisement that contains + // entries, to allow for removal of individual entries. + errChan := ing.batchIndexerEntries(mhChan, value, isRm) + + // Iterate over all entries and ingest (or remove) them. + entries := nchunk.FieldEntries() + cit := entries.ListIterator() + var count int + for !cit.Done() { + _, cnode, _ := cit.Next() + h, err := cnode.AsBytes() + if err != nil { + close(mhChan) + return fmt.Errorf("cannot decode an entry from the ingestion list: %s", err) + } + + select { + case mhChan <- h: + case err = <-errChan: + return err + } + + count++ + } + close(mhChan) + err = <-errChan + if err != nil { + if isRm { + return fmt.Errorf("cannot remove multihashes from indexer: %s", err) + } + return fmt.Errorf("cannot put multihashes into indexer: %s", err) + } + + return linkErr +} + +func (ing *Ingester) setNextCidToAd(nchunk schema.EntryChunk, adCid cid.Cid) (bool, error) { + if nchunk.Next.IsAbsent() || nchunk.Next.IsNull() { + return false, nil + } + + lnk, err := nchunk.Next.AsNode().AsLink() + if err != nil { + return false, err + } + err = putCidToAdMapping(context.Background(), ing.ds, lnk, adCid) + if err == nil { + return false, err + } + + return true, nil +} + +func (ing *Ingester) loadAdData(adCid cid.Cid, keepCache bool) (indexer.Value, bool, error) { + ing.adCacheMutex.Lock() + adData, ok := ing.adCache[adCid] + if !keepCache && ok { + if len(ing.adCache) == 1 { + ing.adCache = nil + } else { + delete(ing.adCache, adCid) + } + } + ing.adCacheMutex.Unlock() + + if ok { + return adData.value, adData.isRm, nil + } + // Getting the advertisement for the entries so we know // what metadata and related information we need to use for ingestion. adb, err := ing.ds.Get(context.Background(), dsKey(adCid.String())) if err != nil { - return fmt.Errorf("cannot read advertisement for entry from datastore: %s", err) + return indexer.Value{}, false, fmt.Errorf("cannot read advertisement for entry from datastore: %s", err) } // Decode the advertisement. adn, err := decodeIPLDNode(bytes.NewBuffer(adb)) if err != nil { - return fmt.Errorf("cannot decode ipld node: %s", err) + return indexer.Value{}, false, fmt.Errorf("cannot decode ipld node: %s", err) } ad, err := decodeAd(adn) if err != nil { - return fmt.Errorf("cannot decode advertisement: %s", err) + return indexer.Value{}, false, fmt.Errorf("cannot decode advertisement: %s", err) } // Fetch data of interest. contextID, err := ad.FieldContextID().AsBytes() if err != nil { - return err + return indexer.Value{}, false, err } metadataBytes, err := ad.FieldMetadata().AsBytes() if err != nil { - return err + return indexer.Value{}, false, err } isRm, err := ad.FieldIsRm().AsBool() if err != nil { - return err + return indexer.Value{}, false, err } // The peerID passed into the storage hook is the source of the @@ -417,24 +511,17 @@ func (ing *Ingester) indexContentBlock(adCid cid.Cid, pubID peer.ID, nentries ip // to create the indexed value. provider, err := ad.FieldProvider().AsString() if err != nil { - return fmt.Errorf("cannot read provider from advertisement: %s", err) + return indexer.Value{}, false, fmt.Errorf("cannot read provider from advertisement: %s", err) } providerID, err := peer.Decode(provider) if err != nil { - return fmt.Errorf("cannot decode provider peer id: %s", err) - } - - // Decode the list of cids into a List_String - nb := schema.Type.EntryChunk.NewBuilder() - err = nb.AssignNode(nentries) - if err != nil { - return fmt.Errorf("cannot decode entries: %s", err) + return indexer.Value{}, false, fmt.Errorf("cannot decode provider peer id: %s", err) } // Check for valid metadata err = new(v0.Metadata).UnmarshalBinary(metadataBytes) if err != nil { - return fmt.Errorf("cannot decoding metadata: %s", err) + return indexer.Value{}, false, fmt.Errorf("cannot decoding metadata: %s", err) } value := indexer.Value{ @@ -443,52 +530,19 @@ func (ing *Ingester) indexContentBlock(adCid cid.Cid, pubID peer.ID, nentries ip MetadataBytes: metadataBytes, } - mhChan := make(chan multihash.Multihash, ing.batchSize) - // The isRm parameter is passed in for an advertisement that contains - // entries, to allow for removal of individual entries. - errChan := ing.batchIndexerEntries(mhChan, value, isRm) - - var count int - nchunk := nb.Build().(schema.EntryChunk) - entries := nchunk.FieldEntries() - // Iterate over all entries and ingest (or remove) them. - cit := entries.ListIterator() - for !cit.Done() { - _, cnode, _ := cit.Next() - h, err := cnode.AsBytes() - if err != nil { - close(mhChan) - return fmt.Errorf("cannot decode an entry from the ingestion list: %s", err) - } - - select { - case mhChan <- h: - case err = <-errChan: - return err - } - - count++ - } - close(mhChan) - err = <-errChan - if err != nil { - return err - } - - // If there is a next link, update the mapping so we know the AdID - // it is related to. - if !(nchunk.Next.IsAbsent() || nchunk.Next.IsNull()) { - lnk, err := nchunk.Next.AsNode().AsLink() - if err != nil { - return err + if keepCache { + ing.adCacheMutex.Lock() + if ing.adCache == nil { + ing.adCache = make(map[cid.Cid]adCacheItem) } - err = putCidToAdMapping(ing.ds, lnk, adCid) - if err != nil { - return err + ing.adCache[adCid] = adCacheItem{ + value: value, + isRm: isRm, } + ing.adCacheMutex.Unlock() } - return nil + return value, isRm, nil } // batchIndexerEntries starts a goroutine that processes batches of multihashes @@ -524,12 +578,10 @@ func (ing *Ingester) batchIndexerEntries(mhChan <-chan multihash.Multihash, valu // Process full batch of multihashes if err := indexFunc(value, batch...); err != nil { errChan <- err - log.Errorf("Cannot %s entries in indexer: %s", opName, err) return } batch = batch[:0] count += batchSize - log.Debugf("%s %d entries in value store", opName, batchSize) } } @@ -537,27 +589,29 @@ func (ing *Ingester) batchIndexerEntries(mhChan <-chan multihash.Multihash, valu // Process any remaining puts if err := indexFunc(value, batch...); err != nil { errChan <- err - log.Errorf("Cannot %s entries in indexer: %s", opName, err) return } count += len(batch) - log.Debugf("%s %d entries in value store", opName, len(batch)) } - log.Debugw("Processed entries", "count", count, "operation", opName) + log.Infow("Processed multihashes in entry chunk", "count", count, "operation", opName) }(ing.batchSize) return errChan } -func putCidToAdMapping(ds datastore.Batching, lnk ipld.Link, adCid cid.Cid) error { - return ds.Put(context.Background(), dsKey(admapPrefix+lnk.(cidlink.Link).Cid.String()), adCid.Bytes()) +func putCidToAdMapping(ctx context.Context, ds datastore.Batching, lnk ipld.Link, adCid cid.Cid) error { + err := ds.Put(ctx, dsKey(admapPrefix+lnk.(cidlink.Link).Cid.String()), adCid.Bytes()) + if err != nil { + return err + } + return ds.Sync(ctx, dsKey(admapPrefix)) } -func getCidToAdMapping(ds datastore.Batching, linkCid cid.Cid) (cid.Cid, error) { - val, err := ds.Get(context.Background(), dsKey(admapPrefix+linkCid.String())) +func getCidToAdMapping(ctx context.Context, ds datastore.Batching, linkCid cid.Cid) (cid.Cid, error) { + val, err := ds.Get(ctx, dsKey(admapPrefix+linkCid.String())) if err != nil { - return cid.Undef, fmt.Errorf("cannot read advertisement CID for entries CID from datastore: %s", err) + return cid.Undef, fmt.Errorf("cannot load advertisement CID for entries CID from datastore: %s", err) } adCid, err := cid.Cast(val) if err != nil { @@ -566,8 +620,8 @@ func getCidToAdMapping(ds datastore.Batching, linkCid cid.Cid) (cid.Cid, error) return adCid, nil } -func deleteCidToAdMapping(ds datastore.Batching, entries cid.Cid) error { - return ds.Delete(context.Background(), dsKey(admapPrefix+entries.String())) +func deleteCidToAdMapping(ctx context.Context, ds datastore.Batching, entries cid.Cid) error { + return ds.Delete(ctx, dsKey(admapPrefix+entries.String())) } // decodeIPLDNode decodes an ipld.Node from bytes read from an io.Reader. diff --git a/version.json b/version.json index 002fae3b8..b46e6d1b8 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.2.1" + "version": "v0.2.2" }