Categories
Software Engineering

Tracking Consumer Lag in Kafka with Burrow & Go

At the highest of high levels, consumer groups in Apache Kafka consist of one or more Kafka consumers that function as a single unit by distributing available partitions from one or more Kafka topics assigned to the group. Think of them as a team of workers in a factory, where each individual worker represents a consumer in the group and is assigned some number of conveyor belts to process (ie partitions). Assuming this is a well managed factory, the ideal would be to distribute the conveyor belts up evenly between all the employees on the team. Like the division of labor among each team member in our factory, by distributing partitions among consumers in a consumer group, Kafka is able to deliver better throughput and fault tolerance than just consuming entire topics using a single consumer.

Continuing with our factory metaphor*, should anyone leave, get hired, or if the number of belts increase, then the belts assigned to each member should be redistributed among the team. Consumer groups in Kafka support a similar sort of rebalancing, wherein Kafka aims to evenly redistribute partitions between members of the group. Similar to our factory, this can happen if consumers are added or removed from the consumer group, if the number of partitions for a consumed topic is increased, or if one of the consumers fails or becomes unavailable.

Kafka maintains metadata for each consumer group that consists of information such as the id of the consumer w/in the consumer group, the last offsets read by the consumer w/in the consumer group, the current consumer lag (more to come on that) for that consumer. This metadata is used as internal book-keeping which allows for fault tolerance and consistency, which allows for consumers to pick up from where they left off in the event of restarts.

Partition Skew and Consumer Lag

Now what would happen if a glitch in the belt system caused an overwhelming amount of items to go to a single belt? Woe to the employee who has been dealt that lane! To make matters even worse for our poor over-burdened soul, the other stubborn employees are in a strict union that allow them to refuse work outside of their assigned belts. What does this mean for the throughput of our factory? Operators of other belts may be completely caught up on their work while the one worker sweats away trying to manage the growing backfill. Customers are expecting deliveries posthaste. This is not a good look for our factory if many orders are waiting on single worker.

This issue, when applied to Kafka, is what is referred to as partition skew and manifests itself in Kafka consumer groups as consumer lag w/in the consumer group metrics. Heavy partition skew can create serious issues for throughput in Kafka consumers because, at its extremes, it can reduce processing to a single serial thread. Ideally, the priority should be to fix whatever is causing the partition skew on in the first place (usually a Kafka producer-sided issue), but we should probably do a better job tracking consumer lag going forward. Confluent does provide a very nice CLI tool for tracking consumer lag, however unless you want someone awake at all hours of the day running CLI commands, a programmatic solution for tracking consumer lag seems to be in order.

Burrow

Burrow is a delightful service that, provided the right configuration, will take care of the hard work of tracking consumer lag for all configured consumer groups within one or more Kafka clusters for us. Burrow works under the hood by collecting consumer group metrics for each group at periodic intervals and sets the status of each partition within the group and the group as a whole based on the change in consumer lag since the last time the consumer group was polled. Burrow also exposes the consumer group metrics that it collects via a REST service. Seems like the perfect tool for the issue at hand! Although Burrow provides some out-of-the-box utilities for alerting via email, let’s see how we can use Go to fetch the status of a heavily skewed consumer group programmatically via the Burrow REST service.

But first, lets briefly discuss the setup of the test environment.

Test Environment

Obviously, we will need some sort of Kafka cluster that we can create topics in and deliver messages to. Here’s a quick docker-compose file that we can use to spin up a mock Kafka cluster with 3 brokers:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.12
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka1:
    image: confluentinc/cp-kafka:6.2.12
    hostname: kafka1
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092,PLAINTEXT_INTERNAL://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  
  kafka2:
    image: confluentinc/cp-kafka:6.2.12
    hostname: kafka2
    ports:
      - "9093:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29092,PLAINTEXT_INTERNAL://localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  
  kafka3:
    image: confluentinc/cp-kafka:6.2.12
    hostname: kafka3
    ports:
      - "9094:9094"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:39092,PLAINTEXT_INTERNAL://localhost:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  burrow:
    image: "coco/burrow"
    hostname: burrow
    ports:
      - "8000:8000"
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3  
    volumes:
      - ./burrow.toml:/config/burrow.toml
    restart: always

  kafka-consumer-lag-go:
    image: kafka-consumer-lag-go
    depends_on:
      - "burrow"
    environment:
      BOOTSTRAP_SERVERS: "kafka1:19092,kafka2:29092,kafka3:39092"

