<dy />
Back to Blog

Handling Kafka Messages in Laravel: A Complete Guide

Published October 19, 2025
10 min read
Written by Serhii Dychko
Apache Kafkamessage broker

This article provides a step-by-step guide to running Kafka locally with Laravel and Docker. All the code examples and a full working project are available on GitHub: https://github.com/dychkos/laravel-kafka.

Table of Contents

Why You Need Kafka for Your App

Kafka is a powerful message broker that helps your application handle data streams efficiently and reliably. It allows different parts of your system to communicate asynchronously, making your app more scalable and resilient. With Kafka, messages are persisted and delivered in order, reducing the risk of data loss or duplication. This makes it ideal for event-driven architectures, microservices, or any system that processes large volumes of data in real time.

Installation and Configuration

You can clone the GitHub repository to get a fully working project, but here are the key steps you need to follow to have a functioning Kafka example in Laravel.

First, you need to ensure the Kafka extension is enabled for your PHP application. We'll use the arnaud-lb/php-rdkafka package. In your PHP Dockerfile, you should install the required Kafka library and extension as follows:

To install the Kafka library:

    RUN apt-get update && apt-get install -y \
    librdkafka-dev

Then install and enable the PHP Kafka extension:

    RUN pecl install rdkafka-6.0.5 && \
        docker-php-ext-enable rdkafka
 

Full Dockerfile example.

After this, your PHP container will be ready to work with Kafka.

Next, let's add the Kafka service to our Docker Compose file. This setup ensures that Kafka runs locally and can communicate with your Laravel application.

kafka:
    image: confluentinc/cp-kafka:7.8.0
    container_name: kafka
    ports:
        - "9092:9092"
    environment:
        KAFKA_NODE_ID: 1 # Unique ID for this broker
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT # Maps listeners to protocols
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 # How clients connect
        KAFKA_PROCESS_ROLES: broker,controller # Defines the roles of this Kafka instance
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # Required for offset storage
        KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093 # Kafka controller quorum configuration
        KAFKA_LISTENERS: PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092 # Actual listener ports
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT # Inter-broker communication
        KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER # Controller listener name
        CLUSTER_ID: 'EmptNWtoR4GGWx-BH6nGLQ' # Unique cluster ID
    volumes:
        - kafka-data:/var/lib/kafka/data
    networks:
        - kafka-net
    healthcheck:
        test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
        interval: 10s
        timeout: 5s
        retries: 5

To make it easier to visualize and interact with Kafka messages, you can add a Kafka UI service:

kafka-ui:
    image: kafbat/kafka-ui:main
    container_name: kafka-ui
    ports:
        - "8080:8080"
    environment:
        DYNAMIC_CONFIG_ENABLED: "true"
        KAFKA_CLUSTERS_0_NAME: local
        KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
    depends_on:
        - kafka
    networks:
        - kafka-net

This configuration gives you a web interface at http://localhost:8080 to monitor topics, messages, and consumers.

Full docker-compose example.

Configuring Laravel to Connect to Kafka

To connect your Laravel application to Kafka, you first need to define the necessary environment variables. Add the following to your .env file:

    KAFKA_BROKERS=kafka:29092
    KAFKA_GROUP_ID=laravel-consumer-group
    KAFKA_CLIENT_ID=laravel-client
    KAFKA_AUTO_OFFSET_RESET=earliest
    KAFKA_AUTO_COMMIT=true
    KAFKA_SESSION_TIMEOUT=6000
    KAFKA_SOCKET_TIMEOUT=60000

Next, create a config/kafka.php file in your Laravel project to map these environment variables for easy use in your app:

    return [
        'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
        'group_id' => env('KAFKA_GROUP_ID', 'laravel-consumer-group'),
        'client_id' => env('KAFKA_CLIENT_ID', 'laravel-client'),
        'auto_offset_reset' => env('KAFKA_AUTO_OFFSET_RESET', 'earliest'),
        'enable_auto_commit' => env('KAFKA_AUTO_COMMIT', true),
        'session_timeout_ms' => env('KAFKA_SESSION_TIMEOUT', 6000),
        'socket_timeout_ms' => env('KAFKA_SOCKET_TIMEOUT', 60000),
    ];

Provider and Service for Handling Kafka

