How to ETL at Petabyte-Scale with Trino

Trino (formerly known as PrestoSQL) is widely appreciated as a fast distributed SQL query engine, but there is precious little information online about using it for batch extract, transform, and load (ETL) ingestion (outside of the original Facebook paper), particularly at petabyte+ scale. After deciding to use Trino as a key piece of Salesforce’s Big Data platform, we developed Huron, a new internal system that uses Trino for running online analytical processing (OLAP) queries against our app log data. In this blog post, we share some of the key insights we discovered on our journey to using Trino to ETL petabytes of data with relatively low data latency.

Why Trino for ETL?

While Trino certainly has some shortcomings when it comes to ETL, such as lack of mid-query fault tolerance and limited expressive power, there are also some highly underrated advantages to using Trino for ETL:

Simple: ETLs in Trino are merely standard SQL statements of the form
INSERT INTO target_table SELECT transform(field) FROM source_table
The overall simplicity of this framework makes the ETL not only easy to implement, but also easy to reason about, and focus can be shifted to tuning other aspects of the system. Moreover, this simplicity means that almost anyone can do ETLs, including end users of the system who may want to perform additional ad hoc transformations specific to their use case.Fast: Trino is fast because it does all processing in memory. This also means it doesn’t handle mid-query failures, and that immediately scares away a lot of people when they think about ETL. What is less appreciated, however, is that there are many scenarios where a system like Trino can come out ahead versus a system that does checkpointing — specifically, where the cost of recovering from failure is less than the cost of checkpointing.Extensive connector framework: Being able to read from many different data sources is one of the main reasons why organizations choose to use Trino, and it’s also a strength when it comes to ETL, as any connector can be used as a source for ETL, and most connectors can be used as a sink as well. For example, we developed a Salesforce connector that issues SOQL queries to read Salesforce data. This connector is now used as a source in an ETL pipeline to store Salesforce objects in our data lakehouse.Highly optimized ORC writer: If you know you’ll be querying your data primarily through Trino, then you’ll likely want your data in Optimized Row Columnar (ORC) format, as Trino is optimized for reading ORC. Trino happens to have one of the most optimized ORC writers, which was written natively for Trino.User defined functions: If standard SQL functions don’t give you the “transform” you’re looking for, it’s fairly easy to implement a custom UDF, which turns Trino into a powerful distributed compute engine. We wrote a UDF that can parse our internal log serialization formats, allowing us to easily distribute our “extract/transform” workload across our Trino cluster.

Tips and tricks for running ETL workloads on Trino

Keep it short and sweet: Because any infrastructure hiccup will cause the entire Trino query to fail, you want to keep your ETL queries short, so that failures are relatively low cost to recover from. In our ETL pipeline, queries generally complete within a few minutes, and no query runs longer than 10 minutes. Running frequent, short queries also helps us keep our data latency low.Compact small files: A related point is that frequent INSERT statements can result in a large number of small files, which can impact read query performance. This can be managed by a periodic compaction process that collates these files into larger ones. We do this by simply having two tables, one partitioned by ~10 minute windows, and one partitioned by day. Users query the data through a view which performs a disjoint union of the two tables, such that they never see partially compacted or duplicate data:CREATE VIEW user_view
AS
SELECT *
FROM fine_grained_partitions
WHERE event_ts > last_compact_ts
UNION ALL
SELECT *
FROM daily_partitions
WHERE event_ts <= last_compact_ts

Compaction itself is a SQL statement, shown here in a simplified example:

