Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Raw TCP Client write queueing/buffering refactor #3342

Merged
merged 13 commits into from
Mar 9, 2021

Conversation

vdarulis
Copy link
Collaborator

@vdarulis vdarulis commented Mar 9, 2021

What this PR does / why we need it:

  • Fixes a long-standing bug, where logic in prepareEnqueueBufferWithLock meant it was possible to NOT flush metrics, until there was enough traffic to a shard to reach $flushSize. It caused weird side effects, where metrics are emitted at a very inconsistent interval if the volume was small enough.
  • Discern between protobuf payload size limits and data written into network connection: we want to limit the former, but we should write as much as possible. As a result, batching logic now lives in Writer, not Queue.
  • No more fixed-size buffers and idle goroutines wasting memory between writes
  • Flush to network only on when client requests a flush - this prevents from writing small payloads in the middle of write cycle/between flushes.

Special notes for your reviewer:

Does this PR introduce a user-facing and/or backwards incompatible change?:


Does this PR require updating code package or user-facing documentation?:


@vdarulis vdarulis requested a review from mway March 9, 2021 06:33
@codecov
Copy link

codecov bot commented Mar 9, 2021

Codecov Report

Merging #3342 (b75e29c) into master (ffdce8e) will decrease coverage by 0.0%.
The diff coverage is 82.1%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #3342     +/-   ##
=========================================
- Coverage    72.4%    72.4%   -0.1%     
=========================================
  Files        1098     1098             
  Lines      101927   101864     -63     
=========================================
- Hits        73897    73825     -72     
+ Misses      22955    22952      -3     
- Partials     5075     5087     +12     
Flag Coverage Δ
aggregator 76.7% <82.1%> (+<0.1%) ⬆️
cluster 84.9% <ø> (-0.2%) ⬇️
collector 84.3% <ø> (ø)
dbnode 78.9% <ø> (-0.1%) ⬇️
m3em 74.4% <ø> (ø)
m3ninx 73.6% <ø> (+<0.1%) ⬆️
metrics 19.8% <ø> (ø)
msg 74.5% <ø> (-0.1%) ⬇️
query 67.3% <ø> (ø)
x 80.4% <ø> (+<0.1%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.


Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ffdce8e...2788fbb. Read the comment docs.

@@ -52,10 +52,10 @@ type Configuration struct {
ShardCutoverWarmupDuration *time.Duration `yaml:"shardCutoverWarmupDuration"`
ShardCutoffLingerDuration *time.Duration `yaml:"shardCutoffLingerDuration"`
Encoder EncoderConfiguration `yaml:"encoder"`
FlushSize int `yaml:"flushSize"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that break parsing of existing yaml configs where this field is present? Maybe leave it with omitempty annotation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept the old one w/ deprecated comment

)

// Round up queue size to power of 2.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code seems to be rounding down the number rather than rounding up :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given 3 ops in a row, might worth extracting it into a single-line function with a single "sanity" unit test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will round up as expected.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, it def rounds up :)

case <-q.doneCh:
return
}
if cap(*buf) < _queueMaxWriteBufSize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultra-nit: use len instead of cap for consistency?

case <-q.doneCh:
return
}
if cap(*buf) < _queueMaxWriteBufSize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to reset the buffer upon successful write regardless of any checks?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not immediately clear what the goal is here - we don't want to pool slices we've expanded past the max buffer size? Is this just to minimize unexpected memory growth and keep things bounded? Can you add a quick comment?

)

// Round up queue size to power of 2.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will round up as expected.

b := q.buf.shift()

bytes := b.Bytes()
if bytes == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you do a len check here instead? not sure if it's possible to have a zero-length, non-nil slice returned.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good idea, this relies too much on protobuf.Buffers implementation - it doesn't have an easier way to test if it's a zero value.

case <-q.doneCh:
return
}
if cap(*buf) < _queueMaxWriteBufSize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not immediately clear what the goal is here - we don't want to pool slices we've expanded past the max buffer size? Is this just to minimize unexpected memory growth and keep things bounded? Can you add a quick comment?

q.writeAndReset()
lastDrain = time.Now()
// Check buffer capacity, not length, to make sure we're not pooling slices that are too large.
// Otherwise, it could in multi-megabyte slices hanging around, in case we get a spike in writes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ultra-nit: "... could result in ...". Thanks for comment btw

@vdarulis vdarulis merged commit 079ac7d into master Mar 9, 2021
@vdarulis vdarulis deleted the v/tcpclient branch March 9, 2021 18:40
soundvibe added a commit that referenced this pull request Mar 10, 2021
* master:
  [dbnode] Remove unused shardBlockVolume (#3347)
  Fix new Go 1.15+ vet check failures (#3345)
  [coordinator] Add config option to make rollup rules untimed (#3343)
  [aggregator] Raw TCP Client write queueing/buffering refactor (#3342)
  [dbnode] Fail M3TSZ encoding on DeltaOfDelta overflow (#3329)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants