diff --git a/CHANGELOG.md b/CHANGELOG.md index 33dc488b3c..6bebe03509 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4908](https://github.com/thanos-io/thanos/pull/4908) UI: Show 'minus' icon and add tooltip when store min / max time is not available. - [#4883](https://github.com/thanos-io/thanos/pull/4883) Mixin: adhere to RFC 1123 compatible component naming. - [#5114](https://github.com/thanos-io/thanos/pull/5114) Tools `thanos bucket inspect` fix time formatting. +- [#5139](https://github.com/thanos-io/thanos/pull/5139) COS: Support multi-part upload, fix upload issue when index size more than 5GB. ## [v0.24.0](https://github.com/thanos-io/thanos/tree/release-0.24) - 2021.12.22 diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index 165f035476..37bfef6b03 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "math" "math/rand" "net/http" "net/url" @@ -194,10 +195,86 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt }, nil } +var ( + _ cos.FixedLengthReader = (*fixedLengthReader)(nil) +) + +type fixedLengthReader struct { + io.Reader + size int64 +} + +func newFixedLengthReader(r io.Reader, size int64) io.Reader { + return fixedLengthReader{ + Reader: io.LimitReader(r, size), + size: size, + } +} + +// Size implement cos.FixedLengthReader interface. +func (r fixedLengthReader) Size() int64 { + return r.size +} + // Upload the contents of the reader as an object into the bucket. func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { - if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil { - return errors.Wrap(err, "upload cos object") + size, err := objstore.TryToGetSize(r) + if err != nil { + return errors.Wrapf(err, "getting size of %s", name) + } + // partSize 128MB. + const partSize = 1024 * 1024 * 128 + partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize + if partNums == 0 { + if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil { + return errors.Wrapf(err, "Put object: %s", name) + } + return nil + } + // 1. init. + result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil) + if err != nil { + return errors.Wrapf(err, "InitiateMultipartUpload %s", name) + } + uploadEveryPart := func(partSize int64, part int, uploadID string) (string, error) { + r := newFixedLengthReader(r, partSize) + resp, err := b.client.Object.UploadPart(ctx, name, uploadID, part, r, &cos.ObjectUploadPartOptions{ + ContentLength: partSize, + }) + if err != nil { + if _, err := b.client.Object.AbortMultipartUpload(ctx, name, uploadID); err != nil { + return "", err + } + return "", err + } + etag := resp.Header.Get("ETag") + return etag, nil + } + optcom := &cos.CompleteMultipartUploadOptions{} + // 2. upload parts. + for part := 1; part <= partNums; part++ { + etag, err := uploadEveryPart(partSize, part, result.UploadID) + if err != nil { + return errors.Wrapf(err, "uploadPart %d, %s", part, name) + } + optcom.Parts = append(optcom.Parts, cos.Object{ + PartNumber: part, ETag: etag}, + ) + } + // 3. upload last part. + if lastSlice != 0 { + part := partNums + 1 + etag, err := uploadEveryPart(lastSlice, part, result.UploadID) + if err != nil { + return errors.Wrapf(err, "uploadPart %d, %s", part, name) + } + optcom.Parts = append(optcom.Parts, cos.Object{ + PartNumber: part, ETag: etag}, + ) + } + // 4. complete. + if _, _, err := b.client.Object.CompleteMultipartUpload(ctx, name, result.UploadID, optcom); err != nil { + return errors.Wrapf(err, "CompleteMultipartUpload %s", name) } return nil }