From abc8b2e063698ef5c01ee4f7c94d6284721c0fc9 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Tue, 8 Feb 2022 15:46:21 +0000 Subject: [PATCH 1/9] dont waste space on the chunkRefMap Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 628880b4c4..1844102a5b 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -22,6 +22,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) +const chunkRefMapFreeThreshold = 10 + type chunkWriteJob struct { cutFile bool seriesRef HeadSeriesRef @@ -38,8 +40,9 @@ type chunkWriteJob struct { type chunkWriteQueue struct { jobs chan chunkWriteJob - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapPeak int isRunningMtx sync.Mutex // Protects the isRunning property. isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. @@ -69,7 +72,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu q := &chunkWriteQueue{ jobs: make(chan chunkWriteJob, size), - chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), writeChunk: writeChunk, adds: counters.WithLabelValues("add"), @@ -112,6 +115,14 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { delete(c.chunkRefMap, job.ref) c.completed.Inc() + + if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold { + // Re-initialize the chunk ref map to half of the peak size that was in use since the last re-init event. + // By setting it to half of the peak we try to minimize the number of allocations required for a "normal" usage + // while ensuring that if its usage has decreased we shrink it over time. + c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeak/2) + c.chunkRefMapPeak = 0 + } } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { @@ -121,6 +132,8 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { } }() + // c.isRunningMtx serializes the adding of jobs to the c.chunkRefMap, if c.jobs is full then c.addJob() will block + // while holding c.isRunningMtx, this guarantees that c.chunkRefMap won't ever grow beyond the queue size + 1. c.isRunningMtx.Lock() defer c.isRunningMtx.Unlock() @@ -130,6 +143,11 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { c.chunkRefMapMtx.Lock() c.chunkRefMap[job.ref] = job.chk + + // Keep track of the peak usage of c.chunkRefMap. + if len(c.chunkRefMap) > c.chunkRefMapPeak { + c.chunkRefMapPeak = len(c.chunkRefMap) + } c.chunkRefMapMtx.Unlock() c.jobs <- job From e1089e435c78b0b86f843132b2f88383b0de6803 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Tue, 8 Feb 2022 16:00:31 +0000 Subject: [PATCH 2/9] add time factor Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 1844102a5b..3cafa4e087 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -16,13 +16,17 @@ package chunks import ( "errors" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb/chunkenc" ) -const chunkRefMapFreeThreshold = 10 +const ( + chunkRefMapFreeThreshold = 10 + chunkRefMapMinFreeInterval = 10 * time.Minute +) type chunkWriteJob struct { cutFile bool @@ -40,9 +44,10 @@ type chunkWriteJob struct { type chunkWriteQueue struct { jobs chan chunkWriteJob - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk - chunkRefMapPeak int + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapPeak int + chunkRefMapLastFree time.Time isRunningMtx sync.Mutex // Protects the isRunning property. isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. @@ -71,9 +76,10 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu ) q := &chunkWriteQueue{ - jobs: make(chan chunkWriteJob, size), - chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), - writeChunk: writeChunk, + jobs: make(chan chunkWriteJob, size), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), + chunkRefMapLastFree: time.Now(), + writeChunk: writeChunk, adds: counters.WithLabelValues("add"), gets: counters.WithLabelValues("get"), @@ -116,12 +122,13 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { c.completed.Inc() - if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold { + if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold && time.Since(c.chunkRefMapLastFree) > chunkRefMapMinFreeInterval { // Re-initialize the chunk ref map to half of the peak size that was in use since the last re-init event. // By setting it to half of the peak we try to minimize the number of allocations required for a "normal" usage // while ensuring that if its usage has decreased we shrink it over time. c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeak/2) c.chunkRefMapPeak = 0 + c.chunkRefMapLastFree = time.Now() } } From 98ec7a259c88b5475760c87fb7bc82fd11d92616 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 9 Feb 2022 22:26:44 +0000 Subject: [PATCH 3/9] add comments Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 3cafa4e087..b377574a11 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -24,7 +24,11 @@ import ( ) const ( - chunkRefMapFreeThreshold = 10 + // Minimum recorded peak since since the last freeing + // of chunkWriteQueue.chunkrefMap to free it again. + chunkRefMapFreeThreshold = 10 + + // Minimum interval between freeing of chunkWriteQueue.chunkRefMap. chunkRefMapMinFreeInterval = 10 * time.Minute ) From e2184ab78b9db2f1907fe23dd417706d6c3e72a1 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 9 Feb 2022 22:39:38 +0000 Subject: [PATCH 4/9] better readability Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 38 +++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index b377574a11..4b83c24539 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -50,8 +50,8 @@ type chunkWriteQueue struct { chunkRefMapMtx sync.RWMutex chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk - chunkRefMapPeak int - chunkRefMapLastFree time.Time + chunkRefMapPeakSize int // largest size that chunkRefMap has grown to since the last time it got freed. + chunkRefMapLastFree time.Time // when the chunkRefMap has been freed the last time. isRunningMtx sync.Mutex // Protects the isRunning property. isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. @@ -126,14 +126,30 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { c.completed.Inc() - if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold && time.Since(c.chunkRefMapLastFree) > chunkRefMapMinFreeInterval { - // Re-initialize the chunk ref map to half of the peak size that was in use since the last re-init event. - // By setting it to half of the peak we try to minimize the number of allocations required for a "normal" usage - // while ensuring that if its usage has decreased we shrink it over time. - c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeak/2) - c.chunkRefMapPeak = 0 - c.chunkRefMapLastFree = time.Now() + c.freeChunkRefMap() +} + +func (c *chunkWriteQueue) freeChunkRefMap() { + if len(c.chunkRefMap) > 0 { + // Can't free it while there is data in it. + return + } + if c.chunkRefMapPeakSize < chunkRefMapFreeThreshold { + // Not freeing it because it has not grown to the minimum threshold yet. + return + } + if time.Since(c.chunkRefMapLastFree) < chunkRefMapMinFreeInterval { + // Not freeing it because the minimum interval between free-events has not passed yet. + return } + + // Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event. + // By initializing it to half of the peak size since the last re-init event we try to hit the sweet spot in the + // trade-off between initializing it to a very small size potentially resulting in many allocations to re-grow it, + // and initializing it to a large size potentially resulting in unused allocated memory. + c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) + c.chunkRefMapPeakSize = 0 + c.chunkRefMapLastFree = time.Now() } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { @@ -156,8 +172,8 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { c.chunkRefMap[job.ref] = job.chk // Keep track of the peak usage of c.chunkRefMap. - if len(c.chunkRefMap) > c.chunkRefMapPeak { - c.chunkRefMapPeak = len(c.chunkRefMap) + if len(c.chunkRefMap) > c.chunkRefMapPeakSize { + c.chunkRefMapPeakSize = len(c.chunkRefMap) } c.chunkRefMapMtx.Unlock() From fde461e42996284f90812477f2e7e282c2c533a1 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 9 Feb 2022 22:48:29 +0000 Subject: [PATCH 5/9] add instrumentation and more comments Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 4b83c24539..4b3960f820 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -65,6 +65,7 @@ type chunkWriteQueue struct { adds prometheus.Counter gets prometheus.Counter completed prometheus.Counter + free prometheus.Counter } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. @@ -88,6 +89,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu adds: counters.WithLabelValues("add"), gets: counters.WithLabelValues("get"), completed: counters.WithLabelValues("complete"), + free: counters.WithLabelValues("free"), } if reg != nil { @@ -129,17 +131,22 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { c.freeChunkRefMap() } +// freeChunkRefMap checks whether the conditions to free the chunkRefMap are met, +// if so it re-initializes it and frees the memory which it currently uses. +// The chunkRefMapMtx must be held when calling this method. func (c *chunkWriteQueue) freeChunkRefMap() { if len(c.chunkRefMap) > 0 { // Can't free it while there is data in it. return } + if c.chunkRefMapPeakSize < chunkRefMapFreeThreshold { // Not freeing it because it has not grown to the minimum threshold yet. return } + if time.Since(c.chunkRefMapLastFree) < chunkRefMapMinFreeInterval { - // Not freeing it because the minimum interval between free-events has not passed yet. + // Not freeing it because the minimum duration between free-events has not passed yet. return } @@ -150,6 +157,7 @@ func (c *chunkWriteQueue) freeChunkRefMap() { c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) c.chunkRefMapPeakSize = 0 c.chunkRefMapLastFree = time.Now() + c.free.Inc() } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { From eb1c577036905c2ee5bd4191638b2d5deb71f57f Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 9 Feb 2022 22:49:25 +0000 Subject: [PATCH 6/9] formatting Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 4b3960f820..e892589327 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -155,6 +155,7 @@ func (c *chunkWriteQueue) freeChunkRefMap() { // trade-off between initializing it to a very small size potentially resulting in many allocations to re-grow it, // and initializing it to a large size potentially resulting in unused allocated memory. c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) + c.chunkRefMapPeakSize = 0 c.chunkRefMapLastFree = time.Now() c.free.Inc() From 3dbab4e8115008c127061c30cd76624658e51133 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 9 Feb 2022 23:02:01 +0000 Subject: [PATCH 7/9] uppercase comments Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index e892589327..d1c813d68b 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -50,8 +50,8 @@ type chunkWriteQueue struct { chunkRefMapMtx sync.RWMutex chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk - chunkRefMapPeakSize int // largest size that chunkRefMap has grown to since the last time it got freed. - chunkRefMapLastFree time.Time // when the chunkRefMap has been freed the last time. + chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time it got freed. + chunkRefMapLastFree time.Time // When the chunkRefMap has been freed the last time. isRunningMtx sync.Mutex // Protects the isRunning property. isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. From 56e3a4aa922b1934a419a5d1bbdfefee82bfaad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 1 Jun 2022 12:03:41 +0200 Subject: [PATCH 8/9] Address review feedback. Renamed "free" to "shrink" everywhere, updated comments and threshold to 1000. --- tsdb/chunks/chunk_write_queue.go | 77 +++++++++++++++++--------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index d1c813d68b..b7d7cf48ec 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -24,12 +24,11 @@ import ( ) const ( - // Minimum recorded peak since since the last freeing - // of chunkWriteQueue.chunkrefMap to free it again. - chunkRefMapFreeThreshold = 10 + // Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. + chunkRefMapShrinkThreshold = 1000 - // Minimum interval between freeing of chunkWriteQueue.chunkRefMap. - chunkRefMapMinFreeInterval = 10 * time.Minute + // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap. + chunkRefMapMinShrinkInterval = 10 * time.Minute ) type chunkWriteJob struct { @@ -48,24 +47,28 @@ type chunkWriteJob struct { type chunkWriteQueue struct { jobs chan chunkWriteJob - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk - chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time it got freed. - chunkRefMapLastFree time.Time // When the chunkRefMap has been freed the last time. + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it. + chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time. - isRunningMtx sync.Mutex // Protects the isRunning property. - isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. + // isRunningMtx serves two purposes: + // 1. It protects isRunning field. + // 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block + // while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1. + isRunningMtx sync.Mutex + isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. workerWg sync.WaitGroup writeChunk writeChunkF - // Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical + // Keeping separate counters instead of only a single CounterVec to improve the performance of the critical // addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec. adds prometheus.Counter gets prometheus.Counter completed prometheus.Counter - free prometheus.Counter + shrink prometheus.Counter } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. @@ -81,15 +84,15 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu ) q := &chunkWriteQueue{ - jobs: make(chan chunkWriteJob, size), - chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), - chunkRefMapLastFree: time.Now(), - writeChunk: writeChunk, + jobs: make(chan chunkWriteJob, size), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), + chunkRefMapLastShrink: time.Now(), + writeChunk: writeChunk, adds: counters.WithLabelValues("add"), gets: counters.WithLabelValues("get"), completed: counters.WithLabelValues("complete"), - free: counters.WithLabelValues("free"), + shrink: counters.WithLabelValues("shrink"), } if reg != nil { @@ -128,37 +131,41 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { c.completed.Inc() - c.freeChunkRefMap() + c.shrinkChunkRefMap() } -// freeChunkRefMap checks whether the conditions to free the chunkRefMap are met, -// if so it re-initializes it and frees the memory which it currently uses. -// The chunkRefMapMtx must be held when calling this method. -func (c *chunkWriteQueue) freeChunkRefMap() { +// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met, +// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method. +// +// We do this because Go runtime doesn't release internal memory used by map after map has been emptied. +// To achieve that we create new map instead and throw the old one away. +func (c *chunkWriteQueue) shrinkChunkRefMap() { if len(c.chunkRefMap) > 0 { - // Can't free it while there is data in it. + // Can't shrink it while there is data in it. return } - if c.chunkRefMapPeakSize < chunkRefMapFreeThreshold { - // Not freeing it because it has not grown to the minimum threshold yet. + if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold { + // Not shrinking it because it has not grown to the minimum threshold yet. return } - if time.Since(c.chunkRefMapLastFree) < chunkRefMapMinFreeInterval { - // Not freeing it because the minimum duration between free-events has not passed yet. + now := time.Now() + + if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval { + // Not shrinking it because the minimum duration between shrink-events has not passed yet. return } // Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event. - // By initializing it to half of the peak size since the last re-init event we try to hit the sweet spot in the - // trade-off between initializing it to a very small size potentially resulting in many allocations to re-grow it, - // and initializing it to a large size potentially resulting in unused allocated memory. + // We are trying to hit the sweet spot in the trade-off between initializing it to a very small size + // potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially + // resulting in unused allocated memory. c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) c.chunkRefMapPeakSize = 0 - c.chunkRefMapLastFree = time.Now() - c.free.Inc() + c.chunkRefMapLastShrink = now + c.shrink.Inc() } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { @@ -168,13 +175,11 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { } }() - // c.isRunningMtx serializes the adding of jobs to the c.chunkRefMap, if c.jobs is full then c.addJob() will block - // while holding c.isRunningMtx, this guarantees that c.chunkRefMap won't ever grow beyond the queue size + 1. c.isRunningMtx.Lock() defer c.isRunningMtx.Unlock() if !c.isRunning { - return errors.New("queue is not started") + return errors.New("queue is not running") } c.chunkRefMapMtx.Lock() From f1d55064d9815c7c46a065391b4308ed4c0cef7f Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 1 Jun 2022 14:41:55 +0000 Subject: [PATCH 9/9] double space Signed-off-by: Mauro Stettler --- tsdb/chunks/chunk_write_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index b7d7cf48ec..b635da5bc8 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -24,7 +24,7 @@ import ( ) const ( - // Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. + // Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. chunkRefMapShrinkThreshold = 1000 // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap.