diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a52d488a5..69bd2fe509 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#5339](https://github.com/thanos-io/thanos/pull/5339) Receive: Fix deadlock on interrupt in routerOnly mode - [#5357](https://github.com/thanos-io/thanos/pull/5357) Store: fix groupcache handling of slashes +- [#5427](https://github.com/thanos-io/thanos/pull/5427) Receive: Fix Ketama hashring replication consistency ### Added diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index cb4c3e78e2..3d6cf36555 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -106,11 +106,6 @@ type ketamaHashring struct { } func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring { - // Replication works by choosing subsequent nodes in the ring. - // In order to improve consistency, we avoid relying on the ordering of the endpoints - // and sort them lexicographically. - sort.Strings(endpoints) - numSections := len(endpoints) * sectionsPerNode ring := ketamaHashring{ endpoints: endpoints, @@ -156,8 +151,8 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st i = 0 } - nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints - + i = (i + n) % numSections + nodeIndex := c.sections[i].endpointIndex return c.endpoints[nodeIndex], nil } diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index 676aaf8c07..d2b4171d96 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -154,7 +154,7 @@ func TestHashringGet(t *testing.T) { } } -func TestConsistentHashringGet(t *testing.T) { +func TestKetamaHashringGet(t *testing.T) { baseTS := &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ { @@ -181,21 +181,21 @@ func TestConsistentHashringGet(t *testing.T) { nodes: []string{"node-1", "node-2", "node-3"}, ts: baseTS, n: 1, - expectedNode: "node-3", + expectedNode: "node-1", }, { name: "base case with replication", nodes: []string{"node-1", "node-2", "node-3"}, ts: baseTS, n: 2, - expectedNode: "node-1", + expectedNode: "node-2", }, { name: "base case with replication and reordered nodes", nodes: []string{"node-1", "node-3", "node-2"}, ts: baseTS, n: 2, - expectedNode: "node-1", + expectedNode: "node-2", }, { name: "base case with new node at beginning of ring", @@ -234,8 +234,8 @@ func TestConsistentHashringGet(t *testing.T) { } } -func TestConsistentHashringConsistency(t *testing.T) { - series := makeSeries(10000) +func TestKetamaHashringConsistency(t *testing.T) { + series := makeSeries() ringA := []string{"node-1", "node-2", "node-3"} a1, err := assignSeries(series, ringA) @@ -254,8 +254,8 @@ func TestConsistentHashringConsistency(t *testing.T) { } } -func TestConsistentHashringIncreaseAtEnd(t *testing.T) { - series := makeSeries(10000) +func TestKetamaHashringIncreaseAtEnd(t *testing.T) { + series := makeSeries() initialRing := []string{"node-1", "node-2", "node-3"} initialAssignments, err := assignSeries(series, initialRing) @@ -274,8 +274,8 @@ func TestConsistentHashringIncreaseAtEnd(t *testing.T) { } } -func TestConsistentHashringIncreaseInMiddle(t *testing.T) { - series := makeSeries(10000) +func TestKetamaHashringIncreaseInMiddle(t *testing.T) { + series := makeSeries() initialRing := []string{"node-1", "node-3"} initialAssignments, err := assignSeries(series, initialRing) @@ -294,7 +294,28 @@ func TestConsistentHashringIncreaseInMiddle(t *testing.T) { } } -func makeSeries(numSeries int) []*prompb.TimeSeries { +func TestKetamaHashringReplicationConsistency(t *testing.T) { + series := makeSeries() + + initialRing := []string{"node-1", "node-4", "node-5"} + initialAssignments, err := assignReplicatedSeries(series, initialRing, 2) + require.NoError(t, err) + + resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"} + reassignments, err := assignReplicatedSeries(series, resizedRing, 2) + require.NoError(t, err) + + // Assert that the initial nodes have no new keys after increasing the ring size + for _, node := range initialRing { + for _, ts := range reassignments[node] { + foundInInitialAssignment := findSeries(initialAssignments, node, ts) + require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node) + } + } +} + +func makeSeries() []*prompb.TimeSeries { + numSeries := 10000 series := make([]*prompb.TimeSeries, numSeries) for i := 0; i < numSeries; i++ { series[i] = &prompb.TimeSeries{ @@ -322,15 +343,21 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string, } func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) { + return assignReplicatedSeries(series, nodes, 0) +} + +func assignReplicatedSeries(series []*prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]*prompb.TimeSeries, error) { hashRing := newKetamaHashring(nodes, SectionsPerNode) assignments := make(map[string][]*prompb.TimeSeries) - for _, ts := range series { - result, err := hashRing.Get("tenant", ts) - if err != nil { - return nil, err - } - assignments[result] = append(assignments[result], ts) + for i := uint64(0); i < replicas; i++ { + for _, ts := range series { + result, err := hashRing.GetN("tenant", ts, i) + if err != nil { + return nil, err + } + assignments[result] = append(assignments[result], ts) + } } return assignments, nil