From 7cd237dc130076388f625e3ddbdbc5b30963b5e9 Mon Sep 17 00:00:00 2001 From: kishansagathiya Date: Mon, 9 Sep 2024 15:48:19 +0530 Subject: [PATCH 1/4] statement distribution skeleton --- .../backing/candidate_backing_test.go | 3 +- dot/parachain/backing/integration_test.go | 9 +-- .../backing/per_relay_parent_state.go | 3 +- .../backing/validated_candidate_command.go | 3 +- dot/parachain/overseer/overseer.go | 4 ++ .../messages/messages.go | 21 ++++++ .../statement_distribution.go | 70 +++++++++++++++++++ dot/parachain/types/overseer_message.go | 15 ---- dot/parachain/types/subsystems.go | 1 + 9 files changed, 107 insertions(+), 22 deletions(-) create mode 100644 dot/parachain/statement-distribution/messages/messages.go create mode 100644 dot/parachain/statement-distribution/statement_distribution.go diff --git a/dot/parachain/backing/candidate_backing_test.go b/dot/parachain/backing/candidate_backing_test.go index f2bbcb7ee7..6ec6935493 100644 --- a/dot/parachain/backing/candidate_backing_test.go +++ b/dot/parachain/backing/candidate_backing_test.go @@ -11,6 +11,7 @@ import ( availabilitystore "github.com/ChainSafe/gossamer/dot/parachain/availability-store" candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation" collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" + statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/sr25519" @@ -83,7 +84,7 @@ func mockOverseer(t *testing.T, subsystemToOverseer chan any) { parachaintypes.ProvisionerMessageProvisionableData, parachaintypes.ProspectiveParachainsMessageCandidateBacked, collatorprotocolmessages.Backed, - parachaintypes.StatementDistributionMessageBacked: + statementedistributionmessages.Backed: continue default: t.Errorf("unknown type: %T\n", data) diff --git a/dot/parachain/backing/integration_test.go b/dot/parachain/backing/integration_test.go index 41c2f0cfc5..3e5fd688cf 100644 --- a/dot/parachain/backing/integration_test.go +++ b/dot/parachain/backing/integration_test.go @@ -13,6 +13,7 @@ import ( candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation" collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" "github.com/ChainSafe/gossamer/dot/parachain/overseer" + statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto" @@ -396,7 +397,7 @@ func TestSecondsValidCandidate(t *testing.T) { distribute := func(msg any) bool { // we have seconded a candidate and shared the statement to peers - share, ok := msg.(parachaintypes.StatementDistributionMessageShare) + share, ok := msg.(statementedistributionmessages.Share) if !ok { return false } @@ -539,7 +540,7 @@ func TestCandidateReachesQuorum(t *testing.T) { validate := validResponseForValidateFromExhaustive(headData, pvd) distribute := func(msg any) bool { - _, ok := msg.(parachaintypes.StatementDistributionMessageShare) + _, ok := msg.(statementedistributionmessages.Share) return ok } @@ -844,7 +845,7 @@ func TestCanNotSecondMultipleCandidatesPerRelayParent(t *testing.T) { distribute := func(msg any) bool { // we have seconded a candidate and shared the statement to peers - share, ok := msg.(parachaintypes.StatementDistributionMessageShare) + share, ok := msg.(statementedistributionmessages.Share) if !ok { return false } @@ -996,7 +997,7 @@ func TestNewLeafDoesNotClobberOld(t *testing.T) { distribute := func(msg any) bool { // we have seconded a candidate and shared the statement to peers - share, ok := msg.(parachaintypes.StatementDistributionMessageShare) + share, ok := msg.(statementedistributionmessages.Share) if !ok { return false } diff --git a/dot/parachain/backing/per_relay_parent_state.go b/dot/parachain/backing/per_relay_parent_state.go index d9e90fb5a2..bf620ca730 100644 --- a/dot/parachain/backing/per_relay_parent_state.go +++ b/dot/parachain/backing/per_relay_parent_state.go @@ -11,6 +11,7 @@ import ( availabilitystore "github.com/ChainSafe/gossamer/dot/parachain/availability-store" candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation" collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" + statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/lib/runtime" wazero_runtime "github.com/ChainSafe/gossamer/lib/runtime/wazero" @@ -172,7 +173,7 @@ func (rpState *perRelayParentState) postImportStatement(subSystemToOverseer chan } // Notify statement distribution of backed candidate. - subSystemToOverseer <- parachaintypes.StatementDistributionMessageBacked(candidateHash) + subSystemToOverseer <- statementedistributionmessages.Backed(candidateHash) } else { // TODO: figure out what this comment means by 'avoid cycles'. diff --git a/dot/parachain/backing/validated_candidate_command.go b/dot/parachain/backing/validated_candidate_command.go index 6f7a1b1a96..4a14e9a1c6 100644 --- a/dot/parachain/backing/validated_candidate_command.go +++ b/dot/parachain/backing/validated_candidate_command.go @@ -8,6 +8,7 @@ import ( "fmt" collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" + statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/keystore" @@ -345,7 +346,7 @@ func signImportAndDistributeStatement( } // `Share` must always be sent before `Backed`. We send the latter in `postImportStatement` below. - subSystemToOverseer <- parachaintypes.StatementDistributionMessageShare{ + subSystemToOverseer <- statementedistributionmessages.Share{ RelayParent: rpState.relayParent, SignedFullStatementWithPVD: signedStatementWithPVD, } diff --git a/dot/parachain/overseer/overseer.go b/dot/parachain/overseer/overseer.go index 30431808ae..b3c33ba1fb 100644 --- a/dot/parachain/overseer/overseer.go +++ b/dot/parachain/overseer/overseer.go @@ -15,6 +15,7 @@ import ( collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" networkbridgemessages "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages" parachain "github.com/ChainSafe/gossamer/dot/parachain/runtime" + statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/dot/parachain/util" "github.com/ChainSafe/gossamer/dot/types" @@ -142,6 +143,9 @@ func (o *OverseerSystem) processMessages() { subsystem = o.nameToSubsystem[parachaintypes.AvailabilityStore] + case statementedistributionmessages.Share, statementedistributionmessages.Backed: + subsystem = o.nameToSubsystem[parachaintypes.StatementDistribution] + case chainapi.ChainAPIMessage[util.Ancestors], chainapi.ChainAPIMessage[chainapi.BlockHeader]: subsystem = o.nameToSubsystem[parachaintypes.ChainAPI] diff --git a/dot/parachain/statement-distribution/messages/messages.go b/dot/parachain/statement-distribution/messages/messages.go new file mode 100644 index 0000000000..9a91317f39 --- /dev/null +++ b/dot/parachain/statement-distribution/messages/messages.go @@ -0,0 +1,21 @@ +package messages + +import ( + parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" + "github.com/ChainSafe/gossamer/lib/common" +) + +// Backed is a statement distribution message. +// it represents a message indicating that a candidate has received sufficient +// validity votes from the backing group. If backed as a result of a local statement, +// it must be preceded by a `Share` message for that statement to ensure awareness of +// full candidates before the `Backed` notification, even in groups of size 1. +type Backed parachaintypes.CandidateHash + +// Share is a statement distribution message. +// It is a signed statement in the context of +// given relay-parent hash and it should be distributed to other validators. +type Share struct { + RelayParent common.Hash + SignedFullStatementWithPVD parachaintypes.SignedFullStatementWithPVD +} diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go new file mode 100644 index 0000000000..6989963c0e --- /dev/null +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -0,0 +1,70 @@ +package statementdistribution + +import ( + "context" + + "github.com/ChainSafe/gossamer/internal/log" + + statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" + parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" +) + +var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) + +type StatementDistribution struct { +} + +func (s StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { + + for { + select { + case msg, ok := <-overseerToSubSystem: + if !ok { + return + } + + err := s.processMessage(msg) + if err != nil { + logger.Errorf("processing overseer message: %w", err) + } + } + } + +} + +func (s StatementDistribution) processMessage(msg any) error { + + switch msg := msg.(type) { + case statementedistributionmessages.Backed: + // todo + case statementedistributionmessages.Share: + // todo + // case statementedistributionmessages.NetworkBridgeUpdate + // TODO this above case would need to wait + case parachaintypes.ActiveLeavesUpdateSignal: + return s.ProcessActiveLeavesUpdateSignal(msg) + case parachaintypes.BlockFinalizedSignal: + return s.ProcessBlockFinalizedSignal(msg) + + default: + return parachaintypes.ErrUnknownOverseerMessage + } + + return nil +} + +func (s StatementDistribution) Name() parachaintypes.SubSystemName { + return parachaintypes.StatementDistribution +} + +func (s StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { + // todo + return nil +} + +func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error { + // nothing to do here + return nil +} + +func (s StatementDistribution) Stop() {} diff --git a/dot/parachain/types/overseer_message.go b/dot/parachain/types/overseer_message.go index e696ed0bb4..c7226e41f3 100644 --- a/dot/parachain/types/overseer_message.go +++ b/dot/parachain/types/overseer_message.go @@ -44,21 +44,6 @@ type ProvisionableDataMisbehaviorReport struct { func (ProvisionableDataMisbehaviorReport) IsProvisionableData() {} -// StatementDistributionMessageBacked is a statement distribution message. -// it represents a message indicating that a candidate has received sufficient -// validity votes from the backing group. If backed as a result of a local statement, -// it must be preceded by a `Share` message for that statement to ensure awareness of -// full candidates before the `Backed` notification, even in groups of size 1. -type StatementDistributionMessageBacked CandidateHash - -// StatementDistributionMessageShare is a statement distribution message. -// It is a signed statement in the context of -// given relay-parent hash and it should be distributed to other validators. -type StatementDistributionMessageShare struct { - RelayParent common.Hash - SignedFullStatementWithPVD SignedFullStatementWithPVD -} - // ProspectiveParachainsMessageGetTreeMembership is a prospective parachains message. // It is intended for retrieving the membership of a candidate in all fragment trees type ProspectiveParachainsMessageGetTreeMembership struct { diff --git a/dot/parachain/types/subsystems.go b/dot/parachain/types/subsystems.go index 0fed974f12..f56c2ae548 100644 --- a/dot/parachain/types/subsystems.go +++ b/dot/parachain/types/subsystems.go @@ -18,6 +18,7 @@ const ( NetworkBridgeReceiver SubSystemName = "NetworkBridgeReceiver" ChainAPI SubSystemName = "ChainAPI" CandidateValidation SubSystemName = "CandidateValidation" + StatementDistribution SubSystemName = "StatementDistribution" ) var SubsystemRequestTimeout = 1 * time.Second From 44e601dab91e2dbdb1c87d72f802803e10d5fd7a Mon Sep 17 00:00:00 2001 From: kishansagathiya Date: Mon, 9 Sep 2024 17:25:50 +0530 Subject: [PATCH 2/4] tiny fix --- .../statement_distribution.go | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 6989963c0e..88f3670f47 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -16,31 +16,23 @@ type StatementDistribution struct { func (s StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { - for { - select { - case msg, ok := <-overseerToSubSystem: - if !ok { - return - } - - err := s.processMessage(msg) - if err != nil { - logger.Errorf("processing overseer message: %w", err) - } + for msg := range overseerToSubSystem { + err := s.processMessage(msg) + if err != nil { + logger.Errorf("processing overseer message: %w", err) } } - } func (s StatementDistribution) processMessage(msg any) error { switch msg := msg.(type) { case statementedistributionmessages.Backed: - // todo + // TODO #4171 case statementedistributionmessages.Share: - // todo + // TODO #4170 // case statementedistributionmessages.NetworkBridgeUpdate - // TODO this above case would need to wait + // TODO #4172 this above case would need to wait until network bridge receiver side is merged case parachaintypes.ActiveLeavesUpdateSignal: return s.ProcessActiveLeavesUpdateSignal(msg) case parachaintypes.BlockFinalizedSignal: @@ -58,7 +50,7 @@ func (s StatementDistribution) Name() parachaintypes.SubSystemName { } func (s StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { - // todo + // TODO #4173 return nil } From 08800cb8c6e3115bc2eefbb3980ad352bf25f223 Mon Sep 17 00:00:00 2001 From: Kishan Mohanbhai Sagathiya Date: Wed, 2 Oct 2024 11:51:05 +0530 Subject: [PATCH 3/4] addressed some reviews --- .../backing/candidate_backing_test.go | 4 ++-- .../statement_distribution.go | 19 ++++++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/dot/parachain/backing/candidate_backing_test.go b/dot/parachain/backing/candidate_backing_test.go index 6ec6935493..505f9ebd08 100644 --- a/dot/parachain/backing/candidate_backing_test.go +++ b/dot/parachain/backing/candidate_backing_test.go @@ -11,7 +11,7 @@ import ( availabilitystore "github.com/ChainSafe/gossamer/dot/parachain/availability-store" candidatevalidation "github.com/ChainSafe/gossamer/dot/parachain/candidate-validation" collatorprotocolmessages "github.com/ChainSafe/gossamer/dot/parachain/collator-protocol/messages" - statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" + statementdistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/sr25519" @@ -84,7 +84,7 @@ func mockOverseer(t *testing.T, subsystemToOverseer chan any) { parachaintypes.ProvisionerMessageProvisionableData, parachaintypes.ProspectiveParachainsMessageCandidateBacked, collatorprotocolmessages.Backed, - statementedistributionmessages.Backed: + statementdistributionmessages.Backed: continue default: t.Errorf("unknown type: %T\n", data) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 88f3670f47..e9d01812db 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -15,11 +15,20 @@ type StatementDistribution struct { } func (s StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { - - for msg := range overseerToSubSystem { - err := s.processMessage(msg) - if err != nil { - logger.Errorf("processing overseer message: %w", err) + for { + select { + case msg, ok := <-overseerToSubSystem: + if !ok { + return + } + err := s.processMessage(msg) + if err != nil { + logger.Errorf("processing overseer message: %w", err) + } + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + logger.Errorf("ctx error: %v\n", err) + } } } } From 63caae39c4e25f15fec237b3aeb4069961083b5a Mon Sep 17 00:00:00 2001 From: Kishan Mohanbhai Sagathiya Date: Wed, 2 Oct 2024 11:57:53 +0530 Subject: [PATCH 4/4] fix --- dot/parachain/backing/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/parachain/backing/integration_test.go b/dot/parachain/backing/integration_test.go index 34b5429c6a..cf2439be0a 100644 --- a/dot/parachain/backing/integration_test.go +++ b/dot/parachain/backing/integration_test.go @@ -1138,7 +1138,7 @@ func TestConflictingStatementIsMisbehavior(t *testing.T) { validate := validResponseForValidateFromExhaustive(headData, pvd) distribute := func(msg any) bool { - _, ok := msg.(parachaintypes.StatementDistributionMessageShare) + _, ok := msg.(statementedistributionmessages.Share) return ok }