Skip to content

Commit

Permalink
feat: Introduce stats task for import (#35868)
Browse files Browse the repository at this point in the history
This PR introduce stats task for import:
1. Define new `Stats` and `IndexBuilding` states for importJob
2. Add new stats step to the import process: trigger the stats task and
wait for its completion
3. Abort stats task if import job failed

issue: #33744

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Sep 15, 2024
1 parent 5ce99e2 commit a61668c
Show file tree
Hide file tree
Showing 21 changed files with 591 additions and 220 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord/session --filename=mock_worker_manager.go --output=internal/datacoord/session --structname=MockWorkerManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Manager --dir=internal/datacoord --filename=mock_segment_manager.go --output=internal/datacoord --structname=MockManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=StatsJobManager --dir=internal/datacoord --filename=mock_job_manager.go --output=internal/datacoord --structname=MockStatsJobManager --with-expecter --inpackage

generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage
Expand Down
160 changes: 131 additions & 29 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/log"
Expand All @@ -47,6 +48,7 @@ type importChecker struct {
alloc allocator.Allocator
sm Manager
imeta ImportMeta
sjm StatsJobManager

closeOnce sync.Once
closeChan chan struct{}
Expand All @@ -58,6 +60,7 @@ func NewImportChecker(meta *meta,
alloc allocator.Allocator,
sm Manager,
imeta ImportMeta,
sjm StatsJobManager,
) ImportChecker {
return &importChecker{
meta: meta,
Expand All @@ -66,6 +69,7 @@ func NewImportChecker(meta *meta,
alloc: alloc,
sm: sm,
imeta: imeta,
sjm: sjm,
closeChan: make(chan struct{}),
}
}
Expand Down Expand Up @@ -93,8 +97,12 @@ func (c *importChecker) Start() {
c.checkPreImportingJob(job)
case internalpb.ImportJobState_Importing:
c.checkImportingJob(job)
case internalpb.ImportJobState_Stats:
c.checkStatsJob(job)
case internalpb.ImportJobState_IndexBuilding:
c.checkIndexBuildingJob(job)
case internalpb.ImportJobState_Failed:
c.tryFailingTasks(job)
c.checkFailedJob(job)
}
}
case <-ticker2.C:
Expand Down Expand Up @@ -178,6 +186,7 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi
}

func (c *importChecker) checkPendingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForPreImports(job)
if len(lacks) == 0 {
return
Expand All @@ -186,7 +195,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) {

newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc)
if err != nil {
log.Warn("new preimport tasks failed", zap.Error(err))
logger.Warn("new preimport tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
Expand All @@ -199,22 +208,23 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
}
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
if err != nil {
log.Warn("failed to update job state to PreImporting", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
logger.Warn("failed to update job state to PreImporting", zap.Error(err))
}
}

func (c *importChecker) checkPreImportingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForImports(job)
if len(lacks) == 0 {
return
}

requestSize, err := CheckDiskQuota(job, c.meta, c.imeta)
if err != nil {
log.Warn("import failed, disk quota exceeded", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
logger.Warn("import failed, disk quota exceeded", zap.Error(err))
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
logger.Warn("failed to update job state to Failed", zap.Error(err))
}
return
}
Expand All @@ -223,7 +233,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
groups := RegroupImportFiles(job, lacks, allDiskIndex)
newTasks, err := NewImportTasks(groups, job, c.sm, c.alloc)
if err != nil {
log.Warn("new import tasks failed", zap.Error(err))
logger.Warn("new import tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
Expand All @@ -236,68 +246,159 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
}
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize))
if err != nil {
log.Warn("failed to update job state to Importing", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
logger.Warn("failed to update job state to Importing", zap.Error(err))
}
}

func (c *importChecker) checkImportingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()),
zap.Int64("collectionID", job.GetCollectionID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, t := range tasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
return
}
}
err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats))
if err != nil {
log.Warn("failed to update job state to Stats", zap.Error(err))
return
}
log.Info("update import job state to Stats", zap.Int64("jobID", job.GetJobID()))
}

func (c *importChecker) checkStatsJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
updateJobState := func(state internalpb.ImportJobState) {
err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(state))
if err != nil {
logger.Warn("failed to update job state", zap.Error(err))
return
}
logger.Info("update import job state", zap.String("state", state.String()))
}

// Skip stats stage if not enable stats or is l0 import.
if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() || importutilv2.IsL0Import(job.GetOptions()) {
updateJobState(internalpb.ImportJobState_IndexBuilding)
return
}

// Check and trigger stats tasks.
var (
taskCnt = 0
doneCnt = 0
)
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, task := range tasks {
originSegmentIDs := task.(*importTask).GetSegmentIDs()
statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs()
taskCnt += len(originSegmentIDs)
for i, originSegmentID := range originSegmentIDs {
state := c.sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort)
switch state {
case indexpb.JobState_JobStateNone:
err := c.sjm.SubmitStatsTask(originSegmentID, statsSegmentIDs[i], indexpb.StatsSubJob_Sort, false)
if err != nil {
logger.Warn("submit stats task failed", zap.Error(err))
continue
}
log.Info("submit stats task done", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...)
case indexpb.JobState_JobStateInit, indexpb.JobState_JobStateRetry, indexpb.JobState_JobStateInProgress:
logger.Debug("waiting for stats task...", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...)
case indexpb.JobState_JobStateFailed:
updateJobState(internalpb.ImportJobState_Failed)
return
case indexpb.JobState_JobStateFinished:
doneCnt++
}
}
}

