Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

support restore view #417

Merged
merged 19 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type MDDatabaseMeta struct {
Name string
SchemaFile string
Tables []*MDTableMeta
Views []*MDTableMeta
charSet string
}

Expand Down Expand Up @@ -81,6 +82,7 @@ type mdLoaderSetup struct {
loader *MDLoader
dbSchemas []FileInfo
tableSchemas []FileInfo
viewSchemas []FileInfo
tableDatas []FileInfo
dbIndexMap map[string]int
tableIndexMap map[filter.Table]int
Expand Down Expand Up @@ -234,6 +236,17 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
return errors.Errorf("invalid table schema file, duplicated item - %s", fileInfo.FileMeta.Path)
}
}

// setup view schema
for _, fileInfo := range s.viewSchemas {
dbExists, tableExists := s.insertView(fileInfo)
if !dbExists {
return errors.Errorf("invalid table schema file, cannot find db '%s' - %s", fileInfo.TableName.Schema, fileInfo.FileMeta.Path)
} else if !tableExists {
// remove the last `-view.sql` from path as the relate table schema file path
return errors.Errorf("invalid view schema file, miss host table schema for view '%s'", fileInfo.TableName.Name)
}
}
}

// Sql file for restore data
Expand Down Expand Up @@ -302,6 +315,8 @@ func (s *mdLoaderSetup) listFiles(ctx context.Context, store storage.ExternalSto
s.dbSchemas = append(s.dbSchemas, info)
case SourceTypeTableSchema:
s.tableSchemas = append(s.tableSchemas, info)
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
s.tableDatas = append(s.tableDatas, info)
}
Expand Down Expand Up @@ -345,6 +360,10 @@ func (s *mdLoaderSetup) route() error {
dbInfo.count++
knownDBNames[info.TableName.Schema] = dbInfo
}
for _, info := range s.viewSchemas {
dbInfo := knownDBNames[info.TableName.Schema]
dbInfo.count++
}

run := func(arr []FileInfo) error {
for i, info := range arr {
Expand Down Expand Up @@ -376,6 +395,9 @@ func (s *mdLoaderSetup) route() error {
if err := run(s.tableSchemas); err != nil {
return errors.Trace(err)
}
if err := run(s.viewSchemas); err != nil {
return errors.Trace(err)
}
if err := run(s.tableDatas); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -428,6 +450,21 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool
}
}

func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
_, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
meta := &MDTableMeta{
DB: fileInfo.TableName.Schema,
Name: fileInfo.TableName.Name,
SchemaFile: fileInfo,
charSet: s.loader.charSet,
}
dbMeta.Views = append(dbMeta.Views, meta)
}
return dbExists, ok
}

func (l *MDLoader) GetDatabases() []*MDDatabaseMeta {
return l.dbs
}
Expand Down
98 changes: 97 additions & 1 deletion lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,33 @@ func (s *testMydumpLoaderSuite) TestDataNoHostTable(c *C) {
c.Assert(err, ErrorMatches, `invalid data file, miss host table 'tbl' - .*[/\\]?db\.tbl\.sql`)
}

func (s *testMydumpLoaderSuite) TestViewNoHostDB(c *C) {
/*
Path/
notdb-schema-create.sql
db.tbl-schema-view.sql
*/
s.touch(c, "notdb-schema-create.sql")
s.touch(c, "db.tbl-schema-view.sql")

_, err := md.NewMyDumpLoader(context.Background(), s.cfg)
c.Assert(err, ErrorMatches, `invalid table schema file, cannot find db 'db' - .*[/\\]?db\.tbl-schema-view\.sql`)
}

func (s *testMydumpLoaderSuite) TestViewNoHostTable(c *C) {
/*
Path/
db-schema-create.sql
db.tbl-schema-view.sql
*/

s.touch(c, "db-schema-create.sql")
s.touch(c, "db.tbl-schema-view.sql")

_, err := md.NewMyDumpLoader(context.Background(), s.cfg)
c.Assert(err, ErrorMatches, `invalid view schema file, miss host table schema for view 'tbl'`)
}

func (s *testMydumpLoaderSuite) TestDataWithoutSchema(c *C) {
dir := s.sourceDir
p := filepath.Join(dir, "db.tbl.sql")
Expand Down Expand Up @@ -223,7 +250,6 @@ func (s *testMydumpLoaderSuite) TestTablesWithDots(c *C) {
s.touch(c, "db.0002.sql")

// insert some tables with file name structures which we're going to ignore.
s.touch(c, "db.v-schema-view.sql")
s.touch(c, "db.v-schema-trigger.sql")
s.touch(c, "db.v-schema-post.sql")
s.touch(c, "db.sql")
Expand Down Expand Up @@ -263,6 +289,12 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
SchemaPattern: "c*",
TargetSchema: "c",
},
{
SchemaPattern: "e*",
TablePattern: "f*",
TargetSchema: "v",
TargetTable: "vv",
},
}

