diff --git a/cmd/bootstrap/run/cluster_qc.go b/cmd/bootstrap/run/cluster_qc.go index a55b7920708..0d480cdd0b7 100644 --- a/cmd/bootstrap/run/cluster_qc.go +++ b/cmd/bootstrap/run/cluster_qc.go @@ -20,6 +20,9 @@ import ( // GenerateClusterRootQC creates votes and generates a QC based on participant data func GenerateClusterRootQC(signers []bootstrap.NodeInfo, allCommitteeMembers flow.IdentityList, clusterBlock *cluster.Block) (*flow.QuorumCertificate, error) { + if !allCommitteeMembers.Sorted(order.Canonical) { + return nil, fmt.Errorf("can't create root cluster QC: committee members are not sorted in canonical order") + } clusterRootBlock := model.GenesisBlockFromFlow(clusterBlock.Header) // STEP 1: create votes for cluster root block @@ -29,8 +32,7 @@ func GenerateClusterRootQC(signers []bootstrap.NodeInfo, allCommitteeMembers flo } // STEP 2: create VoteProcessor - ordered := allCommitteeMembers.Sort(order.Canonical) - committee, err := committees.NewStaticCommittee(ordered, flow.Identifier{}, nil, nil) + committee, err := committees.NewStaticCommittee(allCommitteeMembers, flow.Identifier{}, nil, nil) if err != nil { return nil, err } diff --git a/cmd/bootstrap/run/cluster_qc_test.go b/cmd/bootstrap/run/cluster_qc_test.go index 19a379d5b47..b422609f266 100644 --- a/cmd/bootstrap/run/cluster_qc_test.go +++ b/cmd/bootstrap/run/cluster_qc_test.go @@ -8,6 +8,7 @@ import ( model "github.com/onflow/flow-go/model/bootstrap" "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/order" "github.com/onflow/flow-go/utils/unittest" ) @@ -32,7 +33,8 @@ func TestGenerateClusterRootQC(t *testing.T) { payload := cluster.EmptyPayload(flow.ZeroID) clusterBlock.SetPayload(payload) - _, err := GenerateClusterRootQC(participants, model.ToIdentityList(participants), &clusterBlock) + orderedParticipants := model.ToIdentityList(participants).Sort(order.Canonical) + _, err := GenerateClusterRootQC(participants, orderedParticipants, &clusterBlock) require.NoError(t, err) } diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 2503da27230..750955c3ca0 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -970,6 +970,7 @@ func (fnb *FlowNodeBuilder) initStorage() error { epochCommits := bstorage.NewEpochCommits(fnb.Metrics.Cache, fnb.DB) statuses := bstorage.NewEpochStatuses(fnb.Metrics.Cache, fnb.DB) commits := bstorage.NewCommits(fnb.Metrics.Cache, fnb.DB) + protocolState := bstorage.NewProtocolState(fnb.Metrics.Cache, setups, epochCommits, fnb.DB, bstorage.DefaultCacheSize) versionBeacons := bstorage.NewVersionBeacons(fnb.DB) fnb.Storage = Storage{ @@ -987,6 +988,7 @@ func (fnb *FlowNodeBuilder) initStorage() error { Setups: setups, EpochCommits: epochCommits, VersionBeacons: versionBeacons, + ProtocolState: protocolState, Statuses: statuses, Commits: commits, } @@ -1051,7 +1053,7 @@ func (fnb *FlowNodeBuilder) initState() error { fnb.Storage.QuorumCertificates, fnb.Storage.Setups, fnb.Storage.EpochCommits, - fnb.Storage.Statuses, + fnb.Storage.ProtocolState, fnb.Storage.VersionBeacons, ) if err != nil { @@ -1103,7 +1105,7 @@ func (fnb *FlowNodeBuilder) initState() error { fnb.Storage.QuorumCertificates, fnb.Storage.Setups, fnb.Storage.EpochCommits, - fnb.Storage.Statuses, + fnb.Storage.ProtocolState, fnb.Storage.VersionBeacons, fnb.RootSnapshot, options..., diff --git a/cmd/util/cmd/common/state.go b/cmd/util/cmd/common/state.go index 16d5295a729..4d0f0024ce1 100644 --- a/cmd/util/cmd/common/state.go +++ b/cmd/util/cmd/common/state.go @@ -24,7 +24,7 @@ func InitProtocolState(db *badger.DB, storages *storage.All) (protocol.State, er storages.QuorumCertificates, storages.Setups, storages.EpochCommits, - storages.Statuses, + storages.ProtocolState, storages.VersionBeacons, ) diff --git a/consensus/integration/epoch_test.go b/consensus/integration/epoch_test.go index 535ef9b8982..876b7c6ec1a 100644 --- a/consensus/integration/epoch_test.go +++ b/consensus/integration/epoch_test.go @@ -13,6 +13,7 @@ import ( "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/model/flow/mapfunc" "github.com/onflow/flow-go/model/flow/order" + "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/state/protocol/inmem" "github.com/onflow/flow-go/utils/unittest" ) @@ -255,5 +256,26 @@ func withNextEpoch( Map(mapfunc.WithWeight(0))..., ).Sort(order.Canonical) + // update protocol state + protocolState := encodableSnapshot.ProtocolState + // update protocol state identities since we are in committed phase + protocolState.Identities = flow.DynamicIdentityEntryListFromIdentities(encodableSnapshot.Identities) + // setup ID has changed, need to update it + convertedEpochSetup, _ := protocol.ToEpochSetup(inmem.NewEpoch(*currEpoch)) + protocolState.CurrentEpochEventIDs.SetupID = convertedEpochSetup.ID() + // create next epoch protocol state + convertedEpochSetup, _ = protocol.ToEpochSetup(inmem.NewEpoch(*encodableSnapshot.Epochs.Next)) + convertedEpochCommit, _ := protocol.ToEpochCommit(inmem.NewEpoch(*encodableSnapshot.Epochs.Next)) + protocolState.NextEpochProtocolState = &flow.ProtocolStateEntry{ + CurrentEpochEventIDs: flow.EventIDs{ + SetupID: convertedEpochSetup.ID(), + CommitID: convertedEpochCommit.ID(), + }, + PreviousEpochEventIDs: protocolState.CurrentEpochEventIDs, + Identities: flow.DynamicIdentityEntryListFromIdentities(encodableSnapshot.Identities), + InvalidStateTransitionAttempted: false, + NextEpochProtocolState: nil, + } + return inmem.SnapshotFromEncodable(encodableSnapshot) } diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index ba329c77206..b069f812ca8 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -375,7 +375,7 @@ func createNode( qcsDB := storage.NewQuorumCertificates(metricsCollector, db, storage.DefaultCacheSize) setupsDB := storage.NewEpochSetups(metricsCollector, db) commitsDB := storage.NewEpochCommits(metricsCollector, db) - statusesDB := storage.NewEpochStatuses(metricsCollector, db) + protocolStateDB := storage.NewProtocolState(metricsCollector, setupsDB, commitsDB, db, storage.DefaultCacheSize) versionBeaconDB := storage.NewVersionBeacons(db) protocolStateEvents := events.NewDistributor() @@ -396,7 +396,7 @@ func createNode( qcsDB, setupsDB, commitsDB, - statusesDB, + protocolStateDB, versionBeaconDB, rootSnapshot, ) diff --git a/engine/collection/test/cluster_switchover_test.go b/engine/collection/test/cluster_switchover_test.go index a8f04173099..ec582fadfc4 100644 --- a/engine/collection/test/cluster_switchover_test.go +++ b/engine/collection/test/cluster_switchover_test.go @@ -15,6 +15,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/factory" "github.com/onflow/flow-go/model/flow/filter" + "github.com/onflow/flow-go/model/flow/order" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/util" "github.com/onflow/flow-go/network/channels" @@ -66,19 +67,18 @@ func NewClusterSwitchoverTestCase(t *testing.T, conf ClusterSwitchoverTestConf) rootClusterQCs := make([]flow.ClusterQCVoteData, len(rootClusterBlocks)) for i, cluster := range clusters { signers := make([]model.NodeInfo, 0) - signerIDs := make([]flow.Identifier, 0) for _, identity := range nodeInfos { if _, inCluster := cluster.ByNodeID(identity.NodeID); inCluster { signers = append(signers, identity) - signerIDs = append(signerIDs, identity.NodeID) } } - qc, err := run.GenerateClusterRootQC(signers, model.ToIdentityList(signers), rootClusterBlocks[i]) + signerIdentities := model.ToIdentityList(signers).Sort(order.Canonical) + qc, err := run.GenerateClusterRootQC(signers, signerIdentities, rootClusterBlocks[i]) require.NoError(t, err) rootClusterQCs[i] = flow.ClusterQCVoteDataFromQC(&flow.QuorumCertificateWithSignerIDs{ View: qc.View, BlockID: qc.BlockID, - SignerIDs: signerIDs, + SignerIDs: signerIdentities.NodeIDs(), SigData: qc.SigData, }) } diff --git a/engine/common/follower/integration_test.go b/engine/common/follower/integration_test.go index 663e195462e..f06f2a1002a 100644 --- a/engine/common/follower/integration_test.go +++ b/engine/common/follower/integration_test.go @@ -63,7 +63,7 @@ func TestFollowerHappyPath(t *testing.T) { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index b61ae648482..420135ec8a0 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -250,7 +250,7 @@ func CompleteStateFixture( s.QuorumCertificates, s.Setups, s.EpochCommits, - s.Statuses, + s.ProtocolState, s.VersionBeacons, rootSnapshot, ) diff --git a/integration/testnet/container.go b/integration/testnet/container.go index 2ee74894ac1..3e50c5ce34d 100644 --- a/integration/testnet/container.go +++ b/integration/testnet/container.go @@ -390,7 +390,7 @@ func (c *Container) OpenState() (*state.State, error) { qcs := storage.NewQuorumCertificates(metrics, db, storage.DefaultCacheSize) setups := storage.NewEpochSetups(metrics, db) commits := storage.NewEpochCommits(metrics, db) - statuses := storage.NewEpochStatuses(metrics, db) + protocolState := storage.NewProtocolState(metrics, setups, commits, db, storage.DefaultCacheSize) versionBeacons := storage.NewVersionBeacons(db) return state.OpenState( @@ -403,7 +403,7 @@ func (c *Container) OpenState() (*state.State, error) { qcs, setups, commits, - statuses, + protocolState, versionBeacons, ) } diff --git a/integration/testnet/network.go b/integration/testnet/network.go index e87fce1da6f..993ee49b1dc 100644 --- a/integration/testnet/network.go +++ b/integration/testnet/network.go @@ -648,10 +648,11 @@ func (net *FlowNetwork) addConsensusFollower(t *testing.T, rootProtocolSnapshotP // create a follower-specific directory for the bootstrap files followerBootstrapDir := makeDir(t, tmpdir, DefaultBootstrapDir) + makeDir(t, followerBootstrapDir, bootstrap.DirnamePublicBootstrap) - // strip out the node addresses from root-protocol-state-snapshot.json and copy it to the follower-specific + // copy root protocol snapshot to the follower-specific folder // bootstrap/public-root-information directory - err := rootProtocolJsonWithoutAddresses(rootProtocolSnapshotPath, filepath.Join(followerBootstrapDir, bootstrap.PathRootProtocolStateSnapshot)) + err := io.Copy(rootProtocolSnapshotPath, filepath.Join(followerBootstrapDir, bootstrap.PathRootProtocolStateSnapshot)) require.NoError(t, err) // consensus follower diff --git a/integration/testnet/util.go b/integration/testnet/util.go index ad45be97c82..2dfd450d38f 100644 --- a/integration/testnet/util.go +++ b/integration/testnet/util.go @@ -18,8 +18,6 @@ import ( "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/model/bootstrap" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/state/protocol/inmem" - "github.com/onflow/flow-go/utils/io" ) func makeDir(t *testing.T, base string, subdir string) string { @@ -99,26 +97,6 @@ func WriteFile(path string, data []byte) error { return err } -// rootProtocolJsonWithoutAddresses strips out all node addresses from the root protocol json file specified as srcFile -// and creates the dstFile with the modified contents -func rootProtocolJsonWithoutAddresses(srcfile string, dstFile string) error { - - data, err := io.ReadFile(filepath.Join(srcfile)) - if err != nil { - return err - } - - var rootSnapshot inmem.EncodableSnapshot - err = json.Unmarshal(data, &rootSnapshot) - if err != nil { - return err - } - - strippedSnapshot := inmem.StrippedInmemSnapshot(rootSnapshot) - - return WriteJSON(dstFile, strippedSnapshot) -} - func WriteObserverPrivateKey(observerName, bootstrapDir string) error { // make the observer private key for named observer // only used for localnet, not for use with production diff --git a/integration/tests/lib/testnet_state_tracker.go b/integration/tests/lib/testnet_state_tracker.go index 6ff6820baf4..ff78786487f 100644 --- a/integration/tests/lib/testnet_state_tracker.go +++ b/integration/tests/lib/testnet_state_tracker.go @@ -111,6 +111,11 @@ func (tst *TestnetStateTracker) Track(t *testing.T, ctx context.Context, ghost * finalState, m.ExecutionResult.ID(), len(m.ExecutionResult.Chunks)) + case *messages.ChunkDataResponse: + // consuming this explicitly to avoid logging full msg which is usually very large because of proof + t.Logf("%x chunk data pack received from %x\n", + m.ChunkDataPack.ChunkID, + sender) default: t.Logf("%v other msg received from %s: %#v\n", time.Now().UTC(), sender, msg) diff --git a/model/flow/protocol_state.go b/model/flow/protocol_state.go index 213699d5281..0cded5ae324 100644 --- a/model/flow/protocol_state.go +++ b/model/flow/protocol_state.go @@ -82,16 +82,41 @@ func NewRichProtocolStateEntry( NextEpochProtocolState: nil, } + // ensure data is consistent + if protocolState.PreviousEpochEventIDs.SetupID != ZeroID { + if protocolState.PreviousEpochEventIDs.SetupID != previousEpochSetup.ID() { + return nil, fmt.Errorf("supplied previous epoch setup (%x) does not match protocol state (%x)", + previousEpochSetup.ID(), + protocolState.PreviousEpochEventIDs.SetupID) + } + if protocolState.PreviousEpochEventIDs.CommitID != previousEpochCommit.ID() { + return nil, fmt.Errorf("supplied previous epoch commit (%x) does not match protocol state (%x)", + previousEpochCommit.ID(), + protocolState.PreviousEpochEventIDs.CommitID) + } + } + if protocolState.CurrentEpochEventIDs.SetupID != currentEpochSetup.ID() { + return nil, fmt.Errorf("supplied current epoch setup (%x) does not match protocol state (%x)", + currentEpochSetup.ID(), + protocolState.CurrentEpochEventIDs.SetupID) + } + if protocolState.CurrentEpochEventIDs.CommitID != currentEpochCommit.ID() { + return nil, fmt.Errorf("supplied current epoch commit (%x) does not match protocol state (%x)", + currentEpochCommit.ID(), + protocolState.CurrentEpochEventIDs.CommitID) + } + var err error + nextEpochProtocolState := protocolState.NextEpochProtocolState // if next epoch has been already committed, fill in data for it as well. - if protocolState.NextEpochProtocolState != nil { + if nextEpochProtocolState != nil { // sanity check consistency of input data - if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.SetupID != nextEpochSetup.ID() { + if nextEpochProtocolState.CurrentEpochEventIDs.SetupID != nextEpochSetup.ID() { return nil, fmt.Errorf("inconsistent EpochSetup for constucting RichProtocolStateEntry, next protocol state states ID %v while input event has ID %v", protocolState.NextEpochProtocolState.CurrentEpochEventIDs.SetupID, nextEpochSetup.ID()) } - if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID != ZeroID { - if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID != nextEpochCommit.ID() { + if nextEpochProtocolState.CurrentEpochEventIDs.CommitID != ZeroID { + if nextEpochProtocolState.CurrentEpochEventIDs.CommitID != nextEpochCommit.ID() { return nil, fmt.Errorf("inconsistent EpochCommit for constucting RichProtocolStateEntry, next protocol state states ID %v while input event has ID %v", protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID, nextEpochCommit.ID()) } @@ -99,7 +124,6 @@ func NewRichProtocolStateEntry( // if next epoch is available, it means that we have observed epoch setup event and we are not anymore in staking phase, // so we need to build the identity table using current and next epoch setup events. - // so we need to build the identity table using current and next epoch setup events. result.Identities, err = buildIdentityTable( protocolState.Identities, currentEpochSetup.Participants, @@ -109,7 +133,6 @@ func NewRichProtocolStateEntry( return nil, fmt.Errorf("could not build identity table for setup/commit phase: %w", err) } - nextEpochProtocolState := protocolState.NextEpochProtocolState nextEpochIdentityTable, err := buildIdentityTable( nextEpochProtocolState.Identities, nextEpochSetup.Participants, diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index 9641b7c934a..6041ec2b367 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -108,7 +108,7 @@ func (suite *BuilderSuite) SetupTest() { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) diff --git a/module/state_synchronization/requester/execution_data_requester_test.go b/module/state_synchronization/requester/execution_data_requester_test.go index 5ac29329094..9a12ef9999b 100644 --- a/module/state_synchronization/requester/execution_data_requester_test.go +++ b/module/state_synchronization/requester/execution_data_requester_test.go @@ -787,11 +787,12 @@ func (m *mockSnapshot) Identity(nodeID flow.Identifier) (*flow.Identity, error) func (m *mockSnapshot) SealedResult() (*flow.ExecutionResult, *flow.Seal, error) { return nil, nil, nil } -func (m *mockSnapshot) Commit() (flow.StateCommitment, error) { return flow.DummyStateCommitment, nil } -func (m *mockSnapshot) SealingSegment() (*flow.SealingSegment, error) { return nil, nil } -func (m *mockSnapshot) Descendants() ([]flow.Identifier, error) { return nil, nil } -func (m *mockSnapshot) RandomSource() ([]byte, error) { return nil, nil } -func (m *mockSnapshot) Phase() (flow.EpochPhase, error) { return flow.EpochPhaseUndefined, nil } -func (m *mockSnapshot) Epochs() protocol.EpochQuery { return nil } -func (m *mockSnapshot) Params() protocol.GlobalParams { return nil } -func (m *mockSnapshot) VersionBeacon() (*flow.SealedVersionBeacon, error) { return nil, nil } +func (m *mockSnapshot) Commit() (flow.StateCommitment, error) { return flow.DummyStateCommitment, nil } +func (m *mockSnapshot) SealingSegment() (*flow.SealingSegment, error) { return nil, nil } +func (m *mockSnapshot) Descendants() ([]flow.Identifier, error) { return nil, nil } +func (m *mockSnapshot) RandomSource() ([]byte, error) { return nil, nil } +func (m *mockSnapshot) Phase() (flow.EpochPhase, error) { return flow.EpochPhaseUndefined, nil } +func (m *mockSnapshot) Epochs() protocol.EpochQuery { return nil } +func (m *mockSnapshot) Params() protocol.GlobalParams { return nil } +func (m *mockSnapshot) ProtocolState() (protocol.DynamicProtocolState, error) { return nil, nil } +func (m *mockSnapshot) VersionBeacon() (*flow.SealedVersionBeacon, error) { return nil, nil } diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index 1897cf6a39a..4254b448e4c 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -85,7 +85,7 @@ func (suite *MutatorSuite) SetupTest() { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) diff --git a/state/cluster/badger/snapshot_test.go b/state/cluster/badger/snapshot_test.go index 1073181712f..c3a90cc5125 100644 --- a/state/cluster/badger/snapshot_test.go +++ b/state/cluster/badger/snapshot_test.go @@ -66,7 +66,7 @@ func (suite *SnapshotSuite) SetupTest() { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, root, ) diff --git a/state/protocol/badger/mutator.go b/state/protocol/badger/mutator.go index c13ae1b2864..c3f57097528 100644 --- a/state/protocol/badger/mutator.go +++ b/state/protocol/badger/mutator.go @@ -18,6 +18,7 @@ import ( "github.com/onflow/flow-go/module/trace" "github.com/onflow/flow-go/state" "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/state/protocol/protocol_state" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" @@ -510,19 +511,48 @@ func (m *FollowerState) insert(ctx context.Context, candidate *flow.Block, certi return fmt.Errorf("could not retrieve block header for %x: %w", parentID, err) } - // apply any state changes from service events sealed by this block's parent - dbUpdates, err := m.handleEpochServiceEvents(candidate) + parentProtocolState, err := m.protocolStateSnapshotsDB.ByBlockID(candidate.Header.ParentID) + if err != nil { + return fmt.Errorf("could not retrieve protocol state for block (%v): %w", candidate.Header.ParentID, err) + } + protocolStateUpdater := protocol_state.NewUpdater(candidate.Header, parentProtocolState) + + // apply any state changes from service events sealed by this block + dbUpdates, err := m.handleEpochServiceEvents(candidate, protocolStateUpdater) if err != nil { return fmt.Errorf("could not process service events: %w", err) } - qc := candidate.Header.QuorumCertificate() + updatedState, updatedStateID, hasChanges := protocolStateUpdater.Build() + // TODO: check if updatedStateID corresponds to the root protocol state ID stored in payload + // if updatedStateID != payload.ProtocolStateID { + // return state.NewInvalidExtension("invalid protocol state transition detected expected (%x) got %x", payload.ProtocolStateID, updatedStateID) + // } + + if hasChanges { + dbUpdates = append(dbUpdates, operation.SkipDuplicatesTx( + m.protocolStateSnapshotsDB.StoreTx(updatedStateID, updatedState), + )) + } + dbUpdates = append(dbUpdates, m.protocolStateSnapshotsDB.Index(blockID, updatedStateID)) + + // events is a queue of node-internal events (aka notifications) that are emitted after the database write succeeded var events []func() + if certifyingQC != nil { + dbUpdates = append(dbUpdates, m.qcs.StoreTx(certifyingQC)) + + // queue an BlockProcessable event for candidate block, since it is certified + events = append(events, func() { + m.consumer.BlockProcessable(candidate.Header, certifyingQC) + }) + } + // Both the header itself and its payload are in compliance with the protocol state. // We can now store the candidate block, as well as adding its final seal // to the seal index and initializing its children index. + qc := candidate.Header.QuorumCertificate() err = operation.RetryOnConflictTx(m.db, transaction.Update, func(tx *transaction.Tx) error { // insert the block into the database AND cache err := m.blocks.StoreTx(candidate)(tx) @@ -544,18 +574,6 @@ func (m *FollowerState) insert(ctx context.Context, candidate *flow.Block, certi } } - if certifyingQC != nil { - err = m.qcs.StoreTx(certifyingQC)(tx) - if err != nil { - return fmt.Errorf("could not store certifying qc: %w", err) - } - - // trigger BlockProcessable for candidate block if it's certified - events = append(events, func() { - m.consumer.BlockProcessable(candidate.Header, certifyingQC) - }) - } - // index the latest sealed block in this fork err = transaction.WithTx(operation.IndexLatestSealAtBlock(blockID, latestSealID))(tx) if err != nil { @@ -575,7 +593,6 @@ func (m *FollowerState) insert(ctx context.Context, candidate *flow.Block, certi return fmt.Errorf("could not apply operation: %w", err) } } - return nil }) if err != nil { @@ -641,14 +658,12 @@ func (m *FollowerState) Finalize(ctx context.Context, blockID flow.Identifier) e // We update metrics and emit protocol events for epoch state changes when // the block corresponding to the state change is finalized - epochStatus, err := m.epoch.statuses.ByBlockID(blockID) + psSnapshot, err := m.protocolState.AtBlockID(blockID) if err != nil { - return fmt.Errorf("could not retrieve epoch state: %w", err) - } - currentEpochSetup, err := m.epoch.setups.ByID(epochStatus.CurrentEpoch.SetupID) - if err != nil { - return fmt.Errorf("could not retrieve setup event for current epoch: %w", err) + return fmt.Errorf("could not retrieve protocol state snapshot: %w", err) } + epochStatus := psSnapshot.EpochStatus() + currentEpochSetup := psSnapshot.EpochSetup() epochFallbackTriggered, err := m.isEpochEmergencyFallbackTriggered() if err != nil { return fmt.Errorf("could not check persisted epoch emergency fallback flag: %w", err) @@ -944,61 +959,6 @@ func (m *FollowerState) epochPhaseMetricsAndEventsOnBlockFinalized(block *flow.B return } -// epochStatus computes the EpochStatus for the given block *before* applying -// any service event state changes which come into effect with this block. -// -// Specifically, we must determine whether block is the first block of a new -// epoch in its respective fork. We do this by comparing the block's view to -// the Epoch data from its parent. If the block's view is _larger_ than the -// final View of the parent's epoch, the block starts a new Epoch. -// -// Possible outcomes: -// 1. Block is in same Epoch as parent (block.View < epoch.FinalView) -// -> the parent's EpochStatus.CurrentEpoch also applies for the current block -// 2. Block enters the next Epoch (block.View ≥ epoch.FinalView) -// a) HAPPY PATH: Epoch fallback is not triggered, we enter the next epoch: -// -> the parent's EpochStatus.NextEpoch is the current block's EpochStatus.CurrentEpoch -// b) FALLBACK PATH: Epoch fallback is triggered, we continue the current epoch: -// -> the parent's EpochStatus.CurrentEpoch also applies for the current block -// -// As the parent was a valid extension of the chain, by induction, the parent -// satisfies all consistency requirements of the protocol. -// -// Returns the EpochStatus for the input block. -// No error returns are expected under normal operations -func (m *FollowerState) epochStatus(block *flow.Header, epochFallbackTriggered bool) (*flow.EpochStatus, error) { - parentStatus, err := m.epoch.statuses.ByBlockID(block.ParentID) - if err != nil { - return nil, fmt.Errorf("could not retrieve epoch state for parent: %w", err) - } - parentSetup, err := m.epoch.setups.ByID(parentStatus.CurrentEpoch.SetupID) - if err != nil { - return nil, fmt.Errorf("could not retrieve EpochSetup event for parent: %w", err) - } - - // Case 1 or 2b (still in parent block's epoch or epoch fallback triggered): - if block.View <= parentSetup.FinalView || epochFallbackTriggered { - // IMPORTANT: copy the status to avoid modifying the parent status in the cache - return parentStatus.Copy(), nil - } - - // Case 2a (first block of new epoch): - // sanity check: parent's epoch Preparation should be completed and have EpochSetup and EpochCommit events - if parentStatus.NextEpoch.SetupID == flow.ZeroID { - return nil, fmt.Errorf("missing setup event for starting next epoch") - } - if parentStatus.NextEpoch.CommitID == flow.ZeroID { - return nil, fmt.Errorf("missing commit event for starting next epoch") - } - epochStatus, err := flow.NewEpochStatus( - parentStatus.CurrentEpoch.SetupID, parentStatus.CurrentEpoch.CommitID, - parentStatus.NextEpoch.SetupID, parentStatus.NextEpoch.CommitID, - flow.ZeroID, flow.ZeroID, - ) - return epochStatus, err - -} - // versionBeaconOnBlockFinalized extracts and returns the VersionBeacons from the // finalized block's seals. // This could return multiple VersionBeacons if the parent block contains multiple Seals. @@ -1091,31 +1051,37 @@ func (m *FollowerState) versionBeaconOnBlockFinalized( // operations to insert service events for blocks that include them. // // No errors are expected during normal operation. -func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block) (dbUpdates []func(*transaction.Tx) error, err error) { +func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block, updater protocol.StateUpdater) (dbUpdates []func(*transaction.Tx) error, err error) { epochFallbackTriggered, err := m.isEpochEmergencyFallbackTriggered() if err != nil { return nil, fmt.Errorf("could not retrieve epoch fallback status: %w", err) } - epochStatus, err := m.epochStatus(candidate.Header, epochFallbackTriggered) - if err != nil { - return nil, fmt.Errorf("could not determine epoch status for candidate block: %w", err) - } - activeSetup, err := m.epoch.setups.ByID(epochStatus.CurrentEpoch.SetupID) - if err != nil { - return nil, fmt.Errorf("could not retrieve current epoch setup event: %w", err) - } - // always persist the candidate's epoch status - // note: We are scheduling the operation to store the Epoch status using the _pointer_ variable `epochStatus`. - // The struct `epochStatus` points to will still be modified below. - blockID := candidate.ID() - dbUpdates = append(dbUpdates, m.epoch.statuses.StoreTx(blockID, epochStatus)) + parentProtocolState := updater.ParentState() + epochStatus := parentProtocolState.EpochStatus() + activeSetup := parentProtocolState.CurrentEpochSetup // never process service events after epoch fallback is triggered if epochStatus.InvalidServiceEventIncorporated || epochFallbackTriggered { return dbUpdates, nil } + // perform protocol state transition to next epoch if next epoch is committed and we are at first block of epoch + phase, err := epochStatus.Phase() + if err != nil { + return nil, fmt.Errorf("could not determine epoch phase: %w", err) + } + if phase == flow.EpochPhaseCommitted { + if candidate.Header.View > activeSetup.FinalView { + // TODO: this is a temporary workaround to allow for the epoch transition to be triggered + // most likely it will be not needed when we refactor protocol state entries and define strict safety rules. + err = updater.TransitionToNextEpoch() + if err != nil { + return nil, fmt.Errorf("could not transition protocol state to next epoch: %w", err) + } + } + } + // We apply service events from blocks which are sealed by this candidate block. // The block's payload might contain epoch preparation service events for the next // epoch. In this case, we need to update the tentative protocol state. @@ -1147,14 +1113,16 @@ func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block) (dbUpdat if err != nil { if protocol.IsInvalidServiceEventError(err) { // we have observed an invalid service event, which triggers epoch fallback mode - epochStatus.InvalidServiceEventIncorporated = true + updater.SetInvalidStateTransitionAttempted() return dbUpdates, nil } return nil, fmt.Errorf("unexpected error validating EpochSetup service event: %w", err) } - // prevents multiple setup events for same Epoch (including multiple setup events in payload of same block) - epochStatus.NextEpoch.SetupID = ev.ID() + err = updater.ProcessEpochSetup(ev) + if err != nil { + return nil, irrecoverable.NewExceptionf("could not process epoch setup event: %w", err) + } // we'll insert the setup event when we insert the block dbUpdates = append(dbUpdates, m.epoch.setups.StoreTx(ev)) @@ -1163,34 +1131,26 @@ func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block) (dbUpdat // if we receive an EpochCommit event, we must have already observed an EpochSetup event // => otherwise, we have observed an EpochCommit without corresponding EpochSetup, which triggers epoch fallback mode if epochStatus.NextEpoch.SetupID == flow.ZeroID { - epochStatus.InvalidServiceEventIncorporated = true + updater.SetInvalidStateTransitionAttempted() return dbUpdates, nil } - - // if we have observed an EpochSetup event, we must be able to retrieve it from the database - // => otherwise, this is a symptom of bug or data corruption since this component sets the SetupID field - extendingSetup, err := m.epoch.setups.ByID(epochStatus.NextEpoch.SetupID) - if err != nil { - if errors.Is(err, storage.ErrNotFound) { - return nil, irrecoverable.NewExceptionf("could not retrieve EpochSetup (id=%x) stored in EpochStatus for block %x: %w", - epochStatus.NextEpoch.SetupID, blockID, err) - } - return nil, fmt.Errorf("unexpected error retrieving next epoch setup: %w", err) - } + extendingSetup := parentProtocolState.NextEpochProtocolState.CurrentEpochSetup // validate the service event err = isValidExtendingEpochCommit(ev, extendingSetup, activeSetup, epochStatus) if err != nil { if protocol.IsInvalidServiceEventError(err) { // we have observed an invalid service event, which triggers epoch fallback mode - epochStatus.InvalidServiceEventIncorporated = true + updater.SetInvalidStateTransitionAttempted() return dbUpdates, nil } return nil, fmt.Errorf("unexpected error validating EpochCommit service event: %w", err) } - // prevents multiple setup events for same Epoch (including multiple setup events in payload of same block) - epochStatus.NextEpoch.CommitID = ev.ID() + err = updater.ProcessEpochCommit(ev) + if err != nil { + return nil, irrecoverable.NewExceptionf("could not process epoch commit event: %w", err) + } // we'll insert the commit event when we insert the block dbUpdates = append(dbUpdates, m.epoch.commits.StoreTx(ev)) diff --git a/state/protocol/badger/mutator_test.go b/state/protocol/badger/mutator_test.go index 6b0abcff4b0..5b69250d63b 100644 --- a/state/protocol/badger/mutator_test.go +++ b/state/protocol/badger/mutator_test.go @@ -109,7 +109,7 @@ func TestExtendValid(t *testing.T) { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -825,7 +825,7 @@ func TestExtendEpochTransitionValid(t *testing.T) { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -1931,7 +1931,7 @@ func TestExtendInvalidSealsInBlock(t *testing.T) { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -2461,7 +2461,7 @@ func TestHeaderInvalidTimestamp(t *testing.T) { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) diff --git a/state/protocol/badger/params.go b/state/protocol/badger/params.go index 4239b96cf8d..6576306d32a 100644 --- a/state/protocol/badger/params.go +++ b/state/protocol/badger/params.go @@ -3,21 +3,31 @@ package badger import ( "fmt" + "github.com/dgraph-io/badger/v2" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/state/protocol/inmem" + "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" ) type Params struct { protocol.GlobalParams + protocol.InstanceParams +} + +var _ protocol.Params = (*Params)(nil) + +// InstanceParams implements the interface protocol.InstanceParams. All functions +// are served on demand directly from the database, _without_ any caching. +type InstanceParams struct { state *State } -var _ protocol.InstanceParams = (*Params)(nil) -var _ protocol.GlobalParams = (*Params)(nil) // TODO(yuraolex): probably this is temporary since protocol state will be serving global params +var _ protocol.InstanceParams = (*InstanceParams)(nil) -func (p Params) EpochFallbackTriggered() (bool, error) { +func (p *InstanceParams) EpochFallbackTriggered() (bool, error) { var triggered bool err := p.state.db.View(operation.CheckEpochEmergencyFallbackTriggered(&triggered)) if err != nil { @@ -26,7 +36,7 @@ func (p Params) EpochFallbackTriggered() (bool, error) { return triggered, nil } -func (p Params) FinalizedRoot() (*flow.Header, error) { +func (p *InstanceParams) FinalizedRoot() (*flow.Header, error) { // look up root block ID var rootID flow.Identifier @@ -44,7 +54,7 @@ func (p Params) FinalizedRoot() (*flow.Header, error) { return header, nil } -func (p Params) SealedRoot() (*flow.Header, error) { +func (p *InstanceParams) SealedRoot() (*flow.Header, error) { // look up root block ID var rootID flow.Identifier err := p.state.db.View(operation.LookupBlockHeight(p.state.sealedRootHeight, &rootID)) @@ -62,7 +72,7 @@ func (p Params) SealedRoot() (*flow.Header, error) { return header, nil } -func (p Params) Seal() (*flow.Seal, error) { +func (p *InstanceParams) Seal() (*flow.Seal, error) { // look up root header var rootID flow.Identifier @@ -82,34 +92,34 @@ func (p Params) Seal() (*flow.Seal, error) { // ReadGlobalParams reads the global parameters from the database and returns them as in-memory representation. // No errors are expected during normal operation. -func ReadGlobalParams(state *State) (*inmem.Params, error) { +func ReadGlobalParams(db *badger.DB, headers storage.Headers) (*inmem.Params, error) { var sporkID flow.Identifier - err := state.db.View(operation.RetrieveSporkID(&sporkID)) + err := db.View(operation.RetrieveSporkID(&sporkID)) if err != nil { return nil, fmt.Errorf("could not get spork id: %w", err) } var sporkRootBlockHeight uint64 - err = state.db.View(operation.RetrieveSporkRootBlockHeight(&sporkRootBlockHeight)) + err = db.View(operation.RetrieveSporkRootBlockHeight(&sporkRootBlockHeight)) if err != nil { return nil, fmt.Errorf("could not get spork root block height: %w", err) } var threshold uint64 - err = state.db.View(operation.RetrieveEpochCommitSafetyThreshold(&threshold)) + err = db.View(operation.RetrieveEpochCommitSafetyThreshold(&threshold)) if err != nil { return nil, fmt.Errorf("could not get epoch commit safety threshold") } var version uint - err = state.db.View(operation.RetrieveProtocolVersion(&version)) + err = db.View(operation.RetrieveProtocolVersion(&version)) if err != nil { return nil, fmt.Errorf("could not get protocol version: %w", err) } // retrieve root header - root, err := Params{state: state}.FinalizedRoot() + root, err := ReadFinalizedRoot(db) if err != nil { return nil, fmt.Errorf("could not get root: %w", err) } @@ -124,3 +134,30 @@ func ReadGlobalParams(state *State) (*inmem.Params, error) { }, ), nil } + +// ReadFinalizedRoot retrieves the root block's header from the database. +// This information is immutable for the runtime of the software and may be cached. +func ReadFinalizedRoot(db *badger.DB) (*flow.Header, error) { + var finalizedRootHeight uint64 + var rootID flow.Identifier + var rootHeader flow.Header + err := db.View(func(tx *badger.Txn) error { + err := operation.RetrieveRootHeight(&finalizedRootHeight)(tx) + if err != nil { + return fmt.Errorf("could not retrieve finalized root height: %w", err) + } + err = operation.LookupBlockHeight(finalizedRootHeight, &rootID)(tx) // look up root block ID + if err != nil { + return fmt.Errorf("could not retrieve root header's ID by height: %w", err) + } + err = operation.RetrieveHeader(rootID, &rootHeader)(tx) // retrieve root header + if err != nil { + return fmt.Errorf("could not retrieve root header: %w", err) + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("failed to read root information from database: %w", err) + } + return &rootHeader, nil +} diff --git a/state/protocol/badger/snapshot.go b/state/protocol/badger/snapshot.go index 1a121e81748..80fcf99e0d1 100644 --- a/state/protocol/badger/snapshot.go +++ b/state/protocol/badger/snapshot.go @@ -85,11 +85,11 @@ func (s *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { } func (s *Snapshot) Phase() (flow.EpochPhase, error) { - status, err := s.state.epoch.statuses.ByBlockID(s.blockID) + psSnapshot, err := s.state.protocolState.AtBlockID(s.blockID) if err != nil { - return flow.EpochPhaseUndefined, fmt.Errorf("could not retrieve epoch status: %w", err) + return flow.EpochPhaseUndefined, fmt.Errorf("could not retrieve protocol state snapshot: %w", err) } - phase, err := status.Phase() + phase, err := psSnapshot.EpochStatus().Phase() return phase, err } @@ -100,10 +100,11 @@ func (s *Snapshot) Identities(selector flow.IdentityFilter) (flow.IdentityList, // event here -- this will need revision to support mid-epoch identity changes // once slashing is implemented - status, err := s.state.epoch.statuses.ByBlockID(s.blockID) + psSnapshot, err := s.state.protocolState.AtBlockID(s.blockID) if err != nil { return nil, err } + status := psSnapshot.EpochStatus() setup, err := s.state.epoch.setups.ByID(status.CurrentEpoch.SetupID) if err != nil { @@ -402,6 +403,13 @@ func (s *Snapshot) Params() protocol.GlobalParams { return s.state.Params() } +// ProtocolState returns the dynamic protocol state that the Head block commits to. The +// compliance layer guarantees that only valid blocks are appended to the protocol state. +// For each block stored there should be a protocol state stored. +func (s *Snapshot) ProtocolState() (protocol.DynamicProtocolState, error) { + return s.state.protocolState.AtBlockID(s.blockID) +} + func (s *Snapshot) VersionBeacon() (*flow.SealedVersionBeacon, error) { head, err := s.state.headers.ByBlockID(s.blockID) if err != nil { @@ -420,10 +428,11 @@ type EpochQuery struct { func (q *EpochQuery) Current() protocol.Epoch { // all errors returned from storage reads here are unexpected, because all // snapshots reside within a current epoch, which must be queryable - status, err := q.snap.state.epoch.statuses.ByBlockID(q.snap.blockID) + psSnapshot, err := q.snap.state.protocolState.AtBlockID(q.snap.blockID) if err != nil { - return invalid.NewEpochf("could not get epoch status for block %x: %w", q.snap.blockID, err) + return invalid.NewEpochf("could not get protocol state snapshot at block %x: %w", q.snap.blockID, err) } + status := psSnapshot.EpochStatus() setup, err := q.snap.state.epoch.setups.ByID(status.CurrentEpoch.SetupID) if err != nil { return invalid.NewEpochf("could not get current EpochSetup (id=%x) for block %x: %w", status.CurrentEpoch.SetupID, q.snap.blockID, err) @@ -446,10 +455,11 @@ func (q *EpochQuery) Current() protocol.Epoch { // Next returns the next epoch, if it is available. func (q *EpochQuery) Next() protocol.Epoch { - status, err := q.snap.state.epoch.statuses.ByBlockID(q.snap.blockID) + psSnapshot, err := q.snap.state.protocolState.AtBlockID(q.snap.blockID) if err != nil { - return invalid.NewEpochf("could not get epoch status for block %x: %w", q.snap.blockID, err) + return invalid.NewEpochf("could not get protocol state snapshot at block %x: %w", q.snap.blockID, err) } + status := psSnapshot.EpochStatus() phase, err := status.Phase() if err != nil { // critical error: malformed EpochStatus in storage @@ -484,10 +494,11 @@ func (q *EpochQuery) Next() protocol.Epoch { // For all other epochs, returns the previous epoch. func (q *EpochQuery) Previous() protocol.Epoch { - status, err := q.snap.state.epoch.statuses.ByBlockID(q.snap.blockID) + psSnapshot, err := q.snap.state.protocolState.AtBlockID(q.snap.blockID) if err != nil { - return invalid.NewEpochf("could not get epoch status for block %x: %w", q.snap.blockID, err) + return invalid.NewEpochf("could not get protocol state snapshot at block %x: %w", q.snap.blockID, err) } + status := psSnapshot.EpochStatus() // CASE 1: there is no previous epoch - this indicates we are in the first // epoch after a spork root or genesis block diff --git a/state/protocol/badger/state.go b/state/protocol/badger/state.go index 655b14c587f..98bb1bcae40 100644 --- a/state/protocol/badger/state.go +++ b/state/protocol/badger/state.go @@ -15,6 +15,7 @@ import ( statepkg "github.com/onflow/flow-go/state" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/state/protocol/invalid" + "github.com/onflow/flow-go/state/protocol/protocol_state" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/transaction" @@ -35,11 +36,12 @@ type State struct { results storage.ExecutionResults seals storage.Seals epoch struct { - setups storage.EpochSetups - commits storage.EpochCommits - statuses storage.EpochStatuses + setups storage.EpochSetups + commits storage.EpochCommits } - versionBeacons storage.VersionBeacons + protocolStateSnapshotsDB storage.ProtocolState + protocolState protocol.ProtocolState + versionBeacons storage.VersionBeacons // rootHeight marks the cutoff of the history this node knows about. We cache it in the state // because it cannot change over the lifecycle of a protocol state instance. It is frequently @@ -89,7 +91,7 @@ func Bootstrap( qcs storage.QuorumCertificates, setups storage.EpochSetups, commits storage.EpochCommits, - statuses storage.EpochStatuses, + protocolStateSnapshotsDB storage.ProtocolState, versionBeacons storage.VersionBeacons, root protocol.Snapshot, options ...BootstrapConfigOptions, @@ -108,6 +110,7 @@ func Bootstrap( return nil, fmt.Errorf("expected empty database") } + protocolState := protocol_state.NewProtocolState(protocolStateSnapshotsDB, root.Params()) state := newState( metrics, db, @@ -118,7 +121,8 @@ func Bootstrap( qcs, setups, commits, - statuses, + protocolStateSnapshotsDB, + protocolState, versionBeacons, ) @@ -168,7 +172,7 @@ func Bootstrap( } // 4) initialize values related to the epoch logic - err = state.bootstrapEpoch(root.Epochs(), segment, !config.SkipNetworkAddressValidation)(tx) + err = state.bootstrapEpoch(root.Epochs(), !config.SkipNetworkAddressValidation)(tx) if err != nil { return fmt.Errorf("could not bootstrap epoch values: %w", err) } @@ -191,7 +195,13 @@ func Bootstrap( state.metrics.BlockFinalized(block) } - // 7) initialize version beacon + // 7) bootstrap dynamic protocol state + err = state.bootstrapProtocolState(segment, root, protocolStateSnapshotsDB)(tx) + if err != nil { + return fmt.Errorf("could not bootstrap protocol state: %w", err) + } + + // 8) initialize version beacon err = transaction.WithTx(state.boostrapVersionBeacon(root))(tx) if err != nil { return fmt.Errorf("could not bootstrap version beacon: %w", err) @@ -212,6 +222,37 @@ func Bootstrap( return state, nil } +// bootstrapProtocolState bootstraps data structures needed for Dynamic Protocol State. +// It inserts the root protocol state and indexes all blocks in the sealing segment assuming that +// dynamic protocol state didn't change in the sealing segment. +// The root snapshot's sealing segment must not straddle any epoch transitions +// or epoch phase transitions. +func (state *State) bootstrapProtocolState(segment *flow.SealingSegment, root protocol.Snapshot, protocolState storage.ProtocolState) func(tx *transaction.Tx) error { + return func(tx *transaction.Tx) error { + rootProtocolState, err := root.ProtocolState() + if err != nil { + return fmt.Errorf("could not get root protocol state: %w", err) + } + rootProtocolStateEntry := rootProtocolState.Entry().ProtocolStateEntry + protocolStateID := rootProtocolStateEntry.ID() + err = protocolState.StoreTx(protocolStateID, rootProtocolStateEntry)(tx) + if err != nil { + return fmt.Errorf("could not insert root protocol state: %w", err) + } + + // NOTE: as specified in the godoc, this code assumes that each block + // in the sealing segment in within the same phase within the same epoch. + // the sealing segment. + for _, block := range segment.AllBlocks() { + err = protocolState.Index(block.ID(), protocolStateID)(tx) + if err != nil { + return fmt.Errorf("could not index root protocol state: %w", err) + } + } + return nil + } +} + // bootstrapSealingSegment inserts all blocks and associated metadata for the // protocol state root snapshot to disk. func (state *State) bootstrapSealingSegment(segment *flow.SealingSegment, head *flow.Block, rootSeal *flow.Seal) func(tx *transaction.Tx) error { @@ -400,10 +441,8 @@ func (state *State) bootstrapStatePointers(root protocol.Snapshot) func(*badger. // bootstrapEpoch bootstraps the protocol state database with information about // the previous, current, and next epochs as of the root snapshot. -// -// The root snapshot's sealing segment must not straddle any epoch transitions -// or epoch phase transitions. -func (state *State) bootstrapEpoch(epochs protocol.EpochQuery, segment *flow.SealingSegment, verifyNetworkAddress bool) func(*transaction.Tx) error { +// TODO(yuraolex): This information can be bootstrapped from dynamic protocol state. +func (state *State) bootstrapEpoch(epochs protocol.EpochQuery, verifyNetworkAddress bool) func(*transaction.Tx) error { return func(tx *transaction.Tx) error { previous := epochs.Previous() current := epochs.Current() @@ -524,16 +563,6 @@ func (state *State) bootstrapEpoch(epochs protocol.EpochQuery, segment *flow.Sea } } - // NOTE: as specified in the godoc, this code assumes that each block - // in the sealing segment in within the same phase within the same epoch. - for _, block := range segment.AllBlocks() { - blockID := block.ID() - err = state.epoch.statuses.StoreTx(blockID, status)(tx) - if err != nil { - return fmt.Errorf("could not store epoch status for block (id=%x): %w", blockID, err) - } - } - return nil } } @@ -603,7 +632,7 @@ func OpenState( qcs storage.QuorumCertificates, setups storage.EpochSetups, commits storage.EpochCommits, - statuses storage.EpochStatuses, + protocolState storage.ProtocolState, versionBeacons storage.VersionBeacons, ) (*State, error) { isBootstrapped, err := IsBootstrapped(db) @@ -613,6 +642,11 @@ func OpenState( if !isBootstrapped { return nil, fmt.Errorf("expected database to contain bootstrapped state") } + globalParams, err := ReadGlobalParams(db, headers) + if err != nil { + return nil, fmt.Errorf("could not read global params") + } + protocolStateReader := protocol_state.NewProtocolState(protocolState, globalParams) state := newState( metrics, db, @@ -623,7 +657,8 @@ func OpenState( qcs, setups, commits, - statuses, + protocolState, + protocolStateReader, versionBeacons, ) // populate the protocol state cache err = state.populateCache() @@ -655,13 +690,9 @@ func OpenState( } func (state *State) Params() protocol.Params { - globalParams, err := ReadGlobalParams(state) - if err != nil { - panic("failed to read global params: " + err.Error()) - } return Params{ - GlobalParams: globalParams, - state: state, + GlobalParams: state.protocolState.GlobalParams(), + InstanceParams: &InstanceParams{state: state}, } } @@ -736,7 +767,8 @@ func newState( qcs storage.QuorumCertificates, setups storage.EpochSetups, commits storage.EpochCommits, - statuses storage.EpochStatuses, + protocolState storage.ProtocolState, + protocolStateReader protocol.ProtocolState, versionBeacons storage.VersionBeacons, ) *State { return &State{ @@ -748,17 +780,17 @@ func newState( blocks: blocks, qcs: qcs, epoch: struct { - setups storage.EpochSetups - commits storage.EpochCommits - statuses storage.EpochStatuses + setups storage.EpochSetups + commits storage.EpochCommits }{ - setups: setups, - commits: commits, - statuses: statuses, + setups: setups, + commits: commits, }, - versionBeacons: versionBeacons, - cachedFinal: new(atomic.Pointer[cachedHeader]), - cachedSealed: new(atomic.Pointer[cachedHeader]), + protocolStateSnapshotsDB: protocolState, + protocolState: protocolStateReader, + versionBeacons: versionBeacons, + cachedFinal: new(atomic.Pointer[cachedHeader]), + cachedSealed: new(atomic.Pointer[cachedHeader]), } } diff --git a/state/protocol/badger/state_test.go b/state/protocol/badger/state_test.go index c6bcc59854f..bed874de06a 100644 --- a/state/protocol/badger/state_test.go +++ b/state/protocol/badger/state_test.go @@ -72,7 +72,7 @@ func TestBootstrapAndOpen(t *testing.T) { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, ) require.NoError(t, err) @@ -157,7 +157,7 @@ func TestBootstrapAndOpen_EpochCommitted(t *testing.T) { all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, ) require.NoError(t, err) @@ -541,7 +541,7 @@ func bootstrap(t *testing.T, rootSnapshot protocol.Snapshot, f func(*bprotocol.S all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) diff --git a/state/protocol/convert.go b/state/protocol/convert.go index 8f8630b2230..a28811c15f5 100644 --- a/state/protocol/convert.go +++ b/state/protocol/convert.go @@ -3,11 +3,10 @@ package protocol import ( "fmt" - "github.com/onflow/flow-go/module/signature" - "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" + "github.com/onflow/flow-go/module/signature" ) // ToEpochSetup converts an Epoch interface instance to the underlying concrete diff --git a/state/protocol/inmem/convert.go b/state/protocol/inmem/convert.go index 8a615fb2b9a..6ba89b12658 100644 --- a/state/protocol/inmem/convert.go +++ b/state/protocol/inmem/convert.go @@ -82,6 +82,12 @@ func FromSnapshot(from protocol.Snapshot) (*Snapshot, error) { } snap.Params = params.enc + protocolState, err := from.ProtocolState() + if err != nil { + return nil, fmt.Errorf("could not get protocol state: %w", err) + } + snap.ProtocolState = protocolState.Entry().ProtocolStateEntry + // convert version beacon versionBeacon, err := from.VersionBeacon() if err != nil { @@ -325,6 +331,17 @@ func SnapshotFromBootstrapStateWithParams( EpochCommitSafetyThreshold: epochCommitSafetyThreshold, // see protocol.Params for details } + protocolState := &flow.ProtocolStateEntry{ + CurrentEpochEventIDs: flow.EventIDs{ + SetupID: setup.ID(), + CommitID: commit.ID(), + }, + PreviousEpochEventIDs: flow.EventIDs{}, + Identities: flow.DynamicIdentityEntryListFromIdentities(setup.Participants), + InvalidStateTransitionAttempted: false, + NextEpochProtocolState: nil, + } + snap := SnapshotFromEncodable(EncodableSnapshot{ Head: root.Header, Identities: setup.Participants, @@ -341,6 +358,7 @@ func SnapshotFromBootstrapStateWithParams( Phase: flow.EpochPhaseStaking, Epochs: epochs, Params: params, + ProtocolState: protocolState, SealedVersionBeacon: nil, }) return snap, nil diff --git a/state/protocol/inmem/encodable.go b/state/protocol/inmem/encodable.go index 4ab60a6aefe..08b5ec8b5b3 100644 --- a/state/protocol/inmem/encodable.go +++ b/state/protocol/inmem/encodable.go @@ -17,6 +17,7 @@ type EncodableSnapshot struct { Phase flow.EpochPhase Epochs EncodableEpochs Params EncodableParams + ProtocolState *flow.ProtocolStateEntry SealedVersionBeacon *flow.SealedVersionBeacon } diff --git a/state/protocol/inmem/encodable_test.go b/state/protocol/inmem/encodable_test.go index 22459e17b7a..bc9aba73383 100644 --- a/state/protocol/inmem/encodable_test.go +++ b/state/protocol/inmem/encodable_test.go @@ -37,26 +37,3 @@ func TestEncodeDecode(t *testing.T) { decodedResult, decodedSeal := decodedSnapshot.LatestResult, decodedSnapshot.LatestSeal assert.Equal(t, decodedResult.ID(), decodedSeal.ResultID) } - -// TestStrippedEncodeDecode tests that the protocol state snapshot can be encoded to JSON skipping the network address -// and decoded back successfully -func TestStrippedEncodeDecode(t *testing.T) { - participants := unittest.IdentityListFixture(10, unittest.WithAllRoles()) - initialSnapshot := unittest.RootSnapshotFixture(participants) - - // encode the snapshot - strippedSnapshot := inmem.StrippedInmemSnapshot(initialSnapshot.Encodable()) - snapshotJson, err := json.Marshal(strippedSnapshot) - require.NoError(t, err) - // check that the json string does not contain "Address" - require.NotContains(t, snapshotJson, "Address") - - // decode the snapshots - var decodedSnapshot inmem.EncodableSnapshot - err = json.Unmarshal(snapshotJson, &decodedSnapshot) - require.NoError(t, err) - // check that the network addresses for all the identities are still empty - assert.Len(t, decodedSnapshot.Identities.Filter(func(id *flow.Identity) bool { - return id.Address == "" - }), len(participants)) -} diff --git a/state/protocol/inmem/epoch.go b/state/protocol/inmem/epoch.go index 35573c8b961..361a5a30553 100644 --- a/state/protocol/inmem/epoch.go +++ b/state/protocol/inmem/epoch.go @@ -19,6 +19,10 @@ type Epoch struct { var _ protocol.Epoch = (*Epoch)(nil) +func NewEpoch(enc EncodableEpoch) Epoch { + return Epoch{enc} +} + func (e Epoch) Encodable() EncodableEpoch { return e.enc } diff --git a/state/protocol/inmem/snapshot.go b/state/protocol/inmem/snapshot.go index a30c1b0fcad..2aa5659f9ac 100644 --- a/state/protocol/inmem/snapshot.go +++ b/state/protocol/inmem/snapshot.go @@ -1,6 +1,9 @@ package inmem import ( + "errors" + "fmt" + "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" @@ -72,47 +75,72 @@ func (s Snapshot) Encodable() EncodableSnapshot { return s.enc } -func (s Snapshot) VersionBeacon() (*flow.SealedVersionBeacon, error) { - return s.enc.SealedVersionBeacon, nil -} - -func SnapshotFromEncodable(enc EncodableSnapshot) *Snapshot { - return &Snapshot{ - enc: enc, +func (s Snapshot) ProtocolState() (protocol.DynamicProtocolState, error) { + epochs := s.Epochs() + previous := epochs.Previous() + current := epochs.Current() + next := epochs.Next() + var ( + err error + previousEpochSetup, currentEpochSetup, nextEpochSetup *flow.EpochSetup + previousEpochCommit, currentEpochCommit, nextEpochCommit *flow.EpochCommit + ) + + if _, err := previous.Counter(); err == nil { + // if there is a previous epoch, both setup and commit events must exist + previousEpochSetup, err = protocol.ToEpochSetup(previous) + if err != nil { + return nil, fmt.Errorf("could not get previous epoch setup event: %w", err) + } + previousEpochCommit, err = protocol.ToEpochCommit(previous) + if err != nil { + return nil, fmt.Errorf("could not get previous epoch commit event: %w", err) + } } -} -// StrippedInmemSnapshot removes all the networking address in the snapshot -func StrippedInmemSnapshot(snapshot EncodableSnapshot) EncodableSnapshot { - removeAddress := func(ids flow.IdentityList) { - for _, identity := range ids { - identity.Address = "" - } + // insert current epoch - both setup and commit events must exist + currentEpochSetup, err = protocol.ToEpochSetup(current) + if err != nil { + return nil, fmt.Errorf("could not get current epoch setup event: %w", err) + } + currentEpochCommit, err = protocol.ToEpochCommit(current) + if err != nil { + return nil, fmt.Errorf("could not get current epoch commit event: %w", err) } - removeAddressFromEpoch := func(epoch *EncodableEpoch) { - if epoch == nil { - return + if _, err := next.Counter(); err == nil { + // if there is a next epoch, both setup event should exist, but commit event may not + nextEpochSetup, err = protocol.ToEpochSetup(next) + if err != nil { + return nil, fmt.Errorf("could not get next epoch setup event: %w", err) } - removeAddress(epoch.InitialIdentities) - for _, cluster := range epoch.Clustering { - removeAddress(cluster) - } - for _, c := range epoch.Clusters { - removeAddress(c.Members) + nextEpochCommit, err = protocol.ToEpochCommit(next) + if err != nil && !errors.Is(err, protocol.ErrNextEpochNotCommitted) { + return nil, fmt.Errorf("could not get next epoch commit event: %w", err) } } - removeAddress(snapshot.Identities) - removeAddressFromEpoch(snapshot.Epochs.Previous) - removeAddressFromEpoch(&snapshot.Epochs.Current) - removeAddressFromEpoch(snapshot.Epochs.Next) + protocolStateEntry, err := flow.NewRichProtocolStateEntry( + s.enc.ProtocolState, + previousEpochSetup, + previousEpochCommit, + currentEpochSetup, + currentEpochCommit, + nextEpochSetup, + nextEpochCommit) + if err != nil { + return nil, fmt.Errorf("could not create protocol state entry: %w", err) + } - for _, event := range snapshot.LatestResult.ServiceEvents { - switch event.Type { - case flow.ServiceEventSetup: - removeAddress(event.Event.(*flow.EpochSetup).Participants) - } + return NewDynamicProtocolStateAdapter(protocolStateEntry, s.Params()), nil +} + +func (s Snapshot) VersionBeacon() (*flow.SealedVersionBeacon, error) { + return s.enc.SealedVersionBeacon, nil +} + +func SnapshotFromEncodable(enc EncodableSnapshot) *Snapshot { + return &Snapshot{ + enc: enc, } - return snapshot } diff --git a/state/protocol/invalid/snapshot.go b/state/protocol/invalid/snapshot.go index 78ee386ebcb..248efe6142d 100644 --- a/state/protocol/invalid/snapshot.go +++ b/state/protocol/invalid/snapshot.go @@ -76,6 +76,10 @@ func (u *Snapshot) Params() protocol.GlobalParams { return Params{u.err} } +func (u *Snapshot) ProtocolState() (protocol.DynamicProtocolState, error) { + return nil, u.err +} + func (u *Snapshot) VersionBeacon() (*flow.SealedVersionBeacon, error) { return nil, u.err } diff --git a/state/protocol/mock/snapshot.go b/state/protocol/mock/snapshot.go index 95c22c64fb4..434afaec89f 100644 --- a/state/protocol/mock/snapshot.go +++ b/state/protocol/mock/snapshot.go @@ -200,6 +200,32 @@ func (_m *Snapshot) Phase() (flow.EpochPhase, error) { return r0, r1 } +// ProtocolState provides a mock function with given fields: +func (_m *Snapshot) ProtocolState() (protocol.DynamicProtocolState, error) { + ret := _m.Called() + + var r0 protocol.DynamicProtocolState + var r1 error + if rf, ok := ret.Get(0).(func() (protocol.DynamicProtocolState, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() protocol.DynamicProtocolState); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(protocol.DynamicProtocolState) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // QuorumCertificate provides a mock function with given fields: func (_m *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { ret := _m.Called() diff --git a/state/protocol/protocol_state/mutator.go b/state/protocol/protocol_state/mutator.go index c1083d682fd..8db5bebdd44 100644 --- a/state/protocol/protocol_state/mutator.go +++ b/state/protocol/protocol_state/mutator.go @@ -35,7 +35,7 @@ func (m *Mutator) CreateUpdater(candidate *flow.Header) (protocol.StateUpdater, if err != nil { return nil, fmt.Errorf("could not retrieve protocol state for block (%v): %w", candidate.ParentID, err) } - return newUpdater(candidate, parentState), nil + return NewUpdater(candidate, parentState), nil } // CommitProtocolState commits the protocol state updater as part of DB transaction. diff --git a/state/protocol/protocol_state/updater.go b/state/protocol/protocol_state/updater.go index 21a9d80f811..698cf47435a 100644 --- a/state/protocol/protocol_state/updater.go +++ b/state/protocol/protocol_state/updater.go @@ -28,8 +28,8 @@ type Updater struct { var _ protocol.StateUpdater = (*Updater)(nil) -// newUpdater creates a new protocol state updater. -func newUpdater(candidate *flow.Header, parentState *flow.RichProtocolStateEntry) *Updater { +// NewUpdater creates a new protocol state updater. +func NewUpdater(candidate *flow.Header, parentState *flow.RichProtocolStateEntry) *Updater { updater := &Updater{ parentState: parentState, state: parentState.ProtocolStateEntry.Copy(), diff --git a/state/protocol/protocol_state/updater_test.go b/state/protocol/protocol_state/updater_test.go index b5b705e54b9..1a5d243cebf 100644 --- a/state/protocol/protocol_state/updater_test.go +++ b/state/protocol/protocol_state/updater_test.go @@ -32,7 +32,7 @@ func (s *UpdaterSuite) SetupTest() { s.parentBlock = unittest.BlockHeaderFixture(unittest.HeaderWithView(s.parentProtocolState.CurrentEpochSetup.FirstView + 1)) s.candidate = unittest.BlockHeaderWithParentFixture(s.parentBlock) - s.updater = newUpdater(s.candidate, s.parentProtocolState) + s.updater = NewUpdater(s.candidate, s.parentProtocolState) } // TestNewUpdater tests if the constructor correctly setups invariants for updater. @@ -53,7 +53,7 @@ func (s *UpdaterSuite) TestTransitionToNextEpoch() { candidate := unittest.BlockHeaderFixture( unittest.HeaderWithView(s.parentProtocolState.CurrentEpochSetup.FinalView + 1)) // since the candidate block is from next epoch, updater should transition to next epoch - s.updater = newUpdater(candidate, s.parentProtocolState) + s.updater = NewUpdater(candidate, s.parentProtocolState) err := s.updater.TransitionToNextEpoch() require.NoError(s.T(), err) updatedState, _, _ := s.updater.Build() @@ -67,7 +67,7 @@ func (s *UpdaterSuite) TestTransitionToNextEpochNotAllowed() { protocolState := unittest.ProtocolStateFixture() candidate := unittest.BlockHeaderFixture( unittest.HeaderWithView(protocolState.CurrentEpochSetup.FinalView + 1)) - updater := newUpdater(candidate, protocolState) + updater := NewUpdater(candidate, protocolState) err := updater.TransitionToNextEpoch() require.Error(s.T(), err, "should not allow transition to next epoch if there is no next epoch protocol state") }) @@ -78,7 +78,7 @@ func (s *UpdaterSuite) TestTransitionToNextEpochNotAllowed() { }) candidate := unittest.BlockHeaderFixture( unittest.HeaderWithView(protocolState.CurrentEpochSetup.FinalView + 1)) - updater := newUpdater(candidate, protocolState) + updater := NewUpdater(candidate, protocolState) err := updater.TransitionToNextEpoch() require.Error(s.T(), err, "should not allow transition to next epoch if it is not committed") }) @@ -89,7 +89,7 @@ func (s *UpdaterSuite) TestTransitionToNextEpochNotAllowed() { }) candidate := unittest.BlockHeaderFixture( unittest.HeaderWithView(protocolState.CurrentEpochSetup.FinalView + 1)) - updater := newUpdater(candidate, protocolState) + updater := NewUpdater(candidate, protocolState) err := updater.TransitionToNextEpoch() require.Error(s.T(), err, "should not allow transition to next epoch if next block is not first block from next epoch") }) @@ -97,7 +97,7 @@ func (s *UpdaterSuite) TestTransitionToNextEpochNotAllowed() { protocolState := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState()) candidate := unittest.BlockHeaderFixture( unittest.HeaderWithView(protocolState.CurrentEpochSetup.FinalView)) - updater := newUpdater(candidate, protocolState) + updater := NewUpdater(candidate, protocolState) err := updater.TransitionToNextEpoch() require.Error(s.T(), err, "should not allow transition to next epoch if next block is not first block from next epoch") }) @@ -124,7 +124,7 @@ func (s *UpdaterSuite) TestSetInvalidStateTransitionAttempted() { // update protocol state with next epoch information unittest.WithNextEpochProtocolState()(s.parentProtocolState) // create new updater with next epoch information - s.updater = newUpdater(s.candidate, s.parentProtocolState) + s.updater = NewUpdater(s.candidate, s.parentProtocolState) s.updater.SetInvalidStateTransitionAttempted() updatedState, _, hasChanges := s.updater.Build() @@ -152,7 +152,7 @@ func (s *UpdaterSuite) TestProcessEpochCommit() { require.Error(s.T(), err) }) s.Run("invalid state transition attempted", func() { - updater := newUpdater(s.candidate, s.parentProtocolState) + updater := NewUpdater(s.candidate, s.parentProtocolState) setup := unittest.EpochSetupFixture(func(setup *flow.EpochSetup) { setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1 }) @@ -175,7 +175,7 @@ func (s *UpdaterSuite) TestProcessEpochCommit() { "operation must be no-op") }) s.Run("happy path processing", func() { - updater := newUpdater(s.candidate, s.parentProtocolState) + updater := NewUpdater(s.candidate, s.parentProtocolState) setup := unittest.EpochSetupFixture(func(setup *flow.EpochSetup) { setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1 }) @@ -216,7 +216,7 @@ func (s *UpdaterSuite) TestUpdateIdentityUnknownIdentity() { func (s *UpdaterSuite) TestUpdateIdentityHappyPath() { // update protocol state to have next epoch protocol state unittest.WithNextEpochProtocolState()(s.parentProtocolState) - s.updater = newUpdater(s.candidate, s.parentProtocolState) + s.updater = NewUpdater(s.candidate, s.parentProtocolState) currentEpochParticipants := s.parentProtocolState.CurrentEpochSetup.Participants.Copy() weightChanges, err := currentEpochParticipants.Sample(2) @@ -264,7 +264,7 @@ func (s *UpdaterSuite) TestProcessEpochSetupInvariants() { require.Error(s.T(), err) }) s.Run("invalid state transition attempted", func() { - updater := newUpdater(s.candidate, s.parentProtocolState) + updater := NewUpdater(s.candidate, s.parentProtocolState) setup := unittest.EpochSetupFixture(func(setup *flow.EpochSetup) { setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1 }) @@ -276,7 +276,7 @@ func (s *UpdaterSuite) TestProcessEpochSetupInvariants() { require.Nil(s.T(), updatedState.NextEpochProtocolState, "should not process epoch setup if invalid state transition attempted") }) s.Run("processing second epoch setup", func() { - updater := newUpdater(s.candidate, s.parentProtocolState) + updater := NewUpdater(s.candidate, s.parentProtocolState) setup := unittest.EpochSetupFixture(func(setup *flow.EpochSetup) { setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1 }) @@ -441,7 +441,7 @@ func (s *UpdaterSuite) TestEpochSetupAfterIdentityChange() { // now we can use it to construct updater for next block, which will process epoch setup event. nextBlock := unittest.BlockHeaderWithParentFixture(s.candidate) - s.updater = newUpdater(nextBlock, updatedRichProtocolState) + s.updater = NewUpdater(nextBlock, updatedRichProtocolState) setup := unittest.EpochSetupFixture(func(setup *flow.EpochSetup) { setup.Counter = s.parentProtocolState.CurrentEpochSetup.Counter + 1 diff --git a/state/protocol/snapshot.go b/state/protocol/snapshot.go index f557bc59fbc..3dfbd96a79f 100644 --- a/state/protocol/snapshot.go +++ b/state/protocol/snapshot.go @@ -137,6 +137,12 @@ type Snapshot interface { // Returns invalid.Params with state.ErrUnknownSnapshotReference if snapshot reference block is unknown. Params() GlobalParams + // ProtocolState returns the dynamic protocol state that the Head block commits to. The + // compliance layer guarantees that only valid blocks are appended to the protocol state. + // Returns state.ErrUnknownSnapshotReference if snapshot reference block is unknown. + // All other errors should be treated as exceptions. + ProtocolState() (DynamicProtocolState, error) + // VersionBeacon returns the latest sealed version beacon. // If no version beacon has been sealed so far during the current spork, returns nil. // The latest VersionBeacon is only updated for finalized blocks. This means that, when diff --git a/state/protocol/util/testing.go b/state/protocol/util/testing.go index 24eb8016f6f..16bbac99bfe 100644 --- a/state/protocol/util/testing.go +++ b/state/protocol/util/testing.go @@ -77,7 +77,7 @@ func RunWithBootstrapState(t testing.TB, rootSnapshot protocol.Snapshot, f func( all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -103,7 +103,7 @@ func RunWithFullProtocolState(t testing.TB, rootSnapshot protocol.Snapshot, f fu all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -111,7 +111,17 @@ func RunWithFullProtocolState(t testing.TB, rootSnapshot protocol.Snapshot, f fu receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState( + log, + tracer, + consumer, + state, + all.Index, + all.Payloads, + mockTimer, + receiptValidator, + sealValidator, + ) require.NoError(t, err) f(db, fullState) }) @@ -133,7 +143,7 @@ func RunWithFullProtocolStateAndMetrics(t testing.TB, rootSnapshot protocol.Snap all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -141,7 +151,17 @@ func RunWithFullProtocolStateAndMetrics(t testing.TB, rootSnapshot protocol.Snap receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState( + log, + tracer, + consumer, + state, + all.Index, + all.Payloads, + mockTimer, + receiptValidator, + sealValidator, + ) require.NoError(t, err) f(db, fullState) }) @@ -164,14 +184,24 @@ func RunWithFullProtocolStateAndValidator(t testing.TB, rootSnapshot protocol.Sn all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) require.NoError(t, err) sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, validator, sealValidator) + fullState, err := pbadger.NewFullConsensusState( + log, + tracer, + consumer, + state, + all.Index, + all.Payloads, + mockTimer, + validator, + sealValidator, + ) require.NoError(t, err) f(db, fullState) }) @@ -194,13 +224,21 @@ func RunWithFollowerProtocolState(t testing.TB, rootSnapshot protocol.Snapshot, all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) require.NoError(t, err) mockTimer := MockBlockTimer() - followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer) + followerState, err := pbadger.NewFollowerState( + log, + tracer, + consumer, + state, + all.Index, + all.Payloads, + mockTimer, + ) require.NoError(t, err) f(db, followerState) }) @@ -222,7 +260,7 @@ func RunWithFullProtocolStateAndConsumer(t testing.TB, rootSnapshot protocol.Sna all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -230,7 +268,17 @@ func RunWithFullProtocolStateAndConsumer(t testing.TB, rootSnapshot protocol.Sna receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState( + log, + tracer, + consumer, + state, + all.Index, + all.Payloads, + mockTimer, + receiptValidator, + sealValidator, + ) require.NoError(t, err) f(db, fullState) }) @@ -251,7 +299,7 @@ func RunWithFullProtocolStateAndMetricsAndConsumer(t testing.TB, rootSnapshot pr all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) @@ -259,7 +307,17 @@ func RunWithFullProtocolStateAndMetricsAndConsumer(t testing.TB, rootSnapshot pr receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState( + log, + tracer, + consumer, + state, + all.Index, + all.Payloads, + mockTimer, + receiptValidator, + sealValidator, + ) require.NoError(t, err) f(db, fullState) }) @@ -282,13 +340,21 @@ func RunWithFollowerProtocolStateAndHeaders(t testing.TB, rootSnapshot protocol. all.QuorumCertificates, all.Setups, all.EpochCommits, - all.Statuses, + all.ProtocolState, all.VersionBeacons, rootSnapshot, ) require.NoError(t, err) mockTimer := MockBlockTimer() - followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer) + followerState, err := pbadger.NewFollowerState( + log, + tracer, + consumer, + state, + all.Index, + all.Payloads, + mockTimer, + ) require.NoError(t, err) f(db, followerState, all.Headers, all.Index) }) diff --git a/storage/all.go b/storage/all.go index eb2c9eb0328..0822404309a 100644 --- a/storage/all.go +++ b/storage/all.go @@ -20,5 +20,6 @@ type All struct { TransactionResults TransactionResults Collections Collections Events Events + ProtocolState ProtocolState VersionBeacons VersionBeacons } diff --git a/storage/badger/all.go b/storage/badger/all.go index 58bc45e6848..678bbc7b073 100644 --- a/storage/badger/all.go +++ b/storage/badger/all.go @@ -20,6 +20,7 @@ func InitAll(metrics module.CacheMetrics, db *badger.DB) *storage.All { setups := NewEpochSetups(metrics, db) epochCommits := NewEpochCommits(metrics, db) statuses := NewEpochStatuses(metrics, db) + protocolState := NewProtocolState(metrics, setups, epochCommits, db, DefaultCacheSize) versionBeacons := NewVersionBeacons(db) commits := NewCommits(metrics, db) @@ -40,6 +41,7 @@ func InitAll(metrics module.CacheMetrics, db *badger.DB) *storage.All { Setups: setups, EpochCommits: epochCommits, Statuses: statuses, + ProtocolState: protocolState, VersionBeacons: versionBeacons, Results: results, Receipts: receipts, diff --git a/storage/badger/operation/modifiers.go b/storage/badger/operation/modifiers.go index 3965b5d204c..b8808e2b89b 100644 --- a/storage/badger/operation/modifiers.go +++ b/storage/badger/operation/modifiers.go @@ -21,6 +21,17 @@ func SkipDuplicates(op func(*badger.Txn) error) func(tx *badger.Txn) error { } } +func SkipDuplicatesTx(op func(*transaction.Tx) error) func(tx *transaction.Tx) error { + return func(tx *transaction.Tx) error { + err := op(tx) + if errors.Is(err, storage.ErrAlreadyExists) { + metrics.GetStorageCollector().SkipDuplicate() + return nil + } + return err + } +} + func SkipNonExist(op func(*badger.Txn) error) func(tx *badger.Txn) error { return func(tx *badger.Txn) error { err := op(tx) diff --git a/storage/badger/protocol_state.go b/storage/badger/protocol_state.go index c25514a46f3..ca36af4cd8a 100644 --- a/storage/badger/protocol_state.go +++ b/storage/badger/protocol_state.go @@ -62,17 +62,16 @@ func NewProtocolState(collector module.CacheMetrics, // Expected error returns during normal operations: // - storage.ErrAlreadyExists if an Identity Table with the given id is already stored func (s *ProtocolState) StoreTx(id flow.Identifier, protocolState *flow.ProtocolStateEntry) func(*transaction.Tx) error { - return func(tx *transaction.Tx) error { - if !protocolState.Identities.Sorted(order.IdentifierCanonical) { - return fmt.Errorf("sanity check failed: identities are not sorted") - } - if protocolState.NextEpochProtocolState != nil { - if !protocolState.NextEpochProtocolState.Identities.Sorted(order.IdentifierCanonical) { - return fmt.Errorf("sanity check failed: next epoch identities are not sorted") - } - } - return transaction.WithTx(operation.InsertProtocolState(id, protocolState))(tx) + // front-load sanity checks: + if !protocolState.Identities.Sorted(order.IdentifierCanonical) { + return transaction.Fail(fmt.Errorf("sanity check failed: identities are not sorted")) + } + if protocolState.NextEpochProtocolState != nil && !protocolState.NextEpochProtocolState.Identities.Sorted(order.IdentifierCanonical) { + return transaction.Fail(fmt.Errorf("sanity check failed: next epoch identities are not sorted")) } + + // happy path: return anonymous function, whose future execution (as part of a transaction) will store the protocolState + return transaction.WithTx(operation.InsertProtocolState(id, protocolState)) } // Index indexes the identity table by block ID. diff --git a/storage/badger/transaction/tx.go b/storage/badger/transaction/tx.go index 4235389ad6d..b6a001f7c16 100644 --- a/storage/badger/transaction/tx.go +++ b/storage/badger/transaction/tx.go @@ -6,26 +6,67 @@ import ( ioutils "github.com/onflow/flow-go/utils/io" ) +// Tx wraps a badger transaction and includes and additional slice for callbacks. +// The callbacks are executed after the badger transaction completed _successfully_. +// DESIGN PATTERN +// - DBTxn should never be nil +// - at initialization, `callbacks` is empty +// - While business logic code operates on `DBTxn`, it can append additional callbacks +// via the `OnSucceed` method. This generally happens during the transaction execution. +// +// CAUTION: +// - Tx is stateful (calls to `OnSucceed` change its internal state). +// Therefore, Tx needs to be passed as pointer variable. +// - Do not instantiate Tx outside of this package. Instead, use `Update` or `View` +// functions. +// - Whether a transaction is considered to have succeeded depends only on the return value +// of the outermost function. For example, consider a chain of 3 functions: f3( f2( f1(x))) +// Lets assume f1 fails with an `storage.ErrAlreadyExists` sentinel, which f2 expects and +// therefore discards. f3 could then succeed, i.e. return nil. +// Consequently, the entire list of callbacks is executed, including f1's callback if it +// added one. Callback implementations therefore need to account for this edge case. type Tx struct { DBTxn *dbbadger.Txn callbacks []func() } -// OnSucceed adds a callback to execute after the batch has -// been successfully flushed. -// useful for implementing the cache where we will only cache -// after the batch has been successfully flushed +// OnSucceed adds a callback to execute after the batch has been successfully flushed. +// Useful for implementing the cache where we will only cache after the batch of database +// operations has been successfully applied. +// CAUTION: +// Whether a transaction is considered to have succeeded depends only on the return value +// of the outermost function. For example, consider a chain of 3 functions: f3( f2( f1(x))) +// Lets assume f1 fails with an `storage.ErrAlreadyExists` sentinel, which f2 expects and +// therefore discards. f3 could then succeed, i.e. return nil. +// Consequently, the entire list of callbacks is executed, including f1's callback if it +// added one. Callback implementations therefore need to account for this edge case. func (b *Tx) OnSucceed(callback func()) { b.callbacks = append(b.callbacks, callback) } -// Update creates a badger transaction, passing it to a chain of functions, -// if all succeed. Useful to use callback to update cache in order to ensure data -// in badgerDB and cache are consistent. +// Update creates a badger transaction, passing it to a chain of functions. +// Only if transaction succeeds, we run `callbacks` that were appended during the +// transaction execution. The callbacks are useful update caches in order to reduce +// cache misses. func Update(db *dbbadger.DB, f func(*Tx) error) error { dbTxn := db.NewTransaction(true) - defer dbTxn.Discard() + err := run(f, dbTxn) + dbTxn.Discard() + return err +} + +// View creates a read-only badger transaction, passing it to a chain of functions. +// Only if transaction succeeds, we run `callbacks` that were appended during the +// transaction execution. The callbacks are useful update caches in order to reduce +// cache misses. +func View(db *dbbadger.DB, f func(*Tx) error) error { + dbTxn := db.NewTransaction(false) + err := run(f, dbTxn) + dbTxn.Discard() + return err +} +func run(f func(*Tx) error, dbTxn *dbbadger.Txn) error { tx := &Tx{DBTxn: dbTxn} err := f(tx) if err != nil { @@ -43,6 +84,16 @@ func Update(db *dbbadger.DB, f func(*Tx) error) error { return nil } +// Fail returns an anonymous function, whose future execution returns the error e. This +// is useful for front-loading sanity checks. On the happy path (dominant), this function +// will generally not be used. However, if one of the front-loaded sanity checks fails, +// we include `transaction.Fail(e)` in place of the business logic handling the happy path. +func Fail(e error) func(*Tx) error { + return func(tx *Tx) error { + return e + } +} + // WithTx is useful when transaction is used without adding callback. func WithTx(f func(*dbbadger.Txn) error) func(*Tx) error { return func(tx *Tx) error {