Skip to content

Commit

Permalink
Distributor: Prevent data race on worker pool
Browse files Browse the repository at this point in the history
Without the mutex:
```
==================
WARNING: DATA RACE
Write at 0x00c00011dc10 by goroutine 243:
  runtime.recvDirect()
      /opt/homebrew/opt/go/libexec/src/runtime/chan.go:388 +0x7c
  github.com/grafana/dskit/concurrency.(*ReusableGoroutinesPool).Close()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/concurrency/worker.go:38 +0x3c
  github.com/grafana/mimir/pkg/distributor.New.func9()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:531 +0x18
  github.com/grafana/dskit/services.(*BasicService).main()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/basic_service.go:210 +0x3d0
  github.com/grafana/dskit/services.(*BasicService).StartAsync.func1.gowrap1()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/basic_service.go:122 +0x34

Previous read at 0x00c00011dc10 by goroutine 245:
  runtime.chansend1()
      /opt/homebrew/opt/go/libexec/src/runtime/chan.go:157 +0x2c
  github.com/grafana/dskit/concurrency.(*ReusableGoroutinesPool).Go()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/concurrency/worker.go:28 +0x48
  github.com/grafana/dskit/concurrency.(*ReusableGoroutinesPool).Go-fm()
      <autogenerated>:1 +0x3c
  github.com/grafana/dskit/ring.DoBatchWithOptions()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/ring/batch.go:188 +0xa64
  github.com/grafana/mimir/pkg/distributor.(*Distributor).sendWriteRequestToIngesters()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1572 +0x194
  github.com/grafana/mimir/pkg/distributor.(*Distributor).sendWriteRequestToBackends()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1529 +0xa48
  github.com/grafana/mimir/pkg/distributor.(*Distributor).push()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1478 +0x638
  github.com/grafana/mimir/pkg/distributor.(*Distributor).push-fm()
      <autogenerated>:1 +0x4c
  github.com/grafana/mimir/pkg/distributor.NextOrCleanup.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1372 +0x64
  github.com/grafana/mimir/pkg/distributor.(*Distributor).prePushValidationMiddleware-fm.(*Distributor).prePushValidationMiddleware.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1124 +0xde0
  github.com/grafana/mimir/pkg/distributor.NextOrCleanup.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1372 +0x64
  github.com/grafana/mimir/pkg/distributor.(*Distributor).prePushSortAndFilterMiddleware-fm.(*Distributor).prePushSortAndFilterMiddleware.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:976 +0x294
  github.com/grafana/mimir/pkg/distributor.NextOrCleanup.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1372 +0x64
  github.com/grafana/mimir/pkg/distributor.(*Distributor).prePushRelabelMiddleware-fm.(*Distributor).prePushRelabelMiddleware.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:931 +0x4e0
  github.com/grafana/mimir/pkg/distributor.NextOrCleanup.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1372 +0x64
  github.com/grafana/mimir/pkg/distributor.(*Distributor).prePushHaDedupeMiddleware-fm.(*Distributor).prePushHaDedupeMiddleware.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:828 +0x2f0
  github.com/grafana/mimir/pkg/distributor.NextOrCleanup.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1372 +0x64
  github.com/grafana/mimir/pkg/distributor.(*Distributor).metricsMiddleware-fm.(*Distributor).metricsMiddleware.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1170 +0x48c
  github.com/grafana/mimir/pkg/distributor.NextOrCleanup.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1372 +0x64
  github.com/grafana/mimir/pkg/distributor.(*Distributor).limitsMiddleware-fm.(*Distributor).limitsMiddleware.func1()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1360 +0x258
  github.com/grafana/mimir/pkg/distributor.(*Distributor).Push()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:1388 +0x2d8
  github.com/grafana/mimir/pkg/distributor.TestDistributor_PushWithDoBatchWorkers_DataRace.func2()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor_test.go:344 +0xd4

Goroutine 243 (running) created at:
  github.com/grafana/dskit/services.(*BasicService).StartAsync.func1()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/basic_service.go:122 +0x1ac
  github.com/grafana/dskit/services.(*BasicService).switchState()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/basic_service.go:142 +0xd8
  github.com/grafana/dskit/services.(*BasicService).StartAsync()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/basic_service.go:119 +0x84
  github.com/grafana/dskit/services.(*Manager).StartAsync()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/manager.go:87 +0xa0
  github.com/grafana/dskit/services.StartManagerAndAwaitHealthy()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/manager.go:375 +0xa8
  github.com/grafana/mimir/pkg/distributor.(*Distributor).starting()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor.go:606 +0x50
  github.com/grafana/mimir/pkg/distributor.(*Distributor).starting-fm()
      <autogenerated>:1 +0x44
  github.com/grafana/dskit/services.(*BasicService).main()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/basic_service.go:160 +0xac
  github.com/grafana/dskit/services.(*BasicService).StartAsync.func1.gowrap1()
      /Users/julienduchesne/Repos/mimir/vendor/github.com/grafana/dskit/services/basic_service.go:122 +0x34

Goroutine 245 (finished) created at:
  github.com/grafana/mimir/pkg/distributor.TestDistributor_PushWithDoBatchWorkers_DataRace()
      /Users/julienduchesne/Repos/mimir/pkg/distributor/distributor_test.go:341 +0x204
  testing.tRunner()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1690 +0x184
  testing.(*T).Run.gowrap1()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1743 +0x40
==================
--- FAIL: TestDistributor_PushWithDoBatchWorkers_DataRace (0.19s)
    testing.go:1399: race detected during execution of test
FAIL
FAIL    github.com/grafana/mimir/pkg/distributor        0.975s
FAIL
```
  • Loading branch information
