Engineers & Developers

Updating Topicctl to Support the Future of Kafka

Oct 26, 2021

By Benjamin Yolken


At Segment, we use Apache Kafka extensively to store customer events and connect the key pieces of our data processing pipeline. Last year we open-sourced topicctl, a tool that we developed for safer and easier management of the topics in our Kafka clusters; see our previous blog post for more details.

Since the initial release of topicctl, we’ve been working on several enhancements to the tool, with a particular focus on removing its dependencies on the Apache ZooKeeper APIs; as described more below, this is needed for a future world in which Kafka runs without ZooKeeper. We’ve also added authentication on broker API calls and fixed a number of user-reported bugs.

After months of internal testing, we’re pleased to announce that the new version, which we’re referring to as “v1”, is now ready for general use! See the repo README for more details on installing and using the latest version.

In the remainder of this post, we’d like to go into more detail on these changes and explain some of the technical challenges we faced in the process.

Kafka, ZooKeeper, and topicctl

A Kafka cluster consists of one or more brokers (i.e. nodes), which expose a set of APIs that allow clients to read and write data, among other use cases. The brokers coordinate to ensure that each has the latest version of the configuration metadata, that there is a single, agreed-upon leader for each partition, that messages are replicated to the right locations, and so forth.

Original architecture

Topicctl original architecture

Historically, the coordination described above has been done via a distributed key-value store called Apache ZooKeeper. The latter system stores shared metadata about everything in the cluster (brokers, topics, partitions, etc.) and has primitives to support coordination activities like leader election.

ZooKeeper was not just used internally by Kafka, but also externally by clients as an interface for interacting with cluster metadata. To fetch all topics in the cluster, for instance, a client would hit the ZooKeeper API and read the keys and values in a particular place in the ZooKeeper data hierarchy. Similarly, updates to metadata, e.g. changing the brokers assigned to each partition in a topic, were done by writing JSON blobs into ZooKeeper with the expected format in the expected place.

Some of these operations could also be done through the broker APIs, but many could only be done via ZooKeeper.

Given these conditions, we decided to use ZooKeeper APIs extensively in the original version of topicctl. Although it might have been possible to provide some subset of functionality without going through ZooKeeper, this “mixed access” mode would have made the code significantly more complex and made troubleshooting connection issues harder because different operations would be talking to different systems.

Towards a ZooKeeper-less World

In 2019, a proposal was made to remove the ZooKeeper dependency from Kafka. This would require handling all coordination activities internally within the cluster (involving some significant architectural changes) and also adding new APIs so that clients would no longer need to hit ZooKeeper for any administrative operations.

The motivation behind this proposal was pretty straightforward — ZooKeeper is a robust system and generally works well for the coordination use cases of Kafka, but can be complex to set up and manage. Removing it would significantly simplify the Kafka architecture and improve its scalability.

This proposal was on our radar when we originally created topicctl, but the implementation was so far off in the future that we weren’t worried about it interfering with our initial release. Recently, however, the first Kafka version that can run without ZooKeeper landed. We realized that we needed to embrace this new world so the tool would work continue to work with newer Kafka versions.

At the same time, we got feedback both internally and externally that depending on ZooKeeper APIs for the tool would make security significantly harder. ZooKeeper does have its own ACL system, but managing this in parallel with the Kafka one is a pain, so many companies just block ZooKeeper API access completely for everything except the Kafka brokers. Many users would be reluctant to open this access up (rightfully so!) and thus the ZooKeeper requirement was blocking the adoption of the tool in many environments.

Given these multiple factors, removing the ZooKeeper requirement from topicctl became a high priority.

Removing the ZooKeeper requirement

In the original code for topicctl, all cluster admin access went through a single struct type, the admin client, which then used a private ZooKeeper client for fetching configs, updating topics, etc. This struct exposed methods that could be called by other parts of the tool; the golang code for triggering a leader election in the cluster, for instance, looked like the following (some details omitted for simplicity):

