Apache Kafka: The Future of Real-Time Data Processing
Blog

Apache Kafka: The Future of Real-Time Data Processing

Apache Kafka is an open-source software platform that functions as a distributed publish-subscribe messaging system allowing the exchange of data between applications, servers, and processors, while also providing a robust queue that can handle a high volume of data and enables messages to be passed from one end-point to another.

Apache Kafka was originally developed by LinkedIn, later it was donated to the Apache Software Foundation and became an open-sourced Apache project in early 2011. Currently, it is maintained by Confluent under Apache Software Foundation. Kafka is written in Scala and Java. More than 80% of all Fortune 100 companies trust and use Kafka.                   

Benefits of Kafka:

  • Open source: It is freely available and can be easily customized and extended by developers.
  • Scalability: Kafka is designed to scale horizontally and can handle high volumes of data in real-time, making it suitable for use in large-scale data processing applications.
  • High throughput: It is capable of handling trillions of data events in a day.
  • Low latency: It is suitable for real-time streaming applications that require fast and immediate responses.
  • Fault tolerance: It ensures that the data is not lost in the event of node failure or network outages.
  • Flexibility: It can be customized to fit a wide range of use cases, from data ingestion and stream processing to messaging and log aggregation.
  • Ecosystem: It has a rich ecosystem of tools and technologies that integrate with it, such as connectors, stream processing frameworks, and monitoring tools, making it a powerful platform for building data processing pipelines and streaming applications.

Use cases of Kafka:

  • Data ingestion: It can be used to ingest large volumes of data from multiple sources into a centralized data pipeline, allowing organizations to collect, process, and analyze data in real-time.
  • Stream processing: It can be used as a stream processing engine for real-time analytics, such as monitoring web traffic, analyzing social media feeds, or tracking machine sensor data.
  • Messaging: It can be used as a messaging system for building event-driven architectures that allow services and applications to communicate with each other in a decoupled, asynchronous way.
  • Log aggregation: It can be used to aggregate logs from multiple servers and applications, making it easier to manage and analyze log data in real-time.
  • Commit log: It can be used as a commit log for distributed systems, ensuring that data is reliably stored and replicated across multiple nodes in a cluster.
  • Microservices: It can be used as a communication backbone for microservices architectures, enabling services to communicate with each other in a scalable and fault-tolerant manner.

Apache Kafka core APIs:

  • Admin API: This is used to manage and inspect topics, brokers, and other Kafka objects.
  • Producer API: This is used to publish (write) a stream of events to one or more Kafka topics.
  • Consumer API: This is used to subscribe to (read) one or more topics and to process the stream of events produced to them.
  • Kafka Streams API: This is used to implement stream processing applications and microservices. It provides high-level functions to process event streams, including transformations, stateful operations like aggregations and joins, windowing, processing based on event time, and more. Input is read from one or more topics to generate output to one or more topics, effectively transforming the input streams into output streams.
  • Kafka Connect API: This is used to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events from and to external systems and applications so they can integrate with Kafka. For example, a connector to a relational database like PostgreSQL might capture every change to a set of tables. However, in practice, you typically don’t need to implement your own connectors because the Kafka community already provides hundreds of ready-to-use connectors.

Hands-on Example:

I assume that you have gained a basic overview of Kafka, including its benefits, use cases, and core APIs.

In this part, I will focus primarily on two of its widely-used core APIs:

  1. Producer API
  2. Consumer API

I will be using Bitnami Kafka Docker image, Python programming language, and kafka-python package to gain a better understanding of these two APIs.

Step 1: Downloading the Bitnami Kafka Docker image

To download the functional docker-compose.yml file of bitnami/kafka, run the following curl command.

Run the below command to download and set up the required functional containers such as Zookeeper and Kafka

Step 2: Project setup

Create a new folder for this project or run the below command in the terminal.

Open the newly created folder/directory and create a virtual environment in it using the below command.                   

Activate the virtual environment, and then proceed to install kafka-python package in it using the below command.

Create 3 files named data.py, producer.py, and consumer.py in the main directory that we have created and make sure all the files and folders are created properly.

Step 3: Adding dummy data in data.py

Open the file data.py and add the following car's data to it. We will be using this car's data to produce and consume using Kafka Producer and Consumer API later.

Step 4: Creating Kafka Producer

It's time to start creating the Kafka producer using the kafka-python package. Place the below code in the producer.py file created earlier.                   

Let's go through the above code line-by-line to create the Kafka producer server.                   
The code begins with importing dumps, KafkaProducer, and sleep from three packages - json, kafka, and time respectively - followed by importing the CARS list of objects from the previously created data.py file. We will understand the purpose and usage of these when we start using them.

After importing the necessary modules, created a KafkaProducer object named producer and passed the required parameters:

  • bootstrap_servers: This accepts a list of IP addresses along with the port(default: 9092) number. As there can be multiple brokers located in different regions which can receive messages from the same producer. In this session, I will be running the Kafka server locally in my system. So that I am passing a single value in a list with the IP address as ‘localhost’ and the default port number that is ‘9092’.
  • value_serializer: The messages passed by the producers should be of type string or ASCII converted. As we are passing car objects, I am using the lambda function to dump the data and encode it.

Followed by a print statement, just to indicate the Producer is Started.                   
And then, there is a for loop iterating through the CARS - a list of objects.                   
In which there is a print statement stating ‘Sending car <car-name>’.                   
Followed by a producer.send method with 2 parameters:

  • ‘cars_topic’: This is the topic name to which the producers will be sending their messages. In Kafka, there is a concept called topic to which the producers will be sending the messages and the consumers will be subscribing to the topic to consume the messages.
  • car: This is a car object with specific car details that need to be sent by the producers through Kafka.     

Lastly, there is a sleep method with 2 seconds. This is not mandatory, I added this statement just to feel the working of Kafka Producer and Consumer. The producer waits for 2 seconds after sending each message. 

Step 5: Creating Kafka Consumer

So far we have created the Kafka Producer service to produce/publish the messages to the Kafka topic. Let’s now create a Kafka consumer service to consume the messages that are sent by the producer.                   
Place the below code in the consumer.py file created earlier.

Similar to the producer.py, we have imported KafkaConsumer and loads from two packages kafka and json respectively at the beginning of the file. As usual, let’s understand the purpose and usage of these when we start using them.

After importing the necessary modules, created a KafkaConsumer object named consumer and passed the required parameters:

  • ‘cars_topic’: This is the topic name to which the consumer will be subscribed. To receive messages in Kafka, consumers must subscribe to a particular topic. Consumers will only receive messages from the producers if they are subscribed to the topic to which the producer is sending the messages. If messages are sent to different topics, the consumers will not be able to receive them. As you can see we are subscribing to the same topic to which we are sending messages from the producers.
  • bootstrap_servers: As discussed earlier while creating the producer. We need to provide an IP address along with the port number in order to connect to a particular broker. As I am sending the messages to my local Kafka broker, I will be connecting my consumer to the same local Kafka broker to receive the messages with an IP address as ‘localhost’ and the default port number ‘9092’.
  • auto_offset_reset: This is a policy for resetting offsets. I will be explaining the offset with a print statement below. auto_offset_reset accepts one of the below values:
  1. earliest: This will fetch the oldest value available in the offset first.

  2. latest(default): This is the default value and fetches the newest value available in the offset first.

We are using the earliest, as we accept/consume the first message first.

  • group_id: This is the name of the consumer group used for fetching. The Kafka consumer alone is not allowed to consume the messages in the Kafka service/broker. The consumer should be assigned to any consumer group in order to consume the messages. The default value is None. If no value is provided to this variable, it assigns the default value None/null and generates a random group_id for that particular consumer. We are using ‘cars-group-id’.
  • value_deserializer: This acts contrary to the parameter value_serializer used in the producer. This is an optional parameter. As we serialized our message in the producer, deserialization is required in the consumer to receive a proper message. The deserialization process is done using the loads functions which we have imported at the beginning from json package with a callable function. The value_deserializer is an optional callable parameter. As it is a callable parameter, we are using the lambda function for deserialization.                   
    After setting all the above parameters, our basic Kafka consumer constructor is now ready.

    Next, there is a print statement stating ‘Consumer started…’.                   
    Followed by a for loop, iterating through the consumer instance object to fetch a ConsumerRecord containing all the metadata about a particular message as a message.                   
    Added a few print statements in the for loop to verify the details. Let me explain them with the following points:

  • Topic: In the first print statement, we are printing the topic name from which that particular message is consumed. The topics are present inside each broker/Kafka server.
  • Partition: In the second print statement, we are printing the partition number. Each topic will contain partitions in them with an id starting from 0. And each partition is of a type list starting with index 0. Each item in a list is called offset.
  • Offset: In the third print statement, we are printing the offset number. Each item in a list of a partition is called offset. The index of an item in a list is called the offset number. All the message data sent by the Kafka Producer are stored in each offset.
  • Value: The last print statement is used to print the message data - the actual value sent by the producer.                   
    We are done with creating our Kafka Consumer.

Step 6: Configuring the docker-compose.yml file

The docker-compose.yml file downloaded in Step 1 looks like this:                   

Let me walk you through this docker-compose.yml file.

This is a Docker Compose file that describes a multi-container application that runs Apache ZooKeeper and Apache Kafka using Docker images provided by Bitnami. The application consists of two services, ‘zookeeper’ and ‘kafka’, and two volumes, ‘zookeeper_data’ and ‘kafka_data’.

Services:

  • zookeeper: This service uses the ‘bitnami/zookeeper:3.8’ Docker image and exposes port 2181 to the host machine. It also mounts the ‘zookeeper_data’ volume to ‘/bitnami’ in the container, which is where Zookeeper stores its data. The ‘ALLOW_ANONYMOUS_LOGIN’ environment variable is also set to ‘yes’, which allows anonymous clients to connect to ZooKeeper.
  • kafka: This service uses the ‘bitnami/kafka:3.4’ Docker image and exposes port 9092 to the host machine. It also mounts the ‘kafka_data’ volume to ‘/bitnami’ in the container, which is where Kafka stores its data. The ‘KAFKA_CFG_ZOOKEEPER_CONNECT’ environment variable is set to ‘zookeeper:2181’, which tells Kafka to use ZooKeeper for cluster coordination. The ‘ALLOW_PLAINTEXT_LISTENER’ environment variable is also set to ‘yes’, which enables Kafka to listen for unsecured (plaintext) client connections. The kafka service depends on the ‘zookeeper’ service, which means that the ‘zookeeper’ service must be started before the ‘kafka’ service. This ensures that Kafka can connect to ZooKeeper for cluster coordination.

Volumes:

The ‘zookeeper_data’ and ‘kafka_data’ volumes are both defined with a ‘local’ driver, which means that they are stored on the local host machine. This allows data to persist across container restarts and makes it easy to back up or migrate the data to a different machine.

All the above data is prewritten in the downloaded file. We need to add two more kafka environment variables as per our project dependency.                   
Add the below two lines under kafka environment:

The ‘KAFKA_CFG_ADVERTISED_LISTENERS’ environment variable is set to ‘PLAINTEXT://127.0.0.1:9092’, which tells Kafka to advertise its listener endpoint as ‘PLAINTEXT://127.0.0.1:9092’.                   
The ‘KAFKA_CFG_AUTO_CREATE_TOPICS’ environment variable is set to ‘cars_topic:1:1’, which creates a new Kafka topic called ‘cars_topic’ with one partition and one replica.                   
 

Step 7: Visualize the working of Kafka

Let’s start the Apache ZooKeeper and Apache Kafka server by executing the below command

Sample output:                   

Make sure you are in the working project directory and the Python virtual environment is activated.

Now, start the Kafka Consumer server first using the below command

You should see the message ‘Consumer started…’ and ready to consume messages.

Consumer output:

Finally, start the Kafka Producer server in a new terminal with Python virtual environment activated by using the below command

As all the servers are up and running, you will see a message ‘Producer started…’ and start publishing the messages from the CARS list of objects one by one with a delay of 2 seconds. 

Producer output:

Consumer output after the producer server gets started:

Thank you for reading this blog on Apache Kafka. I hope you found it informative and gained a basic understanding of the topic. You can find the source code of this project here kafka-producer-and-consumer.

Contact us today to schedule a consultation and learn how we can help you implement Apache Kafka in your organization. We offer a variety of services, including: Consulting & Support We are committed to helping our customers succeed with Apache Kafka.