Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Jun 25, 2022
1 parent c48a520 commit 07ffc99
Show file tree
Hide file tree
Showing 27 changed files with 73 additions and 5 deletions.
2 changes: 2 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ func (bc *Client) fineGrainedBackup(
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
//nolint: errcheck
file.Close()
}
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -953,6 +954,7 @@ func doSendBackup(
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
//nolint: errcheck
file.Close()
}
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
//nolint: errcheck
file.Close()
}
}
Expand Down Expand Up @@ -454,6 +455,7 @@ func (mgr *Mgr) Close() {
mgr.dom.Close()
}
tikv.StoreShuttingDown(1)
//nolint: errcheck
mgr.storage.Close()
}

Expand Down
3 changes: 3 additions & 0 deletions br/pkg/glue/console_glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ func (ops *ConsoleOperations) CreateTable() *Table {
}

func (ops ConsoleOperations) Print(args ...interface{}) {
//nolint: errcheck
fmt.Fprint(ops, args...)
}

func (ops ConsoleOperations) Println(args ...interface{}) {
//nolint: errcheck
fmt.Fprintln(ops, args...)
}

func (ops ConsoleOperations) Printf(format string, args ...interface{}) {
//nolint: errcheck
fmt.Fprintf(ops, format, args...)
}

Expand Down
19 changes: 19 additions & 0 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (DB, error) {
}
cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema)
if err != nil {
//nolint: errcheck
db.Close()
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -549,12 +550,14 @@ func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error
if err != nil {
return false, errors.Trace(err)
}
//nolint: errcheck
defer db.Close()
checkSQL := "SHOW DATABASES WHERE `DATABASE` = ?"
rows, err := db.QueryContext(ctx, checkSQL, cfg.Checkpoint.Schema)
if err != nil {
return false, errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
result := rows.Next()
if err := rows.Err(); err != nil {
Expand Down Expand Up @@ -664,6 +667,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Conf
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer taskStmt.Close()
_, err = taskStmt.ExecContext(ctx, cfg.TaskID, cfg.Mydumper.SourceDir, cfg.TikvImporter.Backend,
cfg.TikvImporter.Addr, cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.PdAddr, cfg.TikvImporter.SortedKVDir,
Expand All @@ -682,6 +686,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Conf
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer stmt.Close()

for _, db := range dbInfo {
Expand Down Expand Up @@ -745,6 +750,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer engineRows.Close()
for engineRows.Next() {
var (
Expand All @@ -769,6 +775,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer chunkRows.Close()
for chunkRows.Next() {
var (
Expand Down Expand Up @@ -831,12 +838,14 @@ func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tab
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer engineStmt.Close()

chunkStmt, err := tx.PrepareContext(c, fmt.Sprintf(ReplaceChunkTemplate, cpdb.schema, CheckpointTableNameChunk))
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer chunkStmt.Close()

for engineID, engine := range checkpoints {
Expand Down Expand Up @@ -883,26 +892,31 @@ func (cpdb *MySQLCheckpointsDB) Update(taskCtx context.Context, checkpointDiffs
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer chunkStmt.Close()
rebaseStmt, e := tx.PrepareContext(c, rebaseQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer rebaseStmt.Close()
tableStatusStmt, e := tx.PrepareContext(c, tableStatusQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer tableStatusStmt.Close()
tableChecksumStmt, e := tx.PrepareContext(c, tableChecksumQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer tableChecksumStmt.Close()
engineStatusStmt, e := tx.PrepareContext(c, engineStatusQuery)
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer engineStatusStmt.Close()
for tableName, cpd := range checkpointDiffs {
if cpd.hasStatus {
Expand Down Expand Up @@ -1408,6 +1422,7 @@ func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
for rows.Next() {
var (
Expand Down Expand Up @@ -1519,6 +1534,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
if e != nil {
return errors.Trace(e)
}
//nolint: errcheck
defer rows.Close()
for rows.Next() {
var dtc DestroyedTableCheckpoint
Expand Down Expand Up @@ -1566,6 +1582,7 @@ func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()

return errors.Trace(sqltocsv.Write(writer, rows))
Expand All @@ -1585,6 +1602,7 @@ func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Write
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()

return errors.Trace(sqltocsv.Write(writer, rows))
Expand Down Expand Up @@ -1616,6 +1634,7 @@ func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()

return errors.Trace(sqltocsv.Write(writer, rows))
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/checkpoints/glue_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint,
return errors.Trace(err)
}
r := rs[0]
//nolint: errcheck
defer r.Close()
req := r.NewChunk(nil)
err = r.Next(ctx, req)
Expand Down Expand Up @@ -247,6 +248,7 @@ func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableChe
for {
err = r.Next(ctx, req)
if err != nil {
//nolint: errcheck
r.Close()
return err
}
Expand All @@ -262,6 +264,7 @@ func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableChe
}
}
}
//nolint: errcheck
r.Close()

// 2. Populate the chunks.
Expand All @@ -277,6 +280,7 @@ func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableChe
for {
err = r.Next(ctx, req)
if err != nil {
//nolint: errcheck
r.Close()
return err
}
Expand Down Expand Up @@ -306,12 +310,14 @@ func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableChe
value.FileMeta.Path = value.Key.Path
value.Checksum = verify.MakeKVChecksum(kvcBytes, kvcKVs, kvcChecksum)
if err := json.Unmarshal(colPerm, &value.ColumnPermutation); err != nil {
//nolint: errcheck
r.Close()
return errors.Trace(err)
}
cp.Engines[engineID].Chunks = append(cp.Engines[engineID].Chunks, value)
}
}
//nolint: errcheck
r.Close()

// 3. Fill in the remaining table info
Expand All @@ -322,6 +328,7 @@ func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableChe
return errors.Trace(err)
}
r = rs[0]
//nolint: errcheck
defer r.Close()
req = r.NewChunk(nil)
err = r.Next(ctx, req)
Expand Down Expand Up @@ -713,6 +720,7 @@ func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName
for {
err = r.Next(ctx, req)
if err != nil {
//nolint: errcheck
r.Close()
return err
}
Expand All @@ -728,6 +736,7 @@ func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName
targetTables = append(targetTables, dtc)
}
}
//nolint: errcheck
r.Close()

if _, e := s.Execute(c, deleteChunkQuery); e != nil {
Expand Down Expand Up @@ -791,6 +800,7 @@ func drainFirstRecordSet(ctx context.Context, rss []sqlexec.RecordSet) ([]chunk.
for {
err := rs.Next(ctx, req)
if err != nil || req.NumRows() == 0 {
//nolint: errcheck
rs.Close()
return rows, err
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (w *withStack) Format(s fmt.State, verb rune) {
switch verb {
case 'v':
if s.Flag('+') {
//nolint: errcheck
fmt.Fprintf(s, "%+v", w.Cause())
w.StackTrace().Format(s, verb)
return
Expand All @@ -118,6 +119,7 @@ func (w *withStack) Format(s fmt.State, verb rune) {
case 's':
_, _ = io.WriteString(s, w.Error())
case 'q':
//nolint: errcheck
fmt.Fprintf(s, "%q", w.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func GetJSON(ctx context.Context, client *http.Client, url string, v interface{}
if err != nil {
return errors.Trace(err)
}

//nolint: errcheck
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type sqlConnSession struct {
}

func (session *sqlConnSession) Close() {
//nolint: errcheck
session.conn.Close()
}

Expand Down Expand Up @@ -127,6 +128,7 @@ func (e *ExternalTiDBGlue) QueryStringsWithLog(ctx context.Context, query string
if err != nil {
return err
}
//nolint: errcheck
defer rows.Close()

colNames, err := rows.Columns()
Expand Down Expand Up @@ -179,6 +181,7 @@ func (e *ExternalTiDBGlue) OwnsSQLExecutor() bool {
}

func (e *ExternalTiDBGlue) Close() {
//nolint: errcheck
e.db.Close()
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile
if err != nil {
return nil, errors.Trace(err)
}
//nolint: errcheck
defer fd.Close()

br := bufio.NewReader(fd)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func SplitLargeFile(
pos = dataFile.FileMeta.FileSize
}
endOffset = pos
//nolint: errcheck
parser.Close()
}
regions = append(regions,
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/tikv/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, a
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer conn.Close()

client := import_sstpb.NewImportSSTClient(conn)
Expand Down Expand Up @@ -176,6 +177,7 @@ func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_ss
if err != nil {
return 0, err
}
//nolint: errcheck
defer conn.Close()

client := debugpb.NewDebugClient(conn)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/pdutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func ResetTS(ctx context.Context, pdAddr string, ts uint64, tlsConf *tls.Config)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusForbidden {
buf := new(bytes.Buffer)
Expand All @@ -80,6 +81,7 @@ func GetPlacementRules(ctx context.Context, pdAddr string, tlsConf *tls.Config)
if err != nil {
return nil, errors.Trace(err)
}
//nolint: errcheck
defer resp.Body.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error)
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
//nolint: errcheck
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (u *bufferedWriter) uploadChunk(ctx context.Context) error {
}

func (u *bufferedWriter) Close(ctx context.Context) error {
//nolint: errcheck
u.buf.Close()
err := u.uploadChunk(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func unescapedKey(text string) ([]byte, error) {

switch n[0] {
case 'x':
//nolint: errcheck
fmt.Sscanf(string(r.Next(2)), "%02x", &c)
buf = append(buf, c)
default:
Expand Down
1 change: 1 addition & 0 deletions br/tests/lightning_checkpoint_parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func genParquetFile(dir, name string, count int) error {
if err != nil {
return err
}
//nolint: errcheck
w.Close()

return nil
Expand Down
Loading

0 comments on commit 07ffc99

Please sign in to comment.