type Client struct { // Private ZooKeeper client zkClient zk.Client } // Lots of other methods omitted here ... func (c *Client) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error { // Populate a struct in the right format zkElectionObj := zkElection{ Version: 1, Partitions: []zkElectionTopicPartition{}, } for _, partition := range partitions { zkElectionObj.Partitions = append( zkElectionObj.Partitions, zkElectionTopicPartition{ Topic: topic, Partition: partition, }, ) } // Serialize the struct to JSON and write it to a specific // place in ZooKeeper zNode := c.zNode("/admin/preferred_replica_election") return c.zkClient.CreateJSON( ctx, zNode, zkElectionObj, false, ) }

Note that the client in this case isn’t actually communicating with the Kafka brokers or using any Kafka APIs. It’s just writing some JSON into /admin/preferred_replica_election, which, by convention, is where the Kafka brokers will look to start the process of running a leader election.

Our first step was to convert the APIs exposed by this struct into a golang interface with two implementations- one that depended on ZooKeeper, i.e. using the code from our original version of the admin client, and a second that only used Kafka broker APIs. 

So, the Client above became:

type Client interface { GetClusterID(ctx context.Context) (string, error) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error) // Lots of other methods ... RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error }

with the RunLeaderElection implementations becoming the following for the ZooKeeper and ZooKeeper-less versions, respectively:

type ZKAdminClient struct { // Private ZooKeeper client zkClient zk.Client } // Lots of other methods ... func (c *ZKAdminClient) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error { // Same implementation as above }
type BrokerAdminClient struct { // Private, kafka-go broker client client *kafka.Client } // Lots of other methods ... func (c *BrokerAdminClient) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error { // New implementation- to be filled in! return nil }

The next step was to fill out the details of the broker-based admin client so that it actually worked. topicctl was already using the excellent kafka-go library for its functionality that depended on broker APIs (e.g., tailing topics), so we wanted to use that here as well. Unfortunately, however, this library was designed primarily for reading and writing data, as opposed to metadata, so it only supported a subset of the admin-related Kafka API.

After doing an inventory of our client’s requirements, we determined that there were six API calls we needed that were not yet supported by kafka-go:

api calls

Our next step was to update kafka-go to support these! At first, it looked easy- this library already had a nice interface for adding new Kafka APIs; all you had to do was create go structs to match the API message specs, and then add some helper functions to do the calls.

But, as often happens, we ran into a wrinkle: a new variant of the Kafka protocol had been recently introduced (described here) to make API messages more space-efficient. Although most of the APIs we needed had versions predating the update, a few only supported the new protocol format. To add all of the APIs we needed, we’d have to update kafka-go to support the new format.

Thus, we first went through all of the protocol code in kafka-go, updating it to support both the old and new formats. The proposal linked above didn’t have 100% of the details we needed, so in several cases, we also had to consult the Kafka code to fully understand how newer messages were formatted. After much trial and error, we eventually got this code working and merged.

Once that was done, we were unblocked from adding the additional APIs, which we did in this change. Finally, we could go back to the topicctl code and fill out the implementation of the broker-based admin client.

Returning to the RunLeaderElection example from above, we now had something like:

type BrokerAdminClient struct { // Private, kafka-go broker client- now with all the // extra APIs added! client *kafka.Client } // Lots of other methods ... func (c *BrokerAdminClient) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error { // Using one of the new APIs added to kafka-go req := kafka.ElectLeadersRequest{ Topic: topic, Partitions: partitions, Timeout: defaultTimeout, } _, err := c.client.ElectLeaders(ctx, &req) return err }

The end result is that we were able to get topicctl working end-to-end with either the ZooKeeper-based implementation (required for older clusters) or the ZooKeeper-less one (for newer clusters), with only minimal changes in the other parts of the code.

Security updates

In addition to removing the ZooKeeper requirement from topicctl , we also got several requests to support secure communication between the tool and the brokers in a cluster. We didn’t include these in the original version because we don’t (yet) depend on these features internally at Segment; but, they’re becoming increasingly important, particularly as users adopt externally hosted Kafka solutions like AWS MSK and Confluent Cloud.

We went ahead and fixed this, at least for the most common security mechanisms that Kafka supports. First, and most significantly, topicctl can now use TLS (called “SSL” for historical reasons in the Kafka documentation) to encrypt all communication between the tool and the brokers. 

In addition to TLS, we also added support for SASL authentication on these links. This provides a secure way for a client to present a username and password to the API; the permissions for each authenticated user can then be controlled in a fine-grained way via Kafka’s authorization settings.

Topicctl v1 architecture

Testing and release

As we updated the internals of topicctl, we extended our unit tests to run through the core flows like applying a topic change under multiple conditions, e.g. using ZooKeeper vs. only using Kafka APIs. We also used docker-compose to create local clusters with different combinations of Kafka versions, security settings, and client settings to ensure that the tool worked as expected in all cases. 

Once this initial testing was done, we updated the internal tooling that Segment engineers use to run topicctl to use either the old version or the new one, depending on the cluster. In this way, we could roll out to newer, lower-risk clusters first, then eventually work up to the bigger, riskier ones. 

After several months of usage, we felt confident enough to use v1 for all of our clusters and deprecate the old version for both internal and external users of the tool.

Conclusion

topicctl v1 is ready for general use! You might find it a useful addition to your Kafka toolkit for understanding the data and metadata in your clusters, and for making config changes. Also, feel free to create issues in our Github repository to report problems or request features for future versions.

The State of Personalization 2023

Our annual look at how attitudes, preferences, and experiences with personalization have evolved over the past year.

Get the report
SOP 2023

The State of Personalization 2023

Our annual look at how attitudes, preferences, and experiences with personalization have evolved over the past year.

Get the report
SOP 2023

Share article

Want to keep updated on Segment launches, events, and updates?

We’ll share a copy of this guide and send you content and updates about Twilio Segment’s products as we continue to build the world’s leading CDP. We use your information according to our privacy policy. You can update your preferences at any time.