diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 98dde0b91515..a5229b0ca149 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -420,7 +420,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // Nil check for performance reasons, to avoid dynamic lookup and/or no-op // function calls that cannot be inlined. if d.tee != nil { - d.tee.Duplicate(streams) + d.tee.Duplicate(tenantID, streams) } const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 5a03fe98e94c..71830b4be4d2 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1251,12 +1251,14 @@ func (s *fakeRateStore) RateFor(_ string, _ uint64) (int64, float64) { type mockTee struct { mu sync.Mutex duplicated [][]KeyedStream + tenant string } -func (mt *mockTee) Duplicate(streams []KeyedStream) { +func (mt *mockTee) Duplicate(tenant string, streams []KeyedStream) { mt.mu.Lock() defer mt.mu.Unlock() mt.duplicated = append(mt.duplicated, streams) + mt.tenant = tenant } func TestDistributorTee(t *testing.T) { @@ -1307,5 +1309,7 @@ func TestDistributorTee(t *testing.T) { for j, streams := range td.Streams { assert.Equal(t, tee.duplicated[i][j].Stream.Entries, streams.Entries) } + + require.Equal(t, "test", tee.tenant) } } diff --git a/pkg/distributor/tee.go b/pkg/distributor/tee.go index 9ac48083956e..460f9622b2ea 100644 --- a/pkg/distributor/tee.go +++ b/pkg/distributor/tee.go @@ -1,6 +1,6 @@ package distributor -// Tee imlpementations can duplicate the log streams to another endpoint. +// Tee implementations can duplicate the log streams to another endpoint. type Tee interface { - Duplicate([]KeyedStream) + Duplicate(tenant string, streams []KeyedStream) }