Skip to content

Commit

Permalink
enhance: Avoid unnecessary compaction (#35148)
Browse files Browse the repository at this point in the history
Estimate the import segment size based on DiskSegmentMaxSize(2G) to
avoid unnecessary compaction after import completed.

issue: #35147

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Aug 6, 2024
1 parent a9352a0 commit 678018d
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 22 deletions.
19 changes: 1 addition & 18 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
Expand Down Expand Up @@ -302,28 +299,14 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) {
}

func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 {
indexInfos := t.meta.indexMeta.GetIndexesForCollection(collectionID, "")

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
collMeta, err := t.handler.GetCollection(ctx, collectionID)
if err != nil {
log.Warn("failed to get collection", zap.Int64("collectionID", collectionID), zap.Error(err))
return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
}

vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema)
fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) {
return t.FieldID, GetIndexType(t.IndexParams)
})
vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool {
if indexType, ok := fieldIndexTypes[field.FieldID]; ok {
return indexparamcheck.IsDiskIndex(indexType)
}
return false
})

allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex)
allDiskIndex := t.meta.indexMeta.AreAllDiskIndex(collectionID, collMeta.Schema)
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
return
}

groups := RegroupImportFiles(job, lacks)
allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema())
groups := RegroupImportFiles(job, lacks, allDiskIndex)
newTasks, err := NewImportTasks(groups, job, c.sm, c.alloc)
if err != nil {
log.Warn("new import tasks failed", zap.Error(err))
Expand Down
11 changes: 9 additions & 2 deletions internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,20 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all
}, nil
}

func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats) [][]*datapb.ImportFileStats {
func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, allDiskIndex bool) [][]*datapb.ImportFileStats {
if len(files) == 0 {
return nil
}

var segmentMaxSize int
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
segmentMaxSize = Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt() * 1024 * 1024
} else {
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
segmentMaxSize = Params.DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024
}
isL0Import := importutilv2.IsL0Import(job.GetOptions())
segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024
if isL0Import {
segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt()
}
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func TestImportUtil_RegroupImportFiles(t *testing.T) {
Vchannels: []string{"v0", "v1", "v2", "v3"},
},
}
groups := RegroupImportFiles(job, files)

groups := RegroupImportFiles(job, files, false)
total := 0
for i, fs := range groups {
sum := lo.SumBy(fs, func(f *datapb.ImportFileStats) int64 {
Expand Down
20 changes: 20 additions & 0 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -926,3 +928,21 @@ func (m *indexMeta) GetUnindexedSegments(collectionID int64, segmentIDs []int64)
}
return lo.Without(segmentIDs, indexed...)
}

func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.CollectionSchema) bool {
indexInfos := m.GetIndexesForCollection(collectionID, "")

vectorFields := typeutil.GetVectorFieldSchemas(schema)
fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) {
return t.FieldID, GetIndexType(t.IndexParams)
})
vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool {
if indexType, ok := fieldIndexTypes[field.FieldID]; ok {
return indexparamcheck.IsDiskIndex(indexType)
}
return false
})

allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex)
return allDiskIndex
}

0 comments on commit 678018d

Please sign in to comment.