Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Fix summary log #191

Merged
merged 5 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (bc *Client) BackupRange(
if err != nil {
summary.CollectFailureUnit(key, err)
} else {
summary.CollectSuccessUnit(key, elapsed)
summary.CollectSuccessUnit(key, 1, elapsed)
}
}()
log.Info("backup started",
Expand Down Expand Up @@ -771,8 +771,8 @@ func (bc *Client) FastChecksum() (bool, error) {
totalBytes += file.TotalBytes
}

summary.CollectSuccessUnit(summary.TotalKV, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, totalBytes)
summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)

if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes {
log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
Expand Down
7 changes: 3 additions & 4 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,7 @@ func (rc *Client) RestoreFiles(
if err == nil {
log.Info("Restore Files",
zap.Int("files", len(files)), zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", elapsed)
} else {
summary.CollectFailureUnit("files", err)
summary.CollectSuccessUnit("files", len(files), elapsed)
}
}()

Expand Down Expand Up @@ -320,9 +318,10 @@ func (rc *Client) RestoreFiles(
}
})
}
for range files {
for i := range files {
err := <-errCh
if err != nil {
summary.CollectFailureUnit(fmt.Sprintf("file:%d", i), err)
rc.cancel()
wg.Wait()
log.Error(
Expand Down
4 changes: 2 additions & 2 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
zap.Error(errIngest))
return errIngest
}
summary.CollectSuccessUnit(summary.TotalKV, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, file.TotalBytes)
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
return nil
}, newImportSSTBackoffer())
Expand Down
62 changes: 28 additions & 34 deletions pkg/summary/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
type LogCollector interface {
SetUnit(unit string)

CollectSuccessUnit(name string, arg interface{})
CollectSuccessUnit(name string, unitCount int, arg interface{})

CollectFailureUnit(name string, reason error)

Expand All @@ -43,27 +43,29 @@ type logFunc func(msg string, fields ...zap.Field)
var collector = newLogCollector(log.Info)

type logCollector struct {
mu sync.Mutex
unit string
unitCount int
successCosts map[string]time.Duration
successData map[string]uint64
failureReasons map[string]error
durations map[string]time.Duration
ints map[string]int
mu sync.Mutex
unit string
successUnitCount int
failureUnitCount int
successCosts map[string]time.Duration
successData map[string]uint64
failureReasons map[string]error
durations map[string]time.Duration
ints map[string]int

log logFunc
}

func newLogCollector(log logFunc) LogCollector {
return &logCollector{
unitCount: 0,
successCosts: make(map[string]time.Duration),
successData: make(map[string]uint64),
failureReasons: make(map[string]error),
durations: make(map[string]time.Duration),
ints: make(map[string]int),
log: log,
successUnitCount: 0,
failureUnitCount: 0,
successCosts: make(map[string]time.Duration),
successData: make(map[string]uint64),
failureReasons: make(map[string]error),
durations: make(map[string]time.Duration),
ints: make(map[string]int),
log: log,
}
}

Expand All @@ -73,15 +75,15 @@ func (tc *logCollector) SetUnit(unit string) {
tc.unit = unit
}

func (tc *logCollector) CollectSuccessUnit(name string, arg interface{}) {
func (tc *logCollector) CollectSuccessUnit(name string, unitCount int, arg interface{}) {
tc.mu.Lock()
defer tc.mu.Unlock()

switch v := arg.(type) {
case time.Duration:
if _, ok := tc.successCosts[name]; !ok {
tc.successCosts[name] = v
tc.unitCount++
tc.successUnitCount += unitCount
} else {
tc.successCosts[name] += v
}
Expand All @@ -99,7 +101,7 @@ func (tc *logCollector) CollectFailureUnit(name string, reason error) {
defer tc.mu.Unlock()
if _, ok := tc.failureReasons[name]; !ok {
tc.failureReasons[name] = reason
tc.unitCount++
tc.failureUnitCount++
}
}

Expand Down Expand Up @@ -129,16 +131,10 @@ func (tc *logCollector) Summary(name string) {
switch tc.unit {
case BackupUnit:
msg = fmt.Sprintf("total backup ranges: %d, total success: %d, total failed: %d",
tc.unitCount, len(tc.successCosts), len(tc.failureReasons))
if len(tc.failureReasons) != 0 {
msg += ", failed ranges"
}
tc.failureUnitCount+tc.successUnitCount, tc.successUnitCount, tc.failureUnitCount)
case RestoreUnit:
msg = fmt.Sprintf("total restore tables: %d, total success: %d, total failed: %d",
tc.unitCount, len(tc.successCosts), len(tc.failureReasons))
if len(tc.failureReasons) != 0 {
msg += ", failed tables"
}
msg = fmt.Sprintf("total restore files: %d, total success: %d, total failed: %d",
tc.failureUnitCount+tc.successUnitCount, tc.successUnitCount, tc.failureUnitCount)
}

logFields := make([]zap.Field, 0, len(tc.durations)+len(tc.ints))
Expand All @@ -150,12 +146,10 @@ func (tc *logCollector) Summary(name string) {
}

if len(tc.failureReasons) != 0 {
names := make([]string, 0, len(tc.failureReasons))
for name := range tc.failureReasons {
names = append(names, name)
for unitName, reason := range tc.failureReasons {
logFields = append(logFields, zap.String("unitName", unitName), zap.Error(reason))
}
logFields = append(logFields, zap.Strings(msg, names))
log.Info(name+" summary", logFields...)
log.Info(name+" Failed summary : "+msg, logFields...)
return
}
totalCost := time.Duration(0)
Expand All @@ -178,7 +172,7 @@ func (tc *logCollector) Summary(name string) {
msg += fmt.Sprintf(", %s: %d", name, data)
}

tc.log(name+" summary: "+msg, logFields...)
tc.log(name+" Success summary: "+msg, logFields...)
}

// SetLogCollector allow pass LogCollector outside
Expand Down
4 changes: 2 additions & 2 deletions pkg/summary/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ func SetUnit(unit string) {
}

// CollectSuccessUnit collects success time costs
func CollectSuccessUnit(name string, arg interface{}) {
collector.CollectSuccessUnit(name, arg)
func CollectSuccessUnit(name string, unitCount int, arg interface{}) {
collector.CollectSuccessUnit(name, unitCount, arg)
}

// CollectFailureUnit collects fail reason
Expand Down
1 change: 0 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if len(files) == 0 {
return errors.New("all files are filtered out from the backup archive, nothing to restore")
}
summary.CollectInt("restore files", len(files))

var newTS uint64
if client.IsIncremental() {
Expand Down