Skip to content

Commit

Permalink
enhance: Add metrics to monitor import throughput and imported rows (#…
Browse files Browse the repository at this point in the history
…36519)

issue: #36518

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Sep 28, 2024
1 parent acc9b5a commit 80f25d4
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 2 deletions.
8 changes: 8 additions & 0 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -77,6 +78,11 @@ func NewSyncTask(ctx context.Context,
return nil, err
}

segmentLevel := datapb.SegmentLevel_L1
if insertData == nil && deleteData != nil {
segmentLevel = datapb.SegmentLevel_L0
}

syncPack := &syncmgr.SyncPack{}
syncPack.WithInsertData([]*storage.InsertData{insertData}).
WithDeleteData(deleteData).
Expand All @@ -85,6 +91,8 @@ func NewSyncTask(ctx context.Context,
WithChannelName(vchannel).
WithSegmentID(segmentID).
WithTimeRange(ts, ts).
WithLevel(segmentLevel).
WithDataSource(metrics.BulkinsertDataSourceLabel).
WithBatchSize(int64(insertData.GetRowNum()))

return serializer.EncodeBuffer(ctx, syncPack)
Expand Down
5 changes: 5 additions & 0 deletions internal/flushcommon/syncmgr/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,8 @@ func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask {
t.level = level
return t
}

func (t *SyncTask) WithDataSource(source string) *SyncTask {
t.dataSource = source
return t
}
6 changes: 6 additions & 0 deletions internal/flushcommon/syncmgr/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type SyncPack struct {
startPosition *msgpb.MsgPosition
checkpoint *msgpb.MsgPosition
batchSize int64 // batchSize is the row number of this sync task,not the total num of rows of segemnt
dataSource string
isFlush bool
isDrop bool
// metadata
Expand Down Expand Up @@ -137,3 +138,8 @@ func (p *SyncPack) WithErrorHandler(handler func(err error)) *SyncPack {
p.errHandler = handler
return p
}

func (p *SyncPack) WithDataSource(source string) *SyncPack {
p.dataSource = source
return p
}
1 change: 1 addition & 0 deletions internal/flushcommon/syncmgr/storage_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) {
WithStartPosition(pack.startPosition).
WithCheckpoint(pack.checkpoint).
WithLevel(pack.level).
WithDataSource(pack.dataSource).
WithTimeRange(pack.tsFrom, pack.tsTo).
WithMetaCache(s.metacache).
WithMetaWriter(s.metaWriter).
Expand Down
4 changes: 3 additions & 1 deletion internal/flushcommon/syncmgr/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type SyncTask struct {
pkField *schemapb.FieldSchema
startPosition *msgpb.MsgPosition
checkpoint *msgpb.MsgPosition
dataSource string
// batchSize is the row number of this sync task,
// not the total num of rows of segemnt
batchSize int64
Expand Down Expand Up @@ -169,7 +170,8 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
totalSize += float64(len(t.deltaBlob.Value))
}

metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, t.level.String()).Add(totalSize)
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(totalSize)
metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchSize))

metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds()))

Expand Down
1 change: 1 addition & 0 deletions internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy
WithStartPosition(startPos).
WithTimeRange(tsFrom, tsTo).
WithLevel(segmentInfo.Level()).
WithDataSource(metrics.StreamingDataSourceLabel).
WithCheckpoint(wb.checkpoint).
WithBatchSize(batchSize).
WithErrorHandler(wb.errHandler)
Expand Down
14 changes: 13 additions & 1 deletion pkg/metrics/datanode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,21 @@ var (
Help: "byte size of data flushed to storage",
}, []string{
nodeIDLabelName,
msgTypeLabelName,
dataSourceLabelName,
segmentLevelLabelName,
})

DataNodeFlushedRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flushed_data_rows",
Help: "num of rows flushed to storage",
}, []string{
nodeIDLabelName,
dataSourceLabelName,
})

DataNodeNumProducers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Expand Down Expand Up @@ -246,6 +257,7 @@ func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeFlushBufferCount)
registry.MustRegister(DataNodeFlushReqCounter)
registry.MustRegister(DataNodeFlushedSize)
registry.MustRegister(DataNodeFlushedRows)
// compaction related
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeCompactionLatencyInQueue)
Expand Down
4 changes: 4 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
FlushingSegmentLabel = "Flushing"
DroppedSegmentLabel = "Dropped"

StreamingDataSourceLabel = "streaming"
BulkinsertDataSourceLabel = "bulkinsert"

Leader = "OnLeader"
FromLeader = "FromLeader"

Expand Down Expand Up @@ -101,6 +104,7 @@ const (
cacheNameLabelName = "cache_name"
cacheStateLabelName = "cache_state"
indexCountLabelName = "indexed_field_count"
dataSourceLabelName = "data_source"
requestScope = "scope"
fullMethodLabelName = "full_method"
reduceLevelName = "reduce_level"
Expand Down

0 comments on commit 80f25d4

Please sign in to comment.