Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer.consume() stops returning messages #970

Closed
6 of 7 tasks
jeffjnh opened this issue Oct 22, 2020 · 3 comments
Closed
6 of 7 tasks

Consumer.consume() stops returning messages #970

jeffjnh opened this issue Oct 22, 2020 · 3 comments

Comments

@jeffjnh
Copy link

jeffjnh commented Oct 22, 2020

Description

Observed: Consumer (or two) within Group of 50 consumers stops receiving messages on Consume from assigned partitions.

Temporary Fix: Terminating a consumer (which restarts, triggering a rebalance) reassigns the partitions to another consumer and the partition lag is resolved as the new consumer has no issues receiving messages from the newly assigned partitions. The time between occurrences is not deterministic. It happens most times on an event that rolls all consumers, and occasionally on a random rebalance.

Notes:

  • This problem has arisen, seemingly non-deterministically, and is plaguing this consumer group.
  • There are Java based consumers using the same brokers/topic/partitions that do not have this issue
  • It appears to occur more often around a re-balance (Not a solid guarantee that this is setting the issue off)
  • This specific consumer is polling for a batch of 5 messages and spins off a thread to process each of those messages
  • The process time here can be relatively high (around 20 seconds) for a batch of messages
  • When this behavior occurs, it only happens on one or two consumers, not every one in the group.

How to reproduce

I am unsure about the process to reproduce here. I am currently working on the details around this issue to find the concrete case that triggers this behavior. This seems to occur when multiple rebalances are triggered subsequently, and occasionally on a single rebalance.

Upon further investigation, this can be reproduced easily by triggering a rebalance while a consumer is processing a record. This record will be processed, and subsequently the consumer will attempt to commit the message for a partition it no longer is assigned. This stops the fetchers from being activated for the new partitions thus creating a nil loop where the partition lag will build even though it has an assigned consumer.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system: Python 3.6.2-Stretch
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

confluent-kafka-python version: 1.5.0
librdkafka version: 1.5.0

broker-version: 2.2.1

Operating System: Linux, running Python 3.6.2-stretch

Consumer Config: {
'bootstrap.servers': 'my_boostrap_servers',
'group.id': 'my_unique_group',
'auto.offset.reset': 'earliest',
'max.poll.interval.ms': 300000,
'enable.auto.commit': False,
'client.id': str(uuid.uuid4()),
'max.partition.fetch.bytes': 104857600,
'debug': 'cgrp'
}

CGRP Logs in following message.

@jeffjnh
Copy link
Author

jeffjnh commented Oct 22, 2020