INSERT INTO daily_partitions
SELECT *
FROM fine_grained_partitions
WHERE event_ts > last_compact_ts
AND event_ts <= current_compact_tsKeep your workers fed: When ingesting large numbers of small files (we have up to 120k in a single query), we noticed cluster utilization would stall. The cause was the coordinator not generating splits fast enough to keep the workers saturated, as workers would immediately burn through any small files they were assigned. We solved this by increasing hive.split-loader-concurrency, as well as creating more partitions on the source table.Watch for metastore bottlenecks: We use Hive as our metastore, and found that at large numbers of concurrent queries, it would quickly become a bottleneck. Make sure to scale out your Hive installation and backing relational database management system (RDBMS) according to the number of concurrent queries you intend to run. We found that disabling column stats also helped significantly in alleviating this bottleneck.Use a separate cluster for ETL and read queries: Generally, the cadence and intensity of ETL queries are known a priori, and you are tuning the ETL to maximize throughput and usage of cluster resources. Separating these into a separate cluster ensures interactive user queries aren’t impacted.Use the right JVM settings: We noticed queries hanging on what appeared to be a few slow workers stuck on processing certain splits. It turns out that these Trino JVM settings fixed it:
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
Certain pieces of data (in our case, timestamps) can cause the JVM to do a dynamic “deoptimization.” You then get stuck in a loop unless you set these cutoffs.

Scaling writes to object store

Our ETL pipelines write data to S3 using the Hive connector, and managing the writes here is perhaps the trickiest part to doing ETL at large scale with Trino. There is a delicate Goldilocks balance to be managed along multiple related dimensions:

Number of files — large enough to get good write throughput, yet not so large as to hit S3 throttling or to impact read query performance (via too many small files).Number of writers — large enough to get good overall parallelism, but the number of files scales with this.Number of partitions — large enough to get good partition pruning on the query side, but not so large as to impact write throughput and file counts/sizes.

There are several different configs that control this overall balance, and it can be initially difficult to grok how they all interact with each other. Here’s a diagram that makes the process a little easier to understand:

Every INSERT in Trino is executed by a number of writer nodes (i.e. Trino workers), and each of these nodes can have multiple threads (labeled here as “task_writer”) doing the the actual writes. Each thread can potentially write to every partition (in this example, data for each partition is spread across all writers).

There are a few session properties (as of Trino 356) associated with these:

task_writer_count – number of threads per writer per query.scale_writers – if true, Trino starts with a single writer, then adds additional writers as the writer_min_size threshold is surpassed for each writer. If false, all nodes (up to hash_partition_count) are used to write.use_preferred_write_partitioning – if true, each partition is written only by a single writer.

The number of files that ends up getting created is thus a function of: num_writers * task_writer_count * num_partitions.

Getting good overall performance on both the write and read side is a matter of tuning all these variables to get reasonable file counts and sizes. In the end, the right settings to use will depend on a multitude of variables, like the size of the data and skew. For example, we use scale_writers=true when we know a given INSERT will have a relatively small amount of data.

Some final pointers to keep in mind as you go about your tuning process:

As described earlier, compaction can collate files after they are initially written.Object stores like S3 typically throttle requests based on prefix. Ensure your partitioning scheme results in a good distribution across prefixes. You may potentially want to work with e.g. AWS to pre-partition your bucket at the S3 level to ensure you don’t get throttled.Trino writes through the Hive connector are not transactional, but they are semi-transactional in the sense that files are first written out to object store, and only after all the files are successfully written is the partition committed to the metastore. Your system should account for this — for example, we write to a new s3 prefix on every retry attempt.

Should I use Trino to ETL?

While it might not be suitable for every ETL scenario out there, we’ve seen that Trino does quite well for a certain class of ETL use cases. Trino does best where the ETL can be designed around some of Trino’s shortcomings (like keeping ETL queries short-running for easy failure recovery), and where retries and state management are handled by a robust external system like Apache Airflow. It also takes some careful partition planning and config tuning, but the end result can be a simple, fast, and effective system.

Despite our legacy log serialization formats being compute intensive to parse, we’ve managed to ingest at peak rates of >2.5 GB/sec, with data being available for querying within an average of <15 minutes. Going forward, we hope to improve upon our design by using Iceberg, which provides snapshot/serializable isolation and removes the metastore bottleneck, optimizing Trino further for our workloads, and extending our framework to other dataset types.

Thanks to Chinmay Kulkarni, Conor McAvoy, and Laura Lindeman for their review and feedback.

Join our Talent Network to see all of the open positions at Salesforce Engineering!

How to ETL at Petabyte-Scale with Trino was originally published in Salesforce Engineering on Medium, where people are continuing the conversation by highlighting and responding to this story.

Read More

Published
Categorized as Technology