-
Notifications
You must be signed in to change notification settings - Fork 0
/
migrator.go
148 lines (134 loc) · 4.17 KB
/
migrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main
import (
"database/sql"
"fmt"
"time"
"github.com/golang-migrate/migrate"
"github.com/golang-migrate/migrate/database"
"github.com/golang-migrate/migrate/database/mysql"
_ "github.com/golang-migrate/migrate/source/file"
)
// Migrator module for migrating data schemas
type Migrator struct {
db *sql.DB
driver database.Driver
instance *migrate.Migrate
options *MigratorConnectionOptions
}
// MigratorConnectionOptions for initialising the Migrator module
type MigratorConnectionOptions struct {
Host string
Port string
Database string
User string
Password string
ConnectionRetryIntervalMs time.Duration
ConnectionRetryAttempts uint
}
func (migrator *Migrator) run(opts *MigratorConnectionOptions) {
migrator.options = opts
connectionString := migrator.getConnectionString()
migrator.db = migrator.getConnection(connectionString)
migrator.driver = migrator.getDriver()
migrator.instance = migrator.getDatabaseInstance()
migrator.migrateToLatest()
migrator.instance.Close()
logger.Info("[migrator] migration run completed")
}
func (migrator *Migrator) getConnectionString() string {
logger.Infof("[migrator] connecting to database '%v' at <%v:%v> with user '%v'",
migrator.options.Database,
migrator.options.Host,
migrator.options.Port,
migrator.options.User,
)
return fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
migrator.options.User,
migrator.options.Password,
migrator.options.Host,
migrator.options.Port,
migrator.options.Database,
)
}
func (migrator *Migrator) migrateToLatest() {
version, dirty, err := migrator.instance.Version()
if version == 0 && err.Error() == "no migration" {
logger.Info("[migrator] no migrations applied yet")
} else {
logger.Infof("[migrator] migration version: %v (dirty: %v)", version, dirty)
}
if dirty == true {
migrator.rollbackDirtyMigration(version)
}
migrationDone := false
for migrationDone == false {
migrationDone = migrator.migrateUpwards()
}
}
func (migrator *Migrator) rollbackDirtyMigration(version uint) {
logger.Warnf("[migrator] removing dirty migration from version %v", version)
if err := migrator.instance.Force(int(version)); err != nil {
logger.Error(err)
} else if err := migrator.instance.Steps(-1); err != nil {
logger.Error(err)
panic(err)
}
}
func (migrator *Migrator) migrateUpwards() bool {
version, dirty, err := migrator.instance.Version()
if err != nil {
logger.Error(err)
}
if err := migrator.instance.Steps(1); err != nil {
if err.Error() == "file does not exist" {
logger.Infof("[migrator] migration is up-to-date at version: %v (dirty: %v)", version, dirty)
return true
}
logger.Errorf("[migrator] migration upward failed with error: %s", err)
panic(err)
} else if version, dirty, err = migrator.instance.Version(); err != nil {
logger.Error(err)
} else {
logger.Infof("[migrator] migration version now at %v (dirty: %v)", version, dirty)
}
return false
}
func (*Migrator) getConnection(connection string) *sql.DB {
if databaseConnection, err := sql.Open("mysql", connection); err != nil {
logger.Errorf("[migrator] error while creating database connection: %s", err)
panic(err)
} else {
return databaseConnection
}
}
func (migrator *Migrator) getDriver() database.Driver {
var currentTry uint
var driver database.Driver
var err error
for currentTry = 0; currentTry < migrator.options.ConnectionRetryAttempts; currentTry++ {
if driver, err = mysql.WithInstance(migrator.db, &mysql.Config{}); err != nil {
logger.Errorf("[migrator] failed to get driver (current try: %v/%v), error: %s",
currentTry,
migrator.options.ConnectionRetryAttempts,
err,
)
time.Sleep(migrator.options.ConnectionRetryIntervalMs * time.Millisecond)
} else {
return driver
}
}
logger.Errorf("[migrator] error in getting driver: %s", err)
panic(err)
}
func (migrator *Migrator) getDatabaseInstance() *migrate.Migrate {
if instance, err := migrate.NewWithDatabaseInstance(
"file://migrations",
"mysql",
migrator.driver,
); err != nil {
logger.Errorf("[migrator] error while creating migrator: %s", err)
panic(err)
} else {
return instance
}
}