15:56:35 MainThread:abstract_kafka_consumer.py:49 INFO     ******Initializing Kafka Consumer******
%7|1603382195.161|MEMBERID|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:app]: Group "my_unique_group": updating member id "(not-set)" -> ""
%7|1603382195.162|INIT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) CONSUMER_UUID_CLIENT_ID#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS STATIC_LIB_zlib ZLIB STATIC_LIB_libcrypto STATIC_LIB_libssl SSL STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x100)
%7|1603382195.162|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op SUBSCRIBE (v0) in state init (join state init, v1 vs 0)
%7|1603382195.162|SUBSCRIBE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": subscribe to new subscription of 1 topics (join state init)
%7|1603382195.162|UNSUBSCRIBE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": unsubscribe from current unset subscription of 0 topics (leave group=no, join state init, v1)
%7|1603382195.162|GRPLEADER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": resetting group leader info: unsubscribe
%7|1603382195.162|REBALANCE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" is rebalancing in state init (join-state init) without assignment: unsubscribe
%7|1603382195.162|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state init -> wait-unassign (v1, state init)
%7|1603382195.162|UNASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": unassign done in state init (join state wait-unassign): without new assignment: unassign (no previous assignment)
%7|1603382195.162|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-unassign -> init (v1, state init)
%7|1603382195.162|CGRPSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed state init -> query-coord (v1, join-state init)
%7|1603382195.162|CGRPQUERY|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": no broker available for coordinator query: intervaled in state query-coord
%7|1603382195.164|PROTOERR|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:broker:909]: broker: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1945) (incorrect broker.version.fallback?)
%7|1603382195.164|PROTOERR|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:broker:909]: broker: ApiArrayCnt -1 out of range
%7|1603382195.166|CGRPQUERY|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group": querying for coordinator: intervaled in state query-coord
%7|1603382195.167|CGRPSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed state query-coord -> wait-coord (v1, join-state init)
%7|1603382195.168|CGRPCOORD|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group" coordinator is broker id 5
%7|1603382195.168|CGRPCOORD|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changing coordinator -1 -> 5
%7|1603382195.168|COORDSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" coordinator set to broker broker/5
%7|1603382195.168|CGRPSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed state wait-coord -> wait-broker-transport (v1, join-state init)
%7|1603382195.168|CGRPQUERY|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group": querying for coordinator: intervaled in state wait-broker-transport
%7|1603382195.169|PROTOERR|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/5: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1945) (incorrect broker.version.fallback?)
%7|1603382195.169|PROTOERR|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/5: ApiArrayCnt -1 out of range
%7|1603382195.169|CGRPCOORD|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group" coordinator is broker id 5
%7|1603382195.169|CGRPSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed state wait-broker-transport -> up (v1, join-state init)
%7|1603382195.169|JOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": join with 0 (1) subscribed topic(s)
%7|1603382195.169|CGRPMETADATA|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)
%7|1603382195.169|JOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": postponing join until up-to-date metadata is available
%7|1603382195.171|SUBSCRIPTION|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": effective subscription list changed from 0 to 1 topic(s):
%7|1603382195.171|SUBSCRIPTION|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]:  Topic my-topic with 100 partition(s)
%7|1603382195.171|REJOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": subscription updated from metadata change: rejoining group
%7|1603382195.171|GRPLEADER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": resetting group leader info: Group rejoin
%7|1603382195.171|REJOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" rejoining in join-state init without an assignment
%7|1603382195.171|REBALANCE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" is rebalancing in state up (join-state init) without assignment: group rejoin
%7|1603382195.171|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state init -> wait-unassign (v1, state up)
%7|1603382195.171|UNASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": unassign done in state up (join state wait-unassign): without new assignment: unassign (no previous assignment)
%7|1603382195.171|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-unassign -> init (v1, state up)
%7|1603382195.171|JOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": join with 1 (1) subscribed topic(s)
%7|1603382195.171|CGRPMETADATA|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
%7|1603382195.171|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state init -> wait-join (v1, state up)
%7|1603382195.171|JOINGROUP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId UUID_MEMBER_ID, 0 members in group: Broker: Group member needs a valid member ID
%7|1603382195.171|MEMBERID|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": updating member id "" -> "UUID_MEMBER_ID"
%7|1603382195.171|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-join -> init (v1, state up)
%7|1603382195.171|JOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": join with 1 (1) subscribed topic(s)
%7|1603382195.171|CGRPMETADATA|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
%7|1603382195.171|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state init -> wait-join (v1, state up)
%7|1603382196.358|JOINGROUP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: JoinGroup response: GenerationId 2012, Protocol range, LeaderId leaderID, my MemberId UUID_MEMBER_ID, 0 members in group: (no error)
%7|1603382196.358|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-join -> wait-sync (v1, state up)
%7|1603382196.371|SYNCGROUP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: SyncGroup response: Success (39 bytes of MemberState data)
%7|1603382196.371|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": delegating assign of 2 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
%7|1603382196.371|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-sync -> wait-assign-rebalance_cb (v1, state up)
%7|1603382196.371|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2012
%7|1603382196.404|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
%7|1603382196.404|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": new assignment of 2 partition(s) in join state wait-assign-rebalance_cb
%7|1603382196.404|BARRIER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": rd_kafka_cgrp_assign:2600: new version barrier v2
%7|1603382196.404|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": assigning 2 partition(s) in join state wait-assign-rebalance_cb
%7|1603382196.404|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
%7|1603382196.404|BARRIER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v3
%7|1603382196.404|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": starting fetchers for 2 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2651)
%7|1603382196.404|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1603382196.404|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]:  my-topic [18] offset INVALID
%7|1603382196.404|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]:  my-topic [19] offset INVALID
%7|1603382196.404|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 2/2 partition(s)
%7|1603382196.404|OFFSETFETCH|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1603382196.405|OFFSETFETCH|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]:  my-topic [18] offset INVALID
%7|1603382196.405|OFFSETFETCH|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]:  my-topic [19] offset INVALID
%7|1603382196.405|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [18]: setting default offset INVALID
%7|1603382196.405|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [19]: setting default offset INVALID
%7|1603382196.405|BARRIER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v4
%7|1603382196.405|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": starting fetchers for 2 assigned partition(s) in join-state assigned (usable_offsets=yes, v4, line 1810)
%7|1603382196.405|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1603382196.405|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]:  my-topic [18] offset 453978
%7|1603382196.405|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]:  my-topic [19] offset 449401
%7|1603382196.405|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state assigned -> started (v4, state up)
%7|1603382196.405|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op PARTITION_JOIN in state up (join state started, v4) for my-topic [18]
%7|1603382196.405|PARTADD|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": add my-topic [18]
%7|1603382196.405|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op PARTITION_JOIN in state up (join state started, v4) for my-topic [19]
%7|1603382196.405|PARTADD|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": add my-topic [19]
%7|1603382196.406|PROTOERR|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:broker:909]: broker:9092/3: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1945) (incorrect broker.version.fallback?)
%7|1603382196.406|PROTOERR|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:broker:909]: broker:9092/3: ApiArrayCnt -1 out of range
%7|1603382199.404|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2012
%7|1603382199.405|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" heartbeat error response in state up (join state started, 2 partition(s) assigned): Broker: Group rebalance in progress
%7|1603382199.406|REBALANCE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" is rebalancing in state up (join-state started) with assignment: group is rebalancing
%7|1603382199.406|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": delegating revoke of 2 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: group is rebalancing
%7|1603382199.406|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state started -> wait-revoke-rebalance_cb (v4, state up)
%7|1603382199.406|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op ASSIGN (v0) in state up (join state wait-revoke-rebalance_cb, v4 vs 0)
%7|1603382199.406|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": new assignment of 0 partition(s) in join state wait-revoke-rebalance_cb
%7|1603382199.406|BARRIER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": rd_kafka_cgrp_assign:2600: new version barrier v5
%7|1603382199.406|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-revoke-rebalance_cb -> wait-unassign (v5, state up)
%7|1603382199.406|BARRIER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": rd_kafka_cgrp_unassign:2506: new version barrier v6
%7|1603382199.406|UNASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": unassigning 2 partition(s) (v6)
%7|1603382199.406|UNASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Unassign not done yet (2 wait_unassign, 2 assigned, 0 wait commit, join state wait-unassign): unassign
%7|1603382199.406|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": assigning 0 partition(s) in join state wait-unassign
%7|1603382199.406|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for my-topic [18]
%7|1603382199.406|PARTDEL|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": delete my-topic [18]
%7|1603382199.406|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for my-topic [18]
%7|1603382199.406|UNASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1603382199.406|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for my-topic [19]
%7|1603382199.406|PARTDEL|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": delete my-topic [19]
%7|1603382199.406|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for my-topic [19]
%7|1603382199.406|UNASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": unassign done in state up (join state wait-unassign): without new assignment: FETCH_STOP done
%7|1603382199.406|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-unassign -> init (v6, state up)
%7|1603382199.406|JOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": join with 1 (1) subscribed topic(s)
%7|1603382199.406|CGRPMETADATA|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (4234ms old)
%7|1603382199.406|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state init -> wait-join (v6, state up)
%7|1603382206.501|JOINGROUP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: JoinGroup response: GenerationId 2013, Protocol range, LeaderId leaderID, my MemberId UUID_MEMBER_ID, 0 members in group: (no error)
%7|1603382206.501|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-join -> wait-sync (v6, state up)
%7|1603382216.500|SYNCGROUP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: SyncGroup response: Broker: Group rebalance in progress (0 bytes of MemberState data)
%7|1603382216.500|GRPSYNC|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": synchronization failed: Broker: Group rebalance in progress: rejoining
%7|1603382216.500|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-sync -> init (v6, state up)
%7|1603382216.500|JOIN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": join with 1 (1) subscribed topic(s)
%7|1603382216.500|CGRPMETADATA|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (21329ms old)
%7|1603382216.500|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state init -> wait-join (v6, state up)
%7|1603382226.404|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state wait-join, v6 vs 0)

