Maintaining billions of mutable activity records and serving them in millions of ways
Summary
Salesforce Activity Platform (AP) ingests, stores, and serves user’s activity data as time-sorted data sets. The accumulated data maintained by Activity Platform appears to be time-series big data. Even so, due to application requirements, the data stores for activity records need capabilities from OLTP style databases. Managing mutable big data is very different from managing immutable big data. Activity Platform builds its own data stores from open source NoSQL databases. However, the total solution lies not just in the use of NoSQL databases, but also in the processing pipelines that fully embrace the fact of eventual consistency.
The Mutable Nature of the Activity Data
It walks like a duck, swims like a duck, and quacks like a duck, yet it is not a duck.
User’s activity data such as emails, meetings, voice and video calls are served to applications as “time-series” data. But those activity data are not really “time-series” in the strict technical sense. This can be illustrated by comparing the time-series data served by time-series databases and the activity data managed by Activity Platform.
Time-Series Data and Time-Series Database
A record of time series data usually has two time values or timestamps associated with it. One is the activity time, which indicates when an activity takes place. The other is the capture time, which is the time when the existence of an activity is made known to the system. The capture time can be the time when a log or the activity record is ingested into the system, such as database insertion time or message enqueue time. Applications such as Salesforce Einstein Activity Capture (EAC) lean on activity time to reconstruct the timeline for customer engagement. On the other hand, stream processing frameworks such as Kafka Streams would use capture time to implement stream processing algorithms such as windowing or checkpointing. Activity time and capture time may or may not be the same, depending on the nature of the activity. Nevertheless, a different name serves a different purpose.
Any database with a decent indexing capability can be used to serve “time-series” data. After all, timestamp is an index-friendly data type. Off-the-shelf time-series databases or storage engines take a step further to optimize the use cases. It optimizes its storage in the file system through compaction and carefully chosen compression algorithms, while indexing and partitioning the data files to make file IO efficient and caching effective. In order to achieve that in the realm of time-ordered and time-ranged data access, it usually requires data ingestion in a timely fashion. For example,
Historical data — importing last year’s traffic data in a few hours and then closing corresponding data shards for write.Fresh data — having a 48-hour window for data ingestion for a daily log collection. Once the 48-hour window is closed, so is the corresponding data shards to any write request.
In the case of historical data, data may not be available for read while importing. In the case of fresh data, though ingested data may be available for read requests before the data shards are closed, the performance would be sub-optimal. In both cases, once the storage engine recognizes that there is no more writes to the concerned data files, it applies optimization procedures to relevant data files.
Time-series databases and storage engines employ a subsequent optimization step to achieve fast data retrieval and aggregation. To facilitate that, it would require either
the span of capture time being much shorter than the span of activity time, like in the case of historical data import, orthe capture time to follow activity time closely, as in the case of fresh data.
The need for optimization also implies data immutability as no more write operation is accepted afterwards. Any addition, subtraction, or mutation to the data set would require repeating the optimization process.
With this understanding, let’s examine the activity data in Activity Platform as time-series data.
Activities in Activity Platform
Each data store in Activity Platform is a multi-tenant system. Activity data from all tenants share data storage. Data are served per tenant, filtered by customer relationship such as account, opportunity, or contact.
Activity Platform supports near real-time access to newly ingested data as well as historical data up to the Time-To-Live (TTL) period set by a tenant. It also acts as the backing store for certain reporting functionality and the source of training data for machine learning.
Before going deep into how Activity Platform designs its data stores and processing pipelines, here are some highlights that make activity data shy away from off-the-shelf time-series databases.
A meeting has to be created ahead of meeting time.
— A meeting invite can be created a few minutes or a few months before it starts. That makes the activity time later than its capture time. A time-series storage based on activity time would have to be created sooner than its proper time and stay un-optimized for a long period of time.
— Unlike a web log, the time of visit is always earlier than the log’s capture time. No link click history would have something like “This link is to be clicked in 5 seconds”.An instance of a recurring meeting in the past is modified long after its activity time.
— One example is to include new attendees to allow access to attachments and recordings.Activity data may be captured long after it was created.
— When onboarding a new user, Activity Platform acquires user’s activity data, such as emails and meeting records, up to six months ago. Imported data are additions to the existing data sets.The consideration of GDPR.
— General Data Protection Regulation (GDPR) requires support of data erasure on demand. The activity data in Activity Platform may have a TTL from six months to two years. This is a subtraction to existing time-series data sets.System-generated data fields for new features.
— To support new features, Activity Platform generates new fields via different mechanisms to augment raw activity data. Sometimes it is necessary to backfill such system-generated fields for new features to work adequately. This is to apply updates or mutations to existing data sets.
One can observe the effects of mutable time-series by watching the timeline UI for an account. A user may see a meeting scheduled for tomorrow in the morning but discover the meeting disappeared (was canceled) by evening. A user may also notice some emails from last month not seen an hour ago, yet appearing in the timeline just now because a coworker has just enabled their seat for EAC.
New views of data are made available to applications as soon as corresponding write operations are complete. There is no extra ETL to another database for online queries. In many aspects, the databases of Activity Platform require more OLTP capabilities than OLAP functions.
Off-the-shelf time-series databases do not meet the requirements that Activity Platform needs to support. Traditional relational databases do not scale for the accumulated data volume [1]. To serve activities as a time series, Activity Platform needs a big-data store that supports mutability, global indexing, and stream-like, from write to read data visibility [2]. Activity Platform had to look to other open source technologies to build its platform.
The Design
When Activity Platform started its charter, there was no large scale distributed relational database such as Google Spanner in the market. Activity Platform built its data stores based on NoSQL databases such as Apache Cassandra and ElasticSearch, and used AWS S3 as the object store. To cope with different consistency models and performance characteristics of heterogeneous storages, Activity Platform has implemented a web of asynchronous processing flows to carry out data ingestion, mutation and deletion.
The asynchronous processing graph is composed of many stream processing runtimes. They are an integral part of the platform. It helps to mitigate the disadvantage of a non-externally-consistent system built on top of heterogeneous eventually consistent data stores.
Embrace Eventual Consistency
The fact that the underlying databases are eventual-consistency systems is not the main reason that Activity Platform is a non-externally-consistent system. It is because Activity Platform’s processing graph is a combination of many subsystems, and each subsystem has its own NoSQL stores and processing pipelines composed of streaming processing runtimes, like Apache Storm, Kafka Streams, and Spark Streaming, with different latency characteristics.
Besides the main subsystem that acquires and stores activity data, there are subsystems for activity insights, customer engagements, and alerts. After a new activity finishes its journey in one subsystem and produces its final result in the store, the subsystems cross-pollinate, too. Insights generated in the activity insights subsystem are sent back to activity store to augment the original activity data. Engagement activities are recognized and correlated with prior emails to generate alerts to users. Therefore, data available in one subsystem will have some delay before they are visible or take effect in another subsystem. In a system like Activity Platform, it is very sensitive to latency and availability. There is another post discussing its effect [3].
Knowing what we have at our disposal, we embraced eventual consistency into everything we design and implement, including data schema and processing pipelines.
Architectural Assumptions
These are the main architectural assumptions in the design of our data schema and processing pipelines.
Idempotent OperationIn Order ExecutionNo Assumption on Sequence of EventsData Convergence
Idempotent Operation
All the write operations to data stores have to be guaranteed delivery. Those write operations include insertion, update, and deletion. In many use cases, we need to create companion tables with the main data table to support different use cases due to the limitation of underlying NoSQL database. One logical write to an activity record therefore translates into multiple database write operations.
As there can be more than one side effect from a processing procedure, it is hard to implement “exactly-once” semantics. The design adapts “at least once” semantics for retry-on-error handling using message replay. Idempotent operations are required to implement “at least once” semantics. [4]
In Order Execution
To support “at least once” semantics, the flow has to strictly process messages in order. We don’t want to modify an activity that is deleted, or apply stale information over the latest update. In order execution is a must-have.
No Assumption on Sequence of Events
As the messages flow down the processing graph and fanned out into downstream, a message may appear twice due to message replay from upstream. Downstream processing runtimes would involuntarily carry out the same computation for the same information more than once. Though this is already incorporated into idempotent operations, there is another reason for the computation not to depend on the “natural sequence of events.”
One example to illustrate this point is capturing engagement activities from a contact. The ingestion pipeline flushes down the message for the campaign email that a user sent out. This email would be logged as “campaign sent, waiting for response.” Later, when the contact opens the email and clicks a link in the email, the corresponding email will transition to the next state, “email opened by contact,” or “link clicked.” This would be a very intuitive design of state transition: no email can be opened if it is not sent yet. While in the pipeline, a processing unit may receive another message containing the same campaign email due to a replay at upstream after the link-click is recorded. This can happen if the contact receives the campaign email soon enough and opens it immediately. This out-of-order of event occurrence perceived by the pipeline can also happen if the email ingestion pipeline slows down or stops, while the response-capturing pipeline is running properly (see [3]).
As the capability of Check And Set (CAS) is either missing from NoSQL database, or performance-wise expensive to use, it requires more considerations to design the pipeline [5]. There is no universal solution for this requirement. For this particular scenario, one alternative is to log states instead of transitioning state. Each message represents a state independent of what current state in the database. All states that have happened are added in the database. A reader can interpret the latest state from the overall state data. This way, the pipeline processing can be idempotent and faithfully process messages in order.
Data Convergence
An activity record such as an email or meeting produces many augmented fields to the original activity artifact. To name a few,
Machine-learning model generated insightsPrivacy and sharingAffiliation to other Salesforce’s systemEmail threadsRecipient engagements, such as email open and reply
The augmented data fields come into the system in their own due time. The variation in time of arrival is partly due to the latency of each pipeline and its position in the flow. The nature of the event also plays a part, for example, an email being opened three days after it was received.
To design for the use cases, one could design a hub-spoke like data scheme. The main artifact of an email or meeting record resides in the hub table, while augmented data in spoke tables pointing back to the hub table. This design can work if the data in spoke tables can live without the main entry in the hub table for a while. For example, the ML model generated insights may arrive in the database a few milliseconds sooner than the main record being inserted into the hub table.
It is impossible to design a reliable pipeline-join mechanism as there is no hard limit on the maximum latency for either pipeline.We cannot read a single table on each write operation in the system. It does not scale. Also, most of the stores have no transaction semantics to guard against race conditions.
Activity Platform adopted a system of a “natural key” that can generate the same identifier based on the artifact at hand (in memory) to insert/update data without looking into database first. Therefore, all the data, whether user generated or system generated, falls into place. This is an integral part of embracing eventual consistency and supporting idempotent operations.
One extra benefit of using column-based storage is that we can merge some of the spoke tables direct into the hub table. Augmented data can be “upserted” any time regardless of the existence of the main body of the activity record. This further expedites read operations. Though the readiness of a “complete” record still depends on the slowest pipeline, the whole system is behaving “as soon as possible,” just like the eventual consistency suggests.
Conclusion
In a parallel universe, this system could have been implemented in a totally different architecture.
An ETL heavy processing flow with frequent micro-batches, and the system is backed by analytics-friendly databases.A micro-service proliferated, Stage Event-Driven Architecture (SEDA) system backed by a massive distributed relational database.A smaller set of asynchronous processing pipelines to supplement the above designs.
Today’s Activity Platform system, an asynchronous processing graph backed by NoSQL databases, is the result of a few factors, such as availability of technology, talents in the team, and driving forces such as product requirements at each stage of development [6].
This at-least once, in-order, idempotent processing system that promotes data convergence is an example that it is possible to implement an OLTP-like system while embracing one of the inevitable shortcomings of the NoSQL database, eventual consistency. To implement and grow such a system, it takes equal amount of consideration at every part of the system, not just at the data persistence layer.To perform updates and deletions, the system needs to be able to search through large sets of data. This is not completely the consequence of data denormalization. For this purpose, we built application-level index tables and sometimes leverage ElasticSearch indices built for API and UI. The support for truly global secondary indices by the database would be extremely useful and cost-effective. However, a distributed and replicated index also needs no less considerations in design, implementation and scale planning. This is probably true even for distributed relational database that promises the same familiarity of traditional relational database and scalability like NoSQL databases.Dealing with differences among database technologies is an important aspect in the system design. For example, the use of Apache Cassandra and ElasticSearch as one indexed data store represents a constant design challenge. We need to
— make sure the data in both stores are in sync, except for a brief moment due to the fact they are two separate stores.
— manage differences such as their consistency models, throughput and latency, scalability strength and limitation, and life-cycle management for the data in tables and indices.This is still an evolving system. As more new types of activities are ingested into Activity Platform, and more system-generated augmentation fields are added to the records, pipelines are inserted or re-allocated. The processing graph changes as a result.
Managing mutable big data is very different from managing immutable big data such as that used in machine learning, data analytics, or cyber security. As the latter is much addressed by the industry, Activity Platform is at the forefront of this domain providing our unique solution.
NOTES
[1] At the time of writing, the amount of daily ingestion to Activity Platform may not be on par as the log volume at certain big social networking sites. However, each activity record, such as an email and meeting, is much bigger than a log entry. Also, Activity Platform has to keep the records up to 2 years. The value of an activity record does not decrease as fast as log entries. It is the accumulated data volume and mutation operations associated with it that makes the subject interesting.
[2] I was going to use the term “near real-time,” but decided not to. Some pipeline units have latencies of a few seconds at 99% of time. As the data and computation propagates downstream, the perceived latency increases. In general it is still in the range of seconds. Unlike a batch processing system, new data may have a delay up to 15 to 20 minutes.
[3] The blog post “Analytical Model for Capacity and Degradation in Distributed Systems”, has an extensive discussion on the effect of availability to latency measurement. In a non-transactional big-data system like Activity Platform, if the system would like to support UI and API use cases similar to a system backed by an OLTP database, it is very sensitive to latency and availability.
[4] In a long chain of stream processing runtimes, if to use exactly-once processing in any runtime in the chain, it can imply that every runtime in the chain except for the leaf runtime units needs to apply exactly-once semantics. If any upstream runtime exercises at least once semantics and generated the same message twice to the downstream, the two messages carrying the same information are two separate messages to the downstream. The downstream processing unit may or may not be able to discern the two and deduplicate the messages. It may result in errors hard to troubleshoot in the system.
[5] There is no point in building a distributed transaction facility on top of a system composed of NoSQL databases. Those stores generally do not support commit-rollback semantics. Their support of atomic operations is thin or nonexistent. There is no isolation between sessions and their consistency models differ. Among desired ACID properties, only durability is secured.
[6] This is an architecture and system distilled from many years of engineering effort, from RelateIQ, SalesforceIQ, and now to Activity Platform.
Related Blog Posts
Real-time Einstein Insights Using Kafka Streams https://engineering.salesforce.com/real-time-einstein-insights-using-kafka-streams-ca94008c2c6f
Analytical Model for Capacity and Degradation in Distributed Systems, https://engineering.salesforce.com/analytical-model-for-capacity-and-degradation-in-distributed-systems-f0888ec62ecc
Embracing Mutable Big Data was originally published in Salesforce Engineering on Medium, where people are continuing the conversation by highlighting and responding to this story.