As part of our test infrastructure. we will also need a Kafka topic that we can use to delivery our skewed messages. I’ve used the Confluent library for other projects, so I will use it here to create an Admin client that we can use to create Kafka topics in our test cluster as we see fit:

/*
Creates topic in our dummy cluster
*/
func CreateTopic(ctx context.Context, topic string, numPartitions int) error {
	adminClient, err := kafka.NewAdminClient(&config)
	if err != nil {
		return err
	}
	defer adminClient.Close()

	topicSpec := kafka.TopicSpecification{
		Topic:             topic,
		NumPartitions:     numPartitions,
		ReplicationFactor: 3,
	}

	_, err = adminClient.CreateTopics(ctx, []kafka.TopicSpecification{topicSpec})
	return err
}


Now that we have some test infrastructure set up, we need a way to mock the partition-skew scenario in a test topic. I will leverage the Confluent Kafka API here again to produce integer messages to our Kafka topic and route every even-valued integer message to the first partition in the Kafka topic.

/*
Produces n dummy messages to topic,
attempts to replicate partition skew by assigning more messages to one partition
*/
func ProduceSkewedMessagesToTopic(topic string, numMessages int, numPartitions int) error {
	producer, err := kafka.NewProducer(&config)
	if err != nil {
		return err
	}
	defer producer.Close()

	msgArr := make([]int, numMessages)
	for i := 0; i < numMessages; i++ {
		msgArr[i] = i
	}

	for _, msg := range msgArr {
		var partitionNum int32
		// all even message vals get routed to the first partition to replicate partition skew
		if msg%2 == 0 {
			partitionNum = 1
		} else {
			partitionNum = int32(msg % numPartitions)
		}

		producer.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{
				Topic:     &topic,
				Partition: partitionNum,
			},
			Value:     []byte(string(msg)),
			Timestamp: time.Now(),
		}, nil)
	}

	producer.Flush(10 * 1000)
	fmt.Println("Messages produced")
	return nil
}

Half of the messages produced to our Kafka topic should be going to the same partition, so after we produce enough messages, the topic should have some observable partition-skew. Last but not least, let’s initialize our consumer group ‘skewed-consumer-group’ by reading messages from our skewed topic until we have read at least one message from each partition:

/*
Reads one at least one message from each partition to initiate the consumer group
*/
func CreateConsumerGroup(topic string, consumerGroupName string, numPartitions int) error {
	config.SetKey("group.id", consumerGroupName)
	config.SetKey("auto.offset.reset", "earliest")

	consumer, err := kafka.NewConsumer(&config)
	if err != nil {
		return err
	}
	defer consumer.Close()
	consumer.Subscribe(topic, nil)

	readPartitionsBitMap := map[int32]bool{}
	for len(readPartitionsBitMap) != numPartitions {
		message, _ := consumer.ReadMessage(10 * time.Second)
		fmt.Printf("Read message from partition: %v, message: %v\n", message.TopicPartition.Partition, message.Value)
		readPartitionsBitMap[message.TopicPartition.Partition] = true
	}
	consumer.Commit()
	return nil
}

Tie it all together:

const (
	SKEWED_TOPIC_NAME        = "skewed-topic"
	SKEWED_CONSUMER_GROUP_ID = "skewed-consumer-group"
)

var (
	config = kafka.ConfigMap{
		"bootstrap.servers": kafka.ConfigValue(os.Getenv("BOOTSTRAP_SERVERS")),
	}
)

