The ctodd-python-lib-kafka project is responsible for interacting with Apache Kafka. This includes producing and consuming records from topics, utilizing .avro format, and other tasks in creating event driven applications with Python.
- confluent-kafka==0.11.6
- simplejson==3.16.0
This library is used to interacting with Kafka Admin functionality. This includes getting the admin object that will return details about kafka state.
Functions:
def get_kafka_admin_client(kafka_brokers):
"""
Purpose:
Get a Kafka Admin Client Object. Allows for polling information about Kafka
configuration and creating objects in Kafka
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa
brokers
Return:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
"""
This library is used to aid in creating kafka consumers.
Functions:
def get_kafka_consumer(
kafka_brokers,
consumer_group="default",
timeout=6000,
offset_start="latest",
get_stats=True
):
"""
Purpose:
Get a Kafka Consumer Object (not yet connected to a topic)
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
consumer_group (String): Consumer group to consume as. default is "default"
timeout (String): Timeout in ms if no messages are found (during poll). Default
is 6000
offset_start (String): Where to start consuming with respect to the consumer
group/topic offset. Default is "latest", which ignores any messages in the
topic before the consumer begins consuming
get_stats (Bool): Whether or not to print statistics. Default is True
Return:
kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
"""
def consume_topic(kafka_consumer, kafka_topics):
"""
Purpose:
Consume Kafka Topics
Args:
kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
kafka_topics (List of Strings): List of Kafka Topics to Consume.
Yields:
msg (Kafka Message Obj): Message Obj returned from the topic
"""
File for holding custom exception types that will be generated by the kafka_helpers libraries
Classes:
class TopicNotFound(Exception):
"""
Purpose:
The TopicNotFound will be raised when attempting to consume a topic that
does not exist
"""
This library is used to interact with kafka not specificlly related to consuming or producing messages
Functions:
This library is used to aid in creating kafka producers.
Functions:
def get_kafka_producer(kafka_brokers, get_stats=True):
"""
Purpose:
Get a Kafka Producer Object (not yet connected to a topic)
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
get_stats (Bool): Whether or not to print statistics. Default is True
Return:
kafka_producer (Kafka Producer Obj): Kafka Producer Object
"""
def produce_message(kafka_producer, kafka_topic, msg):
"""
Purpose:
Consume Kafka Topics
Args:
kafka_producer (Kafka Producer Obj): Kafka Producer Object
kafka_topic (String): Kafka Topic to Produce message to.
msg (String): Message to produce to Kafka
Returns:
N/A
"""
def produce_results_callback(err, msg):
"""
Purpose:
Optional per-message delivery callback (triggered by poll() or
flush()) when a message has been successfully delivered or
permanently failed delivery (after retries).
Args:
err (String): Error Message
msg (Object): Kafka Callback Message Object
Return:
N/A
"""
This library is used to interact with kafka topics. This includes getting a list of the topics, finding details about a topic, creating topics, and more.
Functions:
def get_topics(kafka_admin_client, return_system_topics=False):
"""
Purpose:
Get a List of Kafka Topics.
Args:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
Return:
kafka_topics (Dict of Kafka Topics): Key is the topic name and value is a
Kafka metadata object that has basic topic information
"""
def create_kafka_topic(
kafka_admin_client, topic_name, topic_replication=1, topic_partitions=1
):
"""
Purpose:
Create a Kafka Topic
Args:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
topic_name (String): Name of the topic to create
topic_replication (Int): Replication factor for the new topic
topic_partitions (Int): Number of partitions to devide the topic into
Return:
N/A
"""
Example executable Python scripts/modules for testing and interacting with the library. These show example use-cases for the libraries and can be used as templates for developing with the libraries or to use as one-off development efforts.
Purpose:
Consume from a Kafka Topic
Steps:
- Connect to Kafka
- Create Consumer Object
- Poll Topic
- Parse Message
- Print Message
example script call:
python3 consume_from_kafka_topic.py --topic="test-env-topic" \
--broker="0.0.0.0:9092" --consumer-group="test-env-consumer"
Purpose:
Produce to a Kafka Topic
Steps:
- Connect to Kafka
- Create Producer Object
- Prompt for Input
- Parse Input
- Produce Input to Kafka
example script call:
python3 produce_to_kafka_topic.py --topic="test-env-topic" \
--broker="localhost:9092"
Purpose:
Create a Kafka Topic. Takes in replication and parition information
Steps:
- Connect to Kafka
- Create Kafka Admin Client
- Create Topic In Kafka
function call:
---
example script call:
python3 create_kafka_topic.py --topic-name="test-env-topic" \
--topic-replication=3 --topic-partitions=4 \
--broker="localhost:9092"
- Relies on f-string notation, which is limited to Python3.6. A refactor to remove these could allow for development with Python3.0.x through 3.5.x
- Unittest framework in place, but lacking tests