Building the data delivery architecture that powers Twilio Engage

Solon Aguiar, Brian Lai on January 20th 2022

Back in October, during Signal, we announced the launch of our forthcoming product, Twilio Engage. With it, our customers could harness the combined power of Segment, Segment Personas, and Twilio to deliver one-to-one customer experiences seamlessly across channels and drive personalized engagement at scale.

In this blog post, we'll cover the story behind all that was needed to change the Personas data delivery system to support the new platform features. You will learn about the questions we asked ourselves, the architecture of our data delivery system pre- and post-launch, the scaling challenges, the metrics that we watched throughout our work, and many other aspects of the process. In the end, you'll understand our decision-making process, how that impacted our systems, and the end result.

We hope that this post not only tells the story of how we did it; but that it serves as an inspiration to anyone needing to deliver extensive systems enhancements in the face of ambiguity and uncertainty. We have built on top of an existing, well-thought-out platform and see it as our responsibility to share the story and pass our lessons forward.

Let's chat!

Let's chat!  

See how redundant double messaging is? ;) Powerful messaging channels within Personas are one of the main features of the Twilio Engage platform. With them, our customers can deliver highly personalized communications to their own customers and drive increased business value. 

Despite its value, messaging must be handled carefully. Saturated channels can erode customer trust, so it is important to make sure that customer communications are not only effective; but also not overwhelming. Prior to Twilio Engage, the Personas data delivery layer was built on the principle of data completeness. Our systems have always guaranteed that data will be delivered at least once, but in most cases, our system sent the same message multiple times.

When we set out to build the growth automation platform, we quickly realized that we couldn't have the messaging features in the new platform rely on an infrastructure that didn't guarantee only once delivery. We had to solve this problem in a scalable and extensible way.

Before that, we needed to name our problem to better discuss it. Proper nomenclature puts all the stakeholders on the same page and facilitates communication and alignment, ultimately driving value and reducing confusion. We called our project the “deduping problem.”

The Personas data delivery architecture

Segment Personas is a complex system comprised of hundreds of different components. In a nutshell, you can think of it as a massive multi-tenant data pipeline starting from the data input components, going through the identity, computations engine, and data delivery.  The identity pipeline components perform the identity resolution and then create a list of profile updates for the computation engine which, given the space configuration with its audiences, computed traits, journeys, or SQL traits calculates the profile computation membership. The profile computation membership updates are sent to the data delivery layer, which is responsible for syncing them to any configured destination.

The diagram below shows the high-level components of that part of the pipeline.

Zoom with margin

Figure 1 - Personas data delivery components

In this diagram, we deliberately left out the components responsible for actually making the calls to partner APIs as these were not relevant to understanding the work needed for deduping. Personas leverages the power of Segment connections, so it doesn't handle data delivery - it simply forwards messages to the Segment core connections product. This architecture gives customers the guarantee and reliability of our battle-tested core product.

The subsections below detail the responsibilities of each component in Figure 1 and provide necessary background knowledge for the changes needed to dedupe.

Compute-engine

Based on real-time updates, the compute pipeline calculates if a profile computation has:

  1. Been merged into another profile computation

  2. Entered or exited the criteria for a computation

  3. Had an external identifier added or removed

For the purposes of the data sync components, it didn't matter which of the three above occurred. Upon receiving an update, it is forwarded downstream by writing one message for each affected computation to the compute-output topic (audience, computed trait, or SQL trait) that the profile belongs to - herein referred to as the profile computation message. These messages contain everything that the data delivery pipeline must have in order to determine if the profile is part of an existing computation that needs to be updated downstream.

To achieve data completeness (and due to the infrastructure that it is built upon) the compute pipeline doesn't guarantee the uniqueness of each profile computation. Duplicate messages are rare but do happen and that is not of concern to the data delivery layer as Destinations use cases were, up until dedupe, targeted towards marketing. For these use cases, multiple repeated updates were not concerning.

Profile-identifier

The updates in the compute-output topic refer to the canonical user profile in the Personas. That works very well in the compute and identity components as these systems must refer to a single view of the profile. It doesn't work for partner tools since these rely on customer-defined IDs (herein called external IDs) to join profiles. This is where the profile-identifier comes in.

Once an update for a profile computation is received, the profile-identifier is responsible for mapping the canonical ID in the message to the complete list of external IDs attached to that profile. It does so by calling the identity-service, a component responsible for navigating the identity graph and finding all of the relevant external IDs for that profile. After retrieving the list of external IDs, the profile-identifier updates the message and writes it in the identified-output Kafka topic.