//RECEIVED A RECORD HERE: Received a record here but it seems there are no currently assigned partitions. The poll timeout is 30 seconds, so might have received this single record before unassign started?

15:57:06 MainThread:abstract_kafka_consumer.py:78 INFO     Took 30000 millis to poll. 1 Records. {Assigned Partitions: storedOffsets} {}
15:57:06 MainThread:abstract_kafka_consumer.py:147 INFO     Received 1 record(s)
15:57:06 <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f2ddd26def0>_0:abstract_kafka_consumer.py:201 INFO     Processing record for user.


%7|1603382226.428|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_COMMIT (v0) in state up (join state wait-join, v6 vs 0)
%7|1603382226.501|JOINGROUP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: JoinGroup response: GenerationId 2014, Protocol range, LeaderId leaderID, my MemberId UUID_MEMBER_ID, 0 members in group: (no error)
%7|1603382226.501|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-join -> wait-sync (v6, state up)
%7|1603382226.502|COMMIT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: OffsetCommit for 1 partition(s): manual: returned: Local: Operation in progress
%7|1603382226.511|SYNCGROUP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: SyncGroup response: Success (39 bytes of MemberState data)
%7|1603382226.511|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": delegating assign of 2 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
%7|1603382226.511|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-sync -> wait-assign-rebalance_cb (v6, state up)
%7|1603382226.511|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382226.511|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v6 vs 0)
%7|1603382226.511|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": new assignment of 2 partition(s) in join state wait-assign-rebalance_cb
%7|1603382226.511|BARRIER|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": rd_kafka_cgrp_assign:2600: new version barrier v7
%7|1603382226.511|ASSIGN|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": assigning 2 partition(s) in join state wait-assign-rebalance_cb
%7|1603382226.511|CGRPJOINSTATE|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" changed join state wait-assign-rebalance_cb -> assigned (v7, state up)
%7|1603382226.511|FETCHSTART|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group": not starting fetchers for 2 assigned partition(s) in join-state assigned (usable_offsets=no, v7, line 2651): waiting for 1 commit(s)
%7|1603382226.603|CGRPQUERY|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group": querying for coordinator: OffsetCommitRequest failed
%7|1603382226.603|COMMIT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: OffsetCommit for 1 partition(s): manual: returned: Local: Operation in progress
%7|1603382226.604|CGRPCOORD|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group" coordinator is broker id 5
%7|1603382226.704|CGRPQUERY|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group": querying for coordinator: OffsetCommitRequest failed
%7|1603382226.704|COMMIT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: OffsetCommit for 1 partition(s): manual: returned: Broker: Specified group generation id is not valid


