Skip to content

Commit

Permalink
feat(dot/parachain): create worker pool for PVF host (#4101)
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack authored Sep 19, 2024
1 parent a37a18a commit 768b6d5
Show file tree
Hide file tree
Showing 19 changed files with 1,565 additions and 844 deletions.
4 changes: 2 additions & 2 deletions dot/parachain/backing/candidate_backing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func TestValidateAndMakeAvailable(t *testing.T) {
ci := candidatevalidation.ExecutionError
data.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
InvalidResult: &ci,
Invalid: &ci,
},
}
default:
Expand Down Expand Up @@ -768,7 +768,7 @@ func TestValidateAndMakeAvailable(t *testing.T) {
case candidatevalidation.ValidateFromExhaustive:
data.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{},
Valid: &candidatevalidation.Valid{},
},
}
case availabilitystore.StoreAvailableData:
Expand Down
4 changes: 2 additions & 2 deletions dot/parachain/backing/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func validResponseForValidateFromExhaustive(

msgValidate.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{
Valid: &candidatevalidation.Valid{
CandidateCommitments: parachaintypes.CandidateCommitments{
HeadData: headData,
UpwardMessages: []parachaintypes.UpwardMessage{},
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestSecondsValidCandidate(t *testing.T) {
badReturn := candidatevalidation.BadReturn
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
InvalidResult: &badReturn,
Invalid: &badReturn,
},
}
return true
Expand Down
8 changes: 4 additions & 4 deletions dot/parachain/backing/per_relay_parent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ func (rpState *perRelayParentState) validateAndMakeAvailable(
bgValidationResult = backgroundValidationResult{
outputs: &backgroundValidationOutputs{
candidateReceipt: candidateReceipt,
candidateCommitments: validationResultRes.Data.ValidResult.CandidateCommitments,
persistedValidationData: validationResultRes.Data.ValidResult.PersistedValidationData,
candidateCommitments: validationResultRes.Data.Valid.CandidateCommitments,
persistedValidationData: validationResultRes.Data.Valid.PersistedValidationData,
},
candidate: nil,
err: nil,
Expand All @@ -358,11 +358,11 @@ func (rpState *perRelayParentState) validateAndMakeAvailable(
}

} else { // Invalid
logger.Error(validationResultRes.Data.InvalidResult.Error())
logger.Error(validationResultRes.Data.Invalid.Error())
bgValidationResult = backgroundValidationResult{
outputs: nil,
candidate: &candidateReceipt,
err: fmt.Errorf(validationResultRes.Data.InvalidResult.Error()),
err: fmt.Errorf(validationResultRes.Data.Invalid.Error()),
}
}

Expand Down
195 changes: 43 additions & 152 deletions dot/parachain/candidate-validation/candidate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,15 @@ import (

parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/pkg/scale"
)

var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain-candidate-validation"))

var (
ErrValidationCodeMismatch = errors.New("validation code hash does not match")
ErrValidationInputOverLimit = errors.New("validation input is over the limit")
)

// CandidateValidation is a parachain subsystem that validates candidate parachain blocks
type CandidateValidation struct {
SubsystemToOverseer chan<- any
ValidationHost parachainruntime.ValidationHost
BlockState BlockState
pvfHost *host // pvfHost is the host for the parachain validation function
}

type BlockState interface {
Expand All @@ -38,6 +29,7 @@ type BlockState interface {
func NewCandidateValidation(overseerChan chan<- any, blockState BlockState) *CandidateValidation {
candidateValidation := CandidateValidation{
SubsystemToOverseer: overseerChan,
pvfHost: newValidationHost(),
BlockState: blockState,
}
return &candidateValidation
Expand All @@ -48,7 +40,6 @@ func (cv *CandidateValidation) Run(ctx context.Context, overseerToSubsystem <-ch
for {
select {
case msg := <-overseerToSubsystem:
logger.Debugf("received message %v", msg)
cv.processMessage(msg)
case <-ctx.Done():
if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -77,36 +68,25 @@ func (*CandidateValidation) ProcessBlockFinalizedSignal(parachaintypes.BlockFina
}

// Stop stops the CandidateValidation subsystem
func (cv *CandidateValidation) Stop() {
func (*CandidateValidation) Stop() {
}

// processMessage processes messages sent to the CandidateValidation subsystem
func (cv *CandidateValidation) processMessage(msg any) {
switch msg := msg.(type) {
case ValidateFromChainState:
runtimeInstance, err := cv.BlockState.GetRuntime(msg.CandidateReceipt.Descriptor.RelayParent)
if err != nil {
logger.Errorf("failed to get runtime: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
return
}
result, err := validateFromChainState(runtimeInstance, msg.Pov, msg.CandidateReceipt)
if err != nil {
logger.Errorf("failed to validate from chain state: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Data: *result,
}
cv.validateFromChainState(msg)
case ValidateFromExhaustive:
validationTask := &ValidationTask{
PersistedValidationData: msg.PersistedValidationData,
ValidationCode: &msg.ValidationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.PoV,
ExecutorParams: msg.ExecutorParams,
PvfExecTimeoutKind: msg.PvfExecTimeoutKind,
}

case ValidateFromExhaustive:
result, err := validateFromExhaustive(cv.ValidationHost, msg.PersistedValidationData,
msg.ValidationCode, msg.CandidateReceipt, msg.PoV)
result, err := cv.pvfHost.validate(validationTask)
if err != nil {
logger.Errorf("failed to validate from exhaustive: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Expand Down Expand Up @@ -171,134 +151,45 @@ func getValidationData(runtimeInstance parachainruntime.RuntimeInstance, paraID
return nil, nil, fmt.Errorf("getting persisted validation data: %w", mergedError)
}

// validateFromChainState validates a candidate parachain block with provided parameters using relay-chain
// state and using the parachain runtime.
func validateFromChainState(runtimeInstance parachainruntime.RuntimeInstance, pov parachaintypes.PoV,
candidateReceipt parachaintypes.CandidateReceipt) (
*ValidationResult, error) {

persistedValidationData, validationCode, err := getValidationData(runtimeInstance,
candidateReceipt.Descriptor.ParaID)
// validateFromChainState validates a parachain block from chain state message
func (cv *CandidateValidation) validateFromChainState(msg ValidateFromChainState) {
runtimeInstance, err := cv.BlockState.GetRuntime(msg.CandidateReceipt.Descriptor.RelayParent)
if err != nil {
return nil, fmt.Errorf("getting validation data: %w", err)
}

parachainRuntimeInstance, err := parachainruntime.SetupVM(*validationCode)
if err != nil {
return nil, fmt.Errorf("setting up VM: %w", err)
}

validationResults, err := validateFromExhaustive(parachainRuntimeInstance, *persistedValidationData,
*validationCode, candidateReceipt, pov)
if err != nil {
return nil, fmt.Errorf("validating from exhaustive: %w", err)
}
return validationResults, nil
}

// validateFromExhaustive validates a candidate parachain block with provided parameters
func validateFromExhaustive(validationHost parachainruntime.ValidationHost,
persistedValidationData parachaintypes.PersistedValidationData,
validationCode parachaintypes.ValidationCode,
candidateReceipt parachaintypes.CandidateReceipt, pov parachaintypes.PoV) (
*ValidationResult, error) {

validationCodeHash := validationCode.Hash()
// basic checks
validationErr, internalErr := performBasicChecks(&candidateReceipt.Descriptor, persistedValidationData.MaxPovSize,
pov,
validationCodeHash)
if internalErr != nil {
return nil, fmt.Errorf("performing basic checks: %w", internalErr)
}

if validationErr != nil {
validationResult := &ValidationResult{
InvalidResult: validationErr,
logger.Errorf("getting runtime instance: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: fmt.Errorf("getting runtime instance: %w", err),
}
return validationResult, nil //nolint: nilerr
}

validationParams := parachainruntime.ValidationParameters{
ParentHeadData: persistedValidationData.ParentHead,
BlockData: pov.BlockData,
RelayParentNumber: persistedValidationData.RelayParentNumber,
RelayParentStorageRoot: persistedValidationData.RelayParentStorageRoot,
}

validationResult, err := validationHost.ValidateBlock(validationParams)
// TODO: implement functionality to parse errors generated by the runtime when PVF host is implemented, issue #3934
if err != nil {
return nil, fmt.Errorf("executing validate_block: %w", err)
return
}

headDataHash, err := validationResult.HeadData.Hash()
if err != nil {
return nil, fmt.Errorf("hashing head data: %w", err)
}

if headDataHash != candidateReceipt.Descriptor.ParaHead {
ci := ParaHeadHashMismatch
return &ValidationResult{InvalidResult: &ci}, nil
}
candidateCommitments := parachaintypes.CandidateCommitments{
UpwardMessages: validationResult.UpwardMessages,
HorizontalMessages: validationResult.HorizontalMessages,
NewValidationCode: validationResult.NewValidationCode,
HeadData: validationResult.HeadData,
ProcessedDownwardMessages: validationResult.ProcessedDownwardMessages,
HrmpWatermark: validationResult.HrmpWatermark,
}

// if validation produced a new set of commitments, we treat the candidate as invalid
if candidateReceipt.CommitmentsHash != candidateCommitments.Hash() {
ci := CommitmentsHashMismatch
return &ValidationResult{InvalidResult: &ci}, nil
}
return &ValidationResult{
ValidResult: &ValidValidationResult{
CandidateCommitments: candidateCommitments,
PersistedValidationData: persistedValidationData,
},
}, nil

}

// performBasicChecks Does basic checks of a candidate. Provide the encoded PoV-block.
// Returns ReasonForInvalidity and internal error if any.
func performBasicChecks(candidate *parachaintypes.CandidateDescriptor, maxPoVSize uint32,
pov parachaintypes.PoV, validationCodeHash parachaintypes.ValidationCodeHash) (validationError *ReasonForInvalidity,
internalError error) {
povHash, err := pov.Hash()
if err != nil {
return nil, fmt.Errorf("hashing PoV: %w", err)
}

encodedPoV, err := scale.Marshal(pov)
persistedValidationData, validationCode, err := getValidationData(runtimeInstance,
msg.CandidateReceipt.Descriptor.ParaID)
if err != nil {
return nil, fmt.Errorf("encoding PoV: %w", err)
}
encodedPoVSize := uint32(len(encodedPoV))

if encodedPoVSize > maxPoVSize {
ci := ParamsTooLarge
return &ci, nil
}

if povHash != candidate.PovHash {
ci := PoVHashMismatch
return &ci, nil
logger.Errorf("getting validation data: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: fmt.Errorf("getting validation data: %w", err),
}
return
}

if validationCodeHash != candidate.ValidationCodeHash {
ci := CodeHashMismatch
return &ci, nil
validationTask := &ValidationTask{
PersistedValidationData: *persistedValidationData,
ValidationCode: validationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.Pov,
ExecutorParams: msg.ExecutorParams,
// todo: implement PvfExecTimeoutKind, so that validate can be called with a timeout see issue: #3429
PvfExecTimeoutKind: parachaintypes.PvfExecTimeoutKind{},
}

err = candidate.CheckCollatorSignature()
result, err := cv.pvfHost.validate(validationTask)
if err != nil {
ci := BadSignature
return &ci, nil
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Err: err,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[ValidationResult]{
Data: *result,
}
}
return nil, nil
}
Loading

0 comments on commit 768b6d5

Please sign in to comment.