Breaking the bank: the most expensive mistakes in data pipelines.
Initially I wanted to share these ideas during hayaData conference, but it wasn’t selected for this year program. (No hard feelings!). Not claiming to be a FinOps expert, I’m only sharing my learnings.
Production incident.
For the most software engineers production incident means downtime, failure of a major component, bug that causes performance downgrade or malfunction. So how do you call a scenario, when the system that you’ve built and responsible for works great, very fast, all data is intact, customers are extremely happy, but your CFO is angry? Isn’t it an incident, too?
Please don’t tell you don’t care about. It’s not someone else headache. You built — you own it. Most probably you’ll require help from your colleagues, but cost effectiveness of your data pipeline is a KPI like everything else.
It’s worth checking why your cloud bill grew 10 times while amount of ingested data increased only by 50%.
Multi-AZ deployment is the only way ! (Fake news)
Imagine multistep ingestion pipeline built with Flink and Kafka. You’ve built state of the art ingestion pipeline with autoscaling, disaster recover scenarios, deployed it on k8s on multi-AZ AWS. All the buzz words. But your cloud bill grows faster than AI hype on twitter. So what happened (in our case) ?
- We didn’t utilize Kafka rack-awareness. Data transfer costs between broker and consumer in different AZ aren’t free (except Azure). It’s not a very good idea to put all Kafka brokers on the single AZ as they also fail sometimes. It’s a simple change, supported by most clients and Apache Kafka version 2.4+, it saves about 15% of data transfer costs.
- EBS disks can’t migrate easily between AZ, but Flink Job and TaskManagers can. Although it’s a stateful computation framework, its state is preferred to store at s3 bucket. Due to the nature of the multi-tenant system we’ve used shuffling and rebalancing between operators. As a classic example of Pareto principle: 20% of topic partitions had 80% of data. In order to avoid skew we were rebalancing load between Flink operators. How we’ve solved it without impacting performance? Deployed whole each Flink cluster on a single AZ. AZ goes down? Redeploy to a different one.
- Beware of AZs. While most regions have only 3 AZ’s,
us-east-1
has 6 of them. Don’t deploy producers and consumers on AZ where Brokers aren’t deployed. - Not utilizing autoscaling. Seasonality is common theme for data pipelines. Volume of data grows during working hours. It’s not an easy job to configure autoscaling right. There is no “one ring to rule them all”. Like in Anna Karenina: All unstable systems are unstable in its own way. It’s a story of trial and error till you’ll find when you should trigger scaling up or down and which type of scheduler is right for you. Cloud provider will bill you whether you fully utilize resources or not.
Migrating to k8s from managed service will easily save us lots of money!
The case of another Flink based pipeline, running on managed Kinesis Flink, reading from Kinesis and writing its output to MSK. AWS managed Kinesis pricing is based on amount of used resources. So reading data from Kinesis has close to zero data transfer cost. While it’s good on paper, there are lots of reasons to migrate away from it: autoscaling isn’t so good, limited amount of versions, metrics are reported with ~10 mins delay, switching between RocksDB and HashMap based state requires ticket to the support and etc.
- While in managed service you pay for KPU (Kinesis processing unit) and you can’t change size of the TaskManagers you only pay for them. If you run Flink Operator on your own k8s — look at binpacking. Cloud provider will bill you for all EC2/VM that you’ve used, disregarding how good your utilization is. In most cases try reaching a “sweet spot” of 70% for underlying VM and under 80% for each TM during peak hours in order to prevent cpu throttling and make your CFO smile.
- Determine whether your service CPU or IO bound. Or both. Using Flink as a writer for Apache Iceberg is very common. It’s easy, fast and doesn’t require a lot of CPU. But smaller instances on AWS have quite unreliable network connection. Even instance like
m5a.8xlarge
, that comes with 32 vCPU and 128 GB of memory has network performance of Up to 10 Gigabit. Use bin packing and prevent having very different size of underlying instances. It will cause askew! - RocksDB is great option for state backend in a lot of cases. But not only you’ll have to figure out correct way of using it and fighting OOM issues coming from Native memory, but also you’ll require more expensive disk — either NVMe or some fast EBS with high provisioned IOPS. Don’t forget to enable EBS optimization!
- Generally deploying Flink or Spark app in dedicated k8s namespace is more than enough. It’s really, really hard to achieve high utilization if you are dedicate node group / provisioner per service.
- Illustration of binpacking:
Test the hell out of regex.
Not only that happened early in my career, I was really scared I will be fired. I’ve worked on pipeline, that processed HTML files, extracted assets out of them — mostly images and would store them on object storage. While the service was running on premise datacenter we had a direct link to the cloud provider. After sometime I’ve decided to use dedicated Key-Value storage to reduce amount of calls to the blob storage, reduce the costs and increase performance. But I’ve also introduced a bug. Some responses of verification for the nested files were disregarded and file was re-uploaded.
How did we found it? Well, DevOps started to investigate why the uplink from datacenter to cloud is close to 100% utilized, while average was below 40% during peak hours. On a regular day a service would create anywhere between 50 to 100GB of data, but the bug “helped” to upload over 5 TB in less than 12 hours.
It was close to impossible to find this bug in our test environment, but since then I either prefer not to use Regex or test the hell out of it.
Multi-tenant systems are hard.
Oh, how much did it hurt. We built a multi-tenant ingestion pipeline. Each tenant had multiple sites. All events were stored in a dedicated kafka topic, which naming convention had both tenant and site id. Everyone knows Pareto rule, but in this case it was put to extreme: 5% of topics had close to 90% of data across multiple clusters (cell-based architecture).
- The initial idea was great on paper. Downstream services would only need to read relevant events, without filtering out irrelevant events. Most of the topics had low amount of data, with default amount of partition N=3. While initially scaling rule was to use amount of topics as a multiplier of brokers or overall disk count (in most clusters each broker had multiple disks) to distribute the load more evenly, automating it became necessary to reduce manual work for on-call engineers and SRE.
- Each client (both producers and consumers) had to have low value for metadata refresh. Some clusters had over 20k topics. Default metadata age of 5 mins would cause some unexpected lags, which required more compute to process them. Also, metadata fetches can also fail.
- Some services required subscribing to a big amount of topics — thousands of partitions. We even did an experiment: Same spark job (v3.0+) required 2x more resources to read same amount of data from a single topic with 1200 partitions, than from ±200 topics with 1400 partitions. Sub-cells on a service level were the solution to this issues. Each list of topic-partitions were split up into several smaller groups. It introduced more complexity and on-call scenarios, but saved at least 40% on compute. Also amount of retries (and refetching the records) was drastically reduced.
- Dev teams reviewed this after a year or so of running with this design. Although not everyone agreed, most devs agreed that having topic per tenant would be sufficient. Yes, filtering out (dropping irrelevant) events is not costs effective, but amount of time that was spent due to the way too broad naming convention was way too big. We are talking about man-years. It’s not always a cloud bill.
Let's solve all our issues with RocksDB !
I’ve worked on couple Spark-based pipelines, which can be categorized into a category: “find a needle in a haystack”. Filtering huge event stream with complex patterns, regex and custom rules. While switching to Spark Operator was a great initiative to control every aspect of our deployment, making it right was very hard.
- It didn’t matter how would we re-optimize our state, smaller or bigger files, executors would go “black” — unavailable no mater how I’d tweak heartbeat configuration. So the root cause was elsewhere. My old friend EBS. Back in the day gp3 didn’t exists and other types of EBS were very much expensive. I’ve started to look for patterns on what’s happening before the failure. The metric was sourcing from underlying machine — the IOWAIT. It was a small gp2 disk for running OS and container images. No, it wasn’t running out of space. But it was running out of IOPS to use. Solution? Switch to
d
type instances on AWS, create a logical path for all mounted disks and tell spark executors to use host path. NVMe that didn’t care about how many actual IO operations did we do. - Are you wondering why NVMe was a great solution for cost reduction? Well, first it unblocked the data pipeline, reduced the occascional lag and additional resources to close the lag. But eventually, after optimizing the state for “unlimited” IO the actual amount of resources went down by 20–50%. Given that NVMe instances cost on average 20% more it was still way worth it.
Use the right tools.
This one was funny. Especially looking back at it. I was asked to help couple research analysts with performance of their pipeline. It had multiple logic steps, each one implemented as a different process, all orchestrated by AirFlow, written in Python and used SQS as a medium between them.
- First step had a processing rate of couple thousands a second and last one less than a 100 messages per second. “Give us a bigger EC2, this one is too slow” they said. For me the answer was very simple — implement backpressure and parallelize busiest steps. For them it sounded like I was speaking Chinese. I did try to convince them, to teach about amazing magic of applying known software engineering principles. They needed something different. *Magic*
- Initially DevOps guys provided couple huge EC2’s, all of them had couple busy threads and overall less than 10% of CPU usage.
- Let them run on EMR. With PySpark. Yeah, managed services are generally in the opposite category from “cost effective tools”. But PySpark was the magic that they’ve looked for. Py4J has its own issues, but it solved performance issues without team spending months on studying all these new concepts. Backpressure outside of the box and without knowing that it’s there. When I’ve looked at the bill the cost for this component went down by more than 50%.
Leave microservice architecture somewhere else.
Decoupling data pipeline into a mesh of services doesn’t always pays off. It increases the complexity and makes scaling very hard. Modern frameworks like Apache Flink and Apache Spark come built in with multiple optimizations to create complex “monoliths”. General rule of thumb: make the design decision based on rule of 3 V: Volume, Velocity and Variety.
- Leave ETL as ETL (same for ELT). Breaking it into 3 different services isn’t worth it (in most cases).
- We distinguish whether operation is CPU or IO bound, but for ETL we should also consider how sink destination is working. I’d rather keep the “L” as part of ETL if we stream processed data to Database or Data Warehouse like ElasticSeach, Clickhouse, Redshift or etc. In case of using Data Lake (for example: Apache Iceberg) it can be worthwhile to decouple it from the “ET”, due to the fact that commits are periodical.
- Why does it really matter? Optimize your system utilization. If 80% of processing time is spent on loading processed data to its destination — design your system accordingly.
Data loss vs duplicate data (aka Disaster recovery)
Unless you have specific regulation and you are using transactional processing you have to make two decisions (and assign dollar values to it):
- Whether your system can tolerate and detect duplicate data.
- What is the impact of data loss.
One of the hardest challenges I’ve seen is to detect major issues early. One of the general approaches to handling issues in ingestion and processing pipelines is backfil. You re-run your pipeline with providing it start and end positions. The hardest part is to determine those positions, based on two decisions, that you’ve previously made. You need also to look for an edge cases. If you are working on very high volume system it’s not a question of whether (or when) it can happen, but how ready you are.
- Reprocessing data or altering data stores to remove duplicates are extremely time consuming and generally expensive.
- Checkpointing under backpressure is a known Apache Flink problem. I’ve worked on Flink cluster, that sources and sinks data to Kafka topics. Due to overloaded cluster checkpoints where failing, and kafka commit were not committed — Flink commits asynchronously, only after checkpoint is finalized. Kafka lag exporter metric fired an alert, though most of data was processed. I knew that there is an actual lag by looking at event timestamps of records in sink topic, but determining exact positions for backfill was still really hard — there were hundreds of topics. Simply restarting the cluster would cause lots of duplicate data: both latest completed checkpoint and commited offsets were before couple hours. (In 2024 you can already switch to Adaptive scheduling, that allows scaling up parallelism without cluster restart).
Select * on a data lake.
Just block it. Don’t let anyone select *
on a data lake. At least enforce it using with limit
and where
statements.
Hopefully, now you can make your CFO very happy ;)