From 3f61fb4007c4f0b57f875a920fb7c3f6ee5e4872 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Tue, 11 Jan 2022 10:39:11 +0100 Subject: [PATCH 1/9] concurrency.ForEachJob() replaces .ForEach() Previous `ForEach()` required transforming the input into a slice of interfaces (causing an allocation on heap for each job, although that's minor here), and type asserting each job in the function later (generics will save us, but they're still not here). Plus since job index is not known, most use cases require a mutex to concatenate the results. This replaces it with `ForEachJob` instead, which provides the index of the job to `jobFunc` instead, letting it decide how to handle it. In most cases that usage will be accessing the original jobs slice, with proper type checking. Given the index of the job is known, no mutex is needed to store the result in a slice afterwards (see test change), plus since indexes are just ints, we don't even need a channel. This fits all the `concurrency.ForEach` uses I could find, and since there's no released version of dskit yet, I just removed that function. Signed-off-by: Oleg Zaytsev --- concurrency/runner.go | 37 ++++++++++++++----------------------- concurrency/runner_test.go | 28 +++++++++++++--------------- 2 files changed, 27 insertions(+), 38 deletions(-) diff --git a/concurrency/runner.go b/concurrency/runner.go index a6740f3ac..0386c36d2 100644 --- a/concurrency/runner.go +++ b/concurrency/runner.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/grafana/dskit/internal/math" @@ -60,47 +61,37 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun return errs.Err() } -// ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. +// ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. // The execution breaks on first error encountered. -func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { - if len(jobs) == 0 { +func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error { + if jobs == 0 { return nil } - // Push all jobs to a channel. - ch := make(chan interface{}, len(jobs)) - for _, job := range jobs { - ch <- job - } - close(ch) + indexes := atomic.Int64{} + indexes.Add(int64(jobs)) // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) - for ix := 0; ix < math.Min(concurrency, len(jobs)); ix++ { + for ix := 0; ix < math.Min(concurrency, jobs); ix++ { g.Go(func() error { - for job := range ch { + for { + idx := int(indexes.Dec()) + if idx < 0 { + return nil + } + if err := ctx.Err(); err != nil { return err } - if err := jobFunc(ctx, job); err != nil { + if err := jobFunc(ctx, idx); err != nil { return err } } - - return nil }) } // Wait until done (or context has canceled). return g.Wait() } - -// CreateJobsFromStrings is an utility to create jobs from an slice of strings. -func CreateJobsFromStrings(values []string) []interface{} { - jobs := make([]interface{}, len(values)) - for i := 0; i < len(values); i++ { - jobs[i] = values[i] - } - return jobs -} diff --git a/concurrency/runner_test.go b/concurrency/runner_test.go index 1dec972c4..ab4eed7d3 100644 --- a/concurrency/runner_test.go +++ b/concurrency/runner_test.go @@ -72,21 +72,16 @@ func TestForEachUser_ShouldReturnImmediatelyOnNoUsersProvided(t *testing.T) { })) } -func TestForEach(t *testing.T) { +func TestForEachJob(t *testing.T) { var ( ctx = context.Background() - - // Keep track of processed jobs. - processedMx sync.Mutex - processed []string ) jobs := []string{"a", "b", "c"} + processed := make([]string, len(jobs)) - err := ForEach(ctx, CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error { - processedMx.Lock() - defer processedMx.Unlock() - processed = append(processed, job.(string)) + err := ForEachJob(ctx, len(jobs), 2, func(ctx context.Context, idx int) error { + processed[idx] = jobs[idx] return nil }) @@ -94,7 +89,7 @@ func TestForEach(t *testing.T) { assert.ElementsMatch(t, jobs, processed) } -func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) { +func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) { var ( ctx = context.Background() @@ -102,7 +97,7 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T processed atomic.Int32 ) - err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + err := ForEachJob(ctx, 3, 2, func(ctx context.Context, idx int) error { if processed.CAS(0, 1) { return errors.New("the first request is failing") } @@ -125,7 +120,7 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T assert.Equal(t, int32(1), processed.Load()) } -func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) { +func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) { var ( ctx = context.Background() @@ -137,7 +132,7 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing var wg sync.WaitGroup wg.Add(2) - err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + err := ForEachJob(ctx, 3, 2, func(ctx context.Context, idx int) error { wg.Done() if processed.CAS(0, 1) { @@ -161,8 +156,11 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing assert.Equal(t, int32(2), processed.Load()) } -func TestForEach_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { - require.NoError(t, ForEach(context.Background(), nil, 2, func(ctx context.Context, job interface{}) error { +func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { + var processed atomic.Int32 + require.NoError(t, ForEachJob(context.Background(), 0, 2, func(ctx context.Context, idx int) error { + processed.Inc() return nil })) + require.Zero(t, processed.Load()) } From 0f6aeb719ff7874887132d2bd038fd2e5785c87e Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Tue, 11 Jan 2022 11:04:25 +0100 Subject: [PATCH 2/9] Add BenchmarkForEachJob Signed-off-by: Oleg Zaytsev --- concurrency/runner_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/concurrency/runner_test.go b/concurrency/runner_test.go index ab4eed7d3..cbc4f7d5f 100644 --- a/concurrency/runner_test.go +++ b/concurrency/runner_test.go @@ -3,6 +3,7 @@ package concurrency import ( "context" "errors" + "fmt" "sync" "testing" "time" @@ -164,3 +165,37 @@ func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { })) require.Zero(t, processed.Load()) } + +func BenchmarkForEachJob(b *testing.B) { + var ctx = context.Background() + + for _, jobsCount := range []int{2, 16, 256, 1024} { + jobs := make([]string, jobsCount) + for j := 0; j < jobsCount; j++ { + jobs[j] = string(byte('a' + j%26)) + } + concurrencies := []int{1, 2, 16} + if jobsCount/2 > concurrencies[len(concurrencies)-1] { + concurrencies = append(concurrencies, jobsCount/2) + } + if jobsCount > concurrencies[len(concurrencies)-1] { + concurrencies = append(concurrencies, jobsCount) + } + + for _, concurrency := range concurrencies { + name := fmt.Sprintf("%d jobs / concurrency %d", jobsCount, concurrency) + { + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + processed := make([]string, len(jobs)) + err := ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { + processed[idx] = jobs[idx] + return nil + }) + require.NoError(b, err) + } + }) + } + } + } +} From 1d3748e9854b4e8c26dec501d23621f7b9ca3bd2 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Tue, 11 Jan 2022 11:24:12 +0100 Subject: [PATCH 3/9] Update CHANGELOG.md Signed-off-by: Oleg Zaytsev --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a1a8b54a..8788767fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [CHANGE] grpcutil: Convert Resolver into concrete type. #105 * [CHANGE] grpcutil.Resolver.Resolve: Take a service parameter. #102 * [CHANGE] grpcutil.Update: Remove gRPC LB related metadata. #102 +* [CHANGE] concurrency.ForEach: replaced by `concurrency.ForEachJob`. #113 * [ENHANCEMENT] Add middleware package. #38 * [ENHANCEMENT] Add the ring package #45 * [ENHANCEMENT] Add limiter package. #41 From 5b0be335c1cdccbb92e460a538ae7db17f203a75 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Tue, 11 Jan 2022 11:53:02 +0100 Subject: [PATCH 4/9] Remove the low-value benchmark Signed-off-by: Oleg Zaytsev --- concurrency/runner_test.go | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/concurrency/runner_test.go b/concurrency/runner_test.go index cbc4f7d5f..ab4eed7d3 100644 --- a/concurrency/runner_test.go +++ b/concurrency/runner_test.go @@ -3,7 +3,6 @@ package concurrency import ( "context" "errors" - "fmt" "sync" "testing" "time" @@ -165,37 +164,3 @@ func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { })) require.Zero(t, processed.Load()) } - -func BenchmarkForEachJob(b *testing.B) { - var ctx = context.Background() - - for _, jobsCount := range []int{2, 16, 256, 1024} { - jobs := make([]string, jobsCount) - for j := 0; j < jobsCount; j++ { - jobs[j] = string(byte('a' + j%26)) - } - concurrencies := []int{1, 2, 16} - if jobsCount/2 > concurrencies[len(concurrencies)-1] { - concurrencies = append(concurrencies, jobsCount/2) - } - if jobsCount > concurrencies[len(concurrencies)-1] { - concurrencies = append(concurrencies, jobsCount) - } - - for _, concurrency := range concurrencies { - name := fmt.Sprintf("%d jobs / concurrency %d", jobsCount, concurrency) - { - b.Run(name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - processed := make([]string, len(jobs)) - err := ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { - processed[idx] = jobs[idx] - return nil - }) - require.NoError(b, err) - } - }) - } - } - } -} From 6faae529a2161e30c589b64dcd54f1d1ad6e8575 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Tue, 11 Jan 2022 11:57:41 +0100 Subject: [PATCH 5/9] Bring ForEach back, with proper deprecation notice Signed-off-by: Oleg Zaytsev --- CHANGELOG.md | 2 +- concurrency/runner.go | 21 +++++++++ concurrency/runner_test.go | 95 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8788767fe..230ee1d6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ * [CHANGE] grpcutil: Convert Resolver into concrete type. #105 * [CHANGE] grpcutil.Resolver.Resolve: Take a service parameter. #102 * [CHANGE] grpcutil.Update: Remove gRPC LB related metadata. #102 -* [CHANGE] concurrency.ForEach: replaced by `concurrency.ForEachJob`. #113 +* [CHANGE] concurrency.ForEach: deprecated and reimplemented by new `concurrency.ForEachJob`. #113 * [ENHANCEMENT] Add middleware package. #38 * [ENHANCEMENT] Add the ring package #45 * [ENHANCEMENT] Add limiter package. #41 diff --git a/concurrency/runner.go b/concurrency/runner.go index 0386c36d2..7019e1aa9 100644 --- a/concurrency/runner.go +++ b/concurrency/runner.go @@ -61,6 +61,27 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun return errs.Err() } +// ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. +// The execution breaks on first error encountered. +// Deprecated. +// Use ForEachJob instead. +func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { + return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { + return jobFunc(ctx, jobs[idx]) + }) +} + +// CreateJobsFromStrings is an utility to create jobs from an slice of strings. +// Deprecated. +// Will be removed. Not needed when using ForEachJob. +func CreateJobsFromStrings(values []string) []interface{} { + jobs := make([]interface{}, len(values)) + for i := 0; i < len(values); i++ { + jobs[i] = values[i] + } + return jobs +} + // ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. // The execution breaks on first error encountered. func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error { diff --git a/concurrency/runner_test.go b/concurrency/runner_test.go index ab4eed7d3..30b7477f5 100644 --- a/concurrency/runner_test.go +++ b/concurrency/runner_test.go @@ -164,3 +164,98 @@ func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { })) require.Zero(t, processed.Load()) } + +func TestForEach(t *testing.T) { + var ( + ctx = context.Background() + + // Keep track of processed jobs. + processedMx sync.Mutex + processed []string + ) + + jobs := []string{"a", "b", "c"} + + err := ForEach(ctx, CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error { + processedMx.Lock() + defer processedMx.Unlock() + processed = append(processed, job.(string)) + return nil + }) + + require.NoError(t, err) + assert.ElementsMatch(t, jobs, processed) +} + +func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) { + var ( + ctx = context.Background() + + // Keep the processed jobs count. + processed atomic.Int32 + ) + + err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + if processed.CAS(0, 1) { + return errors.New("the first request is failing") + } + + // Wait 1s and increase the number of processed jobs, unless the context get canceled earlier. + select { + case <-time.After(time.Second): + processed.Add(1) + case <-ctx.Done(): + return ctx.Err() + } + + return nil + }) + + require.EqualError(t, err, "the first request is failing") + + // Since we expect the first error interrupts the workers, we should only see + // 1 job processed (the one which immediately returned error). + assert.Equal(t, int32(1), processed.Load()) +} + +func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) { + var ( + ctx = context.Background() + + // Keep the processed jobs count. + processed atomic.Int32 + ) + + // waitGroup to await the start of the first two jobs + var wg sync.WaitGroup + wg.Add(2) + + err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + wg.Done() + + if processed.CAS(0, 1) { + // wait till two jobs have been started + wg.Wait() + return errors.New("the first request is failing") + } + + // Wait till context is cancelled to add processed jobs. + <-ctx.Done() + processed.Add(1) + + return nil + }) + + require.EqualError(t, err, "the first request is failing") + + // Since we expect the first error interrupts the workers, we should only + // see 2 job processed (the one which immediately returned error and the + // job with "b"). + assert.Equal(t, int32(2), processed.Load()) +} + +func TestForEach_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { + require.NoError(t, ForEach(context.Background(), nil, 2, func(ctx context.Context, job interface{}) error { + return nil + })) +} From a491840d1a883f1935bef59befd6d5798a0796e6 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Tue, 11 Jan 2022 12:00:03 +0100 Subject: [PATCH 6/9] PR feedback on job order and ctx.Err check Signed-off-by: Oleg Zaytsev --- concurrency/runner.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/concurrency/runner.go b/concurrency/runner.go index 7019e1aa9..bd1db9846 100644 --- a/concurrency/runner.go +++ b/concurrency/runner.go @@ -89,27 +89,24 @@ func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx return nil } - indexes := atomic.Int64{} - indexes.Add(int64(jobs)) + // Initialise indexes with -1 so first Inc() returns index 0. + indexes := atomic.NewInt64(-1) // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) for ix := 0; ix < math.Min(concurrency, jobs); ix++ { g.Go(func() error { - for { - idx := int(indexes.Dec()) - if idx < 0 { + for ctx.Err() == nil { + idx := int(indexes.Inc()) + if idx >= jobs { return nil } - if err := ctx.Err(); err != nil { - return err - } - if err := jobFunc(ctx, idx); err != nil { return err } } + return ctx.Err() }) } From 29343494226024d3db38ee8b3734488cc89c260f Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 12 Jan 2022 10:00:35 +0100 Subject: [PATCH 7/9] Update deprecation comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Oleg Zaytsev Co-authored-by: Peter Štibraný --- concurrency/runner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/concurrency/runner.go b/concurrency/runner.go index bd1db9846..70914a9ce 100644 --- a/concurrency/runner.go +++ b/concurrency/runner.go @@ -63,8 +63,8 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun // ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. // The execution breaks on first error encountered. -// Deprecated. -// Use ForEachJob instead. +// +// Deprecated: use ForEachJob instead. func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { return jobFunc(ctx, jobs[idx]) @@ -72,8 +72,8 @@ func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc f } // CreateJobsFromStrings is an utility to create jobs from an slice of strings. -// Deprecated. -// Will be removed. Not needed when using ForEachJob. +// +// Deprecated: will be removed as it's not needed when using ForEachJob. func CreateJobsFromStrings(values []string) []interface{} { jobs := make([]interface{}, len(values)) for i := 0; i < len(values); i++ { From a19cdadbd6970127e21bc6303e7f3b44d8a50fb9 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 12 Jan 2022 10:05:42 +0100 Subject: [PATCH 8/9] Don't define a variable for context.Background() Signed-off-by: Oleg Zaytsev --- concurrency/runner_test.go | 55 +++++++++++--------------------------- 1 file changed, 16 insertions(+), 39 deletions(-) diff --git a/concurrency/runner_test.go b/concurrency/runner_test.go index 30b7477f5..142705313 100644 --- a/concurrency/runner_test.go +++ b/concurrency/runner_test.go @@ -14,8 +14,6 @@ import ( func TestForEachUser(t *testing.T) { var ( - ctx = context.Background() - // Keep track of processed users. processedMx sync.Mutex processed []string @@ -23,7 +21,7 @@ func TestForEachUser(t *testing.T) { input := []string{"a", "b", "c"} - err := ForEachUser(ctx, input, 2, func(ctx context.Context, user string) error { + err := ForEachUser(context.Background(), input, 2, func(ctx context.Context, user string) error { processedMx.Lock() defer processedMx.Unlock() processed = append(processed, user) @@ -35,16 +33,12 @@ func TestForEachUser(t *testing.T) { } func TestForEachUser_ShouldContinueOnErrorButReturnIt(t *testing.T) { - var ( - ctx = context.Background() - - // Keep the processed users count. - processed atomic.Int32 - ) + // Keep the processed users count. + var processed atomic.Int32 input := []string{"a", "b", "c"} - err := ForEachUser(ctx, input, 2, func(ctx context.Context, user string) error { + err := ForEachUser(context.Background(), input, 2, func(ctx context.Context, user string) error { if processed.CAS(0, 1) { return errors.New("the first request is failing") } @@ -73,14 +67,10 @@ func TestForEachUser_ShouldReturnImmediatelyOnNoUsersProvided(t *testing.T) { } func TestForEachJob(t *testing.T) { - var ( - ctx = context.Background() - ) - jobs := []string{"a", "b", "c"} processed := make([]string, len(jobs)) - err := ForEachJob(ctx, len(jobs), 2, func(ctx context.Context, idx int) error { + err := ForEachJob(context.Background(), len(jobs), 2, func(ctx context.Context, idx int) error { processed[idx] = jobs[idx] return nil }) @@ -90,14 +80,10 @@ func TestForEachJob(t *testing.T) { } func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) { - var ( - ctx = context.Background() - - // Keep the processed jobs count. - processed atomic.Int32 - ) + // Keep the processed jobs count. + var processed atomic.Int32 - err := ForEachJob(ctx, 3, 2, func(ctx context.Context, idx int) error { + err := ForEachJob(context.Background(), 3, 2, func(ctx context.Context, idx int) error { if processed.CAS(0, 1) { return errors.New("the first request is failing") } @@ -121,18 +107,14 @@ func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationHandled(t *testin } func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) { - var ( - ctx = context.Background() - - // Keep the processed jobs count. - processed atomic.Int32 - ) + // Keep the processed jobs count. + var processed atomic.Int32 // waitGroup to await the start of the first two jobs var wg sync.WaitGroup wg.Add(2) - err := ForEachJob(ctx, 3, 2, func(ctx context.Context, idx int) error { + err := ForEachJob(context.Background(), 3, 2, func(ctx context.Context, idx int) error { wg.Done() if processed.CAS(0, 1) { @@ -157,6 +139,7 @@ func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *test } func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { + // Keep the processed jobs count. var processed atomic.Int32 require.NoError(t, ForEachJob(context.Background(), 0, 2, func(ctx context.Context, idx int) error { processed.Inc() @@ -167,8 +150,6 @@ func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { func TestForEach(t *testing.T) { var ( - ctx = context.Background() - // Keep track of processed jobs. processedMx sync.Mutex processed []string @@ -176,7 +157,7 @@ func TestForEach(t *testing.T) { jobs := []string{"a", "b", "c"} - err := ForEach(ctx, CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error { + err := ForEach(context.Background(), CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error { processedMx.Lock() defer processedMx.Unlock() processed = append(processed, job.(string)) @@ -219,18 +200,14 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T } func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) { - var ( - ctx = context.Background() - - // Keep the processed jobs count. - processed atomic.Int32 - ) + // Keep the processed jobs count. + var processed atomic.Int32 // waitGroup to await the start of the first two jobs var wg sync.WaitGroup wg.Add(2) - err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + err := ForEach(context.Background(), []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { wg.Done() if processed.CAS(0, 1) { From 133a232e811351f341bb19c10919f20910a99ec3 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 12 Jan 2022 10:18:26 +0100 Subject: [PATCH 9/9] go fmt Signed-off-by: Oleg Zaytsev --- concurrency/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concurrency/runner.go b/concurrency/runner.go index 70914a9ce..023be10d7 100644 --- a/concurrency/runner.go +++ b/concurrency/runner.go @@ -63,7 +63,7 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun // ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. // The execution breaks on first error encountered. -// +// // Deprecated: use ForEachJob instead. func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error {