{"id":672,"date":"2023-01-31T14:00:27","date_gmt":"2023-01-31T14:00:27","guid":{"rendered":"https:\/\/fde.cat\/index.php\/2023\/01\/31\/asynchronous-computing-at-meta-overview-and-learnings\/"},"modified":"2023-01-31T14:00:27","modified_gmt":"2023-01-31T14:00:27","slug":"asynchronous-computing-at-meta-overview-and-learnings","status":"publish","type":"post","link":"https:\/\/fde.cat\/index.php\/2023\/01\/31\/asynchronous-computing-at-meta-overview-and-learnings\/","title":{"rendered":"Asynchronous computing at Meta: Overview and learnings"},"content":{"rendered":"<p><span>We\u2019ve made architecture changes to Meta\u2019s event driven asynchronous computing platform that have\u00a0 enabled easy integration with multiple event-sources.\u00a0<\/span><br \/>\n<span>We\u2019re sharing our learnings from handling various workloads and how to tackle trade offs made with certain design choices in building the platform.<\/span><\/p>\n<p><span><a href=\"https:\/\/engineering.fb.com\/2020\/08\/17\/production-engineering\/async\/\" target=\"_blank\" rel=\"noopener\">Asynchronous computing<\/a> is a paradigm where the user does not expect a workload to be executed immediately; instead, it gets scheduled for execution sometime in the near future without blocking the latency-critical path of the application. At Meta, we have built a platform for serverless <\/span><span>asynchronous computing<\/span><span> that is provided as a service for other engineering teams. They register asynchronous functions on the platform and then submit workloads for execution via our SDK. The platform executes these workloads in the background on a large fleet of workers and provides additional capabilities such as load balancing, rate limiting, quota management, downstream protection and many others. We refer to this infrastructure internally as \u201c<a href=\"https:\/\/engineering.fb.com\/2020\/08\/17\/production-engineering\/async\/\">Async tier<\/a>.\u201d\u00a0<\/span><\/p>\n<p><span>Currently we support myriad different customer use cases which result in multi-trillion workloads being executed each day.<\/span><\/p>\n<p><span>There is already a great article from 2020 that dives into the details of the <\/span><a href=\"https:\/\/engineering.fb.com\/2020\/08\/17\/production-engineering\/async\/\"><span>architecture of Async tier<\/span><\/a><span>, the features it provided, and how these features could be applied at scale. In the following material we will focus more on design and implementation aspects and explain how we re-architected the platform to enable five-fold growth over the past two years.<\/span><\/p>\n<div class=\"fb-video\"><\/div>\n<h2><span>General high-level architecture<\/span><\/h2>\n<p><span>Any asynchronous computing platform is composed of the following building blocks:<\/span><\/p>\n<p><span>Ingestion and storage<\/span><br \/>\n<span>Transport and routing<\/span><br \/>\n<span>Computation<\/span><\/p>\n<h3><span><\/span><\/h3>\n<p>Ingestion and storage<\/p>\n<p><span>Our platform is responsible for accepting the workloads and storing them for execution. Here, both latency and reliability are critical: This layer must accept the workload and respond back ASAP, and it must store the workload reliably all the way to successful execution.\u00a0<\/span><\/p>\n<h3><span>Transport and routing<\/span><\/h3>\n<p><span>This deals with transferring the adequate number of workloads from storage into the computation layer, where they will be executed. Sending inadequate numbers will underutilize the <\/span><span>computation<\/span><span> layer and cause an unnecessary processing delay, whereas sending too many will overwhelm the machines responsible for the <\/span><span>computation<\/span><span> and can cause failures. Thus, we define sending the correct number as \u201cflow-control.\u201d<\/span><\/p>\n<p><span>This layer is also responsible for maintaining the optimal utilization of resources in the <\/span><span>computation<\/span><span> layer as well as additional features such as cross-regional load balancing, quota management, rate limiting, downstream protection, backoff and retry capabilities, and many others.<\/span><\/p>\n<h3><span>Computation<\/span><\/h3>\n<p><span>This usually refers to specific worker runtime where the actual function execution takes place.<\/span><\/p>\n<h2><span>Back in 2020<\/span><\/h2>\n<p><span>In the past, Meta built its own distributed priority queue, equivalent to some of the queuing solutions provided by public cloud providers. It is called the <a href=\"https:\/\/engineering.fb.com\/2021\/02\/22\/production-engineering\/foqs-scaling-a-distributed-priority-queue\/\" target=\"_blank\" rel=\"noopener\">Facebook Ordered Queuing Service<\/a> (since it was built when the company was called Facebook), and has a famous acronym: FOQS. FOQS is critical to our story, because it comprised the core of the ingestion and storage components.<\/span><\/p>\n<h3><span>Facebook Ordered Queuing Service (FOQS)<\/span><\/h3>\n<p><a href=\"https:\/\/engineering.fb.com\/2021\/02\/22\/production-engineering\/foqs-scaling-a-distributed-priority-queue\/\" target=\"_blank\" rel=\"noopener\"><span>FOQS<\/span><\/a><span>, our in-house distributed priority queuing service, was developed on top of MySQL and provides the ability to put items in the queue with a timestamp, after which they should be available for consumption as an enqueue operation. The available items can be consumed later with a dequeue operation. While dequeuing, the consumer holds a lease on an item, and once the item is processed successfully, they \u201cACK\u201d (acknowledge) it back to FOQS. Otherwise, they \u201cNACK\u201d (NACK means negative acknowledgement) the item and it becomes available immediately for someone else to dequeue. The lease can also expire before either of these actions takes place, and the item gets auto-NACKed owing to a lease timeout. Also, this is non-blocking, meaning that customers can take a lease on subsequently enqueued, available items even though the oldest item was neither ACKed nor NACKed. There\u2019s already a great article on the subject if you are interested in diving deeply into <a href=\"https:\/\/engineering.fb.com\/2021\/02\/22\/production-engineering\/foqs-scaling-a-distributed-priority-queue\/\" target=\"_blank\" rel=\"noopener\">how we scaled FOQS<\/a>.<\/span><\/p>\n<p><span>Async tier leveraged FOQS by introducing a lightweight service, called \u201cSubmitter,\u201d that customers could use to submit their workloads to the queue. Submitter would do basic validation \/ overload protection and enqueue these items into FOQS. The transport layer consisted of a component called \u201cDispatcher.\u201d This pulled items from FOQS and sent them to the computation layer for execution.<\/span><\/p>\n\n<h2><span>Challenges<\/span><\/h2>\n<h3><span>Increasing complexity of the system<\/span><\/h3>\n<p><span>Over time we started to see that the dispatcher was taking more and more responsibility, growing in size, and becoming almost a single place for all the new features and logic that the team is working on. It was:<\/span><\/p>\n<p><span>Consuming items from FOQS, managing their lifecycle.<\/span><br \/>\n<span>Protecting FOQS from overload by adaptively adjusting dequeue rates.<\/span><br \/>\n<span>Providing all regular features such as rate limiting, quota management, workload prioritization, downstream protection.<\/span><br \/>\n<span>Sending workloads to multiple worker runtimes for execution and managing job lifecycle.<\/span><br \/>\n<span>Providing both local and cross-regional load balancing and flow control.<\/span><\/p>\n<p><span>Consolidating a significant amount of logic in a single component eventually made it hard for us to work on new capabilities in parallel and scale the team operationally.<\/span><\/p>\n<h3><span>External data sources<\/span><\/h3>\n<p><span>At the same time we started to see more and more requests from customers who want to execute their workloads based on data that is already stored in other systems, such as stream, data warehouse, blob storage, pub sub queues, or many others. Although it was possible to do in the existing system, it was coming along with certain downsides.<\/span><\/p>\n\n<p><span>The limitations in the above architecture are:<\/span><\/p>\n<p><span>Customers had to write their own solutions to read data from the original storage and submit it to our platform via Submitter API. It was causing recurrent duplicate work across multiple different use cases.<\/span><br \/>\n<span>Data always had to be copied to FOQS, causing major inefficiency when happening at scale. In addition, some storages were more suitable for particular types of data and load patterns than others. For example, the cost of storing data from high-traffic streams or large data warehouse tables in the queue can be significantly higher than keeping it in the original storage.<\/span><\/p>\n<h2><span>Re-architecture<\/span><\/h2>\n<p><span>To solve the above problems, we had to break down the system into more granular components with clear responsibilities and add first-class support for external data sources.<\/span><\/p>\n<p><span>Our re-imagined version of Async tier would look like this:<\/span><\/p>\n\n<h3><span>Generic transport layer<\/span><\/h3>\n<p><span>In the old system, our transport layer consisted of the dispatcher, which pulled workloads from FOQS. As the first step on the path of multi-source support, we decoupled the storage reading logic from the transport layer and moved it upstream. This left the transport layer as a data-source-agnostic component responsible for managing the execution and providing a compute-related set of capabilities such as rate limiting, quota management, load balancing, etc. We call this \u201cscheduler\u201d\u2014an independent service with a generic API.<\/span><\/p>\n<h3><span>Reading workloads<\/span><\/h3>\n<p><span>Every data source can be different\u2014for example, immutable vs. mutable, or fast-moving vs large-batch\u2014and eventually requires some specific code and settings to read from it. We created adapters to house these \u201cread logic\u201d\u2013the various <\/span><span>mechanisms for reading different data sources<\/span><span>. These adapters act like the UNIX tail command, tailing the data source for new workloads\u2014so we call these \u201ctailers.\u201d During the onboarding, for each data source that the customer utilizes, the platform launches corresponding tailer instances for reading that data.\u00a0<\/span><\/p>\n<p><span>With these changes in place, our architecture looks like this:<\/span><\/p>\n\n<h3><span>Push versus pull and consequences<\/span><\/h3>\n<p><span>To facilitate these changes, the tailers were now \u201cpush\u201d-ing data to the transport layer (the scheduler) instead of the transport \u201cpull\u201d-ing it.\u00a0<\/span><\/p>\n<p><span>The benefit of this change was the ability to provide a generic scheduler API and make it data-source agnostic. In push-mode, tailers would send the workloads as RPC to the scheduler and did not have to wait for ACK\/NACK or lease timeout to know if they were successful or failed.<\/span><\/p>\n\n<p><span>Cross-regional load balancing also became more accurate with this change, since they would be controlled centrally from the tailer instead of each region pulling independently.<\/span><\/p>\n\n<p><span>These changes collectively improved the cross-region load distribution and the end-to-end latency of our platform, together with getting rid of data duplication (owing to buffering in FOQS) and treating all data sources as first-class citizens on our platform.\u00a0<\/span><\/p>\n<p><span>However, there were a couple of drawbacks to these changes as well. As push mode is essentially an RPC, it\u2019s not a great fit for long-running workloads. It requires both client and server to allocate resources for the connection and hold them during the entire function running time, which can become a significant problem at scale. Also, synchronous workloads that run for a while have an increased chance of failure due to transient errors that will make them start over again completely. Based on the usage statistics of our platform, the majority of the workloads were finishing within seconds, so it was not a blocker, but it\u2019s important to consider this limitation if a significant part of your functions are taking multiple minutes or even tens of minutes to finish.<\/span><\/p>\n<h2><span>Re-architecture: Results<\/span><\/h2>\n<p><span>Let\u2019s quickly look at the main benefits we achieved from re-architecture:<\/span><\/p>\n<p><span>Workloads are no longer getting copied in FOQS for the sole purpose of buffering.<\/span><br \/>\n<span>Customers don\u2019t need to invest extra effort in building their own solutions.<\/span><br \/>\n<span>We managed to break down the system into granular components with a clean contract, which makes it easier to scale our operations and work on new features in parallel.<\/span><br \/>\n<span>Moving to push mode improved our e2e latency and cross-regional load distribution.<\/span><\/p>\n<p><span>By enabling first-class support for various data sources, we have created a space for further efficiency wins due to the ability to choose the most efficient storage for each individual use case. Over time we noticed two popular options that customers choose: queue (<\/span><a href=\"https:\/\/engineering.fb.com\/2021\/02\/22\/production-engineering\/foqs-scaling-a-distributed-priority-queue\/\" target=\"_blank\" rel=\"noopener\"><span>FOQS<\/span><\/a><span>) and stream (<\/span><a href=\"https:\/\/engineering.fb.com\/2019\/10\/07\/data-infrastructure\/scribe\/\" target=\"_blank\" rel=\"noopener\"><span>Scribe<\/span><\/a><span>). Since we have enough operational experience with both of them, we are currently in a position to compare the two instances and understand the tradeoffs of using each for powering asynchronous computations.<\/span><\/p>\n<h3><span>Queues versus streams<\/span><\/h3>\n<p><span>With queue as the choice of storage, customers have full flexibility when it comes to retry policies, granular per-item access, and variadic function running time, mainly due to the concept of lease and arbitrary ordering support. If computation was unsuccessful for some workloads, they could be granularly retried by NACKing the item back to the queue with arbitrary delay. However, the concept of lease comes at the cost of an internal item lifecycle management system. In the same way, priority-based ordering comes at the cost of the secondary index on items. These made queues a great universal choice with a lot of flexibility, at a moderate cost.<\/span><\/p>\n<p><span>Streams are less flexible, since they provide immutable data in batches and cannot support granular retries or random access per item. However, they are more efficient if the customer needs only fast sequential access to a large volume of incoming traffic. So, compared to queues, streams provide lower cost at scale by trading off flexibility.<\/span><\/p>\n<h3><span>The problem of retries in streams<\/span><\/h3>\n<h4><span>Clogged stream<\/span><\/h4>\n<p><span>While we explained above that granular message-level retries were not possible in stream, we could not compromise on the At-Least-Once delivery guarantee that we had been providing to our customers. This meant we had to build the capability of providing source-agnostic retries for failed workloads.<\/span><\/p>\n\n<p><span>For streams, the tailers would read workloads in batches and advance a checkpoint for demarcating how far down the stream the read had progressed. These batches would be sent for computation, and the tailer would read the next batch and advance the checkpoint further once all items were processed. As this continued, if even one of the items in the last batch failed, the system wouldn\u2019t be able to make forward progress until, after a few retries, it\u2019s processed successfully. For a high-traffic stream, this would build up significant lag ahead of the checkpoint, and the platform would eventually struggle to catch up. The other option was to drop the failed workload and not block the stream, which would violate the At-Least-Once (ALO) guarantee.<\/span><\/p>\n<h4><span>Delay service<\/span><\/h4>\n\n<p><span>To solve this problem, we have created another service that can store items and retry them after arbitrary delay without blocking the entire stream. This service will accept the workloads along with their intended delay intervals (exponential backoff retry intervals can be used here), and upon completion of this delay interval, it will send the items to computation. We call this the controlled-delay service.\u00a0<\/span><\/p>\n<p><span>We have explored two possible ways to offer this capability:<\/span><\/p>\n<p><span>Use priority queue as intermediate storage and rely on the assumption that most of the traffic will go through the main stream and we will only need to deal with a small fraction of outliers. In that case, it\u2019s important to make sure that during a massive increase in errors (for example, when 100% of jobs are failing), we will clog the stream completely instead of copying it into Delay service.<\/span><br \/>\n<span>Create multiple predefined delay-streams that are blocked by a fixed amount of time (for example, 30s, 1 minute, 5 minutes, 30 minutes) such that every item entering them gets delayed by this amount of time before being read. Then we can combine the available delay-streams to achieve the amount of delay time required by a specific workload before sending it back. As it\u2019s using only sequential access streams under the hood, this approach can potentially allow Delay service to run at a bigger scale with lower cost.<\/span><\/p>\n<h2><span>Observations and learnings<\/span><\/h2>\n<p><span>The main takeaway from our observations is that there is no one-size-fits-all solution when it comes to running async computation at scale. You will have to constantly evaluate tradeoffs and choose an approach based on the specifics of your particular use cases. We noted that streams with RPC are best suited to support high-traffic, short-running workloads, whereas long execution time or granular retries will be supported well by queues at the cost of maintaining the ordering and lease management system. Also, if strict delivery guarantee is crucial for a stream-based architecture with a high ingestion rate, investing in a separate service to handle the retriable workloads can be beneficial.<\/span><\/p>\n<p>The post <a href=\"https:\/\/engineering.fb.com\/2023\/01\/31\/production-engineering\/meta-asynchronous-computing\/\">Asynchronous computing at Meta: Overview and learnings<\/a> appeared first on <a href=\"https:\/\/engineering.fb.com\/\">Engineering at Meta<\/a>.<\/p>\n<p>Engineering at Meta<\/p>","protected":false},"excerpt":{"rendered":"<p>We\u2019ve made architecture changes to Meta\u2019s event driven asynchronous computing platform that have\u00a0 enabled easy integration with multiple event-sources.\u00a0 We\u2019re sharing our learnings from handling various workloads and how to tackle trade offs made with certain design choices in building the platform. Asynchronous computing is a paradigm where the user does not expect a workload&hellip; <a class=\"more-link\" href=\"https:\/\/fde.cat\/index.php\/2023\/01\/31\/asynchronous-computing-at-meta-overview-and-learnings\/\">Continue reading <span class=\"screen-reader-text\">Asynchronous computing at Meta: Overview and learnings<\/span><\/a><\/p>\n","protected":false},"author":0,"featured_media":0,"comment_status":"","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"spay_email":"","footnotes":""},"categories":[7],"tags":[],"class_list":["post-672","post","type-post","status-publish","format-standard","hentry","category-technology","entry"],"jetpack_featured_media_url":"","jetpack-related-posts":[{"id":805,"url":"https:\/\/fde.cat\/index.php\/2023\/12\/19\/how-meta-built-the-infrastructure-for-threads\/","url_meta":{"origin":672,"position":0},"title":"How Meta built the infrastructure for Threads","date":"December 19, 2023","format":false,"excerpt":"On July 5, 2023, Meta launched Threads, the newest product in our family of apps, to an unprecedented success that saw it garner over 100 million sign ups in its first five days. A small, nimble team of engineers built Threads over the course of only five months of technical\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":221,"url":"https:\/\/fde.cat\/index.php\/2021\/02\/02\/asyncapi-and-openapi-an-api-modeling-approach\/","url_meta":{"origin":672,"position":1},"title":"AsyncAPI and OpenAPI: an API Modeling Approach","date":"February 2, 2021","format":false,"excerpt":"AsyncAPI is gaining traction in the ecosystem of API tools. It solves an important problem: it provides a convenient way of describing the interface of event-driven systems independently of the underlying technology. With AsyncAPI, evented systems can be treated as any other API product: a productizable and reusable, self-describing building\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":875,"url":"https:\/\/fde.cat\/index.php\/2024\/06\/10\/serverless-jupyter-notebooks-at-meta\/","url_meta":{"origin":672,"position":2},"title":"Serverless Jupyter Notebooks at Meta","date":"June 10, 2024","format":false,"excerpt":"At Meta, Bento, our internal Jupyter notebooks platform, is a popular tool that allows our engineers to mix code, text, and multimedia in a single document. Use cases run the entire spectrum from what we call \u201clite\u201d workloads that involve simple prototyping to heavier and more complex machine learning workflows.\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":274,"url":"https:\/\/fde.cat\/index.php\/2021\/08\/31\/foqs-scaling-a-distributed-priority-queue\/","url_meta":{"origin":672,"position":3},"title":"FOQS: Scaling a distributed priority queue","date":"August 31, 2021","format":false,"excerpt":"We will be hosting a talk about our work on Scaling a Distributed Priority Queue during our virtual Systems @Scale event at 11 am PT on Wednesday, February 24, followed by a live Q&A session. Please submit any questions to systemsatscale@fb.com before the event. The entire Facebook ecosystem is powered\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":768,"url":"https:\/\/fde.cat\/index.php\/2023\/10\/05\/meta-contributes-new-features-to-python-3-12\/","url_meta":{"origin":672,"position":4},"title":"Meta contributes new features to Python 3.12","date":"October 5, 2023","format":false,"excerpt":"Python 3.12 is out! It includes new features and performance improvements \u2013 some contributed by Meta \u2013 that we believe will benefit all Python users. We\u2019re sharing details about these new features that we worked closely with the Python community to develop. This week\u2019s release of Python 3.12 marks a\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":758,"url":"https:\/\/fde.cat\/index.php\/2023\/09\/07\/using-chakra-execution-traces-for-benchmarking-and-network-performance-optimization\/","url_meta":{"origin":672,"position":5},"title":"Using Chakra execution traces for benchmarking and network performance optimization","date":"September 7, 2023","format":false,"excerpt":"Meta presents Chakra execution traces, an open graph-based representation of AI\/ML workload execution, laying the foundation for benchmarking and network performance optimization. Chakra execution traces represent key operations, such as compute, memory, and communication, data and control dependencies, timing, and resource constraints. In collaboration with MLCommons, we are seeking industry-wide\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]}],"_links":{"self":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts\/672","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/types\/post"}],"replies":[{"embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/comments?post=672"}],"version-history":[{"count":0,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts\/672\/revisions"}],"wp:attachment":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/media?parent=672"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/categories?post=672"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/tags?post=672"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}