Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified fetching/syncing block metadata everywhere #1394

Closed
wants to merge 1 commit into from

Conversation

lx223
Copy link
Contributor

@lx223 lx223 commented Aug 9, 2019

  • [] CHANGELOG entry if change is relevant to the end user.

Changes

  • add a new block meta fetcher that fetches block metadata stored in a bucket
  • replace some of the existing block md fetching code with the new fetcher
  • change the signature of DownloadMeta to use the more restrictive BucketReader

Verification

Unit test

@lx223 lx223 changed the title Use a shared metadata fetcher to fetch remote block metadata [Compact] Use a shared metadata fetcher to fetch remote block metadata Aug 9, 2019
@lx223 lx223 force-pushed the iss-1335-partial-block branch 5 times, most recently from ca22f84 to 6bd7cd4 Compare August 11, 2019 20:37
@lx223 lx223 marked this pull request as ready for review August 11, 2019 20:46
@lx223
Copy link
Contributor Author

lx223 commented Aug 11, 2019

The CI test fails because of go mod error. It doesn't seem related to my change. Is it a known issue?

@bwplotka
Copy link
Member

Fixes #1335

@lx223 can you rebase, we fixed this master go.mod error (:

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Thanks!

Can we use exactly the same thing in bucket.go (essentially in store gateway):

func (s *BucketStore) SyncBlocks(ctx context.Context) error {


// Fetch wil return the mapping from block ID to its metadata that are currently stored in the bucket passed in.
// If the corresponding value for a block ID key is nil, no valid metadta is retrieved successfully.
func (f *MetadataFetcher) Fetch(ctx context.Context) (map[ulid.ULID]*metadata.Meta, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why fetcher and not e.g Syncer? Especially if we maintain cachedBlockMDs for efficiency?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mention if it's goroutine safe (: It looks like it is, the question is ... should it be?

I think it would be better if you would not make Fetch/Sync goroutine safe. First of all, I don't think it is needed. Secondly, with this, we can be more efficient in terms of memory. We can allow to reuse exactly the same map[ulid.ULID]*metadata.Meta (pass as an argument and return updated one). This might save some allocations as there can be millions of objects potentially in the bucket.

What do you think? With this I am not sure if the struct is needed, maybe just Function is enough? Don't have a strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be aware of this #1408 that changes this code as well (somehow we need to sync both PRs) cc @povilasv

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to rename to syncer if you feel strongly about it. I am alright with both. I chose fetch as I felt it is more about fetching the metadata from remote source. Keeping a cache is just an internal optimisation and might become an issue and get removed if the bucket contains too many objects.

Good point about about the memory allocation. I have a different perspective. I don't think it would add that much to memory as it is using metadata pointers right now. And if we were to reuse the same map, there would still have to be a map to keep what metadata are in the bucket in order to remove those cached but somehow-removed-from-remote-source ones. So the saving won't be that much. Besides, I think streaming metadata might be a better solution to scale with bigger number of objects in the bucket. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong feeling to the naming. It really depends on what exactly interface we are going to apply as you are exploring streaming suggestion now (: So then ForeachMeta might make sense more?

I think streaming metadata might be a better solution to scale with bigger number of objects in the bucket. What do you think?

Definitely, however, this means we would need to revert to any of Iter concepts like

  • passing a function to invoke for each meta
  • Iter interface:
// Iterator is a simple iterator that can only get the next value.
type Iterator interface {
	At() metadata.Meta)
	Err() error
	Next() bool
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't have to use stream for now unless the OOM issue is already big and urgent. I think we can keep this signature which meshes pretty well with the current usage and we can use this as the foundation to identify and make changes when we are ready to explore other options like streaming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think I am happy with this. We already keep cached block MD anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetcher might be fine then

pkg/block/mdfetcher.go Outdated Show resolved Hide resolved
pkg/block/mdfetcher.go Show resolved Hide resolved
pkg/block/mdfetcher.go Outdated Show resolved Hide resolved
}

// Fetch wil return the mapping from block ID to its metadata that are currently stored in the bucket passed in.
// If the corresponding value for a block ID key is nil, no valid metadta is retrieved successfully.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the corresponding value for a block ID key is nil, no valid metadta is retrieved successfully.

  1. s/metadta/metadata
  2. Nice idea to delegate the no-meta-json blocks on caller side. I think it makes sense.

However, maybe two maps make sense here then?

(metas map[ulid.ULID]*metadata.Meta, noMeta []ulid.ULID, err  error)

This will be easier to use on caller side I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, not following about point 1. what is s/metadta/metadata?

I thought about whether to distinguish those two cases. For the places I have replaced, I haven't found a case that would require these two being distinguished. Did I miss a case that would require this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, not following about point 1. what is s/metadta/metadata?

https://www.tutorialspoint.com/unix/unix-regular-expressions.htm

(:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what about the case where we remove partially uploaded blocks? (noMeta + delay):

if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {

I think that's when noMeta is useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong here but I think this is the only case that this required and it is covered in the caller's logic - thanos/pkg/compact/compact.go. But it does sound like something useful in future. Tempted to add when we are seeing more such usages.

remoteBlockMDs := make(map[ulid.ULID]*metadata.Meta, len(f.cachedBlockMDs))
wg := &sync.WaitGroup{}

for i := 0; i < f.concurrencyLimit; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will review body of this function when we resolve other comments first (:

Looks good from first glance, looks like you covered all the edge cases, nice! Thanks for tests as well!

@bwplotka
Copy link
Member

Also we need CHANGELOG for this I think as we not handle partial uploads carefully on downsampling/retention apply

@bwplotka
Copy link
Member

Hi 👋

Can we finish this? (: If you don't have time, can I grab it and finish?

@lx223 lx223 force-pushed the iss-1335-partial-block branch 2 times, most recently from d8079e9 to 3680a61 Compare September 14, 2019 23:58
@lx223
Copy link
Contributor Author

lx223 commented Sep 15, 2019

@bwplotka sorry about that!

I'll change pkg/store/bucket.go and add a changelog tomorrow.

@lx223 lx223 force-pushed the iss-1335-partial-block branch 2 times, most recently from f67e795 to 3d0b139 Compare September 15, 2019 10:49
@bwplotka
Copy link
Member

@lx223 Are you on CNCF slack? Join us so we can discuss quicker offline!

I would like to see this change ASAP as it will help with: #1331

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. However, let's make sure to use the same syncer everywhere. I located a few places below. I literally searched for usage of Iter that relates to fetching meta.json. We desperately need to use exactly the same logic everywhere (:

I checked boxed things that I can see we changed in this PR, we might want to add missing bits. (:

Thanos bucket

Store GW:

Compactor:

Verifier

Sidecar

}

// Fetch wil return the mapping from block ID to its metadata that are currently stored in the bucket passed in.
// If the corresponding value for a block ID key is nil, no valid metadta is retrieved successfully.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what about the case where we remove partially uploaded blocks? (noMeta + delay):

if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {

I think that's when noMeta is useful.


// Fetch wil return the mapping from block ID to its metadata that are currently stored in the bucket passed in.
// If the corresponding value for a block ID key is nil, no valid metadta is retrieved successfully.
func (f *MetadataFetcher) Fetch(ctx context.Context) (map[ulid.ULID]*metadata.Meta, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong feeling to the naming. It really depends on what exactly interface we are going to apply as you are exploring streaming suggestion now (: So then ForeachMeta might make sense more?

I think streaming metadata might be a better solution to scale with bigger number of objects in the bucket. What do you think?

Definitely, however, this means we would need to revert to any of Iter concepts like

  • passing a function to invoke for each meta
  • Iter interface:
// Iterator is a simple iterator that can only get the next value.
type Iterator interface {
	At() metadata.Meta)
	Err() error
	Next() bool
}

@bwplotka
Copy link
Member

Would love to see this work to be done as this would enormously help with dynamic system where components are restarted in different moments causing partial uploads to be very common (: Let me know if I can help with this and I would love to prioritize this. @lx223

I listed all the places were we can unify our sync/fetch logic.

@bwplotka bwplotka changed the title [Compact] Use a shared metadata fetcher to fetch remote block metadata Unified fetching/syncing block metadata everywhere Sep 16, 2019
@bwplotka
Copy link
Member

bwplotka commented Sep 16, 2019

Related to #1525

@lx223 lx223 force-pushed the iss-1335-partial-block branch 7 times, most recently from 9f6c3d6 to 1160e83 Compare September 21, 2019 21:31
@lx223
Copy link
Contributor Author

lx223 commented Sep 21, 2019

@bwplotka just joined CNCF slack and have made changes to all those places you listed. PTAL

Signed-off-by: Lan Xiao <lx223@users.noreply.github.com>
id, ok := block.IsBlockDir(name)
if !ok {
return nil
mdFetcher := block.NewMetadataFetcher(logger, defaultBlockMDFetchConcurrency, bkt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't switch to the download function because the original logic suggests it requires block id even for partially uploaded blocks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@bwplotka
Copy link
Member

Should we get back to this @lx223 ? (:

@lx223
Copy link
Contributor Author

lx223 commented Nov 3, 2019

@bwplotka sure :D WDYT about the current code? there is some rebasing and test passing to do. But, it probably won't change the main logic.

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I think we covered all of the usages.
I left some comments but to me, it's looking good.

Let's mention we are fixing #1335 issue.

Wonder if we can instrument this fetcher. Maybe fetch runs? or something (:

id, ok := block.IsBlockDir(name)
if !ok {
return nil
mdFetcher := block.NewMetadataFetcher(logger, defaultBlockMDFetchConcurrency, bkt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -163,7 +163,7 @@ func deleteDir(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir

// DownloadMeta downloads only meta file from bucket by block ID.
// TODO(bwplotka): Differentiate between network error & partial upload.
func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, id ulid.ULID) (metadata.Meta, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return existingMD, nil
}

md, err := DownloadMeta(ctx, f.logger, f.bkt, blockID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we can leverage the file system as a cache, instead of downloading and keep in memory? Let's maybe add TODO at least (: It saves some bandwidth (?) on startup.


// Fetch wil return the mapping from block ID to its metadata that are currently stored in the bucket passed in.
// If the corresponding value for a block ID key is nil, no valid metadta is retrieved successfully.
func (f *MetadataFetcher) Fetch(ctx context.Context) (map[ulid.ULID]*metadata.Meta, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think I am happy with this. We already keep cached block MD anyway.


// Fetch wil return the mapping from block ID to its metadata that are currently stored in the bucket passed in.
// If the corresponding value for a block ID key is nil, no valid metadta is retrieved successfully.
func (f *MetadataFetcher) Fetch(ctx context.Context) (map[ulid.ULID]*metadata.Meta, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetcher might be fine then

mdFetcher := block.NewMetadataFetcher(logger, defaultBlockMDFetchConcurrency, bkt)
mds, err := mdFetcher.Fetch(ctx)
if err != nil {
level.Error(logger).Log("err", err, "msg", "Failed to downloaded block metadata")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double error handling, please choose one

for blockID := range blockIDs {
md, err := f.fetchBlockMDIfNotCached(ctx, wg, blockID)
if err != nil {
level.Debug(f.logger).Log("msg", "failed to fetch block metadata", "block", blockID, "error", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning?

Copy link
Member

@squat squat Nov 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do really want to we keep going here? or continue the loop?
If an nil, err is returned, then we'll end up locking and saving nil in the map unnecessarily.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should collect and/or exit on errors. Currently, if there was a transient network error we'll cache nil (negative result) and then never re-try downloading that metadata.


if err != nil {
return errors.Wrap(err, "iter")
if err := s.addBlock(ctx, blockID); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important as we download meta.json twice. Once to memory once to disk... Can we improve this?

}
wg.Wait()

f.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this lock? To make fetcher thread-safe? I think we need single flight here TBH with some expiry even (: Can you add TODO(bwplotka): Add single flight with expiry if needed

Also, I am not sure if we can really reuse this lock IMO... I would rather have:
either

  • two locks, one for concurrency here and second for potential concurrent use of Fetch
  • remove this lock. I don't think we do currently fetch concurrently so we could remove this and comment that it's not thread safe

I would say let's add TODO

@bwplotka bwplotka mentioned this pull request Nov 5, 2019
1 task
for blockID := range blockIDs {
md, err := f.fetchBlockMDIfNotCached(ctx, wg, blockID)
if err != nil {
level.Debug(f.logger).Log("msg", "failed to fetch block metadata", "block", blockID, "error", err)
Copy link
Member

@squat squat Nov 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do really want to we keep going here? or continue the loop?
If an nil, err is returned, then we'll end up locking and saving nil in the map unnecessarily.

for i := 0; i < f.concurrencyLimit; i++ {
go func() {
for blockID := range blockIDs {
md, err := f.fetchBlockMDIfNotCached(ctx, wg, blockID)
Copy link
Member

@squat squat Nov 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I think it's a bit funny and hard to reason about when the concurrency crosses funcs like this. Maybe just call wg.Done() at the end of the loop block?

m, err := block.DownloadMeta(ctx, logger, bkt, id)
if err != nil {
return errors.Wrap(err, "download metadata")
mdFetcher := block.NewMetadataFetcher(logger, 4, bkt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this concurrency level come from and why is it different from the others? If 4 is the "right" number, then maybe it should be hardcoded in the block package? I think we need to handle these arguments consistently.

@@ -193,31 +194,27 @@ func newLazyOverlapChecker(logger log.Logger, bucket objstore.Bucket, labels fun
labels: labels,

lookupMetas: map[ulid.ULID]struct{}{},
mdFetcher: block.NewMetadataFetcher(logger, 3, bucket),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, why 3?

if err != nil {
return err
for _, md := range mds {
if md == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like every time we want to iterate over the fechted metas we first have to check if the md is nil; why not handle this in Fetch and avoid adding nil metas to the returned slice?

@@ -36,6 +36,10 @@ import (
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

const (
defaultBlockMDFetchConcurrency = 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default... makes it sound like it can be overridden, but this seems to be a hardcoded value that many different packaged define. Should it instead be defined in the block package one time?

}

f.mu.Lock()
remoteBlockMDs[blockID] = md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a race currently: we call wg.Done before inserting the returned values in the map, meaning wg.Done() may return early and we may return an incomplete map to the caller.

@bwplotka
Copy link
Member

@lx223 let us know if you have time to finish this. This fixes quite annoying issue, plus @squat already touching this path for compactor syncer.

If not we are happy to continue this for you (:

@lx223
Copy link
Contributor Author

lx223 commented Nov 10, 2019

@bwplotka sry, might have enough time to finish it fast. Feel free to take it. :)

@bwplotka
Copy link
Member

Cool, closing this for now then, updating the issue.

@bwplotka bwplotka closed this Nov 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants