diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 628880b4c4..b635da5bc8 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -16,12 +16,21 @@ package chunks import ( "errors" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb/chunkenc" ) +const ( + // Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. + chunkRefMapShrinkThreshold = 1000 + + // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap. + chunkRefMapMinShrinkInterval = 10 * time.Minute +) + type chunkWriteJob struct { cutFile bool seriesRef HeadSeriesRef @@ -38,21 +47,28 @@ type chunkWriteJob struct { type chunkWriteQueue struct { jobs chan chunkWriteJob - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it. + chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time. - isRunningMtx sync.Mutex // Protects the isRunning property. - isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. + // isRunningMtx serves two purposes: + // 1. It protects isRunning field. + // 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block + // while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1. + isRunningMtx sync.Mutex + isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. workerWg sync.WaitGroup writeChunk writeChunkF - // Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical + // Keeping separate counters instead of only a single CounterVec to improve the performance of the critical // addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec. adds prometheus.Counter gets prometheus.Counter completed prometheus.Counter + shrink prometheus.Counter } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. @@ -68,13 +84,15 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu ) q := &chunkWriteQueue{ - jobs: make(chan chunkWriteJob, size), - chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size), - writeChunk: writeChunk, + jobs: make(chan chunkWriteJob, size), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), + chunkRefMapLastShrink: time.Now(), + writeChunk: writeChunk, adds: counters.WithLabelValues("add"), gets: counters.WithLabelValues("get"), completed: counters.WithLabelValues("complete"), + shrink: counters.WithLabelValues("shrink"), } if reg != nil { @@ -112,6 +130,42 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { delete(c.chunkRefMap, job.ref) c.completed.Inc() + + c.shrinkChunkRefMap() +} + +// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met, +// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method. +// +// We do this because Go runtime doesn't release internal memory used by map after map has been emptied. +// To achieve that we create new map instead and throw the old one away. +func (c *chunkWriteQueue) shrinkChunkRefMap() { + if len(c.chunkRefMap) > 0 { + // Can't shrink it while there is data in it. + return + } + + if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold { + // Not shrinking it because it has not grown to the minimum threshold yet. + return + } + + now := time.Now() + + if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval { + // Not shrinking it because the minimum duration between shrink-events has not passed yet. + return + } + + // Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event. + // We are trying to hit the sweet spot in the trade-off between initializing it to a very small size + // potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially + // resulting in unused allocated memory. + c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) + + c.chunkRefMapPeakSize = 0 + c.chunkRefMapLastShrink = now + c.shrink.Inc() } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { @@ -125,11 +179,16 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { defer c.isRunningMtx.Unlock() if !c.isRunning { - return errors.New("queue is not started") + return errors.New("queue is not running") } c.chunkRefMapMtx.Lock() c.chunkRefMap[job.ref] = job.chk + + // Keep track of the peak usage of c.chunkRefMap. + if len(c.chunkRefMap) > c.chunkRefMapPeakSize { + c.chunkRefMapPeakSize = len(c.chunkRefMap) + } c.chunkRefMapMtx.Unlock() c.jobs <- job