Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migration: Gracefull cancelation #1169

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
Pending
BlockCommitments
Temporary // used temporarily for migrations
SchemaIntermediateState
)

// Key flattens a prefix and series of byte arrays into a single []byte.
Expand Down
18 changes: 10 additions & 8 deletions migration/bucket_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"bytes"
"context"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
Expand Down Expand Up @@ -66,15 +67,16 @@
return m
}

func (m *BucketMigrator) Before() {
func (m *BucketMigrator) Before(_ []byte) error {
m.before()
return nil
}

func (m *BucketMigrator) Migrate(txn db.Transaction, network utils.Network) error {
func (m *BucketMigrator) Migrate(_ context.Context, txn db.Transaction, network utils.Network) ([]byte, error) {
remainingInBatch := m.batchSize
iterator, err := txn.NewIterator()
if err != nil {
return err
return nil, err

Check warning on line 79 in migration/bucket_migrator.go

View check run for this annotation

Codecov / codecov/patch

migration/bucket_migrator.go#L79

Added line #L79 was not covered by tests
}

for iterator.Seek(m.startFrom); iterator.Valid(); iterator.Next() {
Expand All @@ -84,24 +86,24 @@
}

if pass, err := m.keyFilter(key); err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)

Check warning on line 89 in migration/bucket_migrator.go

View check run for this annotation

Codecov / codecov/patch

migration/bucket_migrator.go#L89

Added line #L89 was not covered by tests
} else if pass {
if remainingInBatch == 0 {
m.startFrom = key
return utils.RunAndWrapOnError(iterator.Close, ErrCallWithNewTransaction)
return nil, utils.RunAndWrapOnError(iterator.Close, ErrCallWithNewTransaction)
}

remainingInBatch--
value, err := iterator.Value()
if err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)

Check warning on line 99 in migration/bucket_migrator.go

View check run for this annotation

Codecov / codecov/patch

migration/bucket_migrator.go#L99

Added line #L99 was not covered by tests
}

if err = m.do(txn, key, value, network); err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)

Check warning on line 103 in migration/bucket_migrator.go

View check run for this annotation

Codecov / codecov/patch

migration/bucket_migrator.go#L103

Added line #L103 was not covered by tests
}
}
}

return iterator.Close()
return nil, iterator.Close()
}
15 changes: 10 additions & 5 deletions migration/bucket_migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migration_test

import (
"bytes"
"context"
"errors"
"testing"

Expand Down Expand Up @@ -32,17 +33,20 @@ func TestBucketMover(t *testing.T) {
return txn.Set(sourceBucket.Key(), []byte{44})
}))

mover.Before()
require.NoError(t, mover.Before(nil))
require.True(t, beforeCalled)

err := testDB.Update(func(txn db.Transaction) error {
err := mover.Migrate(txn, utils.Mainnet)
var (
intermediateState []byte
err error
)
err = testDB.Update(func(txn db.Transaction) error {
intermediateState, err = mover.Migrate(context.Background(), txn, utils.Mainnet)
require.ErrorIs(t, err, migration.ErrCallWithNewTransaction)
return nil
})
require.NoError(t, err)
err = testDB.Update(func(txn db.Transaction) error {
err = mover.Migrate(txn, utils.Mainnet)
intermediateState, err = mover.Migrate(context.Background(), txn, utils.Mainnet)
require.NoError(t, err)
return nil
})
Expand Down Expand Up @@ -76,4 +80,5 @@ func TestBucketMover(t *testing.T) {
return nil
})
require.NoError(t, err)
require.Nil(t, intermediateState)
}
115 changes: 78 additions & 37 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -16,27 +17,34 @@
"github.com/NethermindEth/juno/encoder"
"github.com/NethermindEth/juno/utils"
"github.com/bits-and-blooms/bitset"
"github.com/fxamacker/cbor/v2"
"github.com/sourcegraph/conc/pool"
)