// All segments are stats-ed. Update job state to `IndexBuilding`.
if taskCnt == doneCnt {
updateJobState(internalpb.ImportJobState_IndexBuilding)
}
}

segmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
statsSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetStatsSegmentIDs()
})

targetSegmentIDs := statsSegmentIDs
if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
targetSegmentIDs = originSegmentIDs
}

// Verify completion of index building for imported segments.
unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs)
unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), targetSegmentIDs)
if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 && !importutilv2.IsL0Import(job.GetOptions()) {
log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
for _, segmentID := range unindexed {
select {
case getBuildIndexChSingleton() <- segmentID: // accelerate index building:
default:
}
}
logger.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
return
}

unfinished := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool {
// Here, all segment indexes have been successfully built, try unset isImporting flag for all segments.
isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool {
segment := c.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("cannot find segment, may be compacted", zap.Int64("segmentID", segmentID))
logger.Warn("cannot find segment", zap.Int64("segmentID", segmentID))
return false
}
return segment.GetIsImporting()
})

channels, err := c.meta.GetSegmentsChannels(unfinished)
channels, err := c.meta.GetSegmentsChannels(isImportingSegments)
if err != nil {
log.Warn("get segments channels failed", zap.Error(err))
logger.Warn("get segments channels failed", zap.Error(err))
return
}
for _, segmentID := range unfinished {
for _, segmentID := range isImportingSegments {
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
if channelCP == nil {
log.Warn("nil channel checkpoint")
logger.Warn("nil channel checkpoint")
return
}
op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}})
op2 := UpdateDmlPosition(segmentID, channelCP)
op3 := UpdateIsImporting(segmentID, false)
err = c.meta.UpdateSegmentsInfo(op1, op2, op3)
if err != nil {
log.Warn("update import segment failed", zap.Error(err))
logger.Warn("update import segment failed", zap.Error(err))
return
}
}

// all finished, update import job state to `Completed`.
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil {
log.Warn("failed to update job state to Completed", zap.Error(err))
logger.Warn("failed to update job state to Completed", zap.Error(err))
return
}
log.Info("import job completed")
logger.Info("import job completed")
}

func (c *importChecker) checkFailedJob(job ImportJob) {
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
for _, originSegmentID := range originSegmentIDs {
err := c.sjm.DropStatsTask(originSegmentID, indexpb.StatsSubJob_Sort)
if err != nil {
log.Warn("Drop stats task failed", zap.Int64("jobID", job.GetJobID()))
return
}
}
c.tryFailingTasks(job)
}

func (c *importChecker) tryFailingTasks(job ImportJob) {
Expand All @@ -306,8 +407,8 @@ func (c *importChecker) tryFailingTasks(job ImportJob) {
if len(tasks) == 0 {
return
}
log.Warn("Import job has failed, all tasks with the same jobID"+
" will be marked as failed", zap.Int64("jobID", job.GetJobID()))
log.Warn("Import job has failed, all tasks with the same jobID will be marked as failed",
zap.Int64("jobID", job.GetJobID()), zap.String("reason", job.GetReason()))
for _, task := range tasks {
err := c.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed),
UpdateReason(job.GetReason()))
Expand Down Expand Up @@ -364,14 +465,15 @@ func (c *importChecker) checkGC(job ImportJob) {
}
cleanupTime := tsoutil.PhysicalTime(job.GetCleanupTs())
if time.Now().After(cleanupTime) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second)
log.Info("job has reached the GC retention", zap.Int64("jobID", job.GetJobID()),
logger.Info("job has reached the GC retention",
zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention))
tasks := c.imeta.GetTaskBy(WithJob(job.GetJobID()))
shouldRemoveJob := true
for _, task := range tasks {
if job.GetState() == internalpb.ImportJobState_Failed && task.GetType() == ImportTaskType {
if len(task.(*importTask).GetSegmentIDs()) != 0 {
if len(task.(*importTask).GetSegmentIDs()) != 0 || len(task.(*importTask).GetStatsSegmentIDs()) != 0 {
shouldRemoveJob = false
continue
}
Expand All @@ -393,9 +495,9 @@ func (c *importChecker) checkGC(job ImportJob) {
}
err := c.imeta.RemoveJob(job.GetJobID())
if err != nil {
log.Warn("remove import job failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
logger.Warn("remove import job failed", zap.Error(err))
return
}
log.Info("import job removed", zap.Int64("jobID", job.GetJobID()))
logger.Info("import job removed")
}
}
Loading

0 comments on commit a61668c

Please sign in to comment.