diff --git a/p2p/sync.go b/p2p/sync.go index ddf4274f87..d609597ae5 100644 --- a/p2p/sync.go +++ b/p2p/sync.go @@ -46,7 +46,6 @@ func newSyncService(bc *blockchain.Blockchain, h host.Host, n *utils.Network, lo } } -//nolint:funlen func (s *syncService) start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -60,83 +59,86 @@ func (s *syncService) start(ctx context.Context) { s.log.Debugw("Continuous iteration", "i", i) iterCtx, cancelIteration := context.WithCancel(ctx) - - var nextHeight int - if curHeight, err := s.blockchain.Height(); err == nil { - nextHeight = int(curHeight) + 1 - } else if !errors.Is(err, db.ErrKeyNotFound) { - s.log.Errorw("Failed to get current height", "err", err) + nextHeight, err := s.getNextHeight() + if err != nil { + s.logError("Failed to get current height", err) + cancelIteration() + continue } s.log.Infow("Start Pipeline", "Current height", nextHeight-1, "Start", nextHeight) // todo change iteration to fetch several objects uint64(min(blockBehind, maxBlocks)) blockNumber := uint64(nextHeight) - headersAndSigsCh, err := s.genHeadersAndSigs(iterCtx, blockNumber) - if err != nil { - s.logError("Failed to get block headers parts", err) + if err := s.processBlock(iterCtx, blockNumber); err != nil { + s.logError("Failed to process block", err) cancelIteration() continue } - txsCh, err := s.genTransactions(iterCtx, blockNumber) - if err != nil { - s.logError("Failed to get transactions", err) - cancelIteration() - continue - } + cancelIteration() + } +} - eventsCh, err := s.genEvents(iterCtx, blockNumber) - if err != nil { - s.logError("Failed to get classes", err) - cancelIteration() - continue - } +func (s *syncService) getNextHeight() (int, error) { + curHeight, err := s.blockchain.Height() + if err == nil { + return int(curHeight) + 1, nil + } else if errors.Is(err, db.ErrKeyNotFound) { + return 0, nil + } + return 0, err +} - classesCh, err := s.genClasses(iterCtx, blockNumber) - if err != nil { - s.logError("Failed to get classes", err) - cancelIteration() - continue - } +func (s *syncService) processBlock(ctx context.Context, blockNumber uint64) error { + headersAndSigsCh, err := s.genHeadersAndSigs(ctx, blockNumber) + if err != nil { + return fmt.Errorf("failed to get block headers parts: %w", err) + } - stateDiffsCh, err := s.genStateDiffs(iterCtx, blockNumber) - if err != nil { - s.logError("Failed to get state diffs", err) - cancelIteration() - continue - } + txsCh, err := s.genTransactions(ctx, blockNumber) + if err != nil { + return fmt.Errorf("failed to get transactions: %w", err) + } - blocksCh := pipeline.Bridge(iterCtx, s.processSpecBlockParts(iterCtx, uint64(nextHeight), pipeline.FanIn(iterCtx, - pipeline.Stage(iterCtx, headersAndSigsCh, specBlockPartsFunc[specBlockHeaderAndSigs]), - pipeline.Stage(iterCtx, classesCh, specBlockPartsFunc[specClasses]), - pipeline.Stage(iterCtx, stateDiffsCh, specBlockPartsFunc[specContractDiffs]), - pipeline.Stage(iterCtx, txsCh, specBlockPartsFunc[specTxWithReceipts]), - pipeline.Stage(iterCtx, eventsCh, specBlockPartsFunc[specEvents]), - ))) - - for b := range blocksCh { - if b.err != nil { - // cannot process any more blocks - s.log.Errorw("Failed to process block", "err", b.err) - cancelIteration() - break - } + eventsCh, err := s.genEvents(ctx, blockNumber) + if err != nil { + return fmt.Errorf("failed to get events: %w", err) + } - storeTimer := time.Now() - err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses) - if err != nil { - s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err) - cancelIteration() - break - } + classesCh, err := s.genClasses(ctx, blockNumber) + if err != nil { + return fmt.Errorf("failed to get classes: %w", err) + } - s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), - "root", b.block.GlobalStateRoot.ShortString()) - s.listener.OnSyncStepDone(junoSync.OpStore, b.block.Number, time.Since(storeTimer)) + stateDiffsCh, err := s.genStateDiffs(ctx, blockNumber) + if err != nil { + return fmt.Errorf("failed to get state diffs: %w", err) + } + + blocksCh := pipeline.Bridge(ctx, s.processSpecBlockParts(ctx, blockNumber, pipeline.FanIn(ctx, + pipeline.Stage(ctx, headersAndSigsCh, specBlockPartsFunc[specBlockHeaderAndSigs]), + pipeline.Stage(ctx, classesCh, specBlockPartsFunc[specClasses]), + pipeline.Stage(ctx, stateDiffsCh, specBlockPartsFunc[specContractDiffs]), + pipeline.Stage(ctx, txsCh, specBlockPartsFunc[specTxWithReceipts]), + pipeline.Stage(ctx, eventsCh, specBlockPartsFunc[specEvents]), + ))) + + for b := range blocksCh { + if b.err != nil { + return fmt.Errorf("failed to process block: %w", b.err) } - cancelIteration() + + storeTimer := time.Now() + if err := s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses); err != nil { + return fmt.Errorf("failed to store block: %w", err) + } + + s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), + "root", b.block.GlobalStateRoot.ShortString()) + s.listener.OnSyncStepDone(junoSync.OpStore, b.block.Number, time.Since(storeTimer)) } + return nil } func specBlockPartsFunc[T specBlockHeaderAndSigs | specTxWithReceipts | specEvents | specClasses | specContractDiffs](i T) specBlockParts {