I’ve been working with Apache Kafka ecosystem since 2016: multiple versions, different deployment scenarios and different client libraries.
I want to share my experience and questions I’m asking myself when I’m building, maintaining and fixing kafka-based systems. My main focus won’t be on administrating a cluster, but how producers, consumers and brokers work (or don’t) together. I (still) don’t have experience with Redpanda at scale, so everything below is about Apache/Confluent distro.
There is no “one ring to rule them all”.
Your data and its flow has a major impact and there is no single optimization strategy that can be re-used anytime. The only thing, that is really crucial in all scenarios: monitor your disk usage, even if you use tiered storage (e.g. archiving to s3). Probably, over time your flow will change and even data itself. Don’t make a decisions you won’t be able to rollback.
For most of the examples, I’m using a 3 broker cluster on AWS, each broker runs on a different AZ. Clients run in the same VPC. Default replication factor N=2.
Kafka is very stable and can withstand anything.
Very common misconception. While it is very stable in most scenarios, so many things can break. Especially Zookeeper. While adding brokers to a cluster generally increases its capacity, adding nodes to the ZooKeeper has a very little effect.
If you run on k8s on multi-AZ cloud provider, you’ll probably configure a Service for ZK, which will use EndpointSlices. In most cases it means that broker will first try to connect to the ZK node in the same AZ. And if this particular node is unresponsive — this can lead into a broker crashing.
And of course everyone is talking about how Kafka easily survives single node crash. On paper our example cluster can easily survive single node crash. One node go down and everything does not behave as expected.
Understanding that a broker is down is fast, but not immediate. You have 2 more brokers. Do they have enough disk space, network and CPU capacity to handle all data?
Partition leader re-election will be triggered very fast, doesn’t take a lot of time, even if you have thousands of those. But then all clients need to pull new metadata, which additionally creates more load. Default metadata.max.age.ms
is 300 seconds. You might consider decreasing it. Most of clients come with built-in retry mechanism, but backoff is not (yet) exponential.
When do you commit an offset if your service input and output is a kafka topic? Most guides will tell you only to commit after you finished producing. Not the hardest thing to achieve, but will happen if suddenly you fail to commit an offset? Would you monitor that? Every scenario has a different decision.
I hope your cluster didn’t have an active partition reassignment when a broker went down. And don’t get me started on issues with broken state of __consumer_offsets
topic. Most probably — don’t try to make a manual change there.
Hidden costs.
Are generally found at scale. We have over 1PB of total disc storage and the journey wasn’t easy. I’m not talking about complexity costs, only about those, that really increases your bill.
Disks. Some still use HDD, they are cheap. If you have a very stable streaming platform, without bursts and hiccups, most reads will happen from memory and it’s all great. It won’t be easy to optimize your cluster for batch-based consumers or sudden bursts (>2x higher producing rate). I’ve seen some companies use cloud ephermal drives, they generally have uncapped IOPS (only throughput), but they don’t scale up on demand and don’t survive machine failure.
EBS or etc drives are great for kafka clusters, but have some very important limitations:
- They can only increase in size. No decreasing.
- Moving between AZ is possible only via snapshots. (Generally doesn’t help during outages).
- For gp2/3 you can only increase in size once every 6 hours and if it’s not
in optimization
state.
Cross-AZ costs: In my understanding, for cloud providers it’s a money-printing feature.
In our example cluster, let’s say you produce 1GB of data and consume it only once: 66% of data during produce and consume will go over crossAZ, 100% for Replication. Overall: 2.32 GB. And in AWS you pay for cross-AZ on both sides: So you’ll pay for at least 4.64GB. Yep, it’s only cents, but if you ingest hundreds of terabytes of data a day, it will cost a lot. Even after discounts.
In most use cases data is read more than once per each consumer group and there are multiple groups. And if you are using tiered storage reading from s3 also isn’t free. Use rack-awareness (KIP-392) to decrease these costs by ~15%. But it’s hard to verify whether your consumer client read from a replica or from a leader. (If you know the solution, please ping me on twitter or leave a comment here!).
Version misalignment. Famous quote: Kafka is backwards compatible to 0.10. Software engineers understand that it’s very hard to achieve. What actually happens behind the scenes, is that your client also sends it’s API version to a broker when authentication happens. If it’s not the same version (even 2.2.0 vs 2.3.1), broker will need to spend additional time converting each record to the expected schema. And if your client is librdkafka
based or unofficial like sarama
, you should closely monitor metrics like (mbeans):
kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch}
kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec,topic=([-.\w]+)
Aligning versions would lower the broker CPU usage.
Client libraries and frameworks.
They are different, use different paradigms and have different usecases. Please don’t throw stones at me if I didn’t mention your favorite one.
- Official Java(or librdkafka-based) library is great, very well tested, great performance. It falls under 1 consumer per partition rule and relies on a broker to communicate (balance) a consumer group. Probably not the best solution if you want to build massive parallel data processing system. Great in most use cases.
- Apache Spark, Flink and others. They are becaming very popular, but require more understanding. Internally they run same java client, but use low-level API calls to manage everything — partition subscription, offsets commits and etc. You can have more consumer than partitions*, do complex stateful transformations, run on spots**, scale up and down resources based on requirements, but you’d better understand how things are working. You can try Apache Beam, which comes with higher level abstractions, but it has less abilities. Example scenarios: data ingestion, Stateful tranformations, ML training.
- Kafka Streams. Created by Confluent, has a limited amount of scenarios where it excels. If your input and output topics are on the same cluster — consider as an option. Pair it with RocksDB and you’ll get fast and easy to use service. Suggestion: using KTables creates unexpected amount of temporary data in internal topics. Example scenario: Data transformation like enrichment, quality checks, format conversion.
- Kafka Connect. Also created by Confluent. Great on getting data in or out from Kafka. Note: it’s great when you need to move data “as is”. Use any framework above to make a transformation. Don’t run on spots. Note: all connector details will be stored in internal kafka topics. Yes, that includes credentials to the DB. Example scenarios: exporting data to s3, ElasticSearch, CDC source connector.
- Alpakka, ZIO Kafka, Kafka for Spring Boot and other integrations for popular frameworks are generally based on official clients. Just read the documentation. If you are already framework like Akka it would be a rational choice. Don’t forget to read the documentation section on limitations and known issues!
- Sarama, Segment, Kafka-python and other unofficial distros. Use at your own risk. Some say deep diving on TCP connections with libpcap can be fun.
*For Spark only after version 3.0
** Works great with Spark Operator, less for others.
I didn’t mention Confluent Parallel consumer because I never worked with it. Yep, key concurrency and extendable non-blocking IO sounds amazing.
Did you know that
- You can’t increase topic partition count if you have an active partition reassignment in process. Learned it the hard way, thank you CruiseControl.
- You can specify API version for a broker. Use it when upgrading to a new broker version. Won’t break the cluster.
- In Apache Flink and Kafka streams you can have stand by nodes - they will be part of the cluster and will start handling data if one the nodes (TaskManagers) fail. Increases reliability.
- If lag is close to zero, in most cases, read will be from memory (faster!), if not — from disk.
- Most disk writes on a broker are faster than reads. In a default configuration broker would accumulate records before writing a segment to disk as a single file. Reading single record would require disk seeks, e.g. more IO requests.
- Some configurations contradict with each other and don’t always overwrite the behaviour. For example,
delivery.timeout.ms
andretries
will interfere with each other. (Same withlinger.ms
andbatch.size
- Changing
offsets.topic.replication.factor
from intial value is a headache. Never set it to 1. - Don’t go above 1000 partitions on a single topic. Learned it the hard way.
- When broker starts it verifies data on its data directories. If you have slow and large disks it can take a while.
num.recovery.threads.per.data.dir
for the rescue! - Negative offset-based lag is real. If you have an empty topic with an active subscriber, deleted it and recreated it (consumer still lives) and then produced a record it will mess up the lag exporter.
- Simple Kafka streams service to filter out bad packets (based on proto fields, avg record size ~50kb) can handle 1k packets per second per core with close to no optimizations.
- Kafka broker doesn’t need a way too much heap memory. It will use native memory to cache data. But when you do configure it,
Xms
andXms
should be with the same value. Thank me later. - Airbus is running kafka clusters on polar stations!
Questions I’d ask myself before starting new project
- What kind of a problem would using Kafka solve?
- Should upstream (producer) be aware of downstream (consumers)? And the opposite?
- How scalable system should be?
- What are possible bottlenecks of the system? Will it be persisting data to a disk?
- Stream or batch? Or maybe even micro-batch?
- Seasonality? Difference between peak and off hours?
- How can I say that system is healthy if monitoring isn’t working?
These are only some of the first questions I’d ask myself.
Pick your tools correctly.
There are multiple ways to run the clusters, many ways to run clients — from directly on a VM, to k8s operators and managed services. Generally hosted solutions are very easy to use: they have it all and can easily scale up to a certain point. Administration APIs, monitoring, DR — everything is taking care of, but they come at higher cost.
Kafka was build more than a decade ago to maximize utilization of generic hardware and solve some specific usecases. So many things have changed since then. Measure seven times, cut once.