diff --git a/dot/mock_node_builder_test.go b/dot/mock_node_builder_test.go index 569273fc47..f710592848 100644 --- a/dot/mock_node_builder_test.go +++ b/dot/mock_node_builder_test.go @@ -142,18 +142,18 @@ func (mr *MocknodeBuilderIfaceMockRecorder) createNetworkService(config, stateSr } // createParachainHostService mocks base method. -func (m *MocknodeBuilderIface) createParachainHostService(net *network.Service, forkID string, st *state.Service) (*parachain.Service, error) { +func (m *MocknodeBuilderIface) createParachainHostService(net *network.Service, forkID string, st *state.Service, ks keystore.Keystore) (*parachain.Service, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "createParachainHostService", net, forkID, st) + ret := m.ctrl.Call(m, "createParachainHostService", net, forkID, st, ks) ret0, _ := ret[0].(*parachain.Service) ret1, _ := ret[1].(error) return ret0, ret1 } // createParachainHostService indicates an expected call of createParachainHostService. -func (mr *MocknodeBuilderIfaceMockRecorder) createParachainHostService(net, forkID, st any) *gomock.Call { +func (mr *MocknodeBuilderIfaceMockRecorder) createParachainHostService(net, forkID, st, ks any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "createParachainHostService", reflect.TypeOf((*MocknodeBuilderIface)(nil).createParachainHostService), net, forkID, st) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "createParachainHostService", reflect.TypeOf((*MocknodeBuilderIface)(nil).createParachainHostService), net, forkID, st, ks) } // createRPCService mocks base method. diff --git a/dot/network/notifications.go b/dot/network/notifications.go index dac2970be6..849a4349e5 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -270,6 +270,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc return } + info.peersData.setMutex(peer) stream, err := s.sendHandshake(peer, hs, info) if err != nil { logger.Debugf("failed to send handshake to peer %s on protocol %s: %s", peer, info.protocolID, err) diff --git a/dot/node.go b/dot/node.go index 525d63930e..653f368f6c 100644 --- a/dot/node.go +++ b/dot/node.go @@ -64,7 +64,8 @@ type nodeBuilderIface interface { ) (*core.Service, error) createGRANDPAService(config *cfg.Config, st *state.Service, ks KeyStore, net *network.Service, telemetryMailer Telemetry) (*grandpa.Service, error) - createParachainHostService(net *network.Service, forkID string, st *state.Service) (*parachain.Service, error) + createParachainHostService(net *network.Service, forkID string, st *state.Service, ks keystore.Keystore, + ) (*parachain.Service, error) newSyncService(config *cfg.Config, st *state.Service, finalityGadget BlockJustificationVerifier, verifier *babe.VerificationManager, cs *core.Service, net *network.Service, telemetryMailer Telemetry) (*dotsync.Service, error) @@ -381,7 +382,7 @@ func newNode(config *cfg.Config, } nodeSrvcs = append(nodeSrvcs, fg) - phs, err := builder.createParachainHostService(networkSrvc, gd.ForkID, stateSrvc) + phs, err := builder.createParachainHostService(networkSrvc, gd.ForkID, stateSrvc, ks.Para) if err != nil { return nil, err } diff --git a/dot/node_integration_test.go b/dot/node_integration_test.go index 5537eb3559..c851784ece 100644 --- a/dot/node_integration_test.go +++ b/dot/node_integration_test.go @@ -143,7 +143,7 @@ func TestNewNode(t *testing.T) { mockStateService := &state.Service{ Block: mockBlockState, } - phs, err := parachain.NewService(testNetworkService, "random_fork_id", mockStateService) + phs, err := parachain.NewService(testNetworkService, "random_fork_id", mockStateService, ks.Para) require.NoError(t, err) m.EXPECT().createRuntimeStorage(gomock.AssignableToTypeOf(&state.Service{})).Return(&runtime. @@ -180,6 +180,7 @@ func TestNewNode(t *testing.T) { gomock.AssignableToTypeOf(&network.Service{}), gomock.AssignableToTypeOf("random_fork_id"), gomock.AssignableToTypeOf(mockStateService), + gomock.AssignableToTypeOf(ks.Para), ).Return(phs, nil) got, err := newNode(initConfig, ks, m, mockServiceRegistry) diff --git a/dot/parachain/availability-store/availability_store.go b/dot/parachain/availability-store/availability_store.go index 786b64a34a..a4e6b70212 100644 --- a/dot/parachain/availability-store/availability_store.go +++ b/dot/parachain/availability-store/availability_store.go @@ -456,10 +456,10 @@ func (av *AvailabilityStoreSubsystem) processMessages() { } case parachaintypes.ActiveLeavesUpdateSignal: - av.ProcessActiveLeavesUpdateSignal() + av.ProcessActiveLeavesUpdateSignal(msg) case parachaintypes.BlockFinalizedSignal: - av.ProcessBlockFinalizedSignal() + av.ProcessBlockFinalizedSignal(msg) default: logger.Error(parachaintypes.ErrUnknownOverseerMessage.Error()) @@ -476,11 +476,11 @@ func (av *AvailabilityStoreSubsystem) processMessages() { } } -func (av *AvailabilityStoreSubsystem) ProcessActiveLeavesUpdateSignal() { +func (av *AvailabilityStoreSubsystem) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) { // TODO: #3630 } -func (av *AvailabilityStoreSubsystem) ProcessBlockFinalizedSignal() { +func (av *AvailabilityStoreSubsystem) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) { // TODO: #3630 } diff --git a/dot/parachain/backing/candidate_backing.go b/dot/parachain/backing/candidate_backing.go index 85348a75ae..e2371b1407 100644 --- a/dot/parachain/backing/candidate_backing.go +++ b/dot/parachain/backing/candidate_backing.go @@ -247,20 +247,20 @@ func (cb *CandidateBacking) processMessage(msg any, chRelayParentAndCommand chan case StatementMessage: return cb.handleStatementMessage(msg.RelayParent, msg.SignedFullStatement, chRelayParentAndCommand) case parachaintypes.ActiveLeavesUpdateSignal: - cb.ProcessActiveLeavesUpdateSignal() + cb.ProcessActiveLeavesUpdateSignal(msg) case parachaintypes.BlockFinalizedSignal: - cb.ProcessBlockFinalizedSignal() + cb.ProcessBlockFinalizedSignal(msg) default: return fmt.Errorf("%w: %T", parachaintypes.ErrUnknownOverseerMessage, msg) } return nil } -func (cb *CandidateBacking) ProcessActiveLeavesUpdateSignal() { +func (cb *CandidateBacking) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) { // TODO #3503 } -func (cb *CandidateBacking) ProcessBlockFinalizedSignal() { +func (cb *CandidateBacking) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) { // TODO #3644 } diff --git a/dot/parachain/collator-protocol/collator_side.go b/dot/parachain/collator-protocol/collator_side.go index 44be2beba9..5d90110f65 100644 --- a/dot/parachain/collator-protocol/collator_side.go +++ b/dot/parachain/collator-protocol/collator_side.go @@ -22,10 +22,10 @@ var ErrNotExpectedOnCollatorSide = errors.New("message is not expected on the co type CollatorProtocolCollatorSide struct { net Network - collatingOn parachaintypes.ParaID //nolint:unused + collatingOn parachaintypes.ParaID //nolint } -func (cpcs CollatorProtocolCollatorSide) processMessage(msg any) error { //nolint:unused +func (cpcs CollatorProtocolCollatorSide) processMessage(msg any) error { //nolint // run this function as a goroutine, ideally switch msg := msg.(type) { diff --git a/dot/parachain/collator-protocol/mock_subsystem_test.go b/dot/parachain/collator-protocol/mock_subsystem_test.go index 33f13b48c7..0908220055 100644 --- a/dot/parachain/collator-protocol/mock_subsystem_test.go +++ b/dot/parachain/collator-protocol/mock_subsystem_test.go @@ -55,27 +55,27 @@ func (mr *MockSubsystemMockRecorder) Name() *gomock.Call { } // ProcessActiveLeavesUpdateSignal mocks base method. -func (m *MockSubsystem) ProcessActiveLeavesUpdateSignal() { +func (m *MockSubsystem) ProcessActiveLeavesUpdateSignal(arg0 parachaintypes.ActiveLeavesUpdateSignal) { m.ctrl.T.Helper() - m.ctrl.Call(m, "ProcessActiveLeavesUpdateSignal") + m.ctrl.Call(m, "ProcessActiveLeavesUpdateSignal", arg0) } // ProcessActiveLeavesUpdateSignal indicates an expected call of ProcessActiveLeavesUpdateSignal. -func (mr *MockSubsystemMockRecorder) ProcessActiveLeavesUpdateSignal() *gomock.Call { +func (mr *MockSubsystemMockRecorder) ProcessActiveLeavesUpdateSignal(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessActiveLeavesUpdateSignal", reflect.TypeOf((*MockSubsystem)(nil).ProcessActiveLeavesUpdateSignal)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessActiveLeavesUpdateSignal", reflect.TypeOf((*MockSubsystem)(nil).ProcessActiveLeavesUpdateSignal), arg0) } // ProcessBlockFinalizedSignal mocks base method. -func (m *MockSubsystem) ProcessBlockFinalizedSignal() { +func (m *MockSubsystem) ProcessBlockFinalizedSignal(arg0 parachaintypes.BlockFinalizedSignal) { m.ctrl.T.Helper() - m.ctrl.Call(m, "ProcessBlockFinalizedSignal") + m.ctrl.Call(m, "ProcessBlockFinalizedSignal", arg0) } // ProcessBlockFinalizedSignal indicates an expected call of ProcessBlockFinalizedSignal. -func (mr *MockSubsystemMockRecorder) ProcessBlockFinalizedSignal() *gomock.Call { +func (mr *MockSubsystemMockRecorder) ProcessBlockFinalizedSignal(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessBlockFinalizedSignal", reflect.TypeOf((*MockSubsystem)(nil).ProcessBlockFinalizedSignal)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessBlockFinalizedSignal", reflect.TypeOf((*MockSubsystem)(nil).ProcessBlockFinalizedSignal), arg0) } // Run mocks base method. diff --git a/dot/parachain/collator-protocol/validator_side.go b/dot/parachain/collator-protocol/validator_side.go index 37fa5f8e85..8887474e1f 100644 --- a/dot/parachain/collator-protocol/validator_side.go +++ b/dot/parachain/collator-protocol/validator_side.go @@ -7,14 +7,21 @@ import ( "context" "errors" "fmt" + "reflect" + "sort" + "strconv" + "strings" "time" "github.com/ChainSafe/gossamer/dot/network" collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/dot/peerset" + "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/crypto/sr25519" + "github.com/ChainSafe/gossamer/lib/keystore" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "golang.org/x/exp/slices" @@ -45,6 +52,8 @@ var ( ErrDuplicateAdvertisement = errors.New("advertisement is already known") ErrPeerLimitReached = errors.New("limit for announcements per peer is reached") ErrNotAdvertised = errors.New("collation was not previously advertised") + + ErrInvalidStringFormat = errors.New("invalid string format for fetched collation info") ) func (cpvs CollatorProtocolValidatorSide) Run( @@ -115,12 +124,309 @@ func (cpvs CollatorProtocolValidatorSide) handleNetworkEvents(event network.Netw } } -func (cpvs CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal() { - // NOTE: nothing to do here +func (cpvs *CollatorProtocolValidatorSide) ProcessActiveLeavesUpdateSignal( + signal parachaintypes.ActiveLeavesUpdateSignal) { + // I might need to separate the collator protocol into two parts, one that deals with the + // network and other that deals with other subsystems. + // Make everythin less messing. + // https://github.com/paritytech/polkadot-sdk/blob/1b5f4243d159fbb7cf7067241aca8a37f3dbf7ed/polkadot/node/network/bridge/src/rx/mod.rs#L798 + + // this active leaves are handled in bridge in rust, read the code of bridge properly + + // TODO update cpvs.activeLeaves by adding new active leaves and removing deactivated ones + + // TODO: get the value for majorSyncing for syncing package + // majorSyncing means you are 5 blocks behind the tip of the chain and thus more aggressively + // download blocks etc to reach the tip of the chain faster. + var majorSyncing bool + + cpvs.liveHeads = append(cpvs.liveHeads, parachaintypes.ActivatedLeaf{ + Hash: signal.Activated.Hash, + Number: signal.Activated.Number, + }) + + newLiveHeads := []parachaintypes.ActivatedLeaf{} + + for _, head := range cpvs.liveHeads { + if slices.Contains(signal.Deactivated, head.Hash) { + newLiveHeads = append(newLiveHeads, head) + } + } + + sort.Sort(SortableActivatedLeaves(newLiveHeads)) + // TODO: do I need to store these live heads or just pass them to update view? + cpvs.liveHeads = newLiveHeads + + if !majorSyncing { + // update our view + err := cpvs.updateOurView() + if err != nil { + logger.Errorf("updating our view: %w", err) + } + } } -func (cpvs CollatorProtocolValidatorSide) ProcessBlockFinalizedSignal() { - // NOTE: nothing to do here +func (cpvs *CollatorProtocolValidatorSide) updateOurView() error { + headHashes := []common.Hash{} + for _, head := range cpvs.liveHeads { + headHashes = append(headHashes, head.Hash) + } + newView := View{ + heads: headHashes, + finalizedNumber: cpvs.finalizedNumber, + } + + if cpvs.localView == nil { + *cpvs.localView = newView + return nil + } + + if cpvs.localView.checkHeadsEqual(newView) { + // nothing to update + return nil + } + + *cpvs.localView = newView + + // TODO: send ViewUpdate to all the collation peers and validation peers (v1, v2, v3) + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/bridge/src/rx/mod.rs#L969-L1013 + + // TODO: Create our view and send collation events to all subsystems about our view change + // Just create the network bridge and do both of these tasks as part of those. That's the only way it makes sense. + + err := cpvs.handleOurViewChange(newView) + if err != nil { + return fmt.Errorf("handling our view change: %w", err) + } + return nil +} + +func (cpvs *CollatorProtocolValidatorSide) handleOurViewChange(view View) error { + // 1. Find out removed leaves (hashes) and newly added leaves + // 2. Go over each new leaves, + // - check if perspective parachain mode is enabled + // - assign incoming + // - insert active leaves and per relay parent + activeLeaves := cpvs.activeLeaves + + removed := []common.Hash{} + for activeLeaf := range activeLeaves { + if !slices.Contains(view.heads, activeLeaf) { + removed = append(removed, activeLeaf) + } + } + + newlyAdded := []common.Hash{} + for _, head := range view.heads { + if _, ok := activeLeaves[head]; !ok { + newlyAdded = append(newlyAdded, head) + } + } + + // handled newly added leaves + for _, leaf := range newlyAdded { + mode := prospectiveParachainMode() + + perRelayParent := &PerRelayParent{ + prospectiveParachainMode: mode, + } + + err := cpvs.assignIncoming(leaf, perRelayParent) + if err != nil { + return fmt.Errorf("assigning incoming: %w", err) + } + cpvs.activeLeaves[leaf] = mode + cpvs.perRelayParent[leaf] = *perRelayParent + + //nolint:staticcheck + if mode.IsEnabled { + // TODO: Add it when we have async backing + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1303 //nolint + } + } + + // handle removed leaves + for _, leaf := range removed { + delete(cpvs.activeLeaves, leaf) + + mode := prospectiveParachainMode() + pruned := []common.Hash{} + if mode.IsEnabled { + // TODO: Do this when we have async backing + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1340 //nolint + } else { + pruned = append(pruned, leaf) + } + + for _, prunedLeaf := range pruned { + perRelayParent, ok := cpvs.perRelayParent[prunedLeaf] + if ok { + cpvs.removeOutgoing(perRelayParent) + delete(cpvs.perRelayParent, prunedLeaf) + } + + for fetchedCandidateStr := range cpvs.fetchedCandidates { + fetchedCollation, err := fetchedCandidateFromString(fetchedCandidateStr) + if err != nil { + // this should never really happen + return fmt.Errorf("getting fetched collation from string: %w", err) + } + + if fetchedCollation.relayParent == prunedLeaf { + delete(cpvs.fetchedCandidates, fetchedCandidateStr) + } + } + } + + // TODO + // Remove blocked advertisements that left the view. cpvs.BlockedAdvertisements + // Re-trigger previously failed requests again. requestUnBlockedCollations + // prune old advertisements + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L1361-L1396 + + } + + return nil +} + +func (cpvs *CollatorProtocolValidatorSide) removeOutgoing(perRelayParent PerRelayParent) { + if perRelayParent.assignment != nil { + entry := cpvs.currentAssignments[*perRelayParent.assignment] + entry-- + if entry == 0 { + logger.Infof("unassigned from parachain with ID %d", *perRelayParent.assignment) + delete(cpvs.currentAssignments, *perRelayParent.assignment) + return + } + + cpvs.currentAssignments[*perRelayParent.assignment] = entry + } +} + +func (cpvs *CollatorProtocolValidatorSide) assignIncoming(relayParent common.Hash, perRelayParent *PerRelayParent, +) error { + // TODO: get this instance using relay parent + instance, err := cpvs.BlockState.GetRuntime(relayParent) + if err != nil { + return fmt.Errorf("getting runtime instance: %w", err) + } + + validators, err := instance.ParachainHostValidators() + if err != nil { + return fmt.Errorf("getting validators: %w", err) + } + + validatorGroups, err := instance.ParachainHostValidatorGroups() + if err != nil { + return fmt.Errorf("getting validator groups: %w", err) + } + + availabilityCores, err := instance.ParachainHostAvailabilityCores() + if err != nil { + return fmt.Errorf("getting availability cores: %w", err) + } + + validator, validatorIndex := signingKeyAndIndex(validators, cpvs.Keystore) + if validator == nil { + // return with an error? + return nil + } + + groupIndex, ok := findValidatorGroup(validatorIndex, *validatorGroups) + if !ok { + logger.Trace("not a validator") + return nil + } + + coreIndexNow := validatorGroups.GroupRotationInfo.CoreForGroup(groupIndex, uint8(len(availabilityCores.Types))) + coreNow, err := availabilityCores.Types[coreIndexNow.Index].Value() + if err != nil { + return fmt.Errorf("getting core now: %w", err) + } + + var paraNow *parachaintypes.ParaID + + switch c := coreNow.(type) /*coreNow.Index()*/ { + case parachaintypes.OccupiedCore: + *paraNow = parachaintypes.ParaID(c.CandidateDescriptor.ParaID) + case parachaintypes.ScheduledCore: + *paraNow = c.ParaID + case parachaintypes.Free: + // Nothing to do in case of free + + } + + if paraNow != nil { + entry := cpvs.currentAssignments[*paraNow] + entry++ + cpvs.currentAssignments[*paraNow] = entry + if entry == 1 { + logger.Infof("got assigned to parachain with ID %d", *paraNow) + } + } + + perRelayParent.assignment = paraNow + return nil +} + +func findValidatorGroup(validatorIndex parachaintypes.ValidatorIndex, validatorGroups parachaintypes.ValidatorGroups, +) (parachaintypes.GroupIndex, bool) { + for groupIndex, validatorGroup := range validatorGroups.Validators { + for _, i := range validatorGroup { + if i == validatorIndex { + return parachaintypes.GroupIndex(groupIndex), true + } + } + } + + return 0, false +} + +// signingKeyAndIndex finds the first key we can sign with from the given set of validators, +// if any, and returns it along with the validator index. +func signingKeyAndIndex(validators []parachaintypes.ValidatorID, ks keystore.Keystore, +) (*parachaintypes.ValidatorID, parachaintypes.ValidatorIndex) { + for i, validator := range validators { + publicKey, _ := sr25519.NewPublicKey(validator[:]) + keypair := ks.GetKeypair(publicKey) + + if keypair != nil { + return &validator, parachaintypes.ValidatorIndex(i) + } + } + + return nil, 0 +} + +func prospectiveParachainMode() parachaintypes.ProspectiveParachainsMode { + // TODO: complete this method by calling the runtime function + // https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/node/subsystem-util/src/runtime/mod.rs#L496 //nolint + // NOTE: We will return false until we have support for async backing + return parachaintypes.ProspectiveParachainsMode{ + IsEnabled: false, + } +} + +type SortableActivatedLeaves []parachaintypes.ActivatedLeaf + +func (s SortableActivatedLeaves) Len() int { + return len(s) +} + +func (s SortableActivatedLeaves) Less(i, j int) bool { + return s[i].Number > s[j].Number +} + +func (s SortableActivatedLeaves) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (cpvs *CollatorProtocolValidatorSide) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) { + if cpvs.finalizedNumber >= signal.BlockNumber { + // error + return + } + cpvs.finalizedNumber = signal.BlockNumber } func (cpvs CollatorProtocolValidatorSide) Stop() { @@ -136,8 +442,9 @@ func (cpvs CollatorProtocolValidatorSide) requestCollation(relayParent common.Ha paraID parachaintypes.ParaID, peerID peer.ID) (*parachaintypes.Collation, error) { // TODO: Make sure that the request can be done in MAX_UNSHARED_DOWNLOAD_TIME timeout - if !slices.Contains[[]common.Hash](cpvs.ourView.heads, relayParent) { - return nil, ErrCollationNotInView + _, ok := cpvs.perRelayParent[relayParent] + if !ok { + return nil, ErrOutOfView } // make collation fetching request @@ -296,11 +603,63 @@ const ( Collating ) +// The maximum amount of heads a peer is allowed to have in their view at any time. +// We use the same limit to compute the view sent to peers locally. +const MaxViewHeads uint8 = 5 + +// A succinct representation of a peer's view. This consists of a bounded amount of chain heads +// and the highest known finalized block number. +// +// Up to `N` (5?) chain heads. type View struct { // a bounded amount of chain heads heads []common.Hash // the highest known finalized number - finalizedNumber uint32 //nolint + finalizedNumber uint32 +} + +type SortableHeads []common.Hash + +func (s SortableHeads) Len() int { + return len(s) +} + +func (s SortableHeads) Less(i, j int) bool { + return s[i].String() > s[j].String() +} + +func (s SortableHeads) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// checkHeadsEqual checks if the heads of the view are equal to the heads of the other view. +func (v View) checkHeadsEqual(other View) bool { + if len(v.heads) != len(other.heads) { + return false + } + + localHeads := v.heads + sort.Sort(SortableHeads(localHeads)) + otherHeads := other.heads + sort.Sort(SortableHeads(otherHeads)) + + return reflect.DeepEqual(localHeads, otherHeads) +} + +func ConstructView(liveHeads map[common.Hash]struct{}, finalizedNumber uint32) View { + heads := make([]common.Hash, 0, len(liveHeads)) + for head := range liveHeads { + heads = append(heads, head) + } + + if len(heads) >= 5 { + heads = heads[:5] + } + + return View{ + heads: heads, + finalizedNumber: finalizedNumber, + } } // Network is the interface required by parachain service for the network @@ -333,7 +692,9 @@ type CollatorProtocolValidatorSide struct { ctx context.Context cancel context.CancelFunc - net Network + BlockState *state.BlockState + net Network + Keystore keystore.Keystore SubSystemToOverseer chan<- any OverseerToSubSystem <-chan any @@ -347,7 +708,15 @@ type CollatorProtocolValidatorSide struct { // track all active collators and their data peerData map[peer.ID]PeerData - ourView View + // TODO: Tech Debt + // In polkadot-sdk (rust) code, following fields are common between validation protocol and collator protocol. + // They are kept in network bridge. Network bridge has common logic for both validation and collator protocol. + // I have kept it here for ease, since we don't have network bridge. Make a decision on this. Create a network + // bridge if that seems appropriate. + // And move these fields and some common logic there. + localView *View + // validationPeers []peer.ID + // collationPeers []peer.ID // Parachains we're currently assigned to. With async backing enabled // this includes assignments from the implicit view. @@ -383,6 +752,11 @@ type CollatorProtocolValidatorSide struct { // Collations that we have successfully requested from peers and waiting // on validation. fetchedCandidates map[string]CollationEvent + + // heads are sorted in descending order by block number + liveHeads []parachaintypes.ActivatedLeaf + + finalizedNumber uint32 } // Identifier of a fetched collation @@ -400,6 +774,46 @@ func (f fetchedCollationInfo) String() string { f.relayParent.String(), f.paraID, f.candidateHash.Value.String(), f.collatorID) } +func fetchedCandidateFromString(str string) (fetchedCollationInfo, error) { + splits := strings.Split(str, ",") + if len(splits) != 4 { + return fetchedCollationInfo{}, fmt.Errorf("%w: %s", ErrInvalidStringFormat, str) + } + + relayParent, err := common.HexToHash(strings.TrimSpace(splits[0])) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting relay parent: %w", err) + } + + paraID, err := strconv.ParseUint(strings.TrimSpace(splits[1]), 10, 64) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting para id: %w", err) + } + + candidateHashBytes, err := common.HexToBytes(strings.TrimSpace(splits[2])) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting candidate hash bytes: %w", err) + } + + candidateHash := parachaintypes.CandidateHash{ + Value: common.NewHash(candidateHashBytes), + } + + var collatorID parachaintypes.CollatorID + collatorIDBytes, err := common.HexToBytes(strings.TrimSpace(splits[3])) + if err != nil { + return fetchedCollationInfo{}, fmt.Errorf("getting collator id bytes: %w", err) + } + copy(collatorID[:], collatorIDBytes) + + return fetchedCollationInfo{ + relayParent: relayParent, + paraID: parachaintypes.ParaID(paraID), + candidateHash: candidateHash, + collatorID: collatorID, + }, nil +} + type PerRelayParent struct { prospectiveParachainMode parachaintypes.ProspectiveParachainsMode assignment *parachaintypes.ParaID @@ -572,9 +986,9 @@ func (cpvs CollatorProtocolValidatorSide) processMessage(msg any) error { }, peerID) case parachaintypes.ActiveLeavesUpdateSignal: - cpvs.ProcessActiveLeavesUpdateSignal() + cpvs.ProcessActiveLeavesUpdateSignal(msg) case parachaintypes.BlockFinalizedSignal: - cpvs.ProcessBlockFinalizedSignal() + cpvs.ProcessBlockFinalizedSignal(msg) default: return parachaintypes.ErrUnknownOverseerMessage diff --git a/dot/parachain/network-bridge/receiver.go b/dot/parachain/network-bridge/receiver.go index a27bec736d..6737fa9a15 100644 --- a/dot/parachain/network-bridge/receiver.go +++ b/dot/parachain/network-bridge/receiver.go @@ -1,3 +1,6 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package networkbridge import ( diff --git a/dot/parachain/network-bridge/sender.go b/dot/parachain/network-bridge/sender.go index 33c2cfc85a..3394d3ab22 100644 --- a/dot/parachain/network-bridge/sender.go +++ b/dot/parachain/network-bridge/sender.go @@ -1,3 +1,6 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + package networkbridge import ( diff --git a/dot/parachain/overseer/overseer_test.go b/dot/parachain/overseer/overseer_test.go index 0f7762b60f..88a7381d4e 100644 --- a/dot/parachain/overseer/overseer_test.go +++ b/dot/parachain/overseer/overseer_test.go @@ -49,11 +49,11 @@ func (s *TestSubsystem) Run(ctx context.Context, OverseerToSubSystem chan any, S } } -func (s *TestSubsystem) ProcessActiveLeavesUpdateSignal() { +func (s *TestSubsystem) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) { fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name) } -func (s *TestSubsystem) ProcessBlockFinalizedSignal() { +func (s *TestSubsystem) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) { fmt.Printf("%s ProcessActiveLeavesUpdateSignal\n", s.name) } diff --git a/dot/parachain/overseer/types.go b/dot/parachain/overseer/types.go index 9a5b49a961..3394732e3c 100644 --- a/dot/parachain/overseer/types.go +++ b/dot/parachain/overseer/types.go @@ -14,7 +14,7 @@ type Subsystem interface { // Run runs the subsystem. Run(ctx context.Context, OverseerToSubSystem chan any, SubSystemToOverseer chan any) Name() parachaintypes.SubSystemName - ProcessActiveLeavesUpdateSignal() - ProcessBlockFinalizedSignal() + ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) + ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) Stop() } diff --git a/dot/parachain/service.go b/dot/parachain/service.go index 0f020962e3..becc72f014 100644 --- a/dot/parachain/service.go +++ b/dot/parachain/service.go @@ -17,6 +17,7 @@ import ( "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/keystore" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) @@ -33,7 +34,7 @@ type Service struct { var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain")) -func NewService(net Network, forkID string, st *state.Service) (*Service, error) { +func NewService(net Network, forkID string, st *state.Service, ks keystore.Keystore) (*Service, error) { overseer := overseer.NewOverseer(st.Block) err := overseer.Start() if err != nil { @@ -89,6 +90,8 @@ func NewService(net Network, forkID string, st *state.Service) (*Service, error) if err != nil { return nil, err } + cpvs.BlockState = st.Block + cpvs.Keystore = ks cpvs.OverseerToSubSystem = overseer.RegisterSubsystem(cpvs) parachainService := &Service{ diff --git a/dot/parachain/types/types.go b/dot/parachain/types/types.go index 3d417c5622..391f237344 100644 --- a/dot/parachain/types/types.go +++ b/dot/parachain/types/types.go @@ -36,6 +36,12 @@ type GroupRotationInfo struct { Now BlockNumber `scale:"3"` } +func (gri GroupRotationInfo) CoreForGroup(groupIndex GroupIndex, cores uint8) CoreIndex { + //nolint + // TODO: https://github.com/paritytech/polkadot-sdk/blob/aa68ea58f389c2aa4eefab4bf7bc7b787dd56580/polkadot/primitives/src/v6/mod.rs#L877 + return CoreIndex{} +} + // ValidatorGroups represents the validator groups type ValidatorGroups struct { // Validators is an array of validator set Ids diff --git a/dot/services.go b/dot/services.go index a73abdb4b4..a0fae0d879 100644 --- a/dot/services.go +++ b/dot/services.go @@ -457,9 +457,9 @@ func (nodeBuilder) createGRANDPAService(config *cfg.Config, st *state.Service, k return grandpa.NewService(gsCfg) } -func (nodeBuilder) createParachainHostService(net *network.Service, forkID string, st *state.Service) ( - *parachain.Service, error) { - return parachain.NewService(net, forkID, st) +func (nodeBuilder) createParachainHostService(net *network.Service, forkID string, st *state.Service, + ks keystore.Keystore) (*parachain.Service, error) { + return parachain.NewService(net, forkID, st, ks) } func (nodeBuilder) createBlockVerifier(st *state.Service) *babe.VerificationManager { diff --git a/lib/runtime/interface.go b/lib/runtime/interface.go index 74b3005283..0e93336398 100644 --- a/lib/runtime/interface.go +++ b/lib/runtime/interface.go @@ -59,6 +59,8 @@ type Instance interface { ParachainHostValidationCodeByHash(validationCodeHash common.Hash) (*parachaintypes.ValidationCode, error) ParachainHostValidators() ([]parachaintypes.ValidatorID, error) ParachainHostValidatorGroups() (*parachaintypes.ValidatorGroups, error) + // TODO: There might be a scope to have more go friendly return values here + // VaryingDataTypeSlice is not very nice to use. ParachainHostAvailabilityCores() (*scale.VaryingDataTypeSlice, error) ParachainHostCheckValidationOutputs( parachainID parachaintypes.ParaID, @@ -68,6 +70,8 @@ type Instance interface { ParachainHostCandidatePendingAvailability( parachainID parachaintypes.ParaID, ) (*parachaintypes.CommittedCandidateReceipt, error) + // TODO: There might be a scope to have more go friendly return values here + // VaryingDataTypeSlice is not very nice to use. ParachainHostCandidateEvents() (*scale.VaryingDataTypeSlice, error) ParachainHostSessionInfo(sessionIndex parachaintypes.SessionIndex) (*parachaintypes.SessionInfo, error) ParachainHostAsyncBackingParams() (*parachaintypes.AsyncBackingParams, error)