Now that we have Kafka running, it's time to connect our Laravel application and manage message production and consumption. We'll start by creating a KafkaProvider that sets up the basic configuration for producing and consuming messages.

    namespace App\Components\Kafka;
 
    use App\Components\Kafka\Interfaces\KafkaServiceInterface;
    use Illuminate\Support\ServiceProvider;
    use RdKafka\Conf;
    use RdKafka\KafkaConsumer;
    use RdKafka\Producer;
 
    class KafkaProvider extends ServiceProvider
    {
        public function register(): void
        {
            $this->app->singleton('kafka.producer', function () {
                return $this->createProducer();
            });
 
            $this->app->singleton('kafka.consumer', function () {
                return $this->createConsumer();
            });
 
        $this->app->singleton(KafkaServiceInterface::class, function ($app) {
            return new KafkaService(
                $app->make('kafka.producer'),
                $app->make('kafka.consumer')
            );
        });
    }
 
    protected function createProducer(): Producer
    {
        $config = config('kafka');
        $conf = new Conf();
 
        $conf->set('metadata.broker.list', $config['brokers']);
        $conf->set('client.id', $config['client_id']);
        $conf->set('socket.timeout.ms', $config['socket_timeout_ms']);
 
        return new Producer($conf);
    }
 
    protected function createConsumer(): KafkaConsumer
    {
        $config = config('kafka');
        $conf = new Conf();
 
        $conf->set('metadata.broker.list', $config['brokers']);
        $conf->set('group.id', $config['group_id']);
        $conf->set('auto.offset.reset', $config['auto_offset_reset']);
        $conf->set('enable.auto.commit', $config['enable_auto_commit']);
        $conf->set('session.timeout.ms', $config['session_timeout_ms']);
        $conf->set('socket.timeout.ms', $config['socket_timeout_ms']);
 
        return new KafkaConsumer($conf);
    }
    }

This provider creates two main entities: a producer for sending messages and a consumer for reading them. Both are registered as singletons so the same instance is used throughout your application. Additionally, we register a top-level KafkaService that wraps these entities and provides an easy interface for different messaging scenarios.

Add to bootstrap/providers.php providers array:

    return [
    	// ...
    	App\Providers\KafkaProvider::class,
    ];

Kafka Service

The KafkaService class is the top layer for managing Kafka messages. It provides methods to publish individual messages, publish batches of messages, and consume messages from topics. Here's an example:

    class KafkaService implements KafkaServiceInterface
    {
        private Producer $producer;
        private KafkaConsumer $consumer;
 
    public function __construct(Producer $producer, KafkaConsumer $consumer)
    {
        $this->producer = $producer;
        $this->consumer = $consumer;
    }
 
    public function publishMessage(KafkaTopicEnum $topic, array $payload, ?string $key = null): void
    {
        $topicProducer = $this->producer->newTopic($topic->value);
        $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($payload), $key);
 
        for ($i = 0; $i < 10; $i++) {
            if ($this->producer->flush(1000) === 0) break;
        }
    }
 
    public function publishBatch(KafkaTopicEnum $topic, array $messages): void
    {
        $topicProducer = $this->producer->newTopic($topic->value);
 
        foreach ($messages as $message) {
            $payload = is_array($message['payload']) ? json_encode($message['payload']) : $message['payload'];
            $key = $message['key'] ?? null;
            $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, $payload, $key);
        }
 
        $remaining = $this->producer->flush(30000);
        if ($remaining > 0) Log::warning("Failed to flush {$remaining} messages");
    }
 
    public function consume(KafkaTopicEnum|array $topics, callable $callback, int $timeout = 120000, ?int $maxMessages = null): void
    {
        $topics = is_array($topics) ? $topics : [$topics->value];
        $this->consumer->subscribe($topics);
 
        $count = 0;
 
        try {
            while (true) {
                $message = $this->consumer->consume($timeout);
 
                match ($message->err) {
                    RD_KAFKA_RESP_ERR_NO_ERROR => $this->handleMessage($message, $callback),
                    RD_KAFKA_RESP_ERR__TIMED_OUT => null,
                    RD_KAFKA_RESP_ERR__PARTITION_EOF => null,
                    default => Log::error('Kafka consumer error: '.$message->errstr()),
                };
 
                if ($maxMessages && ++$count >= $maxMessages) break;
            }
        } finally {
            $this->consumer->unsubscribe();
        }
    }
 
    private function handleMessage(Message $message, callable $callback): void
    {
        try {
            $payload = json_decode($message->payload, true) ?? $message->payload;
            $callback([
                'topic' => $message->topic_name,
                'partition' => $message->partition,
                'offset' => $message->offset,
                'key' => $message->key,
                'payload' => $payload,
                'timestamp' => $message->timestamp,
            ]);
            Log::debug('Processed message', ['topic' => $message->topic_name, 'offset' => $message->offset]);
        } catch (\Exception $e) {
            Log::error('Error processing message: '.$e->getMessage(), ['topic' => $message->topic_name, 'offset' => $message->offset]);
            throw $e;
        }
    }
 
    public function getProducer(): Producer { return $this->producer; }
    public function getConsumer(): KafkaConsumer { return
 
    $this->consumer;
    }
    }