Sync-worker

The responsibility of sync-worker is to determine which Destinations the updates should go to and how to generate them. It is a Kafka consumer that receives the updates in the identified-output topic, fetches the list of integrations from the personas-integrations-service and, based on each integration instance configuration, generates appropriate messages to be sent to the core pipeline for delivery.

Since each computation can be connected to multiple Destinations at once (and each destination has its own configuration parameters) sync-worker performs multiple calculations per update and creates a fan out of messages downstream. The job of sync-worker is simple in principle (figure out which Destinations need which messages) but complex in execution. There are many different variables (integration configuration, computation type, type of update) that all affect how a message should be sent to a partner.

Personas-integrations-service

The personas-integrations-service is a service that crosses plane boundaries within Segment's architecture. It sits on top of a PostgreSQL database to save Destination instance configuration.

It is used by customers to store the computation Destinations configuration and also by the data pipeline to the list of Destinations connected to a computation. It is a central component of the data delivery layer that tells the sync system where the data should go and how it should be sent.

Solving the dedupe problem

There's only one simple requirement for deduping:

  • Each profile that is part of a computation list should receive each message

    once and only once.

The trick is how to implement that on a data pipeline that doesn't guarantee single message delivery! As said in earlier sections, guaranteeing that the delivery of each message occurs once and only once is a must for messaging from Personas. The risk of sending the same communication multiple times to a customer is not something that we could accept.

To solve that, we had to think about which pieces of information we needed to determine if a message should go through and how these fit with the responsibilities of each pipeline component. In the end, we realized that:

  1. The computation engine should emit results with additional context related to why computed values were emitted.

  2. Consumers of the computation engine output shouldn't just send whatever they received downstream. From now on, they need to decide if a message should go through based on attributes of the computed values and where the message was intended to go. 

These two realizations made sense in the architecture as they respected the responsibility of each component of the pipeline as it was designed. The compute pipeline would still perform the calculations as it did before, just output the reason for an update (which internally it already had, it just didn't expose) whilst the data delivery would still just be responsible for sending updates.

The extra data sent by the compute pipeline could mean a leak of encapsulation, but it made sense for our team as syncs were no longer as straightforward as before. Syncs now depended on each integration the profile computation was connected to and that was a responsibility of the data delivery pipeline already. Each section below will dive into the changes that we made and the reasons why.

Personas-integrations-service

Although deduplication of messages was critical to customer communications, we strived for a solution that was flexible enough to support future Destinations and that would not affect all of the existing marketing Destinations that didn’t need deduping. We solved this by leveraging the existing Personas integrations database. We added an additional layer of configurability so that when customers created specific Destinations, an additional setting called dedupes_on was also persisted.

The dedupes_on field would be read in sync-worker to decide which messaging channels would be used for an update. For example, messages going to SendGrid would dedupe on email while Twilio would dedupe on phone.

Compute-engine

In addition to communicating computation membership, the compute engine needed to include the reason why a computation was emitted, herein known as a trigger reason. We didn't see a huge shift in the engine's responsibility, it just needed to provide the details on why it made a specific decision. This new field would be added to the payload of the compute-output Kafka message.

Sync-worker

This is where a majority of the changes happened. For deduping, sync-worker's job had to be more specific in how it handled messages. Rather than just looking at the destination configuration and sending the update with the list of external IDs, it also needed to consider why the update happened and then decide what to do. It would either:

  1. Send update with all external IDs downstream

  2. Send update with some external IDs downstream

  3. Drop the update

The decision tree was somewhat complex when it considered the destination configuration (whether it had dedupes_on or not and the type of trigger reason). The diagram below is a simplification of the process for sending an email message.

Zoom with margin

Figure 2 - Dedupe Decision Tree (simplified)

Apply trigger reasons

When the dedupes_on configuration setting was detected, the trigger reasons dictated whether or not the system should choose to ignore irrelevant external IDs and avoid over sending messages to profiles. There are three reasons that would cause a computed value to reach sync-worker:

  • Value changed: Profiles that became part of a computation for the first time should receive messages. When processing profile computation updates triggered by a value change, sync-worker filtered out external IDs that were not of the type defined in the dedupes_on configuration. It then used the latest relevant external IDs based on their timestamps. Conversely, profiles that exited a list should not receive messages.

  • New external ID: When new external IDs were added to a user’s profile that was already in a computation, the engine would emit a new computed value to sync-worker. It needed to figure out if: A) the new external IDs were one of the types defined in

    dedupes_on, or B) if the profile had not already received a message from a prior computation. Only when both A and B were true would sync-worker find the latest external ID and forward the message to the core pipeline.

  • User profile merge: When the computation pipeline detected that two profiles belonged to the same person, a merge of the two happened. In this merge scenario, sync-worker checked if either of the profiles prior to the merge were already in the computation. If neither one was in the list, that meant that the profile never received any messages and it should emit a message using the latest external IDs defined in dedupes_on.