type schemaMetadata struct {
Version uint64
IntermediateState []byte
}

type Migration interface {
Before()
Migrate(db.Transaction, utils.Network) error
Before(intermediateState []byte) error
// Migration should return intermediate state whenever it requests new txn or detects cancelled ctx.
Migrate(context.Context, db.Transaction, utils.Network) ([]byte, error)
}

type MigrationFunc func(db.Transaction, utils.Network) error

// Migrate returns f(txn).
func (f MigrationFunc) Migrate(txn db.Transaction, network utils.Network) error {
return f(txn, network)
func (f MigrationFunc) Migrate(_ context.Context, txn db.Transaction, network utils.Network) ([]byte, error) {
return nil, f(txn, network)
}

// Before is a no-op.
func (f MigrationFunc) Before() {}
func (f MigrationFunc) Before(_ []byte) error { return nil }

// migrations contains a set of migrations that can be applied to a database.
// defaultMigrations contains a set of migrations that can be applied to a database.
// After making breaking changes to the DB layout, add new migrations to this list.
var migrations = []Migration{
var defaultMigrations = []Migration{
MigrationFunc(migration0000),
MigrationFunc(relocateContractStorageRootKeys),
MigrationFunc(recalculateBloomFilters),
Expand All @@ -56,9 +64,13 @@

var ErrCallWithNewTransaction = errors.New("call with new transaction")

func MigrateIfNeeded(targetDB db.DB, network utils.Network, log utils.SimpleLogger) error {
func MigrateIfNeeded(ctx context.Context, targetDB db.DB, network utils.Network, log utils.SimpleLogger) error {
return migrateIfNeeded(ctx, targetDB, network, log, defaultMigrations)
}

func migrateIfNeeded(ctx context.Context, targetDB db.DB, network utils.Network, log utils.SimpleLogger, migrations []Migration) error {
/*
Schema version of the targetDB determines which set of migrations need to be applied to the database.
Schema metadata of the targetDB determines which set of migrations need to be applied to the database.
After a migration is successfully executed, which may update the database, the schema version is incremented
by 1 by this loop.

Expand All @@ -73,58 +85,86 @@
new ones. It will be able to do this since the schema version it reads from the database will be
non-zero and that is what we use to initialise the i loop variable.
*/
version, err := SchemaVersion(targetDB)
metadata, err := SchemaMetadata(targetDB)
if err != nil {
return err
}

for i := version; i < uint64(len(migrations)); i++ {
for i := metadata.Version; i < uint64(len(migrations)); i++ {
if err := ctx.Err(); err != nil {
return err
}
log.Infow("Applying database migration", "stage", fmt.Sprintf("%d/%d", i+1, len(migrations)))
migration := migrations[i]
migration.Before()
if err := migration.Before(metadata.IntermediateState); err != nil {
return err
}
for {
var migrationErr error
if dbErr := targetDB.Update(func(txn db.Transaction) error {
migrationErr = migration.Migrate(txn, network)
if migrationErr != nil {
if errors.Is(migrationErr, ErrCallWithNewTransaction) {
return nil // Run the migration again with a new transaction.
metadata.IntermediateState, migrationErr = migration.Migrate(ctx, txn, network)
switch {
case migrationErr == nil || errors.Is(migrationErr, ctx.Err()):
if metadata.IntermediateState == nil {
metadata.Version++
}
return updateSchemaMetadata(txn, metadata)
case errors.Is(migrationErr, ErrCallWithNewTransaction):
return nil // Run migration again with new transaction.
default:
return migrationErr
}

// Migration successful. Bump the version.
var versionBytes [8]byte
binary.BigEndian.PutUint64(versionBytes[:], i+1)
return txn.Set(db.SchemaVersion.Key(), versionBytes[:])
}); dbErr != nil {
return dbErr
} else if migrationErr == nil {
break
} else if !errors.Is(migrationErr, ErrCallWithNewTransaction) {
return migrationErr
}
}
}

return nil
}

func SchemaVersion(targetDB db.DB) (uint64, error) {
version := uint64(0)
// SchemaMetadata retrieves metadata about a database schema from the given database.
func SchemaMetadata(targetDB db.DB) (schemaMetadata, error) {
metadata := schemaMetadata{}
txn, err := targetDB.NewTransaction(false)
if err != nil {
return 0, nil
return metadata, err

Check warning on line 133 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L133

Added line #L133 was not covered by tests
}
err = txn.Get(db.SchemaVersion.Key(), func(bytes []byte) error {
version = binary.BigEndian.Uint64(bytes)
if err := txn.Get(db.SchemaVersion.Key(), func(b []byte) error {
metadata.Version = binary.BigEndian.Uint64(b)
return nil
})
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return 0, utils.RunAndWrapOnError(txn.Discard, err)
}); err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return metadata, utils.RunAndWrapOnError(txn.Discard, err)
}

Check warning on line 140 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L139-L140

Added lines #L139 - L140 were not covered by tests

if err := txn.Get(db.SchemaIntermediateState.Key(), func(b []byte) error {
return cbor.Unmarshal(b, &metadata.IntermediateState)
}); err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return metadata, utils.RunAndWrapOnError(txn.Discard, err)

Check warning on line 145 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L145

Added line #L145 was not covered by tests
}

return version, txn.Discard()
return metadata, txn.Discard()
}

// updateSchemaMetadata updates the schema in given database.
func updateSchemaMetadata(txn db.Transaction, schema schemaMetadata) error {
Exca-DK marked this conversation as resolved.
Show resolved Hide resolved
var (
version [8]byte
state []byte
err error
)
binary.BigEndian.PutUint64(version[:], schema.Version)
state, err = cbor.Marshal(schema.IntermediateState)
if err != nil {
return err
}

Check warning on line 162 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L161-L162

Added lines #L161 - L162 were not covered by tests

if err := txn.Set(db.SchemaVersion.Key(), version[:]); err != nil {
return err
}

Check warning on line 166 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L165-L166

Added lines #L165 - L166 were not covered by tests
return txn.Set(db.SchemaIntermediateState.Key(), state)
}

// migration0000 makes sure the targetDB is empty
Expand Down Expand Up @@ -227,7 +267,7 @@
}
}

func (m *changeTrieNodeEncoding) Before() {
func (m *changeTrieNodeEncoding) Before(_ []byte) error {
m.trieNodeBuckets = map[db.Bucket]*struct {
seekTo []byte
skipLen int
Expand All @@ -245,6 +285,7 @@
skipLen: 1 + felt.Bytes,
},
}
return nil
}

type node struct {
Expand Down Expand Up @@ -314,7 +355,7 @@
return err
}

func (m *changeTrieNodeEncoding) Migrate(txn db.Transaction, _ utils.Network) error {
func (m *changeTrieNodeEncoding) Migrate(_ context.Context, txn db.Transaction, _ utils.Network) ([]byte, error) {
// If we made n a trie.Node, the encoder would fall back to the custom encoding methods.
// We instead define a cutom struct to force the encoder to use the default encoding.
var n node
Expand Down Expand Up @@ -371,15 +412,15 @@

iterator, err := txn.NewIterator()
if err != nil {
return err
return nil, err

Check warning on line 415 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L415

Added line #L415 was not covered by tests
}

for bucket, info := range m.trieNodeBuckets {
if err := migrateF(iterator, bucket, info.seekTo, info.skipLen); err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)

Check warning on line 420 in migration/migration.go

View check run for this annotation

Codecov / codecov/patch

migration/migration.go#L420

Added line #L420 was not covered by tests
}
}
return iterator.Close()
return nil, iterator.Close()
}

// calculateBlockCommitments calculates the txn and event commitments for each block and stores them separately
Expand Down
Loading
Loading