Method Overview:

publishMessage sends a single message to a topic and immediately flushes it to ensure delivery. publishBatch allows sending multiple messages in one go, which is more efficient for bulk operations but requires a longer flush time. consume subscribes to one or more topics and processes messages using a callback. handleMessage is an internal helper to decode messages and handle logging and error management.

This setup gives you a clean, reusable interface for Kafka messaging in Laravel, making it easy to produce and consume messages without repeating boilerplate code.

Producing and Consuming Kafka Messages in Laravel

Now that we have the basic Kafka setup, let's create a full working example where we produce and consume messages. It's a good practice to define Kafka topics in an enum to manage them easily and avoid typos:

    namespace App\Components\Kafka\Enums;
 
    enum KafkaTopicEnum: string
    {
        case DEFAULT = 'default';
        case USER_REGISTRATION = 'user_registration';
    }

Next, we create two routes and a controller to publish messages:

    class KafkaTestController extends Controller
    {
        public function publishMessage(KafkaServiceInterface $kafkaService): JsonResponse
        {
            $kafkaService->publishMessage(
                topic: KafkaTopicEnum::USER_REGISTRATION,
                payload: [
                    'user_id' => 123,
                    'action' => 'user.created',
                    'timestamp' => now()->toIso8601String(),
                    'data' => ['name' => 'John Doe', 'email' => 'john@example.com']
                ],
                key: 'user_123'
            );
 
        return response()->json(['status' => 'published']);
    }
 
    public function publishBatch(KafkaServiceInterface $kafkaService): JsonResponse
    {
        $kafkaService->publishBatch(KafkaTopicEnum::DEFAULT, [
            ['payload' => ['event' => 'event1', 'data' => 'value1'], 'key' => 'event_1'],
            ['payload' => ['event' => 'event2', 'data' => 'value2'], 'key' => 'event_2'],
            ['payload' => ['event' => 'event3', 'data' => 'value3'], 'key' => 'event_3'],
        ]);
 
        return response()->json(['status' => 'batch published']);
    }
    }
 

When a user visits /kafka/publish, a single message is sent to the USER_REGISTRATION topic. Visiting /kafka/publishBatch sends multiple messages to the DEFAULT topic. The key parameter helps Kafka organize messages within partitions.

Creating a Kafka Consumer

To consume messages, the recommended approach is to create a Laravel command that runs as a daemon. This process continuously listens for new messages from a topic:

    class KafkaConsumeCommand extends Command
    {
        protected $signature = 'kafka:consume {topic} {--timeout=120000} {--max-messages=}';
        protected $description = 'Consume messages from Kafka topic';
 
        public function handle(KafkaServiceInterface $kafkaService)
        {
        $topic = $this->argument('topic');
        $error = $this->validatePrompt($topic, ['required', 'string', Rule::enum(KafkaTopicEnum::class)]);
 
        if ($error) {
            $this->error("Invalid topic: {$topic}");
            return CommandAlias::FAILURE;
        }
 
        $timeout = $this->option('timeout');
        $maxMessages = $this->option('max-messages');
 
        $this->info("Consuming from topic: {$topic}");
 
        $kafkaService->consume(
            KafkaTopicEnum::tryFrom($topic),
            function ($message) {
                $this->line(json_encode($message, JSON_PRETTY_PRINT));
            },
            timeout: (int) $timeout,
            maxMessages: $maxMessages ? (int) $maxMessages : null
        );
    }
    }

Inside this command, Laravel validates the topic, then calls the Kafka service's consume method. The service subscribes to the given topic and continuously fetches messages. For each message received, the provided callback prints the message in a readable JSON format. The optional timeout and max-messages parameters allow controlling how long the consumer listens and how many messages to process before stopping.

With this setup, your Laravel application can both produce messages to Kafka topics and run a persistent consumer to handle incoming messages, enabling a full message-driven workflow.

Next Steps and Best Practices

Now that your Laravel application can produce and consume Kafka messages, there are several practices you can follow to ensure your message-driven system is reliable and maintainable. First, always define your Kafka topics using enums or constants to prevent typos and make managing topics easier. For production systems, consider using multiple partitions for high throughput and consumer groups to scale message processing across multiple workers.

Make sure to handle errors and retries gracefully. In the Kafka service, logging is already implemented, but in production you might want to add dead-letter topics for messages that consistently fail processing. Monitor your Kafka cluster using tools like Kafka UI or Prometheus to track message lag, broker health, and consumer performance.

Get In Touch

What’s next? Feel free to reach out to me if you`re looking for a good developer, have a query, or simply want to connect.

dychkos@proton.me

+ 380 73 404 25 36

You may also find me on these platforms!