From c5193585c3f8060eaebe749529493d25bfaf0958 Mon Sep 17 00:00:00 2001 From: Gabriele Gerbino Date: Tue, 11 Jun 2024 17:50:05 +0200 Subject: [PATCH] fix: improve lookup of consumer-groups' consumers This commit makes lookups for consumer-group's consumers more performant by adding indexes in the in-memory db, instead of relying on "manual" looping which is very expensive when several thousands of consumers exist. --- pkg/state/consumer_group_consumers.go | 75 ++++++++++++++++----------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/pkg/state/consumer_group_consumers.go b/pkg/state/consumer_group_consumers.go index f74e1d9..12b6108 100644 --- a/pkg/state/consumer_group_consumers.go +++ b/pkg/state/consumer_group_consumers.go @@ -12,6 +12,9 @@ import ( const ( consumerGroupConsumerTableName = "consumerGroupConsumer" consumerByGroupID = "consumerByGroupID" + consumerByConsumerID = "consumerByConsumerID" + consumerByUsername = "consumerByUsername" + consumerByCustomID = "consumerByCustomID" ) var errInvalidConsumerGroup = fmt.Errorf("consumer_group.ID is required in consumer group consumers") @@ -60,6 +63,39 @@ var consumerGroupConsumerTableSchema = &memdb.TableSchema{ }, }, }, + consumerByConsumerID: { + Name: consumerByConsumerID, + Indexer: &indexers.SubFieldIndexer{ + Fields: []indexers.Field{ + { + Struct: "Consumer", + Sub: "ID", + }, + }, + }, + }, + consumerByUsername: { + Name: consumerByUsername, + Indexer: &indexers.SubFieldIndexer{ + Fields: []indexers.Field{ + { + Struct: "Consumer", + Sub: "Username", + }, + }, + }, + }, + consumerByCustomID: { + Name: consumerByCustomID, + Indexer: &indexers.SubFieldIndexer{ + Fields: []indexers.Field{ + { + Struct: "Consumer", + Sub: "CustomID", + }, + }, + }, + }, }, } @@ -110,39 +146,20 @@ func (k *ConsumerGroupConsumersCollection) Add(consumer ConsumerGroupConsumer) e return nil } -func getAllByConsumerGroupID(txn *memdb.Txn, consumerGroupID string) ([]*ConsumerGroupConsumer, error) { - iter, err := txn.Get(consumerGroupConsumerTableName, consumerByGroupID, consumerGroupID) - if err != nil { - return nil, err - } - - var consumers []*ConsumerGroupConsumer - for el := iter.Next(); el != nil; el = iter.Next() { - t, ok := el.(*ConsumerGroupConsumer) - if !ok { - panic(unexpectedType) - } - consumers = append(consumers, &ConsumerGroupConsumer{ConsumerGroupConsumer: *t.DeepCopy()}) - } - return consumers, nil -} - func getConsumerGroupConsumer(txn *memdb.Txn, consumerGroupID string, IDs ...string) (*ConsumerGroupConsumer, error) { - consumers, err := getAllByConsumerGroupID(txn, consumerGroupID) - if err != nil { - return nil, err - } + indexes := []string{consumerByConsumerID, consumerByUsername, consumerByCustomID} for _, id := range IDs { - for _, consumer := range consumers { - var username string - if consumer.Consumer.Username != nil { - username = *consumer.Consumer.Username - } else { - username = *consumer.Consumer.CustomID + for _, index := range indexes { + res, err := txn.First(consumerGroupConsumerTableName, index, id) + if err != nil { + return nil, err } - if id == *consumer.Consumer.ID || id == username { - return &ConsumerGroupConsumer{ConsumerGroupConsumer: *consumer.DeepCopy()}, nil + if res != nil { + consumer := res.(*ConsumerGroupConsumer) + if *consumer.ConsumerGroup.ID == consumerGroupID { + return consumer, nil + } } } }