Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: retry if deliver KVs to importer #176

Merged
merged 8 commits into from
May 8, 2019
17 changes: 16 additions & 1 deletion lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,13 @@ func writeToEngine(ctx context.Context, engine *kv.OpenedEngine, totalKVs []kven

var putError error
for _, kvs := range splitIntoDeliveryStreams(totalKVs, maxDeliverBytes) {
putError = stream.Put(kvs)
for i := 0; i < 5; i++ {
putError = stream.Put(kvs)
if putError == nil {
break
}
}
// retry still failed
if putError != nil {
break
}
Expand Down Expand Up @@ -1434,6 +1440,13 @@ func (cr *chunkRestore) deliverLoop(
var channelClosed bool
var dataKVs, indexKVs []kvenc.KvPair

deliverLogger := t.logger.With(
zap.Int32("engineNumber", engineID),
zap.Int("fileIndex", cr.index),
zap.Stringer("path", &cr.chunk.Key),
zap.String("task", "deliver"),
)

for !channelClosed {
var dataChecksum, indexChecksum verify.KVChecksum
var offset, rowID int64
Expand Down Expand Up @@ -1469,9 +1482,11 @@ func (cr *chunkRestore) deliverLoop(
start := time.Now()

if err = writeToEngine(ctx, dataEngine, dataKVs); err != nil {
deliverLogger.Error("write to data engine failed", log.ShortError(err))
return
}
if err = writeToEngine(ctx, indexEngine, indexKVs); err != nil {
deliverLogger.Error("write to index engine failed", log.ShortError(err))
return
}

Expand Down