Create ID groups

Once sync-worker deduped the external IDs that will be sent for a profile, it published an event to the Segment pipeline. An additional layer of complexity that we needed to account for during implementation was how to maintain backwards compatibility with the existing data flow. The worker was originally designed to send data to multiple partner Destinations by sharing the same event published to the core pipeline. This was a good strategy to reduce costs and simplify the flow of data going through our pipeline. However, that process was unsuitable for the dedupe use case because it relied on the assumption that events going to partner Destinations all shared the same set of external IDs. Now that deduplication was contingent on a Destination’s configuration, there was no guarantee that one set of external IDs would apply to all target Destinations. 

In order to address this new problem, we created an IDGroup data structure. It was made up of the set of external IDs and the list of Destinations these IDs should be sent to. In order to maintain the existing behavior of sending messages as efficiently as possible, sync-worker generated a SHA128 hash based on an IDGroup’s external IDs. The resulting hash was used to associate Destinations that received the same external IDs and could therefore share the same outbound event.

As we continued to introduce additional complexity like IDGroups on top of how sync-worker sent messages, we came to the realization that we needed to take a broader look at rearchitecting and streamlining sync-worker’s process for sending events to Segments core pipeline.

The opportunity

Similarly to what we did before to work on deduping, we defined our engineering requirements to release a new feature (more details on this below). A major piece of this vision was to release the new code slowly and be able to disable it in case there was an issue. In practice, this meant that the code that existed before should remain as an executable branch (turned on/off at runtime) or that the new code would be tucked away and easily turned on or off (also at runtime) on demand.

It is important to realize that even with deduping, the responsibility of sync-worker was generally the same: send requests to all connected Destinations based on a profile computation update. The new requirements were subtle but relevant in the process of how we created the requests for each destination.

As we began writing the code for deduping with the need to keep both new and old code around, we realized that the existing sync-worker code was very coupled to the existing behavior and that branching somewhere to add dedupe support would be a major challenge. 

Despite it's central role in the Personas Data Delivery architecture and the complex job that it performs, sync-worker is not a very large code base. All of the business logic, including error handling, slow queue processing, and message processing is around 500 dense lines of code.

This scenario presented an opportunity: given that we had to implement this especially important new feature and keep the old code around during release (or easily branch off into the new code) could we go one step further and also make the whole sync-worker codebase better?

Yes, we could! To do that, we would need to rewrite sync-worker. Although in hindsight it seems like a deliberate decision taken before we started, the process could be categorized as a slow discovery as we sat down, looked through the code base to understand how the new requirements fit together, and understood the current implementation.

When we were done with the rewrite, we ended up with a code that was similar to this:


if isNewCodeOn := checkIfNewCodeSwitchIsOn(); isNewCodeOn {
  executeOldSyncWorkerFunctions()
} else {
  executeNewSyncWorkerFunctions()
}

For us, a “better codebase” meant code that was simple (short pure functions with clearly defined responsibilities), with good automated test coverage, and that was easily extensible. One of the most important aspects about this approach was that, because we kept the public interfaces the same, we re-used (and trusted) the existing tests to make sure that the new code didn't add any regression. As we wrote tests for each new scenario, we slowly improved the test coverage and documentation of our work because we made the tests the living documents describing new features.

Release process

In a similar way to how other features were released at Segment, while we implemented dedupe, we also conceptualized a set of principles to govern our approach to delivering this feature.

Feature parity

Our customers collect and send data through Segment in a million and one different ways. We could not compromise data integrity by introducing regressions to the content of their data or to where it is sent.

Simple validation

Determining whether deduplication worked as expected should not require an assumption on what might happen within the system.

Observable changes in system performance and latency

Even if we could not guarantee that dedupe would retain the exact same latency, we could, at the very least, ensure that the performance impact was measurable and within an acceptable range for our customers.

