From 24db5478c8285492ac10c64f5510e85a8c51e19c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Nov 2023 20:33:37 +0800 Subject: [PATCH] s3: fix s3 concurrent uploader will overwrite error (#48163) close pingcap/tidb#48164 --- br/pkg/storage/s3.go | 5 +++-- br/pkg/storage/s3_test.go | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 276bd50ec3bec..48d851838587a 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -958,9 +958,10 @@ func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOpti s3Writer.wg.Add(1) go func() { _, err := up.UploadWithContext(ctx, upParams) - err1 := rd.Close() + // like a channel we only let sender close the pipe in happy path if err != nil { - log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1)) + log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err)) + _ = rd.CloseWithError(err) } s3Writer.err = err s3Writer.wg.Done() diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index a1f185824417b..50a6598c64028 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -484,6 +484,24 @@ func TestWriteNoError(t *testing.T) { require.NoError(t, err) } +func TestMultiUploadErrorNotOverwritten(t *testing.T) { + s := createS3Suite(t) + ctx := aws.BackgroundContext() + + s.s3.EXPECT(). + CreateMultipartUploadWithContext(ctx, gomock.Any(), gomock.Any()). + Return(nil, errors.New("mock error")) + + w, err := s.storage.Create(ctx, "file", &WriterOption{Concurrency: 2}) + require.NoError(t, err) + // data should be larger than 5MB to trigger CreateMultipartUploadWithContext path + data := make([]byte, 5*1024*1024+6716) + n, err := w.Write(ctx, data) + require.NoError(t, err) + require.Equal(t, 5*1024*1024+6716, n) + require.ErrorContains(t, w.Close(ctx), "mock error") +} + // TestReadNoError ensures the ReadFile API issues a GetObject request and correctly // read the entire body. func TestReadNoError(t *testing.T) {