Monday, December 15, 2014

The number of Kafka partitions when using compression/batching

Early on in my Kafka performance testing, I started as simple as possible. I started with simple non-compressed and non-batched messages with one broker, one partition, one producer and one consumer to understand the relative performance of each aspect of the system. Over time, I added more of each. I found pretty quickly that in my environment (without compression and batching), the main bottleneck with Kafka performance testing was bandwidth. Specifically it was easy to max out the gigabit ethernet in the EC2 broker instance by just adding a few more producers.

At first, my broker was 4 cores (a m3.xlarge instance) and later 8 cores (a i2.2xlarge). To ensure I maxed out the new broker, I ran a few more producers. I found an interesting "problem" at that point. Specifically, I found what looked like a lock when any single message was getting written to a leader log file. This wasn't all that interesting as one partition and one broker isn't all that realistic. After adding a few partitions, this wasn't an issue.

While it was easy to max out the 8 core system's bandwidth, the cpu utilization was low. With Kafka there are two ways to increase throughput in this situation. First, you can enable batching of messages from the producer. Second you can compress the messages within the batch. Upon adding batching and compression, I hit the same issue that I saw in the single partition per broker case. I wasn't able to drive the system to saturation even though there was additional bandwidth and cpu available. The issue again was the lock writing to the leader log file. From the stack traces (see below), it looks like Kafka takes a lock for writing to the leader partition and then decompresses and works with the batch. Given the compression is CPU intensive and the batch is large, the lock can be help for a while. On a high SMP system this can lead to poor system utilization.

To show this, below is a run of the same Kafka producers and brokers with nothing changing but the number of partitions. In each case, given cpu utilization wasn't yet the bottleneck, I expected that I could drive the broker to the point where the bandwidth bottlenecks (125,000 KBytes/sec). You will see when I started, I was only at 70,000 regardless of the large number of producers.

First my configuration:


Producers
  • 15 m3.xlarge instances (4 core)
  • Each running 20 producers (total of 300)
  • Using a Netflix library that wraps the kafka_2.9.2-0.8.2-beta client
  • Using snappy compression
  • Using a batch size of 8192
  • Using request required acks of 1
  • Using a replay of realistic data (each message being JSON formatted of 2-2.5k in size)
Brokers
  • 3 i2.2xlarge instances (8 core)
  • Using a Netflix service that wraps the kafka_2.9.2-0.8.1.1 broker code
  • With topics that are always replication factor of 2
  • With topics that range from 6 partitions (2 leader partitions per broker) to 15 partitions (5 leader partitions per broker)
Consumers
  • None in this case to stress the problem

Next the results


6 partitions (2 leader partitions per broker)
- Average CPU utilization: 30%
- Average Network:  79,000 KBytes/sec




9 partitions (3 leader partitions per broker)
- Average CPU utilization: 38%
- Average Network:  95,000 KBytes/sec



12 partitions (4 leader partitions per broker)
- Average CPU utilization: 42%
- Peak Network:  119,000 KBytes/sec



15 partitions (5 leader partitions per broker)
- Average CPU utilization: 50%
- Peak Network:  125,000 KBytes/sec


So you can see by only changing the number of partitions, I can increase the utilization up to the bandwidth bottleneck of each of the brokers.  I believe the reason five was the peak was - out of eight cores, it looks like there is by default one core handling network, one core handling the replication and five cores handling writing to logs.  I could have tried going up to 18 partitions (6 per core), but at this level of load my drive became unstable rather quickly (as evidenced by the "spiky" bandwidth you see in Network KBytes).  I later found this instability in the producers to be a bug in snappy compression bug in 1.1.1.5.  After upgrading snappy, I now have more stables runs, but it didn't change the relative results presented here.

I tend to focus on bottlenecks in a system as opposed to the overall throughput until I understand the system.  If you were to look at the throughput instead, you can see that the difference is measurable.    Here you will see the throughput measured by Kafka in AllTopicsMessagesPerSecond across the three brokers for each of the above configurations:


Here you can see that the throughput per broker is:

  • 6 partitions - 40K messages/sec
  • 9 partitions - 48K messages/sec
  • 12 partitions - 50K messages/sec
  • 15 partitions - 54K messages/sec

So, by increasing the number of partitions, I was able to increase the performance by 35%.

To confirm, my suspicion, I did thread dumps.  Here are two thread dumps from the run with only 6 partitions (2 leader partitions per broker) which shows the problem in its worst case. You'll see that Kafka has 8 request handler threads. Also, in both dumps, only 2 threads are doing work under kafka.cluster.Partition.appendMessagesToLeader, while all other six threads are blocked waiting for the lock.

This all said, there are some real world aspects that are specifically set to show the issue in my test.  First, I had no consumers.  If I had consumers, they would be using CPU and bandwidth without waiting on the lock.  Also, I am running on a high way SMP VM (8 cores).  The issue would be less noticeable on a 4 core box in terms of CPU utilization.  However, given I was able to hit the issue, I wanted to share in case others see the same thing.  It isn't out of the question of others hitting a similar problem on very hot topics with lower numbers of partitions when consumers aren't listening.

I will work with the Kafka team to see if this is something that should be addressed, or if it is just something to consider when tuning Kafka configurations.  I wonder if the work for compression could be done before taking the lock to decrease the time in the critical section.  However, I am still new to Kafka and haven't read the code, so I may be missing something.  If this is a real issue, I'll get a github issue opened and will then update this blog post.