/*
Sets up test environment with partition skew in topic
*/
func SetupConsumerGroupLagInTopic(ctx context.Context) {
	fmt.Println(fmt.Sprintf("Creating skewed topic, '%v'", SKEWED_TOPIC_NAME))
	if err := CreateTopic(ctx, SKEWED_TOPIC_NAME, 3); err != nil {
		panic(err)
	}

	fmt.Println("Producing messages to Kafka topic")
	if err := ProduceSkewedMessagesToTopic(SKEWED_TOPIC_NAME, 1000, 3); err != nil {
		panic(err)
	}

	fmt.Println("Creating consumer group for skewed topic")
	if err := CreateConsumerGroup(SKEWED_TOPIC_NAME, SKEWED_CONSUMER_GROUP_ID, 3); err != nil {
		panic(err)
	}
}

The last piece that we need is our burrow.toml file that we will use to configure our Burrow service:

[client-profile.client]
client-id="burrow-myclient"

[zookeeper]
servers=[ "zookeeper:2181" ]
timeout=6
root-path="/burrow"

[cluster.docker]
class-name="kafka"
servers=[ "kafka1:19092","kafka2:29092","kafka3:39092" ]
topic-refresh=10
offset-refresh=5

[consumer.docker]
class-name="kafka"
cluster="docker"
servers=[ "kafka1:19092","kafka2:29092","kafka3:39092" ]
group-blacklist="^(console-consumer-|python-kafka-consumer-).*$"
group-whitelist=""

[consumer.zk]
class-name="kafka_zk"
cluster="docker"
servers=[ "zookeeper:2181" ]
root-path="/burrow"
zookeeper-timeout=30
group-blacklist="^(console-consumer-|python-kafka-consumer-).*$"
group-whitelist=""

[httpserver.default]
address=":8000"

Among other things, burrow.toml specifies the hostnames in our dockerized Kafka cluster as well as where to find the parent Zookeeper instance. Under ‘cluster.docker’, we also use topic-refresh and offset-refresh to specify how frequently we should refresh the topic list in our cluster and how frequently to refresh the broker offset for each partition in seconds, respectively.

Burrow Consumer Lag Endpoints

For our use-case, we are concerned with tracking consumer-lag for the partitions in our consumer group that is reading from the skewed topic. Burrow exposes a few different endpoints for tracking consumer-lag that we can retrieve w/ GET requests:

  • /v3/kafka/(cluster)/consumer/(group): returns all information for a single consumer group
  • /v3/kafka/(cluster)/consumer/(group)/status: returns an object that only contains partitions within the consumer group that are in a bad state
  • /v3/kafka/(cluster)/consumer/(group)/lag: returns an object that includes all partitions for consumer, regardless if that partition has been evaluated yet

For our cluster and consumer-group, these will resolve to:

  • http://localhost:8000/v3/kafka/docker/consumer/skewed-consumer-group
  • http://localhost:8000/v3/kafka/docker/consumer/skewed-consumer-group/status
  • http://localhost:8000/v3/kafka/docker/consumer/skewed-consumer-group/lag

Knowing this, let’s implement some simple Go code to print out the response from these endpoints:

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"time"
)

func main() {
	// set up the topic and consumer group
	ctx := context.Background()
	SetupConsumerGroupLagInTopic(ctx)

	// sleep until the dust settles
	time.Sleep(time.Second * 10)

	// check consumer lag endpoints
	skewGroupEndpoint := fmt.Sprintf("http://localhost:8000/v3/kafka/docker/consumer/%v", SKEWED_CONSUMER_GROUP_ID)
	err := MakeRequestToBurrow(skewGroupEndpoint)
	if err != nil {
		panic(err)
	}

	lagEndpoint := fmt.Sprintf("%v/lag", skewGroupEndpoint)
	err = MakeRequestToBurrow(lagEndpoint)
	if err != nil {
		panic(err)
	}

	statusEndpoint := fmt.Sprintf("%v/status", skewGroupEndpoint)
	err = MakeRequestToBurrow(statusEndpoint)
	if err != nil {
		panic(err)
	}

	fmt.Printf("Deleting Kafka topic")
	DeleteTopic(ctx, SKEWED_TOPIC_NAME)
}

