Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed Sep 6, 2023
1 parent bd44ce3 commit 4fe792a
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
failpoint.Inject("TableSinkWorkerFetchFromCache", func() {
err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected"))
})
if err != nil {
return errors.Trace(err)
}
// We have drained all events from the cache, we can return directly.
// No need to get events from the source manager again.
if drained {
performCallback(lowerBound.Prev())
return nil
performCallback(lowerBound.Prev())
if err != nil || drained {
// If drained is true it means we have drained all events from the cache,
// we can return directly instead of get events from the source manager again.
return err
}
}

Expand Down

0 comments on commit 4fe792a

Please sign in to comment.