Real-time Einstein Insights Using Kafka Streams

Sales representatives deal with hundreds of emails everyday. To help them prioritize, Salesforce offers critical insights on emails received. These insights are either generated by our deep learning models or defined by the customer by matching keywords using regex expressions. Insights are generated in real time in our microservice architecture, which is built using Kafka Streams. Kafka acts as the central nervous system of our architecture; crawled emails are published and then subsequent Kafka Streams jobs perform a variety of operations to generate useful insights for increased sales representative productivity.

Problem Statement and Solution

When we started building our insights data pipeline, Kafka was already part of our infrastructure acting as a messaging system. We had the option of either going with Apache Storm or Apache Spark, whose cluster and version was managed by infrastructure team, or to go with Kafka Streams, which can be containerized and can be deployed on existing container orchestration framework.

We decided to go with Kafka Streams for the following reasons:

Simple Data Operations: The majority of our jobs are data transformers that perform map/filter operations. These jobs read from Kafka, transform or filter out the data, and then write that data back to Kafka. Such operations are native to Kafka Streams.Minimal Code for Basic Operations: We realized that, for simple map/filter operations, using Storm was excessive. We needed a simpler way to write such jobs without writing Storm spouts and bolts and connecting them in a tedious way. Kafka Streams offers a higher level Domain Specific Language (DSL)to do this with a minimal amount of code.Version Upgrades: In our organization, the version of Storm and Spark is maintained by the infrastructure team. Considering our many stakeholders, it’s painful to upgrade versions of these frameworks without a major effort from every involved team. On the other hand, we can upgrade our Streams library at-will as long it satisfies compatibility with Kafka broker version (compatibility matrix).No Scheduler Needed for Long-running Process: Unlike Spark where we need separate a Azkaban or Airflow scheduler, Kafka Streams doesn’t need its own scheduler, especially for our use case where we need continuously running streaming jobs. We can schedule the job using Docker orchestrator’s REST API (Nomad, in our case) and rely on the Docker Orchestration framework to bring the job back up if it goes down for some reason.

Kafka Streams also has the following characteristics that made it an ideal candidate for our use case:

It’s a library, not a framework: Hence, no cluster creation, management, or monitoring is required. Also, it’s easier to upgrade the library than to upgrade the cluster. By simply adding the Kafka Streams dependency in your JVM application, you can create the data processing topology.Containerization of application: You can create containers for your application and run it on your container orchestration framework. Today we are running them on Nomad; tomorrow we could migrate them to Kubernetes. As a matter of fact, we did migrate our Kafka Streams jobs from DC/OS to Nomad seamlessly.No single point of failure: There is no nimbus or name node that is a central point of failure to manage like there is when running Storm or Spark jobs. You can build a resilient pipeline that can tolerate the failure of nodes. The failure of a single container won’t bring down your topology.Scale up without downtime: You can spin up more instances as you need them or scale down if required with no down time, unlike with Spark or Storm. Kafka’s group management protocol is used to assign Kafka partitions to new instances of the stream job.No backpressure: In complex data pipelines that consist of multiple jobs that are downstream to each other, handling backpressure is a major requirement. As Kafka streams is a higher level abstraction over the Kafka consumer, it uses a pull-based model while reading data from the topic. Hence, downstream jobs can control the rate at which they consume the data. Also, there is no buffering of records until the record read is completely processed, so it doesn’t need a backpressure mechanism.Fault tolerance: Kafka Streams’ fault tolerance is built on Kafka consumer’s ability to handle failures. If one of the instances running the application fails, Kafka Streams will just assign the failed tuple to one of the running instances.Developer productivity: Last but not least, it’s very easy to onboard new developers onto Kafka Streams, and the API is super easy to use. It takes less time to get it running in production than other similar frameworks.

Kafka Streams Jobs to Generate Insights In Activity Platform

Activity Platform is a big data event processing engine that ingests and analyzes 100+ million customer interactions every day to automatically capture data and to generate insights and recommendations. The following diagram gives a very high level overview of our insights pipeline built using Kafka Streams.

The first job in the pipeline is Gatekeeper Kafka Streams job, which reads data from email input topic, applies filtering to remove spam and automated emails using an NLP library, validates that all required fields are present in the read record, and writes valid emails to another topic (Filtered Email).

From the Filtered Email Kafka topic, a bunch of extractor jobs read emails independently and do inference.

There are four distinct types of extractors that are responsible for generating insights:

