Thursday, December 18, 2014

Kafka network utilization (in vs. out)

One thing that was confusing me as I looked into the metrics in my Kafka performance testing (as shown in any of the graphs in my previous blog post) was the approximately 2x factor of input network bytes vs. output network bytes. Given I was doing replication, shouldn't the number of bytes be the same since I have to exactly replicate the messages to a following broker?

Let's assume one producer sending at 300MB/sec and three brokers with a replication factor of two and three partitions and no consumers for a simple example.

A single brokers (bid = 0) receives 100 MB/sec from the producer (eth0 in) for a partition (say pid = 0) as it is the leader for one of the partitions.

The single broker turns around and sends (eth0 out) 100 MB/sec to the broker who is the in sync replica (say bid = 1) for all of that traffic.  To be accurate, it is actually the in sync replica (bid = 1) that pulls.

Next is what I was missing in quick thinking about this problem ...

The single broker (bid = 0) also gets sent (actually pulls), 100 MB/sec from another broker's (say bid = 2) partition (say pid = 2) to which it is not the leader, but an in sync replica.

This means the traffic load for this broker is:
  • producer to broker (bid = 0) for partition (pid = 0)
    • eth0 in = 100 MB/sec
  • broker (bid = 0) for partition (pid = 0) to in sync replica broker (bid = 1)
    • eth0 out = 100 MB/sec
  • broker (bid = 2) for partition (pid = 2) to in sync replica broker (bid = 0)
    • eth0 in = 100 MB/sec
  • total:
    • eth0 in = 200 MB/sec
    • eth0 out = 100 MB/sec
Notice I originally said no consumers.  If I added consumers they would start to add to the eth0 out figure and possibly balance the in vs. out, but only if they were consuming at the same rate as the producers.  If there were more consumers than producers, the consumers could easily overrun the input rate which would be common for streams that were heavily fanned out to different consumer groups.

Now, let's consider what happens when we make the configuration more interesting.  Specifically, we'd want to consider a larger number of brokers and a larger number of partitions and a larger replication factor.

Let's consider the case of 300 brokers with 3000 partitions with the same replication factor of 2.  Let's imagine a producer group that could send at 3000 MB/sec.  That mean's every partition will receive 1 MB/sec (eth0 in).  Every broker will be a leader for 10 of these partitions.  So for broker 0, it would receive 10 MB/sec of the producer traffic.  It would need to send that traffic to 10 other ISR's sending 10 MB/sec of replication "out" traffic.  It would be a ISR non-leader for 10 partitions so it would receive 10 MB/sec of replication "in" traffic.  That would mean 20 MB/sec in and 10 MB/sec out.  Again 2X factor.


Let's now consider 300 brokers with 3000 partitions and a replication factor of 6.  Again imagine a producer group that could send at 3000 MB/sec.  That mean's every partition will receive 1 MB/sec (eth0 in).  Every broker will be a leader for 10 of these partitions.  So for broker 0, it would receive 10 MB/sec of the producer traffic.  It would need to send that traffic to 50 other ISR's (5 for each of the 10 partitions) sending 50 MB/sec of replication "out" traffic.  It would be a ISR non-leader for 50 partitions so it would receive 50 MB/sec of replication "in" traffic.  That would mean 60 MB/sec in and 50 MB/sec out.  Now a 1.2 factor.  So by increasing the number of partitions will decrease the difference between in and out traffic.

Let's do some algebra to generalize this:

Let:
np = number of partitions
nb = number of brokers
rf = replication factor
tr = transfer rate total

tr_p_p = transfer rate for partition = tr / np
nlp_p_b = number of leader partitions per broker = np / nb
f_p_p = number of followers per partition = rf - 1
nl_p_p = number of leaders per partition = 1
f_tot = total number of following partitions = f_p_p * np
f_tot_p_b = total number of following partitions per broker = f_tot / nb