julienduchesne committed Oct 10, 2024
1 parent ff5dc78 commit 35c9d52
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 2 deletions.
14 changes: 12 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,23 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices = append(subservices, d.ingesterPool, d.activeUsers)

if cfg.ReusableIngesterPushWorkers > 0 {
var wpMu sync.Mutex // Prevents data races on the pool's jobs channel.
wp := concurrency.NewReusableGoroutinesPool(cfg.ReusableIngesterPushWorkers)
d.doBatchPushWorkers = wp.Go
d.doBatchPushWorkers = func(f func()) {
wpMu.Lock()
defer wpMu.Unlock()
wp.Go(f)
}
// Closing the pool doesn't stop the workload it's running, we're doing this just to avoid leaking goroutines in tests.
subservices = append(subservices, services.NewBasicService(
nil,
func(ctx context.Context) error { <-ctx.Done(); return nil },
func(_ error) error { wp.Close(); return nil },
func(_ error) error {
wpMu.Lock()
defer wpMu.Unlock()
wp.Close()
return nil
},
))
}

Expand Down
42 changes: 42 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,48 @@ func TestDistributor_PushWithDoBatchWorkers(t *testing.T) {
require.GreaterOrEqual(t, counter.Load(), int64(3))
}

func TestDistributor_PushWithDoBatchWorkers_DataRace(t *testing.T) {
limits := prepareDefaultLimits()
limits.IngestionRate = 20
limits.IngestionBurstSize = 20

ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
configure: func(cfg *Config) {
// 2 workers, so 1 push would need to spawn a new goroutine.
cfg.ReusableIngesterPushWorkers = 2
},
})
require.Len(t, ds, 1)
distributor := ds[0]

require.NotNil(t, distributor.doBatchPushWorkers)

reqCt := 10
errs := make(chan error, reqCt)
for i := 0; i < reqCt; i++ {
go func() {
request := makeWriteRequest(123456789000, 3, 5, false, false, "foo")
ctx := user.InjectOrgID(context.Background(), "user")
_, err := distributor.Push(ctx, request)
errs <- err
}()
}
time.Sleep(10 * time.Millisecond)
distributor.StopAsync()
errCt := 0
for i := 0; i < reqCt; i++ {
if <-errs != nil {
errCt++
}
}
require.NotZero(t, errCt, "expected at least one error")
require.Less(t, errCt, reqCt, "expected not all requests to fail")
}

func TestDistributor_ContextCanceledRequest(t *testing.T) {
now := time.Now()
mtime.NowForce(now)
Expand Down

0 comments on commit 35c9d52

Please sign in to comment.