diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index de83f5bac..17344bdcc 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -835,7 +835,6 @@ func (ing *Ingester) RunWorkers(n int) { // assignments are processed preferentially over new advertisement chains. func (ing *Ingester) ingestWorker(ctx context.Context, syncFinishedEvents <-chan dagsync.SyncFinished, wkrNum int) { log := log.With("worker", wkrNum) - log.Info("started ingest worker") defer ing.waitForWorkers.Done() @@ -921,7 +920,7 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync } publisher := syncFinished.PeerID - log := log.With("publisher", publisher) + log := log.With("publisher", publisher, "worker", wkrNum) log.Infow("Advertisement chain synced", "length", syncFinished.Count) var rmCount int64 @@ -1066,13 +1065,14 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher count++ if ctx.Err() != nil { - log.Infow("Ingest worker canceled while processing ads", "err", ctx.Err()) + log.Infow("Ingest canceled while processing ads", "err", ctx.Err()) ing.inEvents <- adProcessedEvent{ publisher: publisher, headAdCid: headAdCid, adCid: ai.cid, err: ctx.Err(), } + log.Debug("Sent ad processed event for canceled processing") return } @@ -1103,6 +1103,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher headAdCid: headAdCid, adCid: ai.cid, } + log.Debug("Sent ad processed event for skipped ad") continue } @@ -1112,7 +1113,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher "progress", fmt.Sprintf("%d of %d", count, total), "lag", lag) - hasEnts, fromMirror, err := ing.ingestAd(ctx, publisher, ai.cid, ai.resync, frozen, lag, headProvider) + hasEnts, fromMirror, err := ing.ingestAd(ctx, publisher, ai.cid, ai.resync, frozen, lag, headProvider, wkrNum) if err != nil { var adIngestErr adIngestError if errors.As(err, &adIngestErr) { @@ -1141,11 +1142,13 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher // If err still not nil, then this is a non-permanent type of error. if err != nil { - errText := err.Error() - if errors.Is(err, errInternal) { - errText = errInternal.Error() + if !errors.Is(err, context.Canceled) { + errText := err.Error() + if errors.Is(err, errInternal) { + errText = errInternal.Error() + } + ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText)) } - ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText)) log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1) // Tell anyone waiting that the sync finished for this head because // of error. TODO(mm) would be better to propagate the error. @@ -1155,6 +1158,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher adCid: ai.cid, err: err, } + log.Debug("Sent ad processed event with error") return } } else { @@ -1171,6 +1175,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher if putMirror { if fromMirror && ing.mirror.readWriteSame() { + log.Debug("Removing temporary ad data") // If ad data retrieved from same mirror that is being written // to, then only clean up the data from local datastore, but do // not rewrite it to the mirror. @@ -1179,6 +1184,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher log.Errorw("Cannot remove advertisement data from datastore", "err", err) } } else { + log.Debug("Writing ad to CAR mirror") // If resyncing and not overwriting, then do not overwrite the // destination file if it already exists. preventOverwrite := ai.resync && !ing.overwriteMirrorOnResync @@ -1195,11 +1201,14 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher } } + log.Debug("Done processing ad") + // Distribute the atProcessedEvent notices to waiting Sync calls. ing.inEvents <- adProcessedEvent{ publisher: publisher, headAdCid: headAdCid, adCid: ai.cid, } + log.Debug("Sent ad processed event") } } diff --git a/internal/ingest/linksystem.go b/internal/ingest/linksystem.go index f33353b2b..b3ed71040 100644 --- a/internal/ingest/linksystem.go +++ b/internal/ingest/linksystem.go @@ -138,8 +138,8 @@ func verifyAdvertisement(n ipld.Node, reg *registry.Registry) (peer.ID, error) { // is the source of the indexed content, the provider is where content can be // retrieved from. It is the provider ID that needs to be stored by the // indexer. -func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) (bool, bool, error) { - log := log.With("publisher", publisherID, "adCid", adCid) +func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo, wkrNum int) (bool, bool, error) { + log := log.With("publisher", publisherID, "adCid", adCid, "worker", wkrNum) ad, err := ing.loadAd(adCid) if err != nil { @@ -202,6 +202,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci var extendedProviders *registry.ExtendedProviders if ad.ExtendedProvider != nil { + log.Debug("Advertisement has extended providers") if ad.IsRm { return false, false, adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")} } @@ -263,7 +264,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci log = log.With("contextID", base64.StdEncoding.EncodeToString(ad.ContextID)) if ad.IsRm { - log.Infow("Advertisement is for removal by context id") + log.Info("Advertisement is for removal by context id") err = ing.indexer.RemoveProviderContext(providerID, ad.ContextID) if err != nil { return false, false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)} @@ -277,6 +278,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci if ad.Entries != schema.NoEntries { return false, false, adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")} } + log.Info("Advertisement is for removal by context id") return false, false, nil } @@ -290,9 +292,9 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci } if frozen { - log.Infow("Indexer frozen, advertisement only updates metadata") + log.Info("Indexer frozen, advertisement only updates metadata") } else { - log.Infow("Advertisement is metadata update only") + log.Info("Advertisement is for metadata update only") } err = ing.indexer.Put(value) if err != nil { @@ -306,6 +308,8 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci return false, false, adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")} } + log.Debug("Advertisement has entries to sync") + if ing.syncTimeout != 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, ing.syncTimeout) @@ -316,6 +320,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci // If using a CAR reader, then try to get the advertisement CAR file first. if ing.mirror.canRead() { + log.Debug("Attempting to fetch entries from CAR mirror") mhCount, err = ing.ingestEntriesFromCar(ctx, ad, providerID, adCid, entriesCid, log) hasEnts := mhCount != 0 // If entries data successfully read from CAR file. @@ -324,6 +329,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci return hasEnts, true, nil } if !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot get advertisement from CAR mirror", "err", err) var adIngestErr adIngestError if errors.As(err, &adIngestErr) { switch adIngestErr.state { @@ -336,10 +342,15 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci // serving entries data. return hasEnts, false, err } + } else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return hasEnts, false, err } - log.Errorw("Cannot get advertisement from car store", "err", err) + // If any other error, proceed and try to fetch from publisher. + } else { + log.Debug("Advertisement not found in CAR mirror") } } + log.Debug("Fetching entries from publisher") // The ad.Entries link can point to either a chain of EntryChunks or a // HAMT. Sync the very first entry so that we can check which type it is. @@ -373,8 +384,10 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci } if isHAMT(node) { + log.Info("syncing hamt entries") mhCount, err = ing.ingestHamtFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log) } else { + log.Info("syncing entries") mhCount, err = ing.ingestEntriesFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log) } return mhCount != 0, false, err