Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Fix summary log #191

Merged
merged 5 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (bc *Client) BackupRange(
if err != nil {
summary.CollectFailureUnit(key, err)
} else {
summary.CollectSuccessUnit(key, elapsed)
summary.CollectSuccessUnit(key, 1, elapsed)
}
}()
log.Info("backup started",
Expand Down Expand Up @@ -770,8 +770,8 @@ func (bc *Client) FastChecksum() (bool, error) {
totalBytes += file.TotalBytes
}

summary.CollectSuccessUnit(summary.TotalKV, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, totalBytes)
summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)

if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes {
log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
Expand Down
7 changes: 3 additions & 4 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,7 @@ func (rc *Client) RestoreFiles(
if err == nil {
log.Info("Restore Files",
zap.Int("files", len(files)), zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", elapsed)
} else {
summary.CollectFailureUnit("files", err)
summary.CollectSuccessUnit("files", len(files), elapsed)
}
}()

Expand Down Expand Up @@ -319,9 +317,10 @@ func (rc *Client) RestoreFiles(
}
})
}
for range files {
for i := range files {
err := <-errCh
if err != nil {
summary.CollectFailureUnit(fmt.Sprintf("file:%d", i), err)
rc.cancel()
wg.Wait()
log.Error(
Expand Down
4 changes: 2 additions & 2 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
zap.Error(errIngest))
return errIngest
}
summary.CollectSuccessUnit(summary.TotalKV, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, file.TotalBytes)
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
return nil
}, newImportSSTBackoffer())
Expand Down
62 changes: 28 additions & 34 deletions pkg/summary/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
type LogCollector interface {
SetUnit(unit string)

CollectSuccessUnit(name string, arg interface{})
CollectSuccessUnit(name string, unitCount int, arg interface{})

CollectFailureUnit(name string, reason error)

Expand All @@ -43,27 +43,29 @@ type logFunc func(msg string, fields ...zap.Field)
var collector = newLogCollector(log.Info)

type logCollector struct {
mu sync.Mutex
unit string
unitCount int
successCosts map[string]time.Duration
successData map[string]uint64
failureReasons map[string]error
durations map[string]time.Duration
ints map[string]int
mu sync.Mutex
unit string
successUnitCount int
failureUnitCount int
successCosts map[string]time.Duration
successData map[string]uint64
failureReasons map[string]error
durations map[string]time.Duration
ints map[string]int

log logFunc
}

func newLogCollector(log logFunc) LogCollector {
return &logCollector{
unitCount: 0,
successCosts: make(map[string]time.Duration),
successData: make(map[string]uint64),
failureReasons: make(map[string]error),
durations: make(map[string]time.Duration),
ints: make(map[string]int),
log: log,
successUnitCount: 0,
failureUnitCount: 0,
successCosts: make(map[string]time.Duration),
successData: make(map[string]uint64),
failureReasons: make(map[string]error),
durations: make(map[string]time.Duration),
ints: make(map[string]int),
log: log,
}
}

Expand All @@ -73,15 +75,15 @@ func (tc *logCollector) SetUnit(unit string) {
tc.unit = unit
}

func (tc *logCollector) CollectSuccessUnit(name string, arg interface{}) {
func (tc *logCollector) CollectSuccessUnit(name string, unitCount int, arg interface{}) {
tc.mu.Lock()
defer tc.mu.Unlock()

switch v := arg.(type) {
case time.Duration:
if _, ok := tc.successCosts[name]; !ok {
tc.successCosts[name] = v
tc.unitCount++
tc.successUnitCount += unitCount
} else {
tc.successCosts[name] += v
}
Expand All @@ -99,7 +101,7 @@ func (tc *logCollector) CollectFailureUnit(name string, reason error) {
defer tc.mu.Unlock()
if _, ok := tc.failureReasons[name]; !ok {
tc.failureReasons[name] = reason
tc.unitCount++
tc.failureUnitCount++
}
}

Expand Down Expand Up @@ -129,16 +131,10 @@ func (tc *logCollector) Summary(name string) {
switch tc.unit {
case BackupUnit:
msg = fmt.Sprintf("total backup ranges: %d, total success: %d, total failed: %d",
tc.unitCount, len(tc.successCosts), len(tc.failureReasons))
if len(tc.failureReasons) != 0 {
msg += ", failed ranges"
}
tc.failureUnitCount+tc.successUnitCount, tc.successUnitCount, tc.failureUnitCount)
case RestoreUnit:
msg = fmt.Sprintf("total restore tables: %d, total success: %d, total failed: %d",
tc.unitCount, len(tc.successCosts), len(tc.failureReasons))
if len(tc.failureReasons) != 0 {
msg += ", failed tables"
}
msg = fmt.Sprintf("total restore files: %d, total success: %d, total failed: %d",
tc.failureUnitCount+tc.successUnitCount, tc.successUnitCount, tc.failureUnitCount)
}

logFields := make([]zap.Field, 0, len(tc.durations)+len(tc.ints))
Expand All @@ -150,12 +146,10 @@ func (tc *logCollector) Summary(name string) {
}

if len(tc.failureReasons) != 0 {
names := make([]string, 0, len(tc.failureReasons))
for name := range tc.failureReasons {
names = append(names, name)
for unitName, reason := range tc.failureReasons {
logFields = append(logFields, zap.String("unitName", unitName), zap.Error(reason))
}
logFields = append(logFields, zap.Strings(msg, names))
log.Info(name+" summary", logFields...)
log.Info(name+" Failed summary : "+msg, logFields...)
return
}
totalCost := time.Duration(0)
Expand All @@ -178,7 +172,7 @@ func (tc *logCollector) Summary(name string) {
msg += fmt.Sprintf(", %s: %d", name, data)
}

tc.log(name+" summary: "+msg, logFields...)
tc.log(name+" Success summary: "+msg, logFields...)
}

// SetLogCollector allow pass LogCollector outside
Expand Down
4 changes: 2 additions & 2 deletions pkg/summary/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ func SetUnit(unit string) {
}

// CollectSuccessUnit collects success time costs
func CollectSuccessUnit(name string, arg interface{}) {
collector.CollectSuccessUnit(name, arg)
func CollectSuccessUnit(name string, unitCount int, arg interface{}) {
collector.CollectSuccessUnit(name, unitCount, arg)
}

// CollectFailureUnit collects fail reason
Expand Down
1 change: 0 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if len(files) == 0 {
return errors.New("all files are filtered out from the backup archive, nothing to restore")
}
summary.CollectInt("restore files", len(files))

var newTS uint64
if client.IsIncremental() {
Expand Down