Tr in from producers for a single broker =
    tr_p_p * nlp_p_b =
    tr / np * np / nb =
    tr / nb (let's call this transfer rate per broker = tr_p_b)

Tr out to followers for a single broker =
    nlp_p_b * tr_p_p * f_p_p = 
    np / nb * tr / np * (rf - 1) =
    tr / nb * (rf - 1) =
    tr_p_b * (rf - 1)

Tr in as a follower =
    f_tot_p_b * tr_p_p =
    f_tot / nb * tr / np = 
    f_p_p * np / nb * tr / np = 
    (rf - 1) * np / nb * tr / np = 
    (rf - 1) / nb * tr =
    tr  / nb * (rf - 1)
    tr_p_b * (rf - 1)

Total for a single broker in =
  tr_p_b + (rf - 1) * tr_p_b

Total out for a single broker out =
  tr_p_b * (rf - 1)

The above generalization assumes a larger number of brokers so the replication factor comes into play as it relates to networking.  If you run with three brokers (as my examples above), the replication factor is still really limited to 3 regardless of how many partitions you have, so the difference between input and output would still be around 2X.  So it might be better to consider rf above as the minimum of rf and number of brokers.  However, if you add more brokers to scale this out, the generalization should apply.

In summary, in a large enough cluster, a single broker will take it's share of traffic of front end traffic and a multiple of that same share at one less than the replication factor incoming when acting as a follower.  Also, the single broker will send a similar share of one less than the replication factor to other followers.

Monday, December 15, 2014

The number of Kafka partitions when using compression/batching

Early on in my Kafka performance testing, I started as simple as possible. I started with simple non-compressed and non-batched messages with one broker, one partition, one producer and one consumer to understand the relative performance of each aspect of the system. Over time, I added more of each. I found pretty quickly that in my environment (without compression and batching), the main bottleneck with Kafka performance testing was bandwidth. Specifically it was easy to max out the gigabit ethernet in the EC2 broker instance by just adding a few more producers.

At first, my broker was 4 cores (a m3.xlarge instance) and later 8 cores (a i2.2xlarge). To ensure I maxed out the new broker, I ran a few more producers. I found an interesting "problem" at that point. Specifically, I found what looked like a lock when any single message was getting written to a leader log file. This wasn't all that interesting as one partition and one broker isn't all that realistic. After adding a few partitions, this wasn't an issue.

While it was easy to max out the 8 core system's bandwidth, the cpu utilization was low. With Kafka there are two ways to increase throughput in this situation. First, you can enable batching of messages from the producer. Second you can compress the messages within the batch. Upon adding batching and compression, I hit the same issue that I saw in the single partition per broker case. I wasn't able to drive the system to saturation even though there was additional bandwidth and cpu available. The issue again was the lock writing to the leader log file. From the stack traces (see below), it looks like Kafka takes a lock for writing to the leader partition and then decompresses and works with the batch. Given the compression is CPU intensive and the batch is large, the lock can be help for a while. On a high SMP system this can lead to poor system utilization.

To show this, below is a run of the same Kafka producers and brokers with nothing changing but the number of partitions. In each case, given cpu utilization wasn't yet the bottleneck, I expected that I could drive the broker to the point where the bandwidth bottlenecks (125,000 KBytes/sec). You will see when I started, I was only at 70,000 regardless of the large number of producers.

First my configuration:


Producers
  • 15 m3.xlarge instances (4 core)
  • Each running 20 producers (total of 300)
  • Using a Netflix library that wraps the kafka_2.9.2-0.8.2-beta client
  • Using snappy compression
  • Using a batch size of 8192
  • Using request required acks of 1
  • Using a replay of realistic data (each message being JSON formatted of 2-2.5k in size)
Brokers
  • 3 i2.2xlarge instances (8 core)
  • Using a Netflix service that wraps the kafka_2.9.2-0.8.1.1 broker code
  • With topics that are always replication factor of 2
  • With topics that range from 6 partitions (2 leader partitions per broker) to 15 partitions (5 leader partitions per broker)
Consumers
  • None in this case to stress the problem

Next the results


6 partitions (2 leader partitions per broker)
- Average CPU utilization: 30%
- Average Network:  79,000 KBytes/sec




9 partitions (3 leader partitions per broker)
- Average CPU utilization: 38%
- Average Network:  95,000 KBytes/sec



12 partitions (4 leader partitions per broker)
- Average CPU utilization: 42%
- Peak Network:  119,000 KBytes/sec



15 partitions (5 leader partitions per broker)
- Average CPU utilization: 50%
- Peak Network:  125,000 KBytes/sec


So you can see by only changing the number of partitions, I can increase the utilization up to the bandwidth bottleneck of each of the brokers.  I believe the reason five was the peak was - out of eight cores, it looks like there is by default one core handling network, one core handling the replication and five cores handling writing to logs.  I could have tried going up to 18 partitions (6 per core), but at this level of load my drive became unstable rather quickly (as evidenced by the "spiky" bandwidth you see in Network KBytes).  I later found this instability in the producers to be a bug in snappy compression bug in 1.1.1.5.  After upgrading snappy, I now have more stables runs, but it didn't change the relative results presented here.

I tend to focus on bottlenecks in a system as opposed to the overall throughput until I understand the system.  If you were to look at the throughput instead, you can see that the difference is measurable.    Here you will see the throughput measured by Kafka in AllTopicsMessagesPerSecond across the three brokers for each of the above configurations:


Here you can see that the throughput per broker is:

  • 6 partitions - 40K messages/sec
  • 9 partitions - 48K messages/sec
  • 12 partitions - 50K messages/sec
  • 15 partitions - 54K messages/sec

So, by increasing the number of partitions, I was able to increase the performance by 35%.

To confirm, my suspicion, I did thread dumps.  Here are two thread dumps from the run with only 6 partitions (2 leader partitions per broker) which shows the problem in its worst case. You'll see that Kafka has 8 request handler threads. Also, in both dumps, only 2 threads are doing work under kafka.cluster.Partition.appendMessagesToLeader, while all other six threads are blocked waiting for the lock.

This all said, there are some real world aspects that are specifically set to show the issue in my test.  First, I had no consumers.  If I had consumers, they would be using CPU and bandwidth without waiting on the lock.  Also, I am running on a high way SMP VM (8 cores).  The issue would be less noticeable on a 4 core box in terms of CPU utilization.  However, given I was able to hit the issue, I wanted to share in case others see the same thing.  It isn't out of the question of others hitting a similar problem on very hot topics with lower numbers of partitions when consumers aren't listening.

I will work with the Kafka team to see if this is something that should be addressed, or if it is just something to consider when tuning Kafka configurations.  I wonder if the work for compression could be done before taking the lock to decrease the time in the critical section.  However, I am still new to Kafka and haven't read the code, so I may be missing something.  If this is a real issue, I'll get a github issue opened and will then update this blog post.