In this blog post, we’re going to get back to basics and walk through how to get started using Apache Kafka with your Python applications.
We will assume some basic knowledge of Kafka. If you’re new to the project, the introduction and design sections of the Apache documentation are an excellent place to start. The Confluent blog is also packed with great information; Jay Kreps’s A Practical Guide to Building a Streaming Platform covers many of the core Kafka concepts again, but with a focus on Kafka’s role at a company-wide scale. Also noteworthy are Ben Stopford’s microservices blog posts (The Data Dichotomy, Services on a Backbone of Events) for his unique take on the relationship between applications and data.
For our examples we’ll use Plateforme Confluent. This is a source-available, open distribution of Kafka that includes connectors for various data systems, a REST layer for Kafka, and a schema registry. On OS X this is easily installed via the tar archive. Instructions for all platforms are available on the Confluent website.
The Confluent Python client confluent-kafka-python leverages the high performance C client librdkafka (also developed and supported by Confluent). Starting with version 1.0, these are distributed as self-contained binary wheels for OS X and Linux on PyPi. You can install (generally inside a virtual environment) with:
pip install confluent-kafka
You can get a single-broker Kafka cluster up and running quickly using default configuration files included with the Confluent Platform.
First, you’ll need to start a ZooKeeper instance, which Kafka utilizes for providing various distributed system related services. Assuming you used the zip or tar archive to install Confluent Platform, you can start ZooKeeper from the installation directory as follows:
Then to start a Kafka broker:
That’s it! You now have a Kafka broker to play with.
Here’s a simple program that writes a message with key ‘
hello‘ and value ‘
world‘ to the Kafka topic
After importing the
Producer class from the
confluent_kafka package, we construct a
Producer instance and assign it to the variable
p. The constructor takes a single argument: a dictionary of configuration parameters. Because confluent-kafka uses librdkafka for its underlying implementation, it shares the same set of configuration properties.
The only required property is
bootstrap.servers which is used to specify the address of one or more brokers in your Kafka cluster. In our case, there is only one, but a real-world Kafka cluster may grow to tens or hundreds of nodes. It doesn’t matter which broker(s) you specify here; this setting simply provides a starting point for the client to query the cluster – any broker can answer metadata requests about the cluster.
In the call to the
produce method, both the
value parameters need to be either a byte-like object (in Python 2.x this includes strings), a Unicode object, or
None. In Python 3.x, strings are Unicode and will be converted to a sequence of bytes using the UTF-8 encoding. In Python 2.x, objects of type
unicode will be encoded using the default encoding. Often, you will want to serialize objects of a particular type before writing them to Kafka. A common pattern for doing this is to subclass
Producer and override the
produce method with one that performs the required serialization.
The produce method returns immediately without waiting for confirmation that the message has been successfully produced to Kafka (or otherwise). The
flush method blocks until all outstanding produce commands have completed, or the optional timeout (specified as a number of seconds) has been exceeded. You can test to see whether all produce commands have completed by checking the value returned by the
flush method: if it is greater than zero, there are still produce commands that have yet to complete. Note that you should typically call
flush only at application teardown, not during normal flow of execution, as it will prevent requests from being streamlined in a performant manner.
To be notified when produce commands have completed, you can specify a callback function in the produce call. Here’s an example:
The callback method has two parameters – the first sends information about any error that occured whilst producing the message and the second information about the message produced. Callbacks are executed and sent as a side-effect of calls to the
flush methods. Unlike the
flush method, the
poll method always blocks for the specified timeout period (measured in seconds). An advantage of the poll based callback mechanism is that it allows you to keep everything single threaded and easy to reason about.
Data is read from Kafka using consumers that are generally working together as part of a consumer group. Different consumers subscribe to one or more topics and are automatically assigned to a subset of each topic’s partitions. If consumers are added or removed (perhaps due to failure) from the group, the group will automatically rebalance so that one and only one consumer is ever reading from each partition in each topic of the subscription set. For more detailed information on how consumer groups work, Jason Gustafson’s blog post covering the Java consumer is an excellent reference.
Below is a simple example that creates a Kafka consumer that joins consumer group
mygroup and reads messages from its assigned partitions until Ctrl-C is pressed:
A number of configuration parameters are worth noting:
bootstrap.servers: As with the producer, bootstrap servers specifies the initial point of contact with the Kafka cluster.
group.id: The name of the consumer group the consumer is part of. If the consumer group does not yet exist when the consumer is constructed (there are no existing consumers that are part of the group), the group id will be created automatically. Similarly, if all consumers in a group leave the group, the group and group id will be automatically destroyed.
client.id: Although optional, each consumer in a group should be assigned a unique id – this allows you to differentiate between clients in Kafka error logs and monitoring aggregates.
default.topic.config: A number of topic related configuration properties are grouped together under this high level property. One commonly used topic-level property is
auto.offset.resetwhich specifies which offset to start reading from if there have been no offsets committed to a topic/partition yet. This defaults to
latest, however you will often want this to be smallest so that old messages are not ignored when you first start reading from a topic.
enable.auto.commit: By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. Often you would like more control over exactly when offsets are committed. In this case you can set enable.auto.commit to
Falseand call the
commitmethod on the consumer. For simplicity, we have left auto offset commit enabled in this example.
After constructing the consumer, the
subscribe method is called to inform Kafka that we wish to join the consumer group
mygroup (specified in the configuration) and read messages from a single topic
mytopic. It’s possible to subscribe to more than one topic by specifying more than one topic name in the list provided to the
subscribe method. Note that you can’t do this by calling the
subscribe method a second time – this would result in the consumer first unsubscribing from the original subscription set and then subscribing to only the topic(s) in the newly specified one.
Having subscribed to a set of topic groups, we enter the main poll loop. This is wrapped in a
try/except block that allows controlled shutdown of the consumer via the
close method when the user interrupts program execution. If the
close method is omitted, the consumer group would not rebalance immediately – removal of the consumer from the group would occur as per the consumer group failure detection protocol after the
session.timeout.ms has elapsed.
On the consumer, the
poll method blocks until a Message object is ready for consumption, or until the timeout period (specified in seconds) has elapsed, in which case the return value is
None. When a
Message object is available, there are essentially three cases to consider, differentiated by the value returned by
Messageobject represents a consumed message. The message key, value and other relevant information can be obtained via the
offset()methods of the
Messageobject does not encapsulate any consumed message – it simply signals that the end of a partition has been reached. You can use the
topic()methods to determine the pertinent partition.
Messageobject methods may return valid values. For most error types, use of
That concludes our introduction on how to integrate Apache Kafka with your Python applications. In order to keep this post to a reasonable length, we’ve omitted some of the more advanced features of kafka python integration provided by the library. For example, you can hook into the partition assignment process that happens after you call
subscribe on the consumer but before any messages are read. This allows you to do things like pre-load state associated with the partition assignment for joining with the consumed messages. The client also ships with
AvroConsumer classes that allow you to serialize data in Avro format and manage the evolution of the associated schemas using schema registry. For further information of kafka python integration, refer to the API documentation, the examples in the github repo, or user’s guide on our website.
For expert advice on deploying or operating Kafka, we’ve released a range of training and technical consulting services covering all levels of expertise for you to consume and learn from. For large-scale deployments of Kafka, we offer Confluent Platform, which not only provides a number of powerful features in addition to those under the Confluent Community License but also provides enterprise grade support. Finally, a hosted and fully managed version Apache Kafka is just around the corner with the up-coming Confluent Cloud.