Skip to content

Commit

Permalink
add validation logic to workers
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack committed Aug 6, 2024
1 parent 6537104 commit 7e314f3
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 106 deletions.
13 changes: 7 additions & 6 deletions dot/parachain/backing/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package backing_test

import (
"github.com/ChainSafe/gossamer/dot/parachain/pvf"
"testing"
"time"

Expand Down Expand Up @@ -275,9 +276,9 @@ func TestSecondsValidCandidate(t *testing.T) {
return false
}

badReturn := candidatevalidation.BadReturn
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
badReturn := pvf.BadReturn
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: pvf.ValidationResult{
InvalidResult: &badReturn,
},
}
Expand Down Expand Up @@ -339,9 +340,9 @@ func TestSecondsValidCandidate(t *testing.T) {
return false
}

validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[candidatevalidation.ValidationResult]{
Data: candidatevalidation.ValidationResult{
ValidResult: &candidatevalidation.ValidValidationResult{
validateFromExhaustive.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: pvf.ValidationResult{
ValidResult: &pvf.ValidValidationResult{
CandidateCommitments: parachaintypes.CandidateCommitments{
UpwardMessages: []parachaintypes.UpwardMessage{},
HorizontalMessages: []parachaintypes.OutboundHrmpMessage{},
Expand Down
45 changes: 24 additions & 21 deletions dot/parachain/candidate-validation/candidate_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,36 @@ func (cv *CandidateValidation) processMessages(wg *sync.WaitGroup) {
case ValidateFromExhaustive:
// This is the skeleton to hook up the PVF host to the candidate validation subsystem
// This is currently WIP, pending moving the validation logic to the PVF host
validationCodeHash := msg.ValidationCode.Hash()
taskResult := make(chan *pvf.ValidationTaskResult)
validationTask := &pvf.ValidationTask{
PersistedValidationData: parachaintypes.PersistedValidationData{},
WorkerID: &validationCodeHash,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.PoV,
ExecutorParams: nil,
PvfExecTimeoutKind: parachaintypes.PvfExecTimeoutKind{},
ResultCh: taskResult,
PersistedValidationData: msg.PersistedValidationData,
//WorkerID: &validationCodeHash,
ValidationCode: &msg.ValidationCode,
CandidateReceipt: &msg.CandidateReceipt,
PoV: msg.PoV,
ExecutorParams: nil,
PvfExecTimeoutKind: parachaintypes.PvfExecTimeoutKind{},
ResultCh: taskResult,
}
cv.pvfHost.Validate(validationTask)
fmt.Printf("Validation result: %v", <-taskResult)
go cv.pvfHost.Validate(validationTask)

result := <-taskResult
fmt.Printf("Validation result: %v", result)
// TODO(ed): determine how to handle this error and result

// WIP: This is the current implementation of the validation logic, it will be replaced by the PVF host
// when the validation logic is moved to the PVF host
result, err := validateFromExhaustive(cv.ValidationHost, msg.PersistedValidationData,
msg.ValidationCode, msg.CandidateReceipt, msg.PoV)
if err != nil {
logger.Errorf("failed to validate from exhaustive: %w", err)
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Err: err,
}
} else {
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: *result,
}
//result, err := validateFromExhaustive(cv.ValidationHost, msg.PersistedValidationData,
// msg.ValidationCode, msg.CandidateReceipt, msg.PoV)
//if err != nil {
// logger.Errorf("failed to validate from exhaustive: %w", err)
// msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
// Err: err,
// }
//} else {
msg.Ch <- parachaintypes.OverseerFuncRes[pvf.ValidationResult]{
Data: *result.Result,
//}
}

case PreCheck:
Expand Down
93 changes: 88 additions & 5 deletions dot/parachain/pvf/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package pvf

import (
"fmt"
parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/pkg/scale"
"sync"

"github.com/ChainSafe/gossamer/internal/log"
Expand Down Expand Up @@ -40,10 +43,90 @@ func NewValidationHost() *ValidationHost {
func (v *ValidationHost) Validate(msg *ValidationTask) {
logger.Debugf("Validating worker", "workerID", msg.WorkerID)

logger.Debugf("submitting request for worker", "workerID", msg.WorkerID)
hasWorker := v.workerPool.containsWorker(*msg.WorkerID)
if !hasWorker {
v.workerPool.newValidationWorker(*msg.WorkerID)
validationCodeHash := msg.ValidationCode.Hash()
// basic checks
validationErr, internalErr := performBasicChecks(&msg.CandidateReceipt.Descriptor,
msg.PersistedValidationData.MaxPovSize,
msg.PoV,
validationCodeHash)
// TODO(ed): confirm how to handle internal errors
if internalErr != nil {
logger.Errorf("performing basic checks: %w", internalErr)
}
v.workerPool.submitRequest(msg)

if validationErr != nil {
valErr := &ValidationTaskResult{
who: validationCodeHash,
Result: &ValidationResult{
InvalidResult: validationErr,
},
}
msg.ResultCh <- valErr
return
}

workerID := v.poolContainsWorker(msg)
validationParams := parachainruntime.ValidationParameters{
ParentHeadData: msg.PersistedValidationData.ParentHead,
BlockData: msg.PoV.BlockData,
RelayParentNumber: msg.PersistedValidationData.RelayParentNumber,
RelayParentStorageRoot: msg.PersistedValidationData.RelayParentStorageRoot,
}
workTask := &workerTask{
work: validationParams,
maxPoVSize: msg.PersistedValidationData.MaxPovSize,
ResultCh: msg.ResultCh,
}
v.workerPool.submitRequest(workerID, workTask)
}

func (v *ValidationHost) poolContainsWorker(msg *ValidationTask) parachaintypes.ValidationCodeHash {
if msg.WorkerID != nil {
return *msg.WorkerID
}
if v.workerPool.containsWorker(msg.ValidationCode.Hash()) {
return msg.ValidationCode.Hash()
} else {
v.workerPool.newValidationWorker(*msg.ValidationCode)
return msg.ValidationCode.Hash()
}
}

// performBasicChecks Does basic checks of a candidate. Provide the encoded PoV-block.
// Returns ReasonForInvalidity and internal error if any.
func performBasicChecks(candidate *parachaintypes.CandidateDescriptor, maxPoVSize uint32,
pov parachaintypes.PoV, validationCodeHash parachaintypes.ValidationCodeHash) (
validationError *ReasonForInvalidity, internalError error) {
povHash, err := pov.Hash()
if err != nil {
return nil, fmt.Errorf("hashing PoV: %w", err)
}

encodedPoV, err := scale.Marshal(pov)
if err != nil {
return nil, fmt.Errorf("encoding PoV: %w", err)
}
encodedPoVSize := uint32(len(encodedPoV))

if encodedPoVSize > maxPoVSize {
ci := ParamsTooLarge
return &ci, nil
}

if povHash != candidate.PovHash {
ci := PoVHashMismatch
return &ci, nil
}

if validationCodeHash != candidate.ValidationCodeHash {
ci := CodeHashMismatch
return &ci, nil
}

err = candidate.CheckCollatorSignature()
if err != nil {
ci := BadSignature
return &ci, nil
}
return nil, nil
}
2 changes: 1 addition & 1 deletion dot/parachain/pvf/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Test_validationHost_start(t *testing.T) {
func TestValidationHost(t *testing.T) {
v := NewValidationHost()
v.Start()
v.workerPool.newValidationWorker(parachaintypes.ValidationCodeHash{1, 2, 3, 4})
v.workerPool.newValidationWorker(parachaintypes.ValidationCode{1, 2, 3, 4})

resCh := make(chan *ValidationTaskResult)

Expand Down
92 changes: 76 additions & 16 deletions dot/parachain/pvf/worker.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,104 @@
package pvf

import (
"sync"
"time"

parachainruntime "github.com/ChainSafe/gossamer/dot/parachain/runtime"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"sync"
)

type worker struct {
workerID parachaintypes.ValidationCodeHash
instance *parachainruntime.Instance
queue chan *workerTask
}

func newWorker(pID parachaintypes.ValidationCodeHash) *worker {
return &worker{
workerID: pID,
type workerTask struct {
work parachainruntime.ValidationParameters
maxPoVSize uint32
ResultCh chan<- *ValidationTaskResult
}

func newWorker(validationCode parachaintypes.ValidationCode, queue chan *workerTask) (*worker, error) {
validationRuntime, err := parachainruntime.SetupVM(validationCode)

if err != nil {
return nil, err
}
return &worker{
workerID: validationCode.Hash(),
instance: validationRuntime,
queue: queue,
}, nil
}

func (w *worker) run(queue chan *ValidationTask, wg *sync.WaitGroup) {
func (w *worker) run(queue chan *workerTask, wg *sync.WaitGroup) {
defer func() {
logger.Debugf("[STOPPED] worker %x", w.workerID)
wg.Done()
}()

for task := range queue {
executeRequest(task)
w.executeRequest(task)
}
}

func executeRequest(task *ValidationTask) {
func (w *worker) executeRequest(task *workerTask) {
// WIP: This is a dummy implementation of the worker execution for the validation task. The logic for
// validating the parachain block request should be implemented here.
request := task.PoV
logger.Debugf("[EXECUTING] worker %x, block request: %s", task.WorkerID, request)
time.Sleep(500 * time.Millisecond)
dummyResult := &ValidationResult{}
logger.Debugf("[EXECUTING] worker %x task %v", w.workerID, task.work)

// todo do basic checks

validationResult, err := w.instance.ValidateBlock(task.work)

///////////////////////////////
//if err != nil {
// return nil, fmt.Errorf("executing validate_block: %w", err)
//}

//headDataHash, err := validationResult.HeadData.Hash()
//if err != nil {
// return nil, fmt.Errorf("hashing head data: %w", err)
//}
//
//if headDataHash != candidateReceipt.Descriptor.ParaHead {
// ci := pvf.ParaHeadHashMismatch
// return &pvf.ValidationResult{InvalidResult: &ci}, nil
//}
candidateCommitments := parachaintypes.CandidateCommitments{
UpwardMessages: validationResult.UpwardMessages,
HorizontalMessages: validationResult.HorizontalMessages,
NewValidationCode: validationResult.NewValidationCode,
HeadData: validationResult.HeadData,
ProcessedDownwardMessages: validationResult.ProcessedDownwardMessages,
HrmpWatermark: validationResult.HrmpWatermark,
}

// if validation produced a new set of commitments, we treat the candidate as invalid
//if candidateReceipt.CommitmentsHash != candidateCommitments.Hash() {
// ci := CommitmentsHashMismatch
// return &ValidationResult{InvalidResult: &ci}, nil
//}
pvd := parachaintypes.PersistedValidationData{
ParentHead: task.work.ParentHeadData,
RelayParentNumber: task.work.RelayParentNumber,
RelayParentStorageRoot: task.work.RelayParentStorageRoot,
MaxPovSize: task.maxPoVSize,
}
dummyResilt := &ValidationResult{
ValidResult: &ValidValidationResult{
CandidateCommitments: candidateCommitments,
PersistedValidationData: pvd,
},
}
//////////////////////////

logger.Debugf("[RESULT] worker %x, result: %v, error: %s", w.workerID, dummyResilt, err)

task.ResultCh <- &ValidationTaskResult{
who: *task.WorkerID,
result: dummyResult,
who: w.workerID,
Result: dummyResilt,
}

logger.Debugf("[FINISHED] worker %x", task.WorkerID)
//logger.Debugf("[FINISHED] worker %v, error: %s", validationResult, err)
}
Loading

0 comments on commit 7e314f3

Please sign in to comment.