{"id":512,"date":"2021-12-07T16:08:14","date_gmt":"2021-12-07T16:08:14","guid":{"rendered":"https:\/\/fde.cat\/index.php\/2021\/12\/07\/applying-the-micro-batching-pattern-to-data-transfer\/"},"modified":"2021-12-07T16:08:14","modified_gmt":"2021-12-07T16:08:14","slug":"applying-the-micro-batching-pattern-to-data-transfer","status":"publish","type":"post","link":"https:\/\/fde.cat\/index.php\/2021\/12\/07\/applying-the-micro-batching-pattern-to-data-transfer\/","title":{"rendered":"Applying the Micro Batching Pattern to Data Transfer"},"content":{"rendered":"<h3>Building consistent data\u00a0replicas<\/h3>\n<p>If you have worked on data-rich software systems, chances are you have worked with a distributed architecture where one part of your system needs access to data owned by another part of the system. Whether that architecture is a modern distributed microservices architecture or a set of stand-alone applications looking to exchange data, you will have one party owning that data (let\u2019s call it the <strong><em>producer<\/em><\/strong>) and another party needing that data (let\u2019s call it the <strong><em>consumer<\/em><\/strong>).<\/p>\n<p>How should your consumers obtain the data of your producers, while striking the right balance between <a href=\"https:\/\/queue.acm.org\/detail.cfm?id=3236388\">latency and consistency<\/a>?<\/p>\n<p>One option, of course, would be to run real-time queries against the source system for maximum consistency. Blocking patterns like this mostly work well for small datasets but incur latency for larger result sets, especially when your data comes in the wrong representation or requires transformation.<\/p>\n<p>Another option is to proactively stream data from your producers into a central hub (or data lake) for consumers to consume from. Here the challenges are in synchronization of read\/write rates between producers and consumers, delivery guarantees by the data hub, and the sheer volume of\u00a0data.<\/p>\n<p>While an event streaming system like <a href=\"https:\/\/kafka.apache.org\/\">Kafka<\/a> is certainly the ideal option for real-time integration, it introduces complexities and comes at a cost that the business has to absorb. Not all use cases require the low latency event driven integration and don\u2019t stand to gain anything from the additional investment. An eventually consistent replica is sufficient for many use cases in CRM or ecommerce, especially for back office facing domains like catalog management, inventory management, or order processing.<\/p>\n<p>What I want to discuss in my blog today is a lower cost option that applies the same idea of pub\/sub based eventing systems, but on <strong>datasets<\/strong>. The dataset can itself be the result of a query or an aggregated event stream. We control the size of the dataset either by size (i.e. number of records or bytes) or by time elapsed between data\u00a0sets.<\/p>\n<p>If the dataset is sufficient to <a href=\"https:\/\/docs.microsoft.com\/en-us\/sql\/relational-databases\/replication\/snapshot-replication?view=sql-server-ver15\">describe the full current state of the producer<\/a>, I will call it a <strong><em>snapshot<\/em><\/strong><em>.<\/em><strong><em> <\/em><\/strong><em>A<\/em> <strong><em>delta<\/em><\/strong>, on the other hand, is a dataset that contains only the changes <a href=\"https:\/\/docs.microsoft.com\/en-us\/sql\/relational-databases\/replication\/transactional\/transactional-replication?view=sql-server-ver15\">since the last snapshot was taken<\/a>. The producer can control the frequency of the delta file production and in fact build them incrementally small\u200a\u2014\u200aa technique called <a href=\"https:\/\/hazelcast.com\/glossary\/micro-batch-processing\/\"><em>micro batching<\/em><\/a>.<\/p>\n<p>Snapshots are ideal for the consumer to reset itself (e.g. if applying the delta causes inconsistencies or the consumer is simply not able to keep up with the rate of delta files being produced by the producer). That way, the consumer can opt to skip delta files and simply consume the latest snapshot to reset itself. The frequency of the snapshot production itself can be controlled either by the producer or the consumer.<\/p>\n<p>In the following sections I will describe the capabilities of the system that runs in production at Salesforce today.<\/p>\n<h3>Data and\u00a0Metadata<\/h3>\n<p>One of the main principles we followed during development of the micro batching solution is the clear separation of data and metadata. Metadata is used to describe the properties of every snapshot and\/or delta file and includes attributes like the type of data producer, the producer tenant ID, the kind of data file along with the file identity, production timestamp etc., and whether this is a snapshot file or a delta file with the sequence number and reference to the applicable snapshot.<\/p>\n<p>The metadata layer is flexible and can take any custom attribute that a domain deems necessary to describe their files with. What we mean here by <strong>domain<\/strong> is a set of entities described in our <a href=\"https:\/\/livebook.manning.com\/book\/functional-and-reactive-domain-modeling\/chapter-1\">functional domain<\/a> model and can be any part of the system that manages data, such as the <em>products and catalog<\/em> service, the <em>pricing<\/em> service, or the <em>order<\/em> service. A <strong>tenant<\/strong> is the owner of its data across all domains in a <a href=\"https:\/\/engineering.salesforce.com\/l33t-m10y-f04f38127b82?source=friends_link&amp;sk=785c13a7675907d6f0fa41a0a0d48f7c\">multi-tenant architecture<\/a>, and we identify the tenant with every\u00a0dataset.<\/p>\n<p>Here is an example metadata object for a tenant \u201cgoodstuff_01\u201d looking to publish their catalog domain data for one of its stores. Exactly which store is described with custom attributes:.<\/p>\n<p># System attributes<br \/>PRODUCER_TYPE=ecommerce<br \/>PRODUCER_TENANT_ID=goodstuff_01<br \/>DATASET_DOMAIN=catalog<br \/>DATASET_TYPE=snapshot<br \/>DATASET_FORMAT_TYPE=plain-text<br \/>DATASET_FORMAT_VERSION=1.0<br \/>DATASET_IS_COMPRESSED=false<br \/>DATASET_IS_ENCRYPTED=false# Custom attributes<br \/>CUSTOM:WEBSTORE=goodstuff.com<br \/>CUSTOM:DESCRIPTION=Product catalog of the GoodStuff eCommerce store<\/p>\n<p>For the data itself, we simply provide a protected and isolated storage area for every producing tenant. If you use hierarchical data stores, you can group delta files and snapshots easily.<\/p>\n<p>There is one client library that can be linked with the producer or consumer as a <a href=\"https:\/\/docs.spring.io\/spring-framework\/docs\/current\/reference\/html\/core.html#beans-dependencies\">dependency<\/a>. You assume a role of either data producer or data consumer with this client, or you can assume both roles combined. If your role is that of a consumer, there are options to long poll the metadata APIs for new records or register a call-back endpoint to get notified with a <a href=\"https:\/\/en.wikipedia.org\/wiki\/Webhook\">webhook<\/a>. With an update from the metadata layer, you can then fetch the data file\u00a0itself.<\/p>\n<p>The diagram depicts a typical flow where producers (two in this case) deliver their data to a consumer. First, the producers describe the details of their respective snapshot or delta files into the metadata store (1a + 2a). Next, they deliver the actual data file into the object store for consumption (1b + 2b). Both these actions are combined into a single API endpoint implemented with the producer library. Given that a single endpoint results in two actions (metadata post and data upload), we build <a href=\"https:\/\/resilience4j.readme.io\/docs\">resiliency<\/a> and fault tolerance into the producer to provide transactional guarantees. A transaction is considered complete only after the data files have been uploaded to the object store. At that point we mark the metadata record as visible to the consumer with a reference to the data\u00a0object.<\/p>\n<p>The consumer finally receives the updates from the metadata store (either through notification or by query) for new snapshot files (3a) or new delta files (4a) to consume from the object store (3b +\u00a04b).<\/p>\n<h3>System Synchronization<\/h3>\n<p>The architecture I have shown thus far makes one strong assumption: all data posted to the object store is immediately \u201clive\u201d and available in production. In reality, however, many systems have data lifecycle phases where data moves from development \u2192 staging \u2192 production environments. Wouldn\u2019t it be great if we could replicate the data set from the pre-production environment and then set it \u201clive\u201d with a simple trigger event? With the addition of event handling on top of our metadata store we can do exactly that. This small addition elevates our data replication solution to a system synchronization solution.<\/p>\n<p>To illustrate this, let us consider an ecommerce platform with multiple functional domains; for example, a product and a search domain. A merchant may load a product catalog into the staging environment of this ecommerce platform (marked yellow in the diagram) and then continue to work on the product catalog until a point when they want to promote the catalog to their production environment. They post the data and metadata to the object and metadata store respectively (1a + 1b) and mark the data ready for consumption (1c).<\/p>\n<p>The consuming domain here is the search and discovery engine, one that has to index the product catalog but only make those products that have actually been promoted to the production environment already searchable on the storefront. The search system consumes the product catalog (2a + 2b) but keeps the product index offline for the time being. When the product catalog is actually being pushed to production, a single metadata event (3) is posted to the event queue. Consuming that event (4), the search system now updates the state of the already-indexed product catalog to activate it. Meanwhile, the product catalog can keep changing in the staging environment, which the search system consumes for the next index\u00a0build.<\/p>\n<h3>CQRS for on-demand data\u00a0delivery<\/h3>\n<p>Taking the idea of an event queue to one more level, we can actually build system synchronization where the consumer <em>requests <\/em>the data they want the producer to <em>respond<\/em> with next. <a href=\"https:\/\/martinfowler.com\/bliki\/CQRS.html\">CQRS (Command Query Responsibility Segregation)<\/a> is a pattern that can be applied\u00a0here.<\/p>\n<p>What looks like a task queue in the diagram is really just a metadata object without any associated data, created by the target system to communicate its requests to the data producers (1). The producers are configured as consumers in this case and get notified on new data requests (2). Notification here can be realized webhook style, long poll APIs, or even time based queries triggered by the data owning\u00a0domain.<\/p>\n<p>The producing domain then delivers data back to the metadata and object stores (3a + 3b), along with an event to the response queue (3c) indicating to the consumer that the data is available. The target system in turn subscribes to those results (4a) and can update itself with the produced data (4b +\u00a04c).<\/p>\n<h3>Micro batching<\/h3>\n<p>Now that I discussed how to replicate datasets, how to replicate datasets eagerly, and how to replicate datasets on demand, I want to look at one more optimization technique: micro batching.<\/p>\n<p>\u201c<em>Micro batching is a technique where incoming data to be moved is grouped into small batches to achieve some of the performance advantages of batch processing, without increasing the latency for each data element too much. Micro batching is typically applied in systems where the frequency and distribution of data arrival is variable. The system will grab whatever incoming data has been received in a time window and will process it in a batch. The process is executed repeatedly.\u201d <\/em>[1]<\/p>\n<p>Many streaming systems, such as the <a href=\"https:\/\/databricks.com\/blog\/2015\/07\/30\/diving-into-apache-spark-streamings-execution-model.html\">Spark Streaming RDDs<\/a>, actually use micro batch processing to achieve <strong>scalability<\/strong>. In our case, we want to approach the problem from the other end to achieve better <strong>latency<\/strong> by breaking our large batch requests into incrementally small micro batch requests.<\/p>\n<p>Instead of synchronizing our target system by delivering snapshot and delta files, we now configure our producers to deliver every record change as event onto a time ordered event log. The \u201cbatching\u201d itself is being done by parallel stream processors that aggregate the stream into incrementally small delta files and post them into the object store along with a metadata record to the metadata store. The consumer continues to consume the micro batch at its own read rate, albeit in much smaller increments as configured by the\u00a0system.<\/p>\n<p>The configured size (in time or space) along with the number of concurrent stream processing nodes controls the latency we can achieve for our eventually consistent replicas.<\/p>\n<h3>Conclusion<\/h3>\n<p>There are many approaches to building high scale\/low latency integrations for data-rich systems. This blog introduced one purpose-built solution we leverage in our ecommerce offering here at Salesforce. It uses datasets and applies key principles familiar to data integration technologies, including<\/p>\n<p>snapshot and delta file separation as used in replicationdata and metadata separation as used in\u00a0ETLmicro batching as used in stream processing<\/p>\n<p>We use our implementation to produce eventually consistent replicas, to synchronize multiple domain services, and to distribute data across the compute environments.<\/p>\n<p>As with everything in technology, no one solution meets all needs. Our de-coupling of producer from consumer means no delivery guarantees and some guesswork when it comes to expiring both data and metadata. At the same time, we don\u2019t apply any filter predicates and err on the side of over-delivering data on the producer side. All required data transformation adds additional latency and should ideally be pushed down into the producer domain, as close to the data as possible.<\/p>\n<h3>References:<\/h3>\n<p>[0] <a href=\"https:\/\/queue.acm.org\/detail.cfm?id=3236388\">https:\/\/queue.acm.org\/detail.cfm?id=3236388<\/a><br \/>[1] <a href=\"http:\/\/tutorials.jenkov.com\/java-performance\/micro-batching.html\">http:\/\/tutorials.jenkov.com\/java-performance\/micro-batching.html<\/a><br \/>[2] <a href=\"https:\/\/hazelcast.com\/glossary\/micro-batch-processing\/\">https:\/\/hazelcast.com\/glossary\/micro-batch-processing\/<\/a><br \/>[3] <a href=\"https:\/\/docs.microsoft.com\/en-us\/sql\/relational-databases\/replication\/types-of-replication?view=sql-server-ver15\">https:\/\/docs.microsoft.com\/en-us\/sql\/relational-databases\/replication\/types-of-replication<\/a><br \/>[4] <a href=\"https:\/\/livebook.manning.com\/book\/functional-and-reactive-domain-modeling\/chapter-1\">https:\/\/livebook.manning.com\/book\/functional-and-reactive-domain-modeling<\/a><br \/>[5] <a href=\"https:\/\/docs.spring.io\/spring-framework\/docs\/current\/reference\/html\/core.html\">https:\/\/docs.spring.io\/spring-framework\/docs\/current\/reference\/html\/core.html<\/a><br \/>[6] <a href=\"https:\/\/en.wikipedia.org\/wiki\/Webhook\">https:\/\/en.wikipedia.org\/wiki\/Webhook<\/a><br \/>[7] <a href=\"https:\/\/resilience4j.readme.io\/docs\">https:\/\/resilience4j.readme.io\/docs<\/a><br \/>[8] <a href=\"https:\/\/martinfowler.com\/bliki\/CQRS.html\">https:\/\/martinfowler.com<\/a><br \/>[9] <a href=\"https:\/\/databricks.com\/blog\/2015\/07\/30\/diving-into-apache-spark-streamings-execution-model.html\">https:\/\/databricks.com\/blog\/2015\/07\/30\/diving-into-apache-spark-streamings-execution-model.html<\/a><br \/>[10] <a href=\"https:\/\/engineering.salesforce.com\/l33t-m10y-f04f38127b82\">https:\/\/engineering.salesforce.com\/l33t-m10y-f04f38127b82<\/a><\/p>\n<p><em>Learn how you can join our team to tackle problems like this one by <\/em><a href=\"https:\/\/careers.mail.salesforce.com\/tpil-blogs\"><em>joining our Talent\u00a0Network<\/em><\/a><em>!<\/em><\/p>\n<p><a href=\"https:\/\/engineering.salesforce.com\/applying-the-micro-batching-pattern-to-data-transfer-5b5f72509da2\">Applying the Micro Batching Pattern to Data Transfer<\/a> was originally published in <a href=\"https:\/\/engineering.salesforce.com\/\">Salesforce Engineering<\/a> on Medium, where people are continuing the conversation by highlighting and responding to this story.<\/p>\n<p><a href=\"https:\/\/engineering.salesforce.com\/applying-the-micro-batching-pattern-to-data-transfer-5b5f72509da2?source=rss----cfe1120185d3---4\">Read More<\/a><\/p>","protected":false},"excerpt":{"rendered":"<p>Building consistent data\u00a0replicas If you have worked on data-rich software systems, chances are you have worked with a distributed architecture where one part of your system needs access to data owned by another part of the system. Whether that architecture is a modern distributed microservices architecture or a set of stand-alone applications looking to exchange&hellip; <a class=\"more-link\" href=\"https:\/\/fde.cat\/index.php\/2021\/12\/07\/applying-the-micro-batching-pattern-to-data-transfer\/\">Continue reading <span class=\"screen-reader-text\">Applying the Micro Batching Pattern to Data Transfer<\/span><\/a><\/p>\n","protected":false},"author":0,"featured_media":0,"comment_status":"","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"spay_email":"","footnotes":""},"categories":[7],"tags":[],"class_list":["post-512","post","type-post","status-publish","format-standard","hentry","category-technology","entry"],"jetpack_featured_media_url":"","jetpack-related-posts":[{"id":542,"url":"https:\/\/fde.cat\/index.php\/2022\/02\/15\/embracing-mutable-big-data\/","url_meta":{"origin":512,"position":0},"title":"Embracing Mutable Big Data","date":"February 15, 2022","format":false,"excerpt":"Maintaining billions of mutable activity records and serving them in millions of\u00a0ways Summary Salesforce Activity Platform (AP) ingests, stores, and serves user\u2019s activity data as time-sorted data sets. The accumulated data maintained by Activity Platform appears to be time-series big data. Even so, due to application requirements, the data stores\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":587,"url":"https:\/\/fde.cat\/index.php\/2022\/02\/15\/embracing-mutable-big-data-2\/","url_meta":{"origin":512,"position":1},"title":"Embracing Mutable Big Data","date":"February 15, 2022","format":false,"excerpt":"Summary Salesforce Activity Platform (AP) ingests, stores, and serves user\u2019s activity data as time-sorted data sets. The accumulated data maintained by Activity Platform appears to be time-series big data. Even so, due to application requirements, the data stores for activity records need capabilities from\u00a0OLTP\u00a0style databases. Managing mutable big data is\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":322,"url":"https:\/\/fde.cat\/index.php\/2021\/08\/31\/consolidating-facebook-storage-infrastructure-with-tectonic-file-system\/","url_meta":{"origin":512,"position":2},"title":"Consolidating Facebook storage infrastructure with Tectonic file system","date":"August 31, 2021","format":false,"excerpt":"What the research is:\u00a0 Tectonic, our data center scale distributed file system, enables better resource utilization, promotes simpler services, and requires less operational complexity than our previous approach. Our previous storage infrastructure consisted of a set of use-case specific storage systems. Clusters, or instances of these storage systems, used to\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":179,"url":"https:\/\/fde.cat\/index.php\/2021\/01\/06\/fastingest-low-latency-gobblin-with-apache-iceberg-and-orc-format\/","url_meta":{"origin":512,"position":3},"title":"FastIngest: Low-latency Gobblin with  Apache Iceberg and ORC format","date":"January 6, 2021","format":false,"excerpt":"Co-authors: Zihan Li, Sudarshan Vasudevan, Lei Sun, and Shirshanka Das Data analytics and AI power many business-critical use cases at LinkedIn. We need to ingest data in a timely and reliable way from a variety of sources, including Kafka, Oracle, and Espresso, bringing it into our Hadoop data lake for\u2026","rel":"","context":"In &quot;External&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":672,"url":"https:\/\/fde.cat\/index.php\/2023\/01\/31\/asynchronous-computing-at-meta-overview-and-learnings\/","url_meta":{"origin":512,"position":4},"title":"Asynchronous computing at Meta: Overview and learnings","date":"January 31, 2023","format":false,"excerpt":"We\u2019ve made architecture changes to Meta\u2019s event driven asynchronous computing platform that have\u00a0 enabled easy integration with multiple event-sources.\u00a0 We\u2019re sharing our learnings from handling various workloads and how to tackle trade offs made with certain design choices in building the platform. Asynchronous computing is a paradigm where the user\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]},{"id":342,"url":"https:\/\/fde.cat\/index.php\/2021\/08\/31\/how-we-built-a-general-purpose-key-value-store-for-facebook-with-zippydb\/","url_meta":{"origin":512,"position":5},"title":"How we built a general purpose key value store for Facebook with ZippyDB","date":"August 31, 2021","format":false,"excerpt":"ZippyDB is the largest strongly consistent, geographically distributed key-value store at Facebook. Since we first deployed ZippyDB in 2012, this key-value store has expanded rapidly, and today, ZippyDB serves a number of use cases, ranging from metadata for a distributed filesystem, counting events for both internal and external purposes, to\u2026","rel":"","context":"In &quot;Technology&quot;","img":{"alt_text":"","src":"","width":0,"height":0},"classes":[]}],"_links":{"self":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts\/512","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/types\/post"}],"replies":[{"embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/comments?post=512"}],"version-history":[{"count":0,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/posts\/512\/revisions"}],"wp:attachment":[{"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/media?parent=512"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/categories?post=512"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/fde.cat\/index.php\/wp-json\/wp\/v2\/tags?post=512"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}