// Seems like this commit failed, which would make sense. After this, however, it does not receive any messages on Consume() call. I have validated that there are in fact messages on these partitions
// and that the offsets of these messages are ahead of the current committed offset. Even still, no messages are being returned

%7|1603382226.705|CGRPCOORD|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: broker: Group "my_unique_group" coordinator is broker id 5
%7|1603382229.908|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382232.909|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382235.910|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382238.910|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382241.910|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382244.910|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382247.910|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382250.910|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382253.911|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382256.428|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state assigned, v7 vs 0)
%7|1603382256.428|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382256.428|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382256.439|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [20]: setting default offset INVALID
%7|1603382256.439|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382256.439|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382256.440|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [21]: setting default offset INVALID
15:57:36 MainThread:abstract_kafka_consumer.py:78 INFO     Took 30000 millis to poll. 0 Records. {Assigned Partitions: storedOffsets} {20: 442895, 21: 478702} //Messages exist after these offsets!
%7|1603382256.911|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382259.911|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382262.911|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382265.912|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382268.912|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382271.912|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382274.912|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382277.912|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382280.913|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382283.913|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382286.441|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state assigned, v7 vs 0)
%7|1603382286.441|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382286.441|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382286.441|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [20]: setting default offset INVALID
%7|1603382286.441|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382286.441|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382286.442|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [21]: setting default offset INVALID
15:58:06 MainThread:abstract_kafka_consumer.py:78 INFO     Took 30000 millis to poll. 0 Records. {Assigned Partitions: storedOffsets} {20: 442895, 21: 478702}
%7|1603382286.913|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382289.913|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382292.913|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382295.913|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382298.914|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382301.914|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382304.914|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382307.914|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382310.915|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382313.915|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382316.442|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state assigned, v7 vs 0)
%7|1603382316.442|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382316.442|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382316.443|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [20]: setting default offset INVALID
%7|1603382316.443|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382316.443|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382316.444|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [21]: setting default offset INVALID
15:58:36 MainThread:abstract_kafka_consumer.py:78 INFO     Took 30000 millis to poll. 0 Records. {Assigned Partitions: storedOffsets} {20: 442895, 21: 478702}
%7|1603382316.915|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382319.915|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382322.915|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382325.916|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382328.916|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382331.916|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382334.916|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382337.916|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382340.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382343.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382346.445|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state assigned, v7 vs 0)
%7|1603382346.445|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382346.445|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382346.445|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [20]: setting default offset INVALID
%7|1603382346.446|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382346.446|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382346.446|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [21]: setting default offset INVALID
15:59:06 MainThread:abstract_kafka_consumer.py:78 INFO     Took 30000 millis to poll. 0 Records. {Assigned Partitions: storedOffsets} {20: 442895, 21: 478702}
%7|1603382346.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382349.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382352.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382355.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382358.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382361.917|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382364.918|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382367.918|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382370.918|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382373.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382376.447|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state assigned, v7 vs 0)
%7|1603382376.447|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382376.447|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382376.447|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [20]: setting default offset INVALID
%7|1603382376.447|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382376.447|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382376.447|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [21]: setting default offset INVALID
15:59:36 MainThread:abstract_kafka_consumer.py:78 INFO     Took 30000 millis to poll. 0 Records. {Assigned Partitions: storedOffsets} {20: 442895, 21: 478702}
%7|1603382376.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382379.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382382.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382385.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382388.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382391.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382394.919|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382397.920|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382400.920|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382403.920|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382406.448|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state assigned, v7 vs 0)
%7|1603382406.448|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382406.448|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382406.449|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [20]: setting default offset INVALID
%7|1603382406.449|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382406.449|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)
%7|1603382406.449|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Topic my-topic [21]: setting default offset INVALID
16:00:06 MainThread:abstract_kafka_consumer.py:78 INFO     Took 30001 millis to poll. 0 Records. {Assigned Partitions: storedOffsets} {20: 442895, 21: 478702}
%7|1603382406.920|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382409.920|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382412.920|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382415.920|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382418.921|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382421.921|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382424.921|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382427.921|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382430.921|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382433.921|HEARTBEAT|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Heartbeat for group "my_unique_group" generation id 2014
%7|1603382436.450|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op GET_ASSIGNMENT (v0) in state up (join state assigned, v7 vs 0)
%7|1603382436.450|CGRPOP|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: Group "my_unique_group" received op OFFSET_FETCH (v0) in state up (join state assigned, v7 vs 0)
%7|1603382436.450|OFFSET|CONSUMER_UUID_CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/5: Fetch committed offsets for 1/1 partition(s)

// The consumer (or a few in the group) will loop here without fetchers this way until one of them is killed to trigger a rebalance.

@jeffjnh
Copy link
Author

jeffjnh commented Oct 23, 2020

After some further digging into Librdkafka open issues, this seems to be the exact same problem as observed here confluentinc/librdkafka#2933

I see that it's part of milestone 1.5.2. Do you have an expected release date for this fix/when the fix will be bundled into confluent-kafka-python?

@edenhill
Copy link
Contributor

Sorry for missing this issue, great report!

The v1.6.0 release is due this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants