Normally, we consume Kafka messages from the beginning/end of a topic, or the last committed offsets. For backfilling or troubleshooting however, we occasionally need to consume messages from a certain timestamp. The Kafka consumer class of the kafka-python package has a method to seek a particular offset for a topic partition. Therefore, if we know which topic partition to choose – such as by assigning a topic partition – we can easily override the fetch offset. When we deploy multiple consumer instances together however, we make them subscribe to a topic, and topic partitions are dynamically assigned, which means we do not know which topic partition will be assigned to a consumer instance in advance. In this post, we will discuss how to configure the Kafka consumer to seek offsets by timestamp where topic partitions are dynamically assigned by subscription.
Kafka Docker Environment
A single node Kafka cluster is created as a docker-compose service with Zookeeper, which is used to store the cluster metadata. Note that the Kafka and Zookeeper data directories are mapped to host directories so that Kafka topics and messages are preserved when the services are restarted. As discussed below, fake messages are published into a Kafka topic by a producer application and it runs outside the docker network (kafkanet). In order for the producer to access the Kafka cluster, we need to add an external listener and it is configured on port 9093. Finally the Kafka UI is added for monitoring the kafka broker and related resources. The source can be found in the post’s GitHub repository.
Before we start the services, we need to create the directories that are used for volume-mapping and update their permissions. The services can then be started as usual. A Kafka topic with two partitions is used in this post, and it is created manually as it is different from the default configuration.
The topic can be checked in the Kafka UI as shown below.
A Kafka producer is created to send messages to the orders topic, and fake messages are generated using the Faker package.
The Order class generates one or more fake order records by the create method. An order record includes order id, order timestamp, customer and order items.
A sample order record is shown below.
The Kafka producer sends one or more order records. A message is made up of an order id as the key, and an order record as the value. Both the key and value are serialised as JSON. Once started, it sends order messages to the topic indefinitely and ten messages are sent in a loop. Note that the external listener (localhost:9093) is specified as the bootstrap server because it runs outside the docker network. We can run the producer app simply by python producer.py.
After a while, we can see that messages are sent to the orders topic. Out of 2390 messages, 1179 and 1211 messages are sent to the partition 0 and 1, respectively.
Two consumer instances are deployed in the same consumer group. As the topic has two partitions, it is expected that each instance is assigned to a single topic partition. A custom consumer rebalance listener is registered so that the fetch offset is overridden with an offset timestamp environment variable (offset_str) when a topic partition is assigned.
Custom Consumer Rebalance Listener
The consumer rebalancer listener is a callback interface where custom actions can be implemented when topic partitions are assigned or revoked. For each topic partition assigned, it obtains the earliest offset with a timestamp greater than or equal to the given timestamp in the corresponding partition using the offsets_for_times method. It then overrides the fetch offset using the seek method. Note, that as consumer instances can be rebalanced multiple times over time, the OFFSET_STR value is better to be stored in an external configuration store. This way, we can control whether to override fetch offsets by changing configuration externally.
While it is common practice to specify one or more Kafka topics in the Kafka consumer class when it is instantiated, the consumer omits them in the create method. This is done to register the custom rebalance listener. In the process method, the consumer subscribes to the orders topic while registering the custom listener. After subscribing to the topic, it polls a single message at a time for ease of tracking.
Docker-compose is used to deploy multiple instances of the producer. Note that the compose service uses the same docker network (kafkanet) so that it can use kafka:9092 as the bootstrap server address. The OFFSET_STR environment variable is used to override the fetch offset.
We can start two consumer instances by scaling the consumer service number to 2.
Soon after the instances start to poll messages, we can see that their fetch offsets are updated as the current offset values are much higher than 0.
We can check logs of the consumer instances in order to check their behaviour further. Below shows the logs of one of the instances.
We see that the partition 1 is assigned to this instance. The offset 901 is taken to override and the message timestamp of that message is 2023-01-06T19:20:16.107000, which is later than the OFFSET_STR environment value.
We can also check that the correct offset is obtained as the message timestamp of offset 900 is earlier than the OFFSET_STR value.
In this post, we discussed how to configure Kafka consumers to seek offsets by timestamp. A single node Kafka cluster was created using docker compose and a Kafka producer was used to send fake order messages. While subscribing to the orders topic, the consumer registered a custom consumer rebalance listener that overrides the fetch offsets by timestamp. Two consumer instances were deployed using docker compose and their behaviour was analysed in detail.