Skip to content

Commit

Permalink
fix: ConsumerGroupService
Browse files Browse the repository at this point in the history
修复当消费组isSimpleConsumerGroup为true时,没有消费信息问题
  • Loading branch information
daodol authored and dushixiang committed Aug 17, 2023
1 parent a308fd5 commit cd9f626
Showing 1 changed file with 93 additions and 51 deletions.
144 changes: 93 additions & 51 deletions src/main/java/cn/typesafe/km/service/ConsumerGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import cn.typesafe.km.entity.Cluster;
import cn.typesafe.km.service.dto.*;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -151,15 +149,28 @@ public List<ConsumerGroup> consumerGroup(String clusterId, String filterGroupId)
.map(groupId -> {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setGroupId(groupId);

ConsumerGroupDescription consumerGroupDescription = groups.get(groupId);
Set<String> topics = consumerGroupDescription.members()
.stream()
.map(s -> s.assignment().topicPartitions())
.flatMap(Collection::stream)
.map(TopicPartition::topic)
.collect(Collectors.toSet());
consumerGroup.setTopics(topics);
if (!consumerGroupDescription.isSimpleConsumerGroup()) {
Set<String> topics = consumerGroupDescription.members()
.stream()
.map(s -> s.assignment().topicPartitions())
.flatMap(Collection::stream)
.map(TopicPartition::topic)
.collect(Collectors.toSet());
consumerGroup.setTopics(topics);
return consumerGroup;
}
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap=null;
try {
topicPartitionOffsetAndMetadataMap = adminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
consumerGroup.setTopics(topicPartitionOffsetAndMetadataMap.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet()));
return consumerGroup;
})
.collect(Collectors.toList());
Expand Down Expand Up @@ -190,53 +201,84 @@ public void delete(String clusterId, String groupId) throws ExecutionException,
public List<ConsumerGroupDescribe> describe(String clusterId, String groupId) throws ExecutionException, InterruptedException {
AdminClient adminClient = clusterService.getAdminClient(clusterId);
ConsumerGroupDescription consumerGroupDescription = adminClient.describeConsumerGroups(Collections.singletonList(groupId)).all().get().get(groupId);

Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = adminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get();
Set<TopicPartition> topicPartitions = topicPartitionOffsetAndMetadataMap.keySet();
try (KafkaConsumer<String, String> kafkaConsumer = clusterService.createConsumer(clusterId)) {
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);

return consumerGroupDescription.members()
.stream()
.flatMap(consumer -> {
return consumer.assignment().topicPartitions()
.stream()
.map(topicPartition -> {
String topic = topicPartition.topic();
int partition = topicPartition.partition();
String consumerId = consumer.consumerId();
String host = consumer.host();
String clientId = consumer.clientId();
OffsetAndMetadata offsetAndMetadata = topicPartitionOffsetAndMetadataMap.get(topicPartition);
Long beginningOffset = beginningOffsets.get(topicPartition);
Long endOffset = endOffsets.get(topicPartition);
Long offset = null;
if (offsetAndMetadata != null) {
offset = offsetAndMetadata.offset();
}
ConsumerGroupDescribe consumerGroupDescribe = new ConsumerGroupDescribe();
consumerGroupDescribe.setGroupId(groupId);
consumerGroupDescribe.setTopic(topic);
consumerGroupDescribe.setPartition(partition);
consumerGroupDescribe.setCurrentOffset(offset);
consumerGroupDescribe.setLogBeginningOffset(beginningOffset);
consumerGroupDescribe.setLogEndOffset(endOffset);
if (endOffset != null && offset != null) {
consumerGroupDescribe.setLag(endOffset - offset);
} else {
consumerGroupDescribe.setLag(null);
}
consumerGroupDescribe.setConsumerId(consumerId);
consumerGroupDescribe.setHost(host);
consumerGroupDescribe.setClientId(clientId);

return consumerGroupDescribe;
});
})
.collect(Collectors.toList());
if (!consumerGroupDescription.isSimpleConsumerGroup()) {
return consumerGroupDescription.members()
.stream()
.flatMap(consumer -> {
return consumer.assignment().topicPartitions()
.stream()
.map(topicPartition -> {
String topic = topicPartition.topic();
int partition = topicPartition.partition();
String consumerId = consumer.consumerId();
String host = consumer.host();
String clientId = consumer.clientId();
OffsetAndMetadata offsetAndMetadata = topicPartitionOffsetAndMetadataMap.get(topicPartition);
Long beginningOffset = beginningOffsets.get(topicPartition);
Long endOffset = endOffsets.get(topicPartition);
Long offset = null;
if (offsetAndMetadata != null) {
offset = offsetAndMetadata.offset();
}
ConsumerGroupDescribe consumerGroupDescribe = new ConsumerGroupDescribe();
consumerGroupDescribe.setGroupId(groupId);
consumerGroupDescribe.setTopic(topic);
consumerGroupDescribe.setPartition(partition);
consumerGroupDescribe.setCurrentOffset(offset);
consumerGroupDescribe.setLogBeginningOffset(beginningOffset);
consumerGroupDescribe.setLogEndOffset(endOffset);
if (endOffset != null && offset != null) {
consumerGroupDescribe.setLag(endOffset - offset);
} else {
consumerGroupDescribe.setLag(null);
}
consumerGroupDescribe.setConsumerId(consumerId);
consumerGroupDescribe.setHost(host);
consumerGroupDescribe.setClientId(clientId);
return consumerGroupDescribe;
});
})
.collect(Collectors.toList());
}
List<ConsumerGroupDescribe> results=new ArrayList<>();
for (TopicPartition topicPartition : topicPartitionOffsetAndMetadataMap.keySet()) {
String topic = topicPartition.topic();
int partition = topicPartition.partition();
//String consumerId = consumer.consumerId();
//String host = consumer.host();
//String clientId = consumer.clientId();
OffsetAndMetadata offsetAndMetadata = topicPartitionOffsetAndMetadataMap.get(topicPartition);
Long beginningOffset = beginningOffsets.get(topicPartition);
Long endOffset = endOffsets.get(topicPartition);
Long offset = null;
if (offsetAndMetadata != null) {
offset = offsetAndMetadata.offset();
}
ConsumerGroupDescribe consumerGroupDescribe = new ConsumerGroupDescribe();
consumerGroupDescribe.setGroupId(groupId);
consumerGroupDescribe.setTopic(topic);
consumerGroupDescribe.setPartition(partition);
consumerGroupDescribe.setCurrentOffset(offset);
consumerGroupDescribe.setLogBeginningOffset(beginningOffset);
consumerGroupDescribe.setLogEndOffset(endOffset);
if (endOffset != null && offset != null) {
consumerGroupDescribe.setLag(endOffset - offset);
} else {
consumerGroupDescribe.setLag(null);
}
//consumerGroupDescribe.setConsumerId(consumerId);
//consumerGroupDescribe.setHost(host);
//consumerGroupDescribe.setClientId(clientId);
results.add( consumerGroupDescribe);
}
return results;
}
}
}

0 comments on commit cd9f626

Please sign in to comment.