Gradual rollout

As a means of derisking such a large redesign, message traffic should be transitioned slowly onto the new code paths instead of all at once.

Simple and fast toggles

Segment’s realtime data pipeline processes customer data 24/7. Turning off the dedupe feature during an operational issue should be doable by anyone on the team.

How we accomplished it

Test driven development (yes!)

Test driven development (TDD) is often used as a buzzword, but for this project we saw it as a crucial part of our development process. Because dedupe introduced more complex responsibility on sync-worker, we developed our testing strategy upfront.

Writing tests for all the different scenarios and possible edge cases allowed us to more deeply understand customer use cases and make more informed decisions about what the expected results should be. Our adoption of TDD during development directly impacted how we implemented parts of our system. For example, we created a test scenario where a message was intended to be routed to some Destinations using dedupe, and some Destinations without it. This test case was directly responsible for the ID grouping strategy mentioned earlier.

The main result of the rigorous unit and integration testing procedure was a much higher confidence in our changes once we began our stage testing end-to-end. We also used this as an opportunity to set our systems up for success in the long term by using the new tests as an indicator for regressions. We wanted to leave the system in a better state by having our tests act as artifacts to describe expected behavior and make future iterations safer.

Metrics to validate all code paths not just the outputs

No matter how carefully you test, there is no substitute for deploying the new code. You can not just throw in some data and hope that everything comes out correctly. We wanted confidence that sync-worker sent the correct messages and external IDs. What was equally important was to ensure that sync-worker arrived at that decision by following the right steps. With so many branching paths and decisions being made along the way, validating that the output coming out of the system was simply not good enough for us. 

In order to achieve higher confidence, we took validation one step further by instrumenting relevant metrics at each step in the workflow as a message underwent deduping. By doing this, we gave ourselves the ability to see in realtime how sync-worker was processing each piece of data without having to make assumptions on what was happening. 

During our functional testing, we inspected exactly how data was going through our code and whether our code was respecting the inputs provided - trigger reasons, target Destinations, and ID groupings. The more observability one bakes into the code, the easier the validation becomes. Trust us (and the data)!

Zoom with margin

Figure 3 - Instrumenting metrics across code paths provides detailed insight into code behavior beyond simple input and expected output

Performance metric all the things!

Going into the project, we did not expect to guarantee the exact same processing latency or performance. New code paths and processing meant more work for the system which required more time. However, at the very least we could guarantee observability on the performance impact of the new dedupe logic.

Zoom with margin

Like many services at Segment, sync-worker service emits metrics using Datadog. By measuring the average P95 and P99 latencies at each method of the dedupe process, we saw exactly how much time it took to dedupe a message in comparison to the legacy code. We could also identify performance bottlenecks at a more granular level. By meticulously introducing metrics during development instead of as a post-deploy afterthought, we could not only de-risk the performance impact, but optimize the code during feature development.

Comparing existing and experiment output

Rewriting large portions of the sync-worker codebase along with the dedupe feature meant we increased the blast radius and potential impact to existing customers. Although we had strong validation from our aforementioned testing strategy, we decided to implement a lightweight realtime output comparator. There were a lot of tests in the code base already, but there was no failproof way to know if there were any corner cases missing.

By comparing the experimental output generated using the new flow with existing output from the legacy flow, we could further validate that we did not introduce any regressions. The count and content of messages that were not configured for dedupe should be identical for both flows.

Similar to the ID group implementation, we stored messages generated in either flow in separate maps. For each outbound message, we generated a SHA128 hash and using it as a key, stored an integer that was incremented every time we saw that key. In the final step we compared the contents of each map.

Deployment gates

Segment’s systems are designed for continuous availability and Segment’s global customer base expects to be able send their data at all times. No matter the feature, large or small, new code always has the potential to introduce regressions or operational issues. Due to the scope and complexity of the dedupe feature, it was important for us to plan for safety- gradually rolling out the changes and quickly repairing the system if a regression was detected. 

Segment uses a proprietary feature flag system called Flagon which allows for two types of operational levers. Flags are a simple mechanism that allowed traffic to be diverted to old or new paths based on a percentage. Gates are more granular and allow traffic to be routed depending on Segment’s various customer tiers: Free Tier, Self-Service, Business Tier, etc.