func MakeRequestToBurrow(url string) error {
	fmt.Println(fmt.Sprintf("Making request to Burrow endpoint: %v", url))
	response, err := http.Get(url)
	if err != nil {
		return err
	}
	defer response.Body.Close() // don't forget to close this or else suffer a memory leak!

	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		return err
	}

	var prettyJson bytes.Buffer
	err = json.Indent(&prettyJson, body, "", "\t")
	if err != nil {
		panic(err)
	}

	fmt.Println(fmt.Sprintf("Response Payload JSON: %v", string(prettyJson.Bytes())))
	return nil
}

This code formats a base path based on the host/port that we configured the Burrow service to be exposed to in our docker-compose, the cluster name, ‘docker’, from our burrow.toml, and the name of our consumer group. For this simple example, it will just make requests to each endpoint and print out the response from Burrow.

Everything should now be set up. Let’s run our Zookeeper/Kafka cluster, Burrow service, and our Go app with ‘docker-compose up -d’.

After some amount of waiting the Go app that our docker-compose file spun up should be exit out, so we can take a look at the payload that Burrow returned by tracking down the container with ‘docker container ls -a’ and then printing the logs with ‘docker logs <container id>’.

And the results are in:

Making request to Burrow endpoint: http://localhost:8000/v3/kafka/docker/consumer/skewed-consumer-group
Response Payload JSON: {
        "error": false,
        "message": "consumer detail returned",
        "topics": {
                "skewed-topic": [
                        {
                                "offsets": [
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        {
                                                "offset": 1,
                                                "timestamp": 1699504844020,
                                                "lag": 166
                                        },
                                        {
                                                "offset": 4,
                                                "timestamp": 1699504844022,
                                                "lag": 163
                                        }
                                ],
                                "owner": "",
                                "client_id": "",
                                "current-lag": 163
                        },
                        {
                                "offsets": [
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        {
                                                "offset": 667,
                                                "timestamp": 1699504844020,
                                                "lag": 0
                                        },
                                        {
                                                "offset": 667,
                                                "timestamp": 1699504844022,
                                                "lag": 0
                                        }
                                ],
                                "owner": "",
                                "client_id": "",
                                "current-lag": 0
                        },
                        {
                                "offsets": [
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        null,
                                        {
                                                "offset": 166,
                                                "timestamp": 1699504844020,
                                                "lag": 0
                                        },
                                        {
                                                "offset": 166,
                                                "timestamp": 1699504844022,
                                                "lag": 0
                                        }
                                ],
                                "owner": "",
                                "client_id": "",
                                "current-lag": 0
                        }
                ]
        },
        "request": {
                "url": "/v3/kafka/docker/consumer/skewed-consumer-group",
                "host": "burrow"
        }
}
Making request to Burrow endpoint: http://localhost:8000/v3/kafka/docker/consumer/skewed-consumer-group/status
Response Payload JSON: {
        "error": false,
        "message": "consumer status returned",
        "status": {
                "cluster": "docker",
                "group": "skewed-consumer-group",
                "status": "OK",
                "complete": 0,
                "partitions": [],
                "partition_count": 3,
                "maxlag": {
                        "topic": "skewed-topic",
                        "partition": 0,
                        "owner": "",
                        "client_id": "",
                        "status": "OK",
                        "start": {
                                "offset": 1,
                                "timestamp": 1699504844020,
                                "lag": 166
                        },
                        "end": {
                                "offset": 4,
                                "timestamp": 1699504844022,
                                "lag": 163
                        },
                        "current_lag": 163,
                        "complete": 0.2
                },
                "totallag": 163
        },
        "request": {
                "url": "/v3/kafka/docker/consumer/skewed-consumer-group/status",
                "host": "burrow"
        }
}
Making request to Burrow endpoint: http://localhost:8000/v3/kafka/docker/consumer/skewed-consumer-group/lag
Response Payload JSON: {
        "error": false,
        "message": "consumer status returned",
        "status": {
                "cluster": "docker",
                "group": "skewed-consumer-group",
                "status": "OK",
                "complete": 0,
                "partitions": [
                        {
                                "topic": "skewed-topic",
                                "partition": 0,
                                "owner": "",
                                "client_id": "",
                                "status": "OK",
                                "start": {
                                        "offset": 1,
                                        "timestamp": 1699504844020,
                                        "lag": 166
                                },
                                "end": {
                                        "offset": 4,
                                        "timestamp": 1699504844022,
                                        "lag": 163
                                },
                                "current_lag": 163,
                                "complete": 0.2
                        },
                        {
                                "topic": "skewed-topic",
                                "partition": 1,
                                "owner": "",
                                "client_id": "",
                                "status": "OK",
                                "start": {
                                        "offset": 667,
                                        "timestamp": 1699504844020,
                                        "lag": 0
                                },
                                "end": {
                                        "offset": 667,
                                        "timestamp": 1699504844022,
                                        "lag": 0
                                },
                                "current_lag": 0,
                                "complete": 0.2
                        },
                        {
                                "topic": "skewed-topic",
                                "partition": 2,
                                "owner": "",
                                "client_id": "",
                                "status": "OK",
                                "start": {
                                        "offset": 166,
                                        "timestamp": 1699504844020,
                                        "lag": 0
                                },
                                "end": {
                                        "offset": 166,
                                        "timestamp": 1699504844022,
                                        "lag": 0
                                },
                                "current_lag": 0,
                                "complete": 0.2
                        }
                ],
                "partition_count": 3,
                "maxlag": {
                        "topic": "skewed-topic",
                        "partition": 0,
                        "owner": "",
                        "client_id": "",
                        "status": "OK",
                        "start": {
                                "offset": 1,
                                "timestamp": 1699504844020,
                                "lag": 166
                        },
                        "end": {
                                "offset": 4,
                                "timestamp": 1699504844022,
                                "lag": 163
                        },
                        "current_lag": 163,
                        "complete": 0.2
                },
                "totallag": 163
        },
        "request": {
                "url": "/v3/kafka/docker/consumer/skewed-consumer-group/lag",
                "host": "burrow"
        }
}

