{"id":488,"date":"2021-10-12T15:57:44","date_gmt":"2021-10-12T15:57:44","guid":{"rendered":"https:\/\/fde.cat\/index.php\/2021\/10\/12\/how-to-etl-at-petabyte-scale-with-trino\/"},"modified":"2021-10-12T15:57:44","modified_gmt":"2021-10-12T15:57:44","slug":"how-to-etl-at-petabyte-scale-with-trino","status":"publish","type":"post","link":"https:\/\/fde.cat\/index.php\/2021\/10\/12\/how-to-etl-at-petabyte-scale-with-trino\/","title":{"rendered":"How to ETL at Petabyte-Scale with Trino"},"content":{"rendered":"<p><a href=\"https:\/\/trino.io\/\">Trino<\/a> (formerly known as PrestoSQL) is widely appreciated as a fast distributed SQL query engine, but there is precious little information online about using it for batch extract, transform, and load (ETL) ingestion (outside of the <a href=\"https:\/\/trino.io\/paper.html\">original Facebook paper<\/a>), particularly at petabyte+ scale. After <a href=\"https:\/\/engineering.salesforce.com\/big-data-big-decisions-690a00fad88d\">deciding to use Trino<\/a> as a key piece of Salesforce\u2019s Big Data platform, we developed Huron, a new internal system that uses Trino for running online analytical processing (OLAP) queries against our app log data. In this blog post, we share some of the key insights we discovered on our journey to using Trino to ETL petabytes of data with relatively low data\u00a0latency.<\/p>\n<h3>Why Trino for\u00a0ETL?<\/h3>\n<p>While Trino certainly has some shortcomings when it comes to ETL, such as lack of mid-query fault tolerance and limited expressive power, there are also some highly underrated advantages to using Trino for\u00a0ETL:<\/p>\n<p><strong>Simple<\/strong>: ETLs in Trino are merely standard SQL statements of the form<br \/> INSERT INTO target_table SELECT transform(field) FROM source_table<br \/>The overall simplicity of this framework makes the ETL not only easy to implement, but also easy to reason about, and focus can be shifted to tuning other aspects of the system. Moreover, this simplicity means that almost anyone can do ETLs, including end users of the system who may want to perform additional ad hoc transformations specific to their use\u00a0case.<strong>Fast<\/strong>: Trino is fast because it does all processing in memory. This also means it doesn\u2019t handle mid-query failures, and that immediately scares away a lot of people when they think about ETL. What is less appreciated, however, is that there are many scenarios where a system like Trino can come out ahead versus a system that does checkpointing\u200a\u2014\u200aspecifically, where the cost of recovering from failure is less than the cost of checkpointing.<strong>Extensive connector framework<\/strong>: Being able to read from many different data sources is one of the main reasons why organizations choose to use Trino, and it\u2019s also a strength when it comes to ETL, as any connector can be used as a source for ETL, and most connectors can be used as a sink as well. For example, we developed a Salesforce connector that issues <a href=\"https:\/\/developer.salesforce.com\/docs\/atlas.en-us.soql_sosl.meta\/soql_sosl\/sforce_api_calls_soql.htm\">SOQL<\/a> queries to read Salesforce data. This connector is now used as a source in an ETL pipeline to store Salesforce objects in our data lakehouse.<strong>Highly optimized ORC writer<\/strong>: If you know you\u2019ll be querying your data primarily through Trino, then you\u2019ll likely want your data in Optimized Row Columnar (ORC) format, as Trino is optimized for reading ORC. Trino happens to have one of the most optimized ORC writers, which was written natively for\u00a0Trino.<strong>User defined functions<\/strong>: If standard SQL functions don\u2019t give you the \u201ctransform\u201d you\u2019re looking for, it\u2019s fairly easy to implement a <a href=\"https:\/\/trino.io\/docs\/current\/develop\/functions.html\">custom UDF<\/a>, which turns Trino into a powerful distributed compute engine. We wrote a UDF that can parse our internal log serialization formats, allowing us to easily distribute our \u201cextract\/transform\u201d workload across our Trino\u00a0cluster.<\/p>\n<h3>Tips and tricks for running ETL workloads on\u00a0Trino<\/h3>\n<p><strong>Keep it short and sweet<\/strong>: Because any infrastructure hiccup will cause the entire Trino query to fail, you want to keep your ETL queries short, so that failures are relatively low cost to recover from. In our ETL pipeline, queries generally complete within a few minutes, and no query runs longer than 10 minutes. Running frequent, short queries also helps us keep our data latency\u00a0low.<strong>Compact small files<\/strong>: A related point is that frequent INSERT statements can result in a large number of small files, which can impact read query performance. This can be managed by a periodic compaction process that collates these files into larger ones. We do this by simply having two tables, one partitioned by ~10 minute windows, and one partitioned by day. Users query the data through a view which performs a disjoint union of the two tables, such that they never see partially compacted or duplicate data:CREATE VIEW user_view<br \/>AS<br \/> SELECT *<br \/> FROM fine_grained_partitions<br \/> WHERE event_ts &gt; last_compact_ts<br \/> UNION ALL<br \/> SELECT *<br \/> FROM daily_partitions<br \/> WHERE event_ts &lt;= last_compact_ts<\/p>\n<p>Compaction itself is a SQL statement, shown here in a simplified example:<\/p>\n<p>INSERT INTO daily_partitions<br \/>SELECT *<br \/>FROM fine_grained_partitions<br \/>WHERE event_ts &gt; last_compact_ts<br \/> AND event_ts &lt;= current_compact_ts<strong>Keep your workers fed<\/strong>: When ingesting large numbers of small files (we have up to 120k in a single query), we noticed cluster utilization would stall. The cause was the coordinator not generating splits fast enough to keep the workers saturated, as workers would immediately burn through any small files they were assigned. We solved this by increasing hive.split-loader-concurrency, as well as creating more partitions on the source\u00a0table.<strong>Watch for metastore bottlenecks<\/strong>: We use <a href=\"https:\/\/hive.apache.org\/\">Hive<\/a> as our metastore, and found that at large numbers of concurrent queries, it would quickly become a bottleneck. Make sure to scale out your Hive installation and backing relational database management system (RDBMS) according to the number of concurrent queries you intend to run. We found that disabling column stats also helped significantly in alleviating this bottleneck.<strong>Use a separate cluster for ETL and read queries<\/strong>: Generally, the cadence and intensity of ETL queries are known a priori, and you are tuning the ETL to maximize throughput and usage of cluster resources. Separating these into a separate cluster ensures interactive user queries aren\u2019t impacted.<strong>Use the right JVM settings<\/strong>: We noticed queries hanging on what appeared to be a few slow workers stuck on processing certain splits. It turns out that these Trino JVM settings fixed it:<br \/>-XX:PerMethodRecompilationCutoff=10000<br \/>-XX:PerBytecodeRecompilationCutoff=10000<br \/>Certain pieces of data (in our case, timestamps) can cause the JVM to do a dynamic \u201cdeoptimization.\u201d You then get stuck in a loop unless you set these\u00a0cutoffs.<\/p>\n<h3>Scaling writes to object\u00a0store<\/h3>\n<p>Our ETL pipelines write data to S3 using the Hive connector, and managing the writes here is perhaps the trickiest part to doing ETL at large scale with Trino. There is a delicate <a href=\"https:\/\/en.wikipedia.org\/wiki\/Goldilocks_principle\">Goldilocks<\/a> balance to be managed along multiple related dimensions:<\/p>\n<p><strong>Number of files<\/strong>\u200a\u2014\u200alarge enough to get good write throughput, yet not so large as to hit S3 throttling or to impact read query performance (via too many small\u00a0files).<strong>Number of writers<\/strong>\u200a\u2014\u200alarge enough to get good overall parallelism, but the number of files scales with\u00a0this.<strong>Number of partitions<\/strong>\u200a\u2014\u200alarge enough to get good partition pruning on the query side, but not so large as to impact write throughput and file counts\/sizes.<\/p>\n<p>There are several different configs that control this overall balance, and it can be initially difficult to grok how they all interact with each other. Here\u2019s a diagram that makes the process a little easier to understand:<\/p>\n<p>Every INSERT in Trino is executed by a number of writer nodes (i.e. Trino workers), and each of these nodes can have multiple threads (labeled here as \u201ctask_writer\u201d) doing the the actual writes. Each thread can potentially write to every partition (in this example, data for each partition is spread across all writers).<\/p>\n<p>There are a few <a href=\"https:\/\/trino.io\/docs\/current\/admin\/properties-writer-scaling.html\">session properties<\/a> (as of Trino 356) associated with\u00a0these:<\/p>\n<p>task_writer_count &#8211; number of threads per writer per\u00a0query.scale_writers &#8211; if true, Trino starts with a single writer, then adds additional writers as the writer_min_size threshold is surpassed for each writer. If false, all nodes (up to hash_partition_count) are used to\u00a0write.use_preferred_write_partitioning &#8211; if true, each partition is written only by a single\u00a0writer.<\/p>\n<p>The number of files that ends up getting created is thus a function of: num_writers * task_writer_count * num_partitions.<\/p>\n<p>Getting good overall performance on both the write and read side is a matter of tuning all these variables to get reasonable file counts and sizes. In the end, the right settings to use will depend on a multitude of variables, like the size of the data and skew. For example, we use scale_writers=true when we know a given INSERT will have a relatively small amount of data.<\/p>\n<p>Some final pointers to keep in mind as you go about your tuning\u00a0process:<\/p>\n<p>As described earlier, compaction can collate files after they are initially written.Object stores like S3 typically throttle requests based on prefix. Ensure your partitioning scheme results in a good distribution across prefixes. You may potentially want to work with e.g. AWS to pre-partition your bucket at the S3 level to ensure you don\u2019t get throttled.Trino writes through the Hive connector are not transactional, but they are semi-transactional in the sense that files are first written out to object store, and only after all the files are successfully written is the partition committed to the metastore. Your system should account for this\u200a\u2014\u200afor example, we write to a new s3 prefix on every retry\u00a0attempt.<\/p>\n<h3>Should I use Trino to\u00a0ETL?<\/h3>\n<p>While it might not be suitable for every ETL scenario out there, we\u2019ve seen that Trino does quite well for a certain class of ETL use cases. Trino does best where the ETL can be designed around some of Trino\u2019s shortcomings (like keeping ETL queries short-running for easy failure recovery), and where retries and state management are handled by a robust external system like <a href=\"https:\/\/airflow.apache.org\/\">Apache Airflow<\/a>. It also takes some careful partition planning and config tuning, but the end result can be a simple, fast, and effective system.<\/p>\n<p>Despite our legacy log serialization formats being compute intensive to parse, we\u2019ve managed to ingest at peak rates of &gt;2.5 GB\/sec, with data being available for querying within an average of &lt;15 minutes. Going forward, we hope to improve upon our design by using <a href=\"https:\/\/iceberg.apache.org\/\">Iceberg<\/a>, which provides snapshot\/serializable isolation and removes the metastore bottleneck, optimizing Trino further for our workloads, and extending our framework to other dataset\u00a0types.<\/p>\n<p><em>Thanks to Chinmay Kulkarni, Conor McAvoy, and Laura Lindeman for their review and feedback.<\/em><\/p>\n<p><a href=\"https:\/\/careers.mail.salesforce.com\/tpil-blogs\"><em>Join our Talent Network<\/em><\/a><em> to see all of the open positions at Salesforce Engineering!<\/em><\/p>\n<p><a href=\"https:\/\/engineering.salesforce.com\/how-to-etl-at-petabyte-scale-with-trino-5fe8ac134e36\">How to ETL at Petabyte-Scale with Trino<\/a> was originally published in <a href=\"https:\/\/engineering.salesforce.com\/\">Salesforce Engineering<\/a> on Medium, where people are continuing the conversation by highlighting and responding to this story.<\/p>\n<p><a href=\"https:\/\/engineering.salesforce.com\/how-to-etl-at-petabyte-scale-with-trino-5fe8ac134e36?source=rss----cfe1120185d3---4\">Read More<\/a><\/p>","protected":false},"excerpt":{"rendered":"<p>Trino (formerly known as PrestoSQL) is widely appreciated as a fast distributed SQL query engine, but there is precious little information online about using it for batch extract, transform, and load (ETL) ingestion (outside of the original Facebook paper), particularly at petabyte+ scale. After deciding to use Trino as a key piece of Salesforce\u2019s Big&hellip; <a class=\"more-link\" href=\"https:\/\/fde.cat\/index.php\/2021\/10\/12\/how-to-etl-at-petabyte-scale-with-trino\/\">Continue reading <span class=\"screen-reader-text\">How to ETL at Petabyte-Scale with Trino<\/span><\/a><\/p>\n","protected":false},"author":0,"featured_media":0,"comment_status":"","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"spay_email":"","footnotes":""},"categories":[7],"tags":[],"class_list":["post-488","post","type-post","status-publish","format-standard","hentry","category-technology","entry"],"jetpack_featured_media_url":"","jetpack-related-posts":[{"id":636,"url":"https:\/\/fde.cat\/index.php\/2022\/09\/27\/a-peek-at-datoramas-aws-s3-sql-query-tool\/","url_meta":{"origin":488,"position":0},"title":"A Peek at Datorama\u2019s AWS S3 SQL Query Tool","date":"September 27, 2022","format":false,"excerpt":"Datorama Reports for Marketing Cloud enables you to generate, view, and share a detailed analysis of your Email, Push, and Journey campaign-level data. For that, Datorama extracts large volumes of data to its Data Lake solution for the Marketing Cloud marketing analytics, which is stored in a structured table compatible\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":900,"url":"https:\/\/fde.cat\/index.php\/2024\/07\/22\/data-clouds-lightning-fast-migration-from-amazon-ec2-to-kubernetes-in-6-months\/","url_meta":{"origin":488,"position":1},"title":"Data Cloud\u2019s Lightning-Fast Migration: From Amazon EC2 to Kubernetes in 6 Months","date":"July 22, 2024","format":false,"excerpt":"In our \u201cEngineering Energizers\u201d Q&A series, we delve into the journeys of distinguished engineering leaders. Today, we feature Archana Kumari, Director of Software Engineering at Salesforce. Archana leads our India-based Data Cloud Compute Layer team, which played a pivotal role in a recent transition from Amazon EC2 to Kubernetes for\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":694,"url":"https:\/\/fde.cat\/index.php\/2023\/03\/23\/big-data-processing-driving-data-migration-for-salesforce-data-cloud\/","url_meta":{"origin":488,"position":2},"title":"Big Data Processing: Driving Data Migration  for Salesforce Data Cloud","date":"March 23, 2023","format":false,"excerpt":"The tsunami of data \u2014 set to exceed 180 zettabytes by 2025 \u2014 places significant pressure on companies. Simply having access to customer information is not enough \u2014 companies must also analyze and refine the data to find actionable pieces that power new business. As businesses collect these volumes of\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":753,"url":"https:\/\/fde.cat\/index.php\/2023\/08\/29\/scheduling-jupyter-notebooks-at-meta\/","url_meta":{"origin":488,"position":3},"title":"Scheduling Jupyter Notebooks at Meta","date":"August 29, 2023","format":false,"excerpt":"At Meta, Bento is our internal Jupyter notebooks platform that is leveraged by many internal users. Notebooks are also being used widely for creating reports and workflows (for example, performing data ETL) that need to be repeated at certain intervals. Users with such notebooks would have to remember to manually\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":278,"url":"https:\/\/fde.cat\/index.php\/2021\/08\/31\/a-peek-at-datoramas-virtual-sandbox\/","url_meta":{"origin":488,"position":4},"title":"A Peek at Datorama\u2019s Virtual Sandbox","date":"August 31, 2021","format":false,"excerpt":"What is a\u00a0Sandbox?A Sandbox is an isolated environment where you can safely experiment and test changes that you intend to apply to your live environment. The idea is to learn about the implications of these intended changes in advance, in order to decide on how to best conduct them, without\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":892,"url":"https:\/\/fde.cat\/index.php\/2024\/07\/08\/unlocking-data-clouds-secret-for-scaling-massive-data-volumes-and-slashing-processing-bottlenecks\/","url_meta":{"origin":488,"position":5},"title":"Unlocking Data Cloud\u2019s Secret for Scaling Massive Data Volumes and Slashing Processing Bottlenecks","date":"July 8, 2024","format":false,"excerpt":"In our Engineering Energizers Q&A series, we explore engineers who have pioneered advancements in their fields. Today, we meet Rahul Singh, Vice President of Software Engineering, leading the India-based Data Cloud team. His team is focused on delivering a robust, scalable, and efficient Data Cloud platform that consolidates customer data\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]}],"_links":{"self":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts\/488","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/types\/post"}],"replies":[{"embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/comments?post=488"}],"version-history":[{"count":0,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts\/488\/revisions"}],"wp:attachment":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/media?parent=488"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/categories?post=488"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/tags?post=488"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}