Kafka Asynchronous Consumer

BY Xamta Infotech

To create an asyncio Kafka consumer in Python using the aiokafka library, you first need to install the library if you haven't already. You can install it using pip:

pip install aiokafka

Here's an example of how you can create an asyncio Kafka consumer:

import asyncio
from aiokafka import AIOKafkaConsumer

async def consume():
    consumer = AIOKafkaConsumer(
        'your_topic_name',
        bootstrap_servers='your_bootstrap_servers',  # e.g., 'localhost:9092'
        group_id='your_group_id',
        auto_offset_reset='earliest',  # or 'latest' or 'none'
        enable_auto_commit=True  # Set it to False if you want manual commit
    )

    # Get cluster layout and join consumer group
    await consumer.start()

    try:
        # Consume messages
        async for message in consumer:
            print("Received message:", message.value.decode())
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

# Run the consumer in the event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(consume())

In this example:

  • Replace 'your_topic_name' with the name of the Kafka topic you want to consume messages from.
  • Set 'your_bootstrap_servers' to the address of your Kafka bootstrap servers (e.g., 'localhost:9092').
  • 'your_group_id' is the consumer group id. Kafka uses consumer groups to balance the load across a group of consumers.
  • auto_offset_reset specifies what to do when there is no initial offset in Kafka or if the current offset does not exist anymore (either because the data was deleted or the offset is out of range). You can set it to 'earliest', 'latest', or 'none'.
  • enable_auto_commit specifies whether the consumer's offset should be periodically committed to Kafka. Set it to True for automatic offset commits, or False if you want to commit offsets manually.

Make sure to customize the values according to your Kafka setup. This code sets up an asyncio Kafka consumer that listens for messages on the specified topic and prints the received messages to the console. You can integrate this code into your application and handle the messages as needed.

we are happy to serve you

Let's start a project.

Gujarat, India

based Development Company

We deliver web and mobile app development services to Indian businesses since 2013, with 100% project delivery success. Hire the best programmers at affordable prices. Our design-focused approach and project execution processes help you to deliver the right solutions.

Schedule Meeting !


Kafka Asyncio Producer
By Xamta Infotech