Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add confluent kafka producer poll and flush returns #2527

Merged
merged 9 commits into from
May 30, 2024
Prev Previous commit
Next Next commit
fix lint
  • Loading branch information
dferrochio committed May 16, 2024
commit a33309976816ee53d0e8316b8e4035912f557aa9
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,4 @@ def test_producer_flush(self) -> None:
producer = instrumentation.instrument_producer(producer)
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.flush()
self.assertIsNotNone(msg)
self.assertIsNotNone(msg)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from confluent_kafka import Consumer, Producer
from typing import Optional

from confluent_kafka import Consumer, Producer


class MockConsumer(Consumer):
def __init__(self, queue, config):
self._queue = queue
Expand All @@ -21,14 +23,14 @@ def poll(self, timeout=None):

class MockedMessage:
def __init__(
self,
topic: str,
partition: int,
offset: int,
headers,
key: Optional[str]=None,
value=Optional[str]=None
):
self,
topic: str,
partition: int,
offset: int,
headers,
key: Optional[str] = None,
value: Optional[str] = None,
):
self._topic = topic
self._partition = partition
self._offset = offset
Expand Down Expand Up @@ -70,12 +72,12 @@ def produce(
offset=0,
headers=[],
key=kwargs.get("key"),
value=kwargs.get("value")
value=kwargs.get("value"),
)
)

def poll(self, timeout=None):
def poll(self, *args, **kwargs):
return len(self._queue)

def flush(self, timeout=None):
def flush(self, *args, **kwargs):
return len(self._queue)
Loading