Skip to content

Commit

Permalink
Handle context concelation during ingestion
Browse files Browse the repository at this point in the history
In some places context cancelation was not handled during ingestion. This caused the indexer to be unresponsive to a shutdown request. This change will include context cancelation when selecting to read or write channels.

Adds additional debug logging.
  • Loading branch information
gammazero committed Apr 2, 2024
1 parent 021ab38 commit e35cb13
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
25 changes: 17 additions & 8 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,6 @@ func (ing *Ingester) RunWorkers(n int) {
// assignments are processed preferentially over new advertisement chains.
func (ing *Ingester) ingestWorker(ctx context.Context, syncFinishedEvents <-chan dagsync.SyncFinished, wkrNum int) {
log := log.With("worker", wkrNum)

log.Info("started ingest worker")
defer ing.waitForWorkers.Done()

Expand Down Expand Up @@ -921,7 +920,7 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync
}

publisher := syncFinished.PeerID
log := log.With("publisher", publisher)
log := log.With("publisher", publisher, "worker", wkrNum)
log.Infow("Advertisement chain synced", "length", syncFinished.Count)

var rmCount int64
Expand Down Expand Up @@ -1066,13 +1065,14 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
count++

if ctx.Err() != nil {
log.Infow("Ingest worker canceled while processing ads", "err", ctx.Err())
log.Infow("Ingest canceled while processing ads", "err", ctx.Err())
ing.inEvents <- adProcessedEvent{
publisher: publisher,
headAdCid: headAdCid,
adCid: ai.cid,
err: ctx.Err(),
}
log.Debug("Sent ad processed event for canceled processing")
return
}

Expand Down Expand Up @@ -1103,6 +1103,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
headAdCid: headAdCid,
adCid: ai.cid,
}
log.Debug("Sent ad processed event for skipped ad")
continue
}

Expand All @@ -1112,7 +1113,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
"progress", fmt.Sprintf("%d of %d", count, total),
"lag", lag)

hasEnts, fromMirror, err := ing.ingestAd(ctx, publisher, ai.cid, ai.resync, frozen, lag, headProvider)
hasEnts, fromMirror, err := ing.ingestAd(ctx, publisher, ai.cid, ai.resync, frozen, lag, headProvider, wkrNum)
if err != nil {
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
Expand Down Expand Up @@ -1141,11 +1142,13 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher

// If err still not nil, then this is a non-permanent type of error.
if err != nil {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
if !errors.Is(err, context.Canceled) {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1)
// Tell anyone waiting that the sync finished for this head because
// of error. TODO(mm) would be better to propagate the error.
Expand All @@ -1155,6 +1158,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
adCid: ai.cid,
err: err,
}
log.Debug("Sent ad processed event with error")
return
}
} else {
Expand All @@ -1171,6 +1175,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher

if putMirror {
if fromMirror && ing.mirror.readWriteSame() {
log.Debug("Removing temporary ad data")
// If ad data retrieved from same mirror that is being written
// to, then only clean up the data from local datastore, but do
// not rewrite it to the mirror.
Expand All @@ -1179,6 +1184,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
log.Errorw("Cannot remove advertisement data from datastore", "err", err)
}
} else {
log.Debug("Writing ad to CAR mirror")
// If resyncing and not overwriting, then do not overwrite the
// destination file if it already exists.
preventOverwrite := ai.resync && !ing.overwriteMirrorOnResync
Expand All @@ -1195,11 +1201,14 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
}
}

log.Debug("Done processing ad")

// Distribute the atProcessedEvent notices to waiting Sync calls.
ing.inEvents <- adProcessedEvent{
publisher: publisher,
headAdCid: headAdCid,
adCid: ai.cid,
}
log.Debug("Sent ad processed event")
}
}
25 changes: 19 additions & 6 deletions internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func verifyAdvertisement(n ipld.Node, reg *registry.Registry) (peer.ID, error) {
// is the source of the indexed content, the provider is where content can be
// retrieved from. It is the provider ID that needs to be stored by the
// indexer.
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) (bool, bool, error) {
log := log.With("publisher", publisherID, "adCid", adCid)
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo, wkrNum int) (bool, bool, error) {
log := log.With("publisher", publisherID, "adCid", adCid, "worker", wkrNum)

ad, err := ing.loadAd(adCid)
if err != nil {
Expand Down Expand Up @@ -202,6 +202,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci

var extendedProviders *registry.ExtendedProviders
if ad.ExtendedProvider != nil {
log.Debug("Advertisement has extended providers")
if ad.IsRm {
return false, false, adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")}
}
Expand Down Expand Up @@ -263,7 +264,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
log = log.With("contextID", base64.StdEncoding.EncodeToString(ad.ContextID))

if ad.IsRm {
log.Infow("Advertisement is for removal by context id")
log.Info("Advertisement is for removal by context id")
err = ing.indexer.RemoveProviderContext(providerID, ad.ContextID)
if err != nil {
return false, false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)}
Expand All @@ -277,6 +278,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
if ad.Entries != schema.NoEntries {
return false, false, adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")}
}
log.Info("Advertisement is for removal by context id")
return false, false, nil
}

Expand All @@ -290,9 +292,9 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
}

if frozen {
log.Infow("Indexer frozen, advertisement only updates metadata")
log.Info("Indexer frozen, advertisement only updates metadata")
} else {
log.Infow("Advertisement is metadata update only")
log.Info("Advertisement is for metadata update only")
}
err = ing.indexer.Put(value)
if err != nil {
Expand All @@ -306,6 +308,8 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
return false, false, adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")}
}

log.Debug("Advertisement has entries to sync")

if ing.syncTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, ing.syncTimeout)
Expand All @@ -316,6 +320,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci

// If using a CAR reader, then try to get the advertisement CAR file first.
if ing.mirror.canRead() {
log.Debug("Attempting to fetch entries from CAR mirror")
mhCount, err = ing.ingestEntriesFromCar(ctx, ad, providerID, adCid, entriesCid, log)
hasEnts := mhCount != 0
// If entries data successfully read from CAR file.
Expand All @@ -324,6 +329,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
return hasEnts, true, nil
}
if !errors.Is(err, fs.ErrNotExist) {
log.Errorw("Cannot get advertisement from CAR mirror", "err", err)
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
Expand All @@ -336,10 +342,15 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
// serving entries data.
return hasEnts, false, err
}
} else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return hasEnts, false, err
}
log.Errorw("Cannot get advertisement from car store", "err", err)
// If any other error, proceed and try to fetch from publisher.
} else {
log.Debug("Advertisement not found in CAR mirror")
}
}
log.Debug("Fetching entries from publisher")

// The ad.Entries link can point to either a chain of EntryChunks or a
// HAMT. Sync the very first entry so that we can check which type it is.
Expand Down Expand Up @@ -373,8 +384,10 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
}

if isHAMT(node) {
log.Info("syncing hamt entries")
mhCount, err = ing.ingestHamtFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
} else {
log.Info("syncing entries")
mhCount, err = ing.ingestEntriesFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
}
return mhCount != 0, false, err
Expand Down

0 comments on commit e35cb13

Please sign in to comment.