/*
Expand All @@ -277,10 +309,15 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
a1.s1.1.schema.sql
a1.t2-schema.sql
a1.t2.1.sql
a1.v1-schema.sql
a1.v1-schema-view.sql
c0-schema-create.sql
c0.t3-schema.sql
c0.t3.1.sql
d0-schema-create.sql
e0-schema-create.sql
e0.f0-schema.sql
e0.f0-schema-view.sql
*/

s.touch(c, "a0-schema-create.sql")
Expand All @@ -294,13 +331,19 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
s.touch(c, "a1.s1.1.sql")
s.touch(c, "a1.t2-schema.sql")
s.touch(c, "a1.t2.1.sql")
s.touch(c, "a1.v1-schema.sql")
s.touch(c, "a1.v1-schema-view.sql")

s.touch(c, "c0-schema-create.sql")
s.touch(c, "c0.t3-schema.sql")
s.touch(c, "c0.t3.1.sql")

s.touch(c, "d0-schema-create.sql")

s.touch(c, "e0-schema-create.sql")
s.touch(c, "e0.f0-schema.sql")
s.touch(c, "e0.f0-schema-view.sql")

mdl, err := md.NewMyDumpLoader(context.Background(), s.cfg)
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
Expand All @@ -314,6 +357,19 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1-schema.sql", Type: md.SourceTypeTableSchema}},
DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}},
},
{
DB: "a1",
Name: "v1",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema.sql", Type: md.SourceTypeTableSchema}},
DataFiles: []md.FileInfo{},
},
},
Views: []*md.MDTableMeta{
{
DB: "a1",
Name: "v1",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema-view.sql", Type: md.SourceTypeViewSchema}},
},
},
},
{
Expand Down Expand Up @@ -348,6 +404,25 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
},
},
{
Name: "v",
SchemaFile: "e0-schema-create.sql",
Tables: []*md.MDTableMeta{
{
DB: "v",
Name: "vv",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema.sql", Type: md.SourceTypeTableSchema}},
DataFiles: []md.FileInfo{},
},
},
Views: []*md.MDTableMeta{
{
DB: "v",
Name: "vv",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema-view.sql", Type: md.SourceTypeViewSchema}},
},
},
},
})
}