The first response is a grab-bag of all the information that Burrow has on all partitions within the consumer-group. This does contain information about consumer-lag for each partition, which is useful but it also contains a lot of information that we don’t necessarily need since we are really only concerned with tracking partitions with problematic skew. The second endpoint, ‘/lag’, is slightly more useful as it also includes the status for each partition within the consumer group and a useful blob ‘maxlag’ which details the most problematic partition in the group in terms of consumer-lag. Like the consumer-group detail endpoint, the ‘/lag’ endpoint also contains information on a lot of healthy partitions, which isn’t terribly useful. The last endpoint, ‘/status’, seems to be the most relevant for our use-case as it returns all the useful fields from the ‘/lag’ endpoint but only for partitions which Burrow has flagged as unhealthy for the given consumer-group.

Conclusions

In this article, we explored the basic definitions of Kafka consumer groups and how to track consumer lag by leveraging the Burrow service to collect consumer metrics. We then came up with a small toy application in Go to simulate partition skew and retrieve consumer group metrics from the Burrow REST service. From here, we could develop this application further by leveraging specific the consumer group metrics to do custom alerting or persist them elsewhere for better visibility. One limitation that is worth noting is that even though the Burrow service is written in Go, the structs that are used to define the consumer-group lag responses are declared as private, so they can’t be used by just importing the Burrow package, though a sort of clumsy workaround would be to just copy-paste them into your own application. That said, Burrow is a great service that allows for quick and easy tracking of consumer group metrics that is easy to set up and offers a nice REST service that can be used with any language.

* The factory metaphor is admittedly a bit crusty if stretched too far as messages from Kafka can be consumed many times by many consumers, whereas items on a conveyor belt are processed a single time by one worker and then (hopefully) never again.

Links

https://github.com/McKalvan/go-kafka-consumer-lag

https://github.com/linkedin/Burrow/tree/master

https://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups

Leave a Reply

Your email address will not be published.