Skip to content

Commit

Permalink
pump/*: Refine some log in storage/ (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 authored May 20, 2019
1 parent 54c4924 commit 30ad792
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (s *Server) writeBinlog(ctx context.Context, in *binlog.WriteBinlogReq, isF

errHandle:
lossBinlogCacheCounter.Add(1)
log.Errorf("write binlog error %v", err)
log.Errorf("write binlog error %+v", err)
ret.Errmsg = err.Error()
return ret, err
}
Expand Down
23 changes: 9 additions & 14 deletions pump/storage/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func encodeRecord(writer io.Writer, payload []byte) (int, error) {

n, err := writer.Write(header)
if err != nil {
return n, errors.Trace(err)
return n, errors.Annotate(err, "write header failed")
}

n, err = writer.Write(payload)
if err != nil {
return int(headerLength) + n, errors.Trace(err)
return int(headerLength) + n, errors.Annotate(err, "write payload failed")
}

return int(headerLength) + len(payload), nil
Expand Down Expand Up @@ -108,14 +108,12 @@ func (r *Record) isValid() bool {
func newLogFile(fid uint32, name string) (lf *logFile, err error) {
fd, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Trace(err)
}

info, err := fd.Stat()
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotatef(err, "stat file %s failed", name)
}

logReporter := func(bytes int, reason error) {
Expand Down Expand Up @@ -200,8 +198,8 @@ func (lf *logFile) finalize() error {
return errors.Trace(lf.fdatasync())
}

func (lf *logFile) close() {
lf.fd.Close()
func (lf *logFile) close() error {
return lf.fd.Close()
}

// recover scan all the record get the state like maxTS which only saved when the file is finalized
Expand Down Expand Up @@ -272,8 +270,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
record = new(Record)
err = record.readHeader(reader)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotate(err, "read header failed")
}

if record.magic != recordMagic {
Expand All @@ -286,8 +283,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
record.payload = make([]byte, record.length)
_, err = io.ReadFull(reader, record.payload)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotate(err, "read payload failed")
}
} else {
buf := new(bytes.Buffer)
Expand All @@ -300,8 +296,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
}

if !record.isValid() {
err = errors.New("checksum mismatch")
return
return nil, errors.New("checksum mismatch")
}

return
Expand Down
23 changes: 17 additions & 6 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func (a *Append) GCTS(ts int64) {
}

func (a *Append) doGCTS(ts int64) {
log.Info("start gc ts: ", ts)
irange := &util.Range{
Start: encodeTSKey(0),
Limit: encodeTSKey(ts + 1),
Expand All @@ -550,6 +551,7 @@ func (a *Append) doGCTS(ts int64) {
}

a.vlog.gcTS(ts)
log.Info("finish gc ts: ", ts)
}

// MaxCommitTS implement Storage.MaxCommitTS
Expand Down Expand Up @@ -820,23 +822,23 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error {

vpData, err := a.metadata.Get(encodeTSKey(cbinlog.StartTs), nil)
if err != nil {
errors.Trace(err)
return errors.Annotatef(err, "get pointer of P-Binlog(ts: %d) failed", cbinlog.StartTs)
}

err = vp.UnmarshalBinary(vpData)
if err != nil {
errors.Trace(err)
return errors.Trace(err)
}

pvalue, err := a.vlog.readValue(vp)
if err != nil {
errors.Trace(err)
return errors.Annotatef(err, "read P-Binlog value failed, vp: %+v", vp)
}

pbinlog := new(pb.Binlog)
err = pbinlog.Unmarshal(pvalue)
if err != nil {
errors.Trace(err)
return errors.Trace(err)
}

cbinlog.StartTs = pbinlog.StartTs
Expand Down Expand Up @@ -895,7 +897,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte

value, err := a.vlog.readValue(vp)
if err != nil {
log.Error(err)
log.Error(errors.ErrorStack(err))
iter.Release()
errorCount.WithLabelValues("read_value").Add(1.0)
return
Expand All @@ -904,7 +906,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
binlog := new(pb.Binlog)
err = binlog.Unmarshal(value)
if err != nil {
log.Error(err)
log.Error(errors.ErrorStack(err))
iter.Release()
return
}
Expand All @@ -919,6 +921,15 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
} else {
err = a.feedPreWriteValue(binlog)
if err != nil {
if errors.Cause(err) == leveldb.ErrNotFound {
// In pump-client, a C-binlog should always be sent to the same pump instance as the matching P-binlog.
// But in some older versions of pump-client, writing of C-binlog would fallback to some other instances when the correct one is unavailable.
// When this error occurs, we may assume that the matching P-binlog is on a different pump instance.
// And it would query TiKV for the matching C-binlog. So it should be OK to ignore the error here.
log.Error("Matching P-binlog not found", binlog)
continue
}

errorCount.WithLabelValues("feed_pre_write_value").Add(1.0)
log.Error(err)
iter.Release()
Expand Down
29 changes: 16 additions & 13 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,15 @@ func (vlog *valueLog) close() error {
if vlog.writableOffset() >= finalizeFileSizeAtClose {
err = curFile.finalize()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "finalize file %s failed", curFile.path)
}
}

for _, logFile := range vlog.filesMap {
err = logFile.fd.Close()
err = logFile.close()
if err != nil {
return err
return errors.Annotatef(err, "close %s failed", logFile.path)
}

}

return nil
Expand All @@ -272,16 +271,16 @@ func (vlog *valueLog) close() error {
func (vlog *valueLog) readValue(vp valuePointer) ([]byte, error) {
logFile, err := vlog.getFileRLocked(vp.Fid)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotatef(err, "get file(id: %d) failed", vp.Fid)
}

defer logFile.lock.RUnlock()

record, err := logFile.readRecord(vp.Offset)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotatef(err, "read record at %+v failed", vp)
}

logFile.lock.RUnlock()

return record.payload, nil
}

Expand All @@ -305,7 +304,7 @@ func (vlog *valueLog) write(reqs []*request) error {
if vlog.sync {
err = curFile.fdatasync()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "fdatasync file %s failed", curFile.path)
}
}

Expand All @@ -320,13 +319,13 @@ func (vlog *valueLog) write(reqs []*request) error {
if vlog.writableOffset() > vlog.opt.ValueLogFileSize {
err := curFile.finalize()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "finalize file %s failed", curFile.path)
}

id := atomic.AddUint32(&vlog.maxFid, 1)
curFile, err = vlog.createLogFile(id)
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "create file id %d failed", id)
}
}
return nil
Expand All @@ -346,7 +345,7 @@ func (vlog *valueLog) write(reqs []*request) error {

if writeNow {
if err := toDisk(); err != nil {
return err
return errors.Annotate(err, "write to disk failed")
}
}
}
Expand Down Expand Up @@ -415,7 +414,11 @@ func (vlog *valueLog) gcTS(gcTS int64) {

for _, logFile := range toDeleteFiles {
logFile.lock.Lock()
err := os.Remove(logFile.path)
err := logFile.close()
if err != nil {
log.Errorf("close file %s err: %+v", logFile.path, err)
}
err = os.Remove(logFile.path)
if err != nil {
log.Errorf("remove file %s err: %v", logFile.path, err)
}
Expand Down

0 comments on commit 30ad792

Please sign in to comment.