Expand Down Expand Up @@ -377,6 +452,12 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
Table: "$2",
Type: "table-schema",
},
{
Pattern: `(?i)^(?:[^./]*/)*([a-z0-9]+)/([a-z0-9_]+)-view\.sql$`,
Schema: "$1",
Table: "$2",
Type: "view-schema",
},
{
Pattern: `(?i)^(?:[^./]*/)*([a-z][a-z0-9_]*)/([a-z]+)[0-9]*(?:\.([0-9]+))?\.(sql|csv)$`,
Schema: "$1",
Expand All @@ -398,6 +479,8 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
s.touch(c, "d1/test0.sql")
s.touch(c, "d1/test1.sql")
s.touch(c, "d1/test2.001.sql")
s.touch(c, "d1/v1-table.sql")
s.touch(c, "d1/v1-view.sql")
_ = s.touch(c, "d1/t1-schema-create.sql")
s.touch(c, "d2/schema.sql")
s.touch(c, "d2/abc-table.sql")
Expand Down Expand Up @@ -432,6 +515,19 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
},
},
},
{
DB: "d1",
Name: "v1",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "d1/v1-table.sql", Type: md.SourceTypeTableSchema}},
DataFiles: []md.FileInfo{},
},
},
Views: []*md.MDTableMeta{
{
DB: "d1",
Name: "v1",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "d1/v1-view.sql", Type: md.SourceTypeViewSchema}},
},
},
},
{
Expand Down
16 changes: 12 additions & 4 deletions lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ const (
SourceTypeSQL
SourceTypeCSV
SourceTypeParquet
SourceTypeViewSchema
)

const (
SchemaSchema = "schema-schema"
TableSchema = "table-schema"
ViewSchema = "view-schema"
TypeSQL = "sql"
TypeCSV = "csv"
TypeParquet = "parquet"
Expand Down Expand Up @@ -57,6 +59,8 @@ func parseSourceType(t string) (SourceType, error) {
return SourceTypeParquet, nil
case TypeIgnore:
return SourceTypeIgnore, nil
case ViewSchema:
return SourceTypeViewSchema, nil
default:
return SourceTypeIgnore, errors.Errorf("unknown source type '%s'", t)
}
Expand All @@ -74,6 +78,8 @@ func (s SourceType) String() string {
return TypeSQL
case SourceTypeParquet:
return TypeParquet
case SourceTypeViewSchema:
return ViewSchema
default:
return TypeIgnore
}
Expand Down Expand Up @@ -102,12 +108,14 @@ var (

var (
defaultFileRouteRules = []*config.FileRouteRule{
// ignore *-schema-view.sql,-schema-trigger.sql,-schema-post.sql files
{Pattern: `(?i).*(-schema-view|-schema-trigger|-schema-post)\.sql`, Type: "ignore"},
// ignore *-schema-trigger.sql, *-schema-post.sql files
{Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql$`, Type: "ignore"},
// db schema create file pattern, matches files like '{schema}-schema-create.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql`, Schema: "$1", Table: "", Type: SchemaSchema},
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$`, Schema: "$1", Table: "", Type: SchemaSchema},
// table schema create file pattern, matches files like '{schema}.{table}-schema.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql`, Schema: "$1", Table: "$2", Type: TableSchema},
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$`, Schema: "$1", Table: "$2", Type: TableSchema},
// view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql$`, Schema: "$1", Table: "$2", Type: ViewSchema},
// source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3"},
}
Expand Down
4 changes: 3 additions & 1 deletion lightning/mydump/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func (t *testFileRouterSuite) TestMultiRouteRule(c *C) {
// multi rule don't intersect with each other
rules := []*config.FileRouteRule{
{Pattern: `(?:[^/]*/)*([^/.]+)-schema-create\.sql`, Schema: "$1", Type: SchemaSchema},
{Pattern: `(?:[^/]*/)*([^/.]+)\.([^/.]+)-schema\.sql`, Schema: "$1", Table: "$2", Type: TableSchema},
{Pattern: `(?:[^/]*/)*([^/.]+)\.([^/.]+)-schema\.sql$`, Schema: "$1", Table: "$2", Type: TableSchema},
{Pattern: `(?:[^/]*/)*([^/.]+)\.([^/.]+)-schema-view\.sql$`, Schema: "$1", Table: "$2", Type: ViewSchema},
{Pattern: `^(?:[^/]*/)*(?P<schema>[^/.]+)\.(?P<table>[^./]+)(?:\.(?P<key>[0-9]+))?\.(?P<type>csv|sql)(?:\.(?P<cp>[A-Za-z0-9]+))?$`, Schema: "$schema", Table: "$table", Type: "$type", Key: "$key", Compression: "$cp"},
}

Expand All @@ -139,6 +140,7 @@ func (t *testFileRouterSuite) TestMultiRouteRule(c *C) {
inputOutputMap := map[string][]string{
"test-schema-create.sql": {"test", "", "", "", SchemaSchema},
"test.t-schema.sql": {"test", "t", "", "", TableSchema},
"test.v1-schema-view.sql": {"test", "v1", "", "", ViewSchema},
"my_schema.my_table.sql": {"my_schema", "my_table", "", "", "sql"},
"/test/123/my_schema.my_table.sql": {"my_schema", "my_table", "", "", "sql"},
"my_dir/my_schema.my_table.csv": {"my_schema", "my_table", "", "", "csv"},
Expand Down
25 changes: 25 additions & 0 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,24 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error {
return errors.Annotatef(err, "restore table schema %s failed", dbMeta.Name)
}
}

// restore views. Since views can cross database we must restore views after all table schemas are restored.
for _, dbMeta := range rc.dbMetas {
if len(dbMeta.Views) > 0 {
task := log.With(zap.String("db", dbMeta.Name)).Begin(zap.InfoLevel, "restore view schema")
viewsSchema := make(map[string]string)
for _, viewMeta := range dbMeta.Views {
viewsSchema[viewMeta.Name] = viewMeta.GetSchema(ctx, rc.store)
}
err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, viewsSchema)

task.End(zap.ErrorLevel, err)
if err != nil {
return errors.Annotatef(err, "restore view schema %s failed", dbMeta.Name)
}
}

}
}
getTableFunc := rc.backend.FetchRemoteTableModels
if !rc.tidbGlue.OwnsSQLExecutor() {
Expand Down Expand Up @@ -1153,6 +1171,13 @@ func (t *TableRestore) importEngine(
}

func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint) error {
// there are no data in this table, no need to do post process
// this is important for tables that are just the dump table of views
// because at this stage, the table was already deleted and replaced by the related view
if len(cp.Engines) == 1 {
return nil
}

// 3. alter table set auto_increment
if cp.Status < CheckpointStatusAlteredAutoInc {
rc.alterTableLock.Lock()
Expand Down
Loading