With that in mind, we adopted a combination flag + gate approach for releasing the feature. The flag dictated whether sync-worker should generate a second set of events using the new dedupe flow and compare them against events generated using the legacy flow. It also acted as our emergency shutoff valve to completely disable the entire dedupe codepath if necessary. The gate, on the other hand, controlled how we rolled the feature out at a per-customer basis. This gate governed if sync-worker should send messages to the destination Kafka topic that were generated using the dedupe or legacy flow. 

Over the course of several days, we slowly migrated customers over to the new deduplication flow until all customers were on a single codepath. All of this was done without any code changes.

Project outcomes

Out in the wild

The new dedupe functionality we released was designed and implemented as a backend-only feature on Segment’s data plane. And that meant that the best news is no news at all. Despite this feature being designed to support the new Twilio Engage platform to be released in 2022, we were rebuilding our infrastructure in the present to support the new dedupe use case and set our systems up for success in the long term. 

Over the course of the week-long rollout to our customers, we kept our eyes locked on our metrics dashboards and on all the relevant escalation Slack channels. During that time we did not detect any mismatched events between our two message streams and we received no customer complaints. And most importantly, we did not have to revert any of our changes. That is not to say we were completely free from incidents (which we will detail in our lessons learned) but considering we rewrote one of Twilio Engage’s most critical and high volume systems, we were very pleased with the outcome.

Some numbers

After implementing dedupe and moving every customer onto the new feature, the end result was:

  • 50 New Test Cases

  • 105 Commits

  • 3,845 New Lines of Code (including tests)

And most importantly…

  • 0 Customer Zendesk Tickets related to dedupe

Take-aways

There were countless take-aways from this project. Below are some of the most important ones for us.

Re-writing is not always the answer

Completely rewriting sync-worker was not an obvious decision. It slowly dawned on us as we analyzed the unique characteristics of the codebase, the new requirements, the timeline, and the existing test coverage.

Even then, it didn't come for free. There were times were we questioned whether this was the best path. During those times, we always remembered that not rewriting it would be even costlier. Because of our decision to rewrite, due to the importance of this component to Personas, we had to come up with the elaborate release process described above to double check the work.

A full rewrite is not always the silver bullet. It is a decision that shouldn't be taken lightly and that doesn't come for free. It is important to understand the trade-offs and explore possibilities before going down this path.

Ah, that edge case scenario

When we initially deployed our changes to production, we kept a close eye on memory usage and response time. We knew that we were going to take a hit, since we essentially did twice the work (remember that the test, new code, and control, old code, were running simultaneously so that we could compare outputs). The main question was, how much of a hit? For a couple of days, we kept a close eye on our metrics which were very promising: no considerable memory or latency increase.

Until a fateful Wednesday at 1:00 AM, when we received alerts that hosts were being killed off due to out of memory errors (OOM). Our Kafka partitions were starting to build up lag leading to delays and we would start to loose data if they exceeded the retention window. We had not encountered this during our stage testing or throughout the previous days that our code was being gradually released to all customers. 

At this point during the incident, we did not have conclusive evidence that there was a problem with the dedupe feature or a general problem with the system. However, we quickly validated this by using the emergency shutoff switch and sure enough, the OOM issues subsided and data began processing as usual.

This was a clear indicator that our deploy was responsible and we had to find out why. Looking for root cause in a Kafka topic that processes millions of messages a day is a modern day equivalent of looking for a needle in a haystack. To our advantage, during the incident we saved the messages at the current offset of the stuck partitions and upon inspection, the reason for the OOM became clear. 

Since our comparison tester was checking the new and legacy outputs, our memory footprint was doubled (as expected). This memory overhead was not a problem for a majority of our customers, but for some highly complex and extremely rare profile configurations, the resulting amplification factor could be as high as 150,000x. That mean one input message resulted in over 150,000 outbound messages! With the double processing, that would amount to 300k messages stored in memory.

At this point our changes were diffed for many days without any regressions. We were very confident that the new code worked well and didn't introduce any different behavior. Our mitigation strategy was to disable the comparator altogether and complete the release. Our main learning from this story is that while we were diligent about our code correctness and didn’t see any performance discrepancies between the new and old flows, we failed to account for the performance impact of our validator in very rare cases. Our testing and validation mechanisms must be held to same level of scrutiny.

The State of Personalization

Our annual look at how attitudes, preferences, and experiences with personalization have evolved over the past year.

Become a data expert.

Get the latest articles on all things data, product, and growth delivered straight to your inbox.