Skip to content

Commit

Permalink
pump/*: Refine some log in storage/ (pingcap#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed May 23, 2019
1 parent 2aacbe8 commit d8ecc47
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 29 deletions.
23 changes: 9 additions & 14 deletions pump/storage/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func encodeRecord(writer io.Writer, payload []byte) (int, error) {

n, err := writer.Write(header)
if err != nil {
return n, errors.Trace(err)
return n, errors.Annotate(err, "write header failed")
}

n, err = writer.Write(payload)
if err != nil {
return int(headerLength) + n, errors.Trace(err)
return int(headerLength) + n, errors.Annotate(err, "write payload failed")
}

return int(headerLength) + len(payload), nil
Expand Down Expand Up @@ -122,14 +122,12 @@ func (r *Record) isValid() bool {
func newLogFile(fid uint32, name string) (lf *logFile, err error) {
fd, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Trace(err)
}

info, err := fd.Stat()
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotatef(err, "stat file %s failed", name)
}

logReporter := func(bytes int, reason error) {
Expand Down Expand Up @@ -214,8 +212,8 @@ func (lf *logFile) finalize() error {
return errors.Trace(lf.fdatasync())
}

func (lf *logFile) close() {
lf.fd.Close()
func (lf *logFile) close() error {
return lf.fd.Close()
}

// recover scan all the record get the state like maxTS which only saved when the file is finalized
Expand Down Expand Up @@ -286,8 +284,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
record = new(Record)
err = record.readHeader(reader)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotate(err, "read header failed")
}

if record.magic != recordMagic {
Expand All @@ -300,8 +297,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
record.payload = make([]byte, record.length)
_, err = io.ReadFull(reader, record.payload)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotate(err, "read payload failed")
}
} else {
buf := new(bytes.Buffer)
Expand All @@ -314,8 +310,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
}

if !record.isValid() {
err = errors.New("checksum mismatch")
return
return nil, errors.New("checksum mismatch")
}

return
Expand Down
15 changes: 13 additions & 2 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ func (a *Append) GCTS(ts int64) {
}

func (a *Append) doGCTS(ts int64) {
log.Info("start gc", zap.Int64("ts", ts))
irange := &util.Range{
Start: encodeTSKey(0),
Limit: encodeTSKey(ts + 1),
Expand All @@ -571,6 +572,7 @@ func (a *Append) doGCTS(ts int64) {
}

a.vlog.gcTS(ts)
log.Info("finish gc", zap.Int64("ts", ts))
}

// MaxCommitTS implement Storage.MaxCommitTS
Expand Down Expand Up @@ -841,7 +843,7 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error {

vpData, err := a.metadata.Get(encodeTSKey(cbinlog.StartTs), nil)
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "get pointer of P-Binlog(ts: %d) failed", cbinlog.StartTs)
}

err = vp.UnmarshalBinary(vpData)
Expand All @@ -851,7 +853,7 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error {

pvalue, err := a.vlog.readValue(vp)
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "read P-Binlog value failed, vp: %+v", vp)
}

pbinlog := new(pb.Binlog)
Expand Down Expand Up @@ -940,6 +942,15 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
} else {
err = a.feedPreWriteValue(binlog)
if err != nil {
if errors.Cause(err) == leveldb.ErrNotFound {
// In pump-client, a C-binlog should always be sent to the same pump instance as the matching P-binlog.
// But in some older versions of pump-client, writing of C-binlog would fallback to some other instances when the correct one is unavailable.
// When this error occurs, we may assume that the matching P-binlog is on a different pump instance.
// And it would query TiKV for the matching C-binlog. So it should be OK to ignore the error here.
log.Error("Matching P-binlog not found", zap.Int64("commit ts", binlog.CommitTs))
continue
}

errorCount.WithLabelValues("feed_pre_write_value").Add(1.0)
log.Error("feed pre write value failed", zap.Error(err))
iter.Release()
Expand Down
29 changes: 16 additions & 13 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,15 @@ func (vlog *valueLog) close() error {
if vlog.writableOffset() >= finalizeFileSizeAtClose {
err = curFile.finalize()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "finalize file %s failed", curFile.path)
}
}

for _, logFile := range vlog.filesMap {
err = logFile.fd.Close()
err = logFile.close()
if err != nil {
return err
return errors.Annotatef(err, "close %s failed", logFile.path)
}

}

return nil
Expand All @@ -286,16 +285,16 @@ func (vlog *valueLog) close() error {
func (vlog *valueLog) readValue(vp valuePointer) ([]byte, error) {
logFile, err := vlog.getFileRLocked(vp.Fid)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotatef(err, "get file(id: %d) failed", vp.Fid)
}

defer logFile.lock.RUnlock()

record, err := logFile.readRecord(vp.Offset)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotatef(err, "read record at %+v failed", vp)
}

logFile.lock.RUnlock()

return record.payload, nil
}

Expand All @@ -319,7 +318,7 @@ func (vlog *valueLog) write(reqs []*request) error {
if vlog.sync {
err = curFile.fdatasync()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "fdatasync file %s failed", curFile.path)
}
}

Expand All @@ -334,13 +333,13 @@ func (vlog *valueLog) write(reqs []*request) error {
if vlog.writableOffset() > vlog.opt.ValueLogFileSize {
err := curFile.finalize()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "finalize file %s failed", curFile.path)
}

id := atomic.AddUint32(&vlog.maxFid, 1)
curFile, err = vlog.createLogFile(id)
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "create file id %d failed", id)
}
}
return nil
Expand All @@ -360,7 +359,7 @@ func (vlog *valueLog) write(reqs []*request) error {

if writeNow {
if err := toDisk(); err != nil {
return err
return errors.Annotate(err, "write to disk failed")
}
}
}
Expand Down Expand Up @@ -429,7 +428,11 @@ func (vlog *valueLog) gcTS(gcTS int64) {

for _, logFile := range toDeleteFiles {
logFile.lock.Lock()
err := os.Remove(logFile.path)
err := logFile.close()
if err != nil {
log.Error("close file failed", zap.String("path", logFile.path), zap.Error(err))
}
err = os.Remove(logFile.path)
if err != nil {
log.Error("remove file failed", zap.String("path", logFile.path), zap.Error(err))
}
Expand Down

0 comments on commit d8ecc47

Please sign in to comment.