Tensorflow Extractors: We are running Kafka Streams application to read/write data from Kafka and Tensorflow model server inside single container. How the Streams application is calling the Tensforflow model server and doing inference is beyond the scope of this blog and will be tackled in a subsequent article.Regex/Keyword Extractors: We offer the ability to sales reps to create custom insights. A custom Regex extractor Streams job pulls up the custom configuration stored in Postgres store, applies rules on incoming streaming emails, and generates custom insights if the rules are satisfied.Spark ML Extractors: We still have a few old Spark jobs that run Spark ML models. These will be migrated to Kafka Streams when Tensorflow models become available.Einstein Conversation Insights Extractor: Einstein Conversation Insights are important moments generated from sales calls. This extractor pipeline is not owned by the Insights team but, when data is extracted, it’s written to an Extracted Data Kafka topic from which it’s converted to insights and persisted by insights Streams jobs.

The Email to Activity Transformer Streams Job converts email to an activity that is an internal representation of email data and gets stored in Cassandra by activity persister Streams job. This Cassandra store powers multiple features in products like reply/bounce detection in High Velocity Sales and compound insights.

The Insight Publisher Streams Job is responsible for converting extracted data from extractors to Insight artifacts. It pulls up insight configuration from Postgres store, caches it in memory, and applies rules from configuration to generate final insight data that gets written to the insight Kafka topic.

Finally, the Insight Persistor Job stores data read from the insight Kafka topic in the Cassandra store. This Cassandra store powers insights presented in activity timeline and Salesforce mobile inbox.

Advantages of Kafka Streams

On top of the advantages we originally identified that make Kafka Streams the best fit for our use case, we’ve seen additional benefits since adopting it.

Streams DSL for Minimal Code: Kafka Streams DSL provides a nicer way to write the topology with minimal amount of code. This makes it extremely easy to understand what each job is doing by just reading a few lines of code and debugging for failures. For example, the following code snippet is from an insight publisher job that reads data from an extracted data input topic, applies rules from an insight rule cache, and then converts that data to an insight object and writes it to an insight topic. All of that happens with a few lines of code.val builder = StreamsBuilder()val inputStream = builder.stream(inputTopic, Consumed.with(byteArraySerde, byteArraySerde))inputStream
.filter{_, value -> value != null }
.mapValues{it -> insightGenerator.deserializeData(it)}
.flatMapValues{it -> insightGenerator.toInsights(it, insightTypeCache)}
.mapValues {it -> insightGenerator.serializeData(it)}
.to(insightTopic)

return builder.build()Use of Existing Container Orchestration Framework: Our infra team was already running Docker Orchestration framework (DC/OS) to support various types of backend services, so we didn’t need to create a new framework or clusters to support Kafka Streams jobs. We containerized the jobs and deployed them without any issues. Later, we also migrated them from DC/OS to Nomad without much effort.Fault Tolerance: Because there is no single point of failure, Kafka Streams create very resilient data pipelines. The failure of a few containers won’t bring down the whole pipeline. Partitions of failed containers get automatically assigned to running containers. This has significantly improved the uptime of the insights pipeline. On the other hand, we frequently see issues with a Storm worker going down and bringing down the whole Storm topology or name node in a Spark cluster, causing issues with scheduling of new Spark jobs. We save on infrastructure efforts by not having to keep specialized clusters up all the time.Scaling Up as Per Demand: Unlike Storm or Spark, you don’t have to bring down your Kafka Streams job to scale up or down the topology. You can scale up the number of containers and process increased in traffic without disrupting your running job. The catch in this case is that the maximum parallelism you can achieve is bounded by number of partitions of input Kafka topic.Saving Resources for Simple Jobs: For jobs involving simple map/filter operations, you won’t need a huge amount of memory and CPU resources. We are running individual containers with a minimal amount of resources.Contributing to Open Source: Kafka has a very vibrant and welcoming open source community. It encourages newcomers like me to participate and to contribute to the codebase. This led to the contribution of the following code changes to Kafka codebase.Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext(pull request)TopologyTestDriver should not require a Properties argument(pull request)Add tests that cover all of the cases for ReplicatedLog’s validateOffsetAndEpoch

Future Use Cases

We continue to build new data processing jobs using Kafka Streams. For stateful operations where we want to perform aggregations or joins, we want to use KTable, which will eliminate the need to use another database like Redis or Postgres.

For writing data from Kafka to Cassandra or Elasticsearch, we want to introduce Kafka Connect into our pipeline, which will help us move a large amount of data out of Kafka and into indexing or database systems.

References:

Kafka Streams Architecture: https://docs.confluent.io/current/streams/architecture.html#stream-partitions-and-tasksKafka Streams Work Allocation: https://medium.com/@andy.bryant/kafka-streams-work-allocation-4f31c24753cchttps://engineering.linecorp.com/en/blog/applying-kafka-streams-for-internal-message-delivery-pipeline/

Real-time Einstein Insights Using Kafka Streams was originally published in Salesforce Engineering on Medium, where people are continuing the conversation by highlighting and responding to this story.

Read More