From e6e5dd3d316f5f2001978bd0111ce4be2f616cdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 8 Jun 2022 11:33:57 +0200 Subject: [PATCH] Update Prometheus with async chunk mapper changes. (#2043) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update Prometheus with async chunk mapper changes. Included changes: https://github.com/grafana/mimir-prometheus/pull/131 https://github.com/grafana/mimir-prometheus/pull/247 These result is lower memory usage by chunk mapper. Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- .../tsdb/chunks/chunk_write_queue.go | 108 +++++++++++++-- .../prometheus/tsdb/chunks/queue.go | 127 ++++++++++++++++++ vendor/modules.txt | 4 +- 6 files changed, 227 insertions(+), 19 deletions(-) create mode 100644 vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 79a51159dfb..94b86133191 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951 * [ENHANCEMENT] Upgrade Docker base images to `alpine:3.16.0`. #2028 * [ENHANCEMENT] Store-gateway: Add experimental configuration option for the store-gateway to attempt to pre-populate the file system cache when memory-mapping index-header files. Enabled with `-blocks-storage.bucket-store.index-header.map-populate-enabled=true`. #2019 +* [ENHANCEMENT] Chunk Mapper: reduce memory usage of async chunk mapper. #2043 * [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883 * [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893 * [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933 diff --git a/go.mod b/go.mod index 889656d20f8..4e3338d493e 100644 --- a/go.mod +++ b/go.mod @@ -226,7 +226,7 @@ replace git.apache.org/thrift.git => github.com/apache/thrift v0.0.0-20180902110 replace github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab // Using a fork of Prometheus while we work on querysharding to avoid a dependency on the upstream. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce // Pin hashicorp depencencies since the Prometheus fork, go mod tries to update them. replace github.com/hashicorp/go-immutable-radix => github.com/hashicorp/go-immutable-radix v1.2.0 diff --git a/go.sum b/go.sum index 5defb3660c8..1fc0f08d963 100644 --- a/go.sum +++ b/go.sum @@ -1053,8 +1053,8 @@ github.com/grafana/e2e v0.1.1-0.20220519104354-1db01e4751fe h1:mxrRWDjKtob43xF9n github.com/grafana/e2e v0.1.1-0.20220519104354-1db01e4751fe/go.mod h1:+26VJWpczg2OU3D0537acnHSHzhJORpxOs6F+M27tZo= github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 h1:PgEQkGHR4YimSCEGT5IoswN9gJKZDVskf+he6UClCLw= github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a h1:Pkqac/osviA8l3NuNLORELHtRueAtXKZPenfMcdSjKk= -github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a/go.mod h1:W59JUgfj423JtdkiZLvblAJD4IQeE04y26z0CL7DVKc= +github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce h1:4C+cNC/u97P+ugUpQfpg/PXdPWRG85/u2VYSaqOv2L8= +github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce/go.mod h1:W59JUgfj423JtdkiZLvblAJD4IQeE04y26z0CL7DVKc= github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 h1:uirlL/j72L93RhV4+mkWhjv0cov2I0MIgPOG9rMDr1k= github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go b/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go index 628880b4c4a..071c92c85d0 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go @@ -16,12 +16,25 @@ package chunks import ( "errors" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb/chunkenc" ) +const ( + // Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. + chunkRefMapShrinkThreshold = 1000 + + // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap. + chunkRefMapMinShrinkInterval = 10 * time.Minute + + // Maximum size of segment used by job queue (number of elements). With chunkWriteJob being 64 bytes, + // this will use ~512 KiB for empty queue. + maxChunkQueueSegmentSize = 8192 +) + type chunkWriteJob struct { cutFile bool seriesRef HeadSeriesRef @@ -36,23 +49,30 @@ type chunkWriteJob struct { // Chunks that shall be written get added to the queue, which is consumed asynchronously. // Adding jobs to the queue is non-blocking as long as the queue isn't full. type chunkWriteQueue struct { - jobs chan chunkWriteJob + jobs *writeJobQueue - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it. + chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time. - isRunningMtx sync.Mutex // Protects the isRunning property. - isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. + // isRunningMtx serves two purposes: + // 1. It protects isRunning field. + // 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block + // while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1. + isRunningMtx sync.Mutex + isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. workerWg sync.WaitGroup writeChunk writeChunkF - // Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical + // Keeping separate counters instead of only a single CounterVec to improve the performance of the critical // addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec. adds prometheus.Counter gets prometheus.Counter completed prometheus.Counter + shrink prometheus.Counter } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. @@ -67,14 +87,21 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu []string{"operation"}, ) + segmentSize := size + if segmentSize > maxChunkQueueSegmentSize { + segmentSize = maxChunkQueueSegmentSize + } + q := &chunkWriteQueue{ - jobs: make(chan chunkWriteJob, size), - chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size), - writeChunk: writeChunk, + jobs: newWriteJobQueue(size, segmentSize), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), + chunkRefMapLastShrink: time.Now(), + writeChunk: writeChunk, adds: counters.WithLabelValues("add"), gets: counters.WithLabelValues("get"), completed: counters.WithLabelValues("complete"), + shrink: counters.WithLabelValues("shrink"), } if reg != nil { @@ -90,7 +117,12 @@ func (c *chunkWriteQueue) start() { go func() { defer c.workerWg.Done() - for job := range c.jobs { + for { + job, ok := c.jobs.pop() + if !ok { + return + } + c.processJob(job) } }() @@ -112,6 +144,42 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { delete(c.chunkRefMap, job.ref) c.completed.Inc() + + c.shrinkChunkRefMap() +} + +// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met, +// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method. +// +// We do this because Go runtime doesn't release internal memory used by map after map has been emptied. +// To achieve that we create new map instead and throw the old one away. +func (c *chunkWriteQueue) shrinkChunkRefMap() { + if len(c.chunkRefMap) > 0 { + // Can't shrink it while there is data in it. + return + } + + if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold { + // Not shrinking it because it has not grown to the minimum threshold yet. + return + } + + now := time.Now() + + if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval { + // Not shrinking it because the minimum duration between shrink-events has not passed yet. + return + } + + // Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event. + // We are trying to hit the sweet spot in the trade-off between initializing it to a very small size + // potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially + // resulting in unused allocated memory. + c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) + + c.chunkRefMapPeakSize = 0 + c.chunkRefMapLastShrink = now + c.shrink.Inc() } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { @@ -125,14 +193,26 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { defer c.isRunningMtx.Unlock() if !c.isRunning { - return errors.New("queue is not started") + return errors.New("queue is not running") } c.chunkRefMapMtx.Lock() c.chunkRefMap[job.ref] = job.chk + + // Keep track of the peak usage of c.chunkRefMap. + if len(c.chunkRefMap) > c.chunkRefMapPeakSize { + c.chunkRefMapPeakSize = len(c.chunkRefMap) + } c.chunkRefMapMtx.Unlock() - c.jobs <- job + ok := c.jobs.push(job) + if !ok { + c.chunkRefMapMtx.Lock() + delete(c.chunkRefMap, job.ref) + c.chunkRefMapMtx.Unlock() + + return errors.New("queue is closed") + } return nil } @@ -159,7 +239,7 @@ func (c *chunkWriteQueue) stop() { c.isRunning = false - close(c.jobs) + c.jobs.close() c.workerWg.Wait() } @@ -171,7 +251,7 @@ func (c *chunkWriteQueue) queueIsEmpty() bool { func (c *chunkWriteQueue) queueIsFull() bool { // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh // because one job is currently being processed and blocked in the writer. - return c.queueSize() == cap(c.jobs)+1 + return c.queueSize() == c.jobs.maxSize+1 } func (c *chunkWriteQueue) queueSize() int { diff --git a/vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go b/vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go new file mode 100644 index 00000000000..23b38e7f27b --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go @@ -0,0 +1,127 @@ +package chunks + +import "sync" + +// writeJobQueue is similar to buffered channel of chunkWriteJob, but manages its own buffers +// to avoid using a lot of memory when it's empty. It does that by storing elements into segments +// of equal size (segmentSize). When segment is not used anymore, reference to it are removed, +// so it can be treated as a garbage. +type writeJobQueue struct { + maxSize int + segmentSize int + + mtx sync.Mutex // protects all following variables + pushed, popped *sync.Cond // signalled when something is pushed into the queue or popped from it + first, last *writeJobQueueSegment // pointer to first and last segment, if any + size int // total size of the queue + closed bool // after closing the queue, nothing can be pushed to it +} + +type writeJobQueueSegment struct { + segment []chunkWriteJob + nextRead, nextWrite int // index of next read and next write in this segment. + nextSegment *writeJobQueueSegment // next segment, if any +} + +func newWriteJobQueue(maxSize, segmentSize int) *writeJobQueue { + if maxSize <= 0 || segmentSize <= 0 { + panic("invalid queue") + } + + q := &writeJobQueue{ + maxSize: maxSize, + segmentSize: segmentSize, + } + + q.pushed = sync.NewCond(&q.mtx) + q.popped = sync.NewCond(&q.mtx) + return q +} + +func (q *writeJobQueue) close() { + q.mtx.Lock() + defer q.mtx.Unlock() + + q.closed = true + + // unblock all blocked goroutines + q.pushed.Broadcast() + q.popped.Broadcast() +} + +// push blocks until there is space available in the queue, and then adds job to the queue. +// If queue is closed or gets closed while waiting for space, push returns false. +func (q *writeJobQueue) push(job chunkWriteJob) bool { + q.mtx.Lock() + defer q.mtx.Unlock() + + // wait until queue has more space or is closed + for !q.closed && q.size >= q.maxSize { + q.popped.Wait() + } + + if q.closed { + return false + } + + // Check if this segment has more space for writing, and create new one if not. + if q.last == nil || q.last.nextWrite >= q.segmentSize { + prevLast := q.last + q.last = &writeJobQueueSegment{ + segment: make([]chunkWriteJob, q.segmentSize), + } + + if prevLast != nil { + prevLast.nextSegment = q.last + } + if q.first == nil { + q.first = q.last + } + } + + q.last.segment[q.last.nextWrite] = job + q.last.nextWrite++ + q.size++ + q.pushed.Signal() + return true +} + +// pop returns first job from the queue, and true. +// if queue is empty, pop blocks until there is a job (returns true), or until queue is closed (returns false). +// If queue was already closed, pop first returns all remaining elements from the queue (with true value), and only then returns false. +func (q *writeJobQueue) pop() (chunkWriteJob, bool) { + q.mtx.Lock() + defer q.mtx.Unlock() + + // wait until something is pushed to the queue, or queue is closed. + for q.size == 0 { + if q.closed { + return chunkWriteJob{}, false + } + + q.pushed.Wait() + } + + res := q.first.segment[q.first.nextRead] + q.first.segment[q.first.nextRead] = chunkWriteJob{} // clear just-read element + q.first.nextRead++ + q.size-- + + // If we have read all possible elements from first segment, we can drop it. + if q.first.nextRead >= q.segmentSize { + q.first = q.first.nextSegment + if q.first == nil { + q.last = nil + } + } + + q.popped.Signal() + return res, true +} + +func (q *writeJobQueue) length() int { + q.mtx.Lock() + defer q.mtx.Unlock() + + return q.size +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 01f5d561375..d26d6ec202e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -714,7 +714,7 @@ github.com/prometheus/node_exporter/https github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.8.2-0.20220308163432-03831554a519 => github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a +# github.com/prometheus/prometheus v1.8.2-0.20220308163432-03831554a519 => github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce ## explicit; go 1.16 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1222,7 +1222,7 @@ gopkg.in/yaml.v2 gopkg.in/yaml.v3 # git.apache.org/thrift.git => github.com/apache/thrift v0.0.0-20180902110319-2566ecd5d999 # github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce # github.com/hashicorp/go-immutable-radix => github.com/hashicorp/go-immutable-radix v1.2.0 # github.com/hashicorp/go-hclog => github.com/hashicorp/go-hclog v0.12.2 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167