Go back to Blog

Engineering

Calvin French-Owen on June 5th 2020

This blog should not be construed as legal advice. Please discuss with your counsel what you need to do to comply with the GDPR, CCPA, and other similar laws.

Under the GDPR and CCPA, any company which serves users in the EU or users in California must allow its users to request that their data is either deleted or suppressed.

  • Deletion all identifying info related to the user must be properly deleted.

  • Suppression the user should be able to specify where their data is used and sent (e.g. for a marketing, advertising, or product use case).

When you get a deletion request, it doesn’t just mean deleting a few rows of data in your database. It’s your responsibility to purge data about your users from all of your tools – email, advertising, and push notifications.

Typically, this process is incredibly time-consuming. We have seen companies create custom JIRA workflows, in-depth checklists, and other manual work to comply with the law. 

In this article we’ll show you how to automate and easily respect user privacy by:

  • Managing consent with our open source consent manager.

  • Issuing DSAR (Data Subject Access Requests) on behalf of your users.

  • Federating those requests to downstream tools.

Let's dive in.

Step 1: Set up a Javascript source and identify calls

If you haven’t already, you’ll want to be sure you have a source data setup on your website, and collecting your user data through Segment.

The easiest way to do this is via our Javascript, and analytics.identify calls.

// when a user first logs in, identify them with name and email analytics.identify('my-user-id', { email: 'jkim@email.com', firstName: 'Jane', lastName: 'Kim' })

Generally, we recommend you first:

  • Generate user ID in your database a user ID should never change! It’s best to generate these in your database, so they can stay constant even if a user changes their email address. We’ll handle anonymous IDs automatically.

  • Collect the traits you have you don’t have to worry about collecting all traits with every call. We’ll automatically merge them for you, so just collect what you have.

  • Start with messaging if you’re trying to come up with a list of traits to collect, start with email personalization. Most customers start by collecting email, first and last name, age, phone, role, and company info so they can send personalized emails or push notifications.

Once you’ve collected data, you’re ready to start your compliance efforts.

Step 2: Enable the open-source consent manager

Giving users the ability to control what personal data is collected is a huge part of any privacy compliance regime. 

We’ve built an open source drop-in consent manager that automatically works with Analytics.js.

Adding it in is straightforward.

Updating the snippet

First, you’ll want to remove the two lines from your analytics.js snippet.

analytics.load("<Your Write Key") // <-- delete meanalytics.page() // <-- delete me

These will automatically be called by the consent manager.

Add in your config

We’ve included some boilerplate configuration, which dictates when the consent manager is shown and what the text looks like. You’ll want to add this somewhere and customize it to your liking.

You’ll also want to add a target container for the manager to load. <div id="target-container"></div>

You can and should also customize this to your liking.

Load the consent manager

Finally, we’re ready to load the consent manager.

<script  src="https://unpkg.com/@segment/consent-manager@5.0.0/standalone/consent-manager.js"  defer></script>

Once you’re done, it should look like this.

Great, now we can let users manage their preferences! They can opt-in to all data collection, or just the portion they want to. 

Step 3: Collecting deletion requests

Now it’s time to allow users to delete their data. The simplest way to do this is to start an Airtable sheet to keep track of user requests, and then create a form from it.

At a minimum, you’ll want to have columns for:

  • The user identifier – either an email or user ID.

  • A confirmation if your page is public (making sure the user was authenticated).

  • A checkbox indicating that the deletion was submitted.

From there, we can automatically turn it into an Airtable form to collect this data.

To automate this you can use our GDPR Deletion APIs. You can automatically script these so that you don’t need to worry about public form submissions. We’ve done this internally at Segment. 

Tip: Make sure deletions are guarded by some sort of confirmation step, or only accessible when the user is logged in.

Step 4: Issuing deletions and receipts

Now we’re ready to put it all together. We can issue deletion requests within Segment for individual users.

This will remove user records from:

  • Segment archives

  • Your warehouses and data lakes

  • Downstream destinations that support deletion

To do so, simply go to the deletion manager under Workspace Settings > End User Privacy.

This will allow you to make a new request by ID.

Simply select “New Request”, and enter the user ID from your database.

This will automatically kick off deletions in any end tools which support them. You’ll see receipts in Segment indicating that these deletions went through.

As your different destinations begin processing this data, they will send you notifications as well.

And just like that, we’ve built deletion and suppression into our pipeline, all with minimal work!

Wrapping up

Here’s what we’ve accomplished in this article. We’ve:

  • Collected our user data thoughtfully and responsibly by asking for consent with the Segment open source consent manager.

  • Accepted deletion requests via Airtable or the Segment deletion API.

  • Automated that deletion in downstream tools with the deletion requests.

Try this recipe for yourself...

Get help implementing this use case by talking with a Segment Team member or by signing up for a free Segment workspace here.

All Engineering articles

Calvin French-Owen on May 23rd 2018

Today, we’re excited to share the architecture for Centrifuge–Segment’s system for reliably sending billions of messages per day to hundreds of public APIs. This post explores the problems Centrifuge solves, as well as the data model we use to run it in production.

The Centrifuge problem

At Segment, our core product collects, processes, and delivers hundreds of thousands of analytics events per second. These events consist of user actions like viewing a page, buying an item from Amazon, or liking a friend’s playlist. No matter what the event is, it’s almost always the result of some person on the internet doing something.

We take these incoming events and forward them to hundreds of downstream endpoints like Google Analytics, Salesforce, and per-customer Webhooks.

At any point in time, dozens of these endpoints will be in a state of failure. We’ll see 10x increases in response latency, spikes in 5xx status codes, and aggressive rate limiting for single large customers. 

To give you a flavor, here are the sorts of latencies and uptimes I pulled from our internal monitoring earlier today. 

In the best case, these API failures cause delays. In the worst case, data loss.

As it turns out, ‘scheduling’ that many requests in a faulty environment is a complex problem. You have to think hard about fairness (what data should you prioritize?), buffering semantics (how should you enqueue data?), and retry behavior (does retrying now add unwanted load to the system?).

Across all of the literature, we couldn't find a lot of good 'prior art' for delivering messages reliably in high-failure environments. The closest thing is network scheduling and routing, but that discipline has very different strategies concerning buffer allocation (very small) and backpressure strategies (adaptive, and usually routing to a single place).

So we decided to build our own general-purpose, fully distributed job scheduler to schedule and execute HTTP requests reliably. We've called it Centrifuge.

You can think of Centrifuge as the layer that sits between our infrastructure and the outside world–it's the system responsible for sending data to all of our customers destinations. When third-party APIs fail, Centrifuge is there to absorb the traffic. 

Under normal operation, Centrifuge has three responsibilities: it delivers messages to third-party endpoints, retries messages upon failure, and archives any undelivered messages.

We've written this first post as a guide to understand the problems Centrifuge solves, its data model, and the building blocks we’ve used to operate it in production. In subsequent posts, we’ll share how we’ve verified the system’s correctness and made it blindingly fast.

Let’s dive in. 

When queues stop working

Before discussing Centrifuge itself, you might be thinking "why not just use a queue here? Building a fully distributed job scheduler seems a bit overdone”.

We've asked ourselves the same question. We already use Kafka extensively at Segment (we're passing nearly 1M messages per second through it), and it’s been the core building block of all of our streaming pipelines. 

The problem with using any sort of queue is that you are fundamentally limited in terms of how you access data. After all, a queue only supports two operations (push and pop).

To see where queues break down, let's walk through a series of queueing topologies that we’ve implemented at Segment.

Architecture 1: a single queue

To start, let’s first consider a naive approach. We can run a group of workers that read jobs from a single queue. 

Workers will read a single message off the queue, send to whatever third-party APIs are required and then acknowledge the message. It seems like it should protect us from failure, right?

This works okay for awhile, but what happens when we start seeing a single endpoint get slow? Unfortunately, it creates backpressure on the entire message flow.

Clearly, this isn’t ideal. If a single endpoint can bring down the entire pipeline, and each endpoint has an hour-long downtime each year (99.9% available), then with 200+ endpoints, we’ll be seeing hour-long outages once per day.

Architecture 2: queues per destination

After seeing repeated slowdowns on our ingestion pipeline, we decided to re-architect. We updated our queueing topology to route events into separate queues based upon the downstream endpoints they would hit.

To do this, we added a router in front of each queue. The router would only publish messages to a queue destined for a specific API endpoint.

Suppose you had three destinations enabled: Google Analytics, Mixpanel, and Salesforce. The router would publish three messages, one to each dedicated queue for Google Analytics, Mixpanel, and Salesforce, respectively.

The benefit of this approach is that a single failing API will only affect messages bound for a single endpoint (which is what we want!).

Unfortunately, this approach has problems in practice. If we look at the distribution of messages which should be delivered to a single endpoint, things become a little more nuanced.

Segment is a large, multi-tenant system, so some sources of data will generate substantially more load than others. As you might imagine, among our customer base, this follows a fairly consistent power law:

When that translates to messages within our queues, the breakdown looks more like this:

In this case, we have data for customers A, B, and C, all trying to send to the same downstream endpoint. Customer A dominates the load, but B and C have a handful of calls mixed in.

Let’s suppose that the API endpoint we are sending to is rated to 1,000 calls per second, per customer. When the endpoint receives more than 1,000 calls in a second for a given customer API key, it will respond with a 429 HTTP Header (rate limit exceeded). 

Now let’s assume that customer A is trying to send 50,000 messages to the API. Those messages are all ordered contiguously in our queue.

At this point we have a few options:

  • we can try and send a hard cap of 1,000 messages per second, but this delays traffic for B and C by 50 seconds.

  • we can try and send more messages to the API for customer A, but we will see 429 (rate limit exceeded) errors. We’ll want to retry those failed messages, possibly causing more slowdowns for B and C.

  • we can detect that we are nearing a rate-limit after sending 1,000 messages for customer A in the first second, so we can then copy the next 49,000 messages for customer A into a dead-letter queue, and allow the traffic for B and C to proceed.

None of these options are ideal. We’ll either end up blocking the queue for all customers in the case where a single customer sends a large batch of data, or we’ll end up copying terabytes of data between dead-letter queues.

Ideal state: queues per <source, destination>

Instead, we want an architecture that looks more like the following diagram, where we have separate queues per combination of customer and endpoint. This architecture gives us much better isolation, as well as the ability to dynamically adjust throughput on a per-customer basis.

However, in a large, multi-tenant system, like Segment, this number of queues becomes difficult to manage.

We have hundreds of thousands of these source-destination pairs. Today, we have 42,000 active sources of data sending to an average of 2.1 downstream endpoints. That’s 88,000 total queues that we’d like to support (and we’re growing quickly). 

To implement per source-destination queues with full isolation, we’d need hundreds of thousands of different queues. Across Kafka, RabbitMQ, NSQ, or Kinesis–we haven’t seen any queues which support that level of cardinality with simple scaling primitives. SQS is the only queue we’ve found which manages to do this, but is totally cost prohibitive. We need a new primitive to solve this problem of high-cardinality isolation.

Getting to “virtual” queues

We now have our ideal end-state: tens of thousands of little queues. Amongst those queues, we can easily decide to consume messages at different rates from Customers A, B, and C.

But when we start to think about implementation, how do we actually manage that many queues? 

We started with a few core requirements for our virtual queue system:

1) Provide per-customer isolation

First and foremost, we need to provide per-customer isolation. One customer sending a significant amount of failing traffic shouldn’t slow down any other data delivery. Our system must absorb failures without slowing the global delivery rate. 

2) Allow us to re-order messages without copying terabytes of data

Our second constraint is that our system must be able to quickly shuffle its delivery order without copying terabytes of data over the network. 

In our experience working with large datasets, having the ability to immediately re-order messages for delivery is essential. We’ve frequently run into cases which create large backlogs in data processing, where our consumers are spinning on a set of consistently failing messages.

Traditionally there are two ways to handle a large set of bad messages. The first is to stop your consumers and retry the same set of messages after a backoff period. This is clearly unacceptable in a multi-tenant architecture, where valid messages should still be delivered.

The second technique is to publish failed messages to a dead-letter queue and re-consume them later. Unfortunately, re-publishing messages to dead-letter queues or ‘tiers’ of topics with copies of the same event incurs significant storage and network overhead. 

In either case, if your data is sitting in Kafka–the delivery order for your messages is effectively ‘set’ by the producer to the topic:

We want the ability to quickly recover from errors without having to shuffle terabytes of data around the network. So neither of these approaches works efficiently for us. 

3) Evenly distribute the workload between many different workers

Finally, we need a system which cleanly scales as we increase our event volume. We don’t want to be continually adding partitions or doing additional sharding as we add customers. Our system should scale out horizontally based upon the throughput of traffic that we need. 

Data in Centrifuge

By this point, we have a good idea of the problems that Centrifuge solves (reliable message delivery), the issues of various queueing topologies, and our core requirements. So let’s look at the Centrifuge data layer to understand how we’ve solved for the constraints we just listed above.

The core delivery unit of Centrifuge is what we call a job

Jobs require both a payload of data to send, as well an endpoint indicating where to send the data. You can optionally supply headers to govern things like retry logic, message encoding, and timeout behavior. 

In our case, a job is a single event which should be delivered to a partner API. To give you an idea of what jobs look like in practice, here’s a sample Segment job:

Looking back at our requirements, we want a way of quickly altering the delivery order for our jobs, without having to create many copies of the jobs themselves. 

A queue won’t solve this problem in-place for us. Our consumer would have to read and then re-write all of the data in our new ordering. But a database, on the other hand, does

By storing the execution order inside a relational database, we can immediately change the quality of service by running a single SQL statement. 

Similarly, whenever we want to change the delivery semantics for our messages, we don’t have to re-shuffle terabytes of data or double-publish to a new datastore. Instead, we can just deploy a new version of our service, and it can start using the new queries right away. 

Using a database gives us the flexibility in execution that queues are critically lacking.

For that reason, we decided to store Centrifuge data inside Amazon’s RDS instances running on MySQL. RDS gives us managed datastores, and MySQL provides us with the ability to re-order our jobs. 

The database-as-a-queue

The Centrifuge database model has a few core properties that allow it to perform well:

  • immutable rows: we don’t want to be frequently updating rows, and instead be appending new rows whenever new states are entered. We’ve modeled all job execution plans as completely immutable, so we never run updates in the database itself.

  • no database JOINs: rather than needing a lot of coordination, with locks across databases or tables, Centrifuge need only query data on a per job basis. This allows us to massively parallelize our databases since we never need to join data across separate jobs.

  • predominantly write-heavy, with a small working set: because Centrifuge is mostly accepting and delivering new data, we don’t end up reading from the database. Instead, we can cache most new items in memory, and then age entries out of cache as they are delivered.

To give you a sense of how these three properties interact, let’s take a closer look at how jobs are actually stored in our Centrifuge databases.

The jobs table

First, we have the jobs table. This table is responsible for storing all jobs and payloads, including the metadata governing how jobs should be delivered.

While the endpoint, payload, and headers fields govern message transmission, the expire_at field is used to indicate when a given job should be archived.

By splitting expire_at into a separate field, our operations team can easily adjust if we ever need to flush a large number of failing messages to S3, so that we can process them out-of-band. 

Looking at the indexes for the jobs table, we’ve been careful to minimize the overhead of building and maintaining indexes on each field. We keep only a single index on the primary key.

The jobs table primary key is a KSUID, which means that our IDs are both are k-sortable by timestamp as well as globally unique. This allows us to effectively kill two birds with one stone–we can query by a single job ID, as well as sort by the time that the job was created with a single index. 

Since the median size of the payload and settings for a single job is about 5kb (and can be as big as 750kb), we’ve done our best to limit reads from and updates to the jobs table. 

Under normal operation, the jobs table is immutable and append-only. The golang process responsible for inserting jobs (which we call a Director) keeps a cached version of the payloads and settings in-memory. Most of the time, jobs can be immediately expired from memory after they are delivered, keeping our overall memory footprint low.

In production, we set our jobs to expire after 4 hours, with an exponential backoff strategy.

Of course, we also want to keep track of what state each job is in, whether it is waiting to be delivered, in the process of executing, or awaiting retry. For that, we use a separate table, the job_state_transitions table. 

The job state transitions table

The job_state_transitions table is responsible for logging all of the state transitions a single job may pass through.

Within the database, the job state machine looks like this:

A job first enters with the awaiting_scheduling state. It has yet to be executed and delivered to the downstream endpoint.

From there, a job will begin executing, and the result will transition to one of three separate states. 

If the job succeeds (and receives a 200 HTTP response from the endpoint), Centrifuge will mark the job as succeeded. There’s nothing more to be done here, and we can expire it from our in-memory cache.  

Similarly, if the job fails (in the case of a 400 HTTP response), then Centrifuge will mark the job as discarded. Even if we try to re-send the same job multiple times, the server will reject it. So we’ve reached another terminal state.

However, it’s possible that we may hit an ephemeral failure like a timeout, network disconnect, or a 500 response code. In this case, retrying can actually bring up our delivery rate for the data we collect (we see this happen across roughly 1.5% of the data for our entire userbase), so we will retry delivery.

Finally, any jobs which exceed their expiration time transition from awaiting_retry to archiving. Once they are successfully stored on S3, the jobs are finally transitioned to a terminal archived state.

If we look deeper into the transitions, we can see the fields governing this execution:

Like the jobs table, rows in the job_state_transitions are also immutable and append-only. Every time a new attempt is made, the attempt number is increased. Upon job execution failure, the retry is scheduled with a retry_at time by the retry behavior specified in the job itself. 

In terms of indexing strategy, we keep a composite index on two fields: a monotonically incrementing ID, as well the ID of the job that is being executed.

You can see here in one of our production databases that the first index in the sequence is always on the job_id, which is guaranteed to be globally unique. From there, the incrementing ID ensures that each entry in the transitions table for a single job’s execution is sequential. 

To give you a flavor of what this looks like in action, here’s a sample execution trace for a single job pulled from production. 

Notice that the job first starts in the awaiting-scheduling state before quickly transitioning to its first delivery attempt. From there, the job consistently fails, so it oscillates between executing and awaiting-retry

While this trace is certainly useful for internal debugging, the main benefit it provides is the ability to actually surface the execution path for a given event to the end customer. (Stay tuned for this feature, coming soon!)

Interacting with the database: the Director

Up until this point, we’ve focused exclusively on the data model for our jobs. We’ve shown how they are stored in our RDS instance, and how the jobs table and jobs_state_transitions table are both populated. 

But we still need to understand the service writing data to the database and actually executing our HTTP requests. We call this service the Centrifuge Director

Traditionally, web-services have many readers and writers interacting with a single, centralized database. There is a stateless application tier, which is backed by any number of sharded databases.

Remember though, that Segment’s workload looks very different than a traditional web-service. 

Our workload is extremely write-heavy, has no reads, and requires no JOINs or query coordination across separate jobs. Instead, our goal is to minimize the contention between separate writers to keep the writes as fast as possible. 

To do that, we’ve adopted an architecture where a single Director interacts with a given database. The Director manages all of its caching, locks, and querying in-process. Because the Director is the sole writer, it can manage all of its cache invalidations with zero-coordination. 

The only thing a Director needs to globally coordinate is to which particular database it is writing. We call the attached database a JobDB, and what follows is a view into the architecture for how Directors coordinate to acquire and send messages to a JobDB.

When a Director first boots up, it follows the following lifecycle:

Acquire a spare JobDB via Consul – to begin operating; a Director first does a lookup and acquires a consul session on the key for a given JobDB. If another Director already holds the lock, the current Director will retry until it finds an available spare JobDB. 

Consul sessions ensure that a given database is never concurrently written to by multiple Directors. They are mutually exclusive and held by a single writer. Sessions also allow us to lock an entire keyspace so that a director can freely update the status for the JobDB in Consul while it continues to hold the lock. 

Connect to the JobDB, and create new tables – once a Director has connected to a spare JobDB, it needs to create the necessary tables within the connected DB. 

Rather than use an ORM layer, we’ve used the standard database/sql golang interface, backed by the go-sql-driver/mysql implementation. Many of these queries and prepared statements are generated via go:generate, but a handful are handwritten.

Begin listening for new jobs and register itself in Consul – after the Director has finished creating the necessary tables, it registers itself in Consul so that clients may start sending the Director traffic. 

Start executing jobs – once the Director is fully running, it begins accepting jobs. Those jobs are first logged to the paired JobDB; then the Director begins delivering each job to its specified endpoint.

Now that we understand the relationship between Directors and JobDBs, we can look back at the properties of the system (immutable, extremely write-heavy with a small working set, no database JOINs), and understand how Centrifuge is able to quickly absorb traffic.

Under normal operation, the Director rarely has to read from the attached JobDB. Because all jobs are immutable and the Director is the sole writer, it can cache all jobs in-memory and expire them immediately once they are delivered. The only time it needs to read from the database is when recovering from a failure. 

Looking at the pprof for our memory profile, we can see that a significant proportion of heap objects do indeed fall into the category of cached jobs:

And thanks to the cache, our writes dominate our reads. Here’s the example Cloudwatch metrics that we pulled from a single active database.

Since all jobs are extremely short-lived (typically only a few hundred milliseconds while it is being executed), we can quickly expire delivered jobs from our cache. 

All together now

Taking a step back, we can now combine the concepts of the Centrifuge data model with the Director and JobDB.

First, the Director is responsible for accepting new jobs via RPC. When it receives the RPC request, it will go ahead and log those jobs to the attached JobDB, and respond with a transaction ID once the jobs have been successfully persisted.

From there, the Director makes requests to all of the specified endpoints, retrying jobs where necessary, and logging all state transitions to the JobDB.

If the Director fails to deliver any jobs after their expiration time (4 hours in our case), they are archived on S3 to be re-processed later.

Scaling with load

Of course, a single Director wouldn’t be able to handle all of the load on our system.

In production, we run many of individual Directors, each one which can handle a small slice of our traffic. Over the past month, we’ve been running anywhere from 80 to 300 Directors at peak load. 

Like all of our other services at Segment, the Directors scale themselves up and down based upon CPU usage. If our system starts running under load, ECS auto-scaling rules will add Directors. If we are over capacity, ECS removes them.

However, Centrifuge created an interesting new motion for us. We needed to appropriately scale our storage layer (individual JobDBs) up and down to match the scaling in our compute layer (instances of Director containers).

To do that, we created a separate binary called the JobDB Manager. The Manager’s job is to constantly adjust the number of databases to match the number of Directors. It keeps a pool of ‘spare’ databases around in case we need to scale up suddenly. And it will retire old databases during off-peak hours. 

To keep the ‘small working set’ even smaller, we cycle these JobDBs roughly every 30 minutes. The manager cycles JobDBs when their target of filled percentage data is about to exceed available RAM. 

This cycling of databases ensures that no single database is slowing down because it has to keep growing its memory outside of RAM.

Instead of issuing a large number of random deletes, we end up batching the deletes into a single drop table for better performance. And if a Director exits and has to restart, it must only read a small amount of data from the JobDB into memory.

By the time 30 minutes have passed, 99.9% of all events have either failed or been delivered, and a small subset are currently in the process of being retried. The manager is then responsible for pairing a small drainer process with each JobDB, which will migrate currently retrying jobs into another database before fully dropping the existing tables. 

In production

Today, we are using Centrifuge to fully deliver all events through Segment. By the numbers this means:

  • 800 commits from 5 engineers

  • 50,000 lines of Go code

  • 9 months of build, correctness testing, and deployment to production

  • 400,000 outbound HTTP requests per second

  • 2 million load-tested HTTP requests per second

  • 340 billion jobs executed in the last month

On average, we find about 1.5% of all global data succeeds on a retry, where it did not succeed on the first delivery attempt. 

Depending on your perspective, 1.5% may or may not sound like a big number. For an early-stage startup, 1.5% accuracy is almost meaningless. For a large retailer making billions of dollars in revenue, 1.5% accuracy is incredibly significant. 

On the graph below, you can see all successful retries split by ‘attempt number’. We typically deliver the majority of messages on their second try (the large yellow bar), but about 50% of retries succeed only on the third through the tenth attempts. 

Of course, seeing the system operate at ‘steady-state’ isn’t really the most interesting part of Centrifuge. It’s designed to absorb traffic in high-load failure scenarios.

We had tested many of these scenarios in a staging account, but had yet to really see a third-party outage happen in production. One month after the full rollout, we finally got to observe the system operating in a high-failure state.

At 4:45 pm on March 17th, one of our more popular integrations started reporting high latencies and elevated 500s. Under normal operation, this API receives 16,000 requests per second, which is a fairly significant portion of our outbound traffic load. 

From 4:45pm until 6:30pm, our monitoring saw a sharp decline and steeply degraded performance. The percentage of successful calls dropped to about 15% of normal traffic load.

Here you can see the graph of successful calls in dark red, plotted against the data from one week before as the dashed thin line.  

During this time, Centrifuge began to rapidly retry the failed requests. Our exponential backoff strategy started to kick in, and we started attempting to re-send any requests which had failed.

Here you can see the request volume to the third-party’s endpoint. Admittedly this strategy still needs some tuning–at peak, we were sending around 100,000 requests per second to the partner’s API. 

You can see the requests rapidly start retrying over the first few minutes, but then smooth out as they hit their exponential backoff period. 

This outage was the first time we’d really demonstrated the true power of Centrifuge. Over a 90-minute period, we managed to absorb about 85 million analytics events in Segment’s infrastructure. In the subsequent 30 minutes after the outage, the system successfully delivered all of the queued traffic. 

Watching the event was incredibly validating. The system worked as anticipated: it scaled up, absorbed the load, and then flushed it once the API had recovered. Even better, our mutual customers barely noticed. A handful saw delays in delivering their data to the third-party tool, but none saw data loss. 

Best of all, this single outage didn’t affect data delivery for any other integrations we support! 

All told, there’s a lot more we could say about Centrifuge. Which is why we’re saving a number of the implementation details around it for further posts.

In our next posts in the series, we plan to share: 

  • how we’ve verified correctness and exactly-once delivery while moving jobs into Centrifuge

  • how we’ve optimized the system to achieve high performance, and low-cost writes

  • how we’ve built upon the Centrifuge primitives to launch an upcoming visibility project

  • which choices and properties we plan on re-thinking for future versions

Until then, you can expect that Centrifuge will continue evolving under the hood. And we’ll continue our quest for no data left behind. 

Interested in joining us on that quest? We’re hiring.


See what Segment is all about

Since you've made it this far, perhaps you want to check out Segment? Sign up for a free workspace here or a get a demo here 👉


Centrifuge is the result of a 9-month development and roll-out period. 

Rick Branson designed and architected the system (as well as christened it with the name). Achille Roussel built out most of the core functionality, the QA process, and performance optimizations. Maxence Charriere was responsible for building the initial JobDB Manager as well as numerous integration tests and checks. Alexandra Noonan built the functionality for pairing drainers to JobDBs and helping optimize the system to meet our cost efficiency. And Tom Holmes wrote most of the archiving code, the drainer processes, and tracked down countless edge cases and bugs. Julien Fabre helped architect and build our load testing environment. Micheal Lopez designed the amazing logo.

Special thanks to James Cowling for advising on the technical design and helping us think through our two-phase-commit semantics.

To close, we wanted to share a few of the moments in development and rollout:

June 23rd, 2017: Max, Rick, and Achille begin testing Centrifuge on a subset of production traffic for the first time. They are stoked.

Sept 22, 2017: Achille gets some exciting ideas for cycling databases. Feverish whiteboarding ensues.

January 12, 2018: we hit a major milestone of 70% traffic flowing through the system. Tom bows for the camera.

Mar 14, 2018: We hit a new load test record of 2M messages per second in our “Black Hole” account.

May 22, 2018: Tom, Calvin, Alexandra, and Max take a group picture, since we forgot to earlier. Rick and Achille are traveling

Evan Johnson on March 6th 2018

Segment receives billions of events from our customers daily and has grown in to dozens of AWS accounts. Expanding in to many more accounts was necessary in order to best align with our GDPR and security initiatives, but it comes at a large complexity cost. In order to continue scaling gracefully we are investing in building tooling for employees to use with many accounts, and centrally managing employee access to AWS with terraform and our identity provider.

Segment began in a single AWS account and last year finished our move to a dev, stage, prod, and “ops” accounts. For the past few months we’ve been growing at about one new AWS account every week or two, and plan to continue this expansion in to per-team and per-system accounts. Having many “micro-accounts” provides superior security isolation between systems, and reliability benefits by limiting the blast radius of AWS rate-limits.

When Segment had only a few accounts, employees would log in to the AWS “ops” account using their email, password, and 2FA token. Employees would then connect to the ops-admin role in the dev, stage, and prod accounts using the AssumeRole api.

Segment now has a few dozen AWS accounts and plans to continue adding more! In order to organize this expansion we needed a mechanism to control our accounts, which accounts employees have access to, and each employee’s permissions in each account. 

We also hate using AWS API keys when we don’t absolutely have to so we moved to a system where no employees have any AWS keys. Instead, employees only access AWS through our identity provider. Today we have zero employees with AWS keys and there is no future need for employees to have a personal AWS key. This is a massive security win!

Designing a scalable IAM architecture

Segment uses Okta as an identity provider, and consulted their integration guide for managing multiple AWS accounts, but improved it with a minor change for better employee experience. The integration guide recommends connecting the identity provider to each AWS account but this breaks AWS’ built in support for account switching and was more complicated to audit which teams had access to which roles.

Instead, employees use our identity provider to connect to our “ops” account and then use the simple token service assume-role API to connect to each account they have access to. Using our identity provider, each team is assigned to a different role in our hub account, and each team role has access to different roles in each account. This is the classic “hub-and-spoke” architecture.

In order to make maintaining our hub-and-spoke architecture simple, we built a terraform module for creating a role in our spoke accounts, and a separate terraform module for creating a role in our hub account. Both modules simply create a role and attach a policy ARN to it, which is part of the module’s input.  

The only difference between the modules are their trust relationships. The hub role module allows access from our identity provider while the spoke module only allow access from the hub account. Below is module we use for allowing access to a hub role from our Identity provider. 

In order to provide each team with granular access to only the resources the teams need we create a role for each team in the hub account using our hub role terraform module. These roles mostly contain IAM policies for sts:AssumeRole in to other accounts but it is also possible to give granular access in our hub role too.

One concrete and simple example of a granular policy is our Financial Planning and Analysis team’s role, who keeps close watch on our AWS spend. Our FP&A team only has access to billing information and information about our reserved capacity.

The FP&A team does not have access to our spoke accounts, though. One team that needs full access to much of our infrastructure and all of our accounts is our Foundation and Reliability team, who participate in our on-call rotation. We provide both a ReadOnly role, and an Administrator role to our foundation team in all of our accounts.

After per-team roles are created for each team in the hub account, employees are assigned to groups that represent their teams in Okta, and each team can then be assigned to their associated role in the hub account.

Okta allows each group to be assigned different IAM roles in the hub account, and using their UI we can assign the FP&A team to our “Amazon Web Services” app, and restrict their access to the fpa role that we created for them in the hub account. 

After building this, we needed the tooling to provide our employees with an amazing engineering experience. Even though this system is far more secure, we wanted it to be just as usable and efficient as our setup with only a handful of AWS accounts.

Maintaining usability with aws-okta

One great thing about our old IAM setup was each employee with AWS access could use AWS APIs from their local computer using aws-vault. Each employee had their IAM user credentials securely stored in their laptop’s keychain. However, accessing AWS entirely through Okta is a massive breaking change for our old workflows.

Our tooling team took up the challenge and created a (near) drop in replacement for aws-vault which our engineering team used extensively, called aws-okta. aws-okta is now open-source and available on github

The quality of aws-okta is the principal reason that Segment engineers were able to smoothly have their AWS credentials revoked. Employees are able to execute commands using the permissions and roles they are granted, exactly like they did when using aws-vault.

There is a lot of new complexity handled with aws-okta that is is not able to be handled in aws-vault. While aws-vault uses IAM user credentials to run commands, aws-okta uses your Okta password (stored in your keychain) to authenticate with Okta, waits for a response to a push notification for 2FA, and finally provides AWS with a SAML assertion to retrieve temporary credentials.

In order to authenticate with Okta, aws-okta needs to know your Okta “application id”. We took the liberty of extending the ~/.aws/config ini file to add in the necessary id.

When Segment had only a few AWS accounts and the ops-admin role, Segment engineers all shared the same ~/.aws/config. Once each team had access to different accounts and systems, we needed a better system to manage each team’s ~/.aws/config. Our system also needed a way to update the access that employees had quickly, when new accounts and roles are created.

We decided to integrate this solution closely with prior art that Segment had built. Each team’s config is stored in a git repo that has our company dotfiles in it. Each team can initialize their aws config by using our internal tool called robo, which is a tool to share helpful commands between employees.

This was only possible to add because all Segment engineers already had an environment variable called called SEGMENT_TEAM, which denotes the team the engineer is a part of. Running robo aws.config will clone the dotfiles repo, save the old ~/.aws/config, and initialize the most recent config for their team.

AWS bookmarks were the primary way that engineers navigated our environment when we utilized fewer accounts. When we got rid of the ops-admin role, the engineers sign-in bookmarks stopped working. Additionally, AWS bookmarks only support up to five different AssumeRole targets and we now have many more than five accounts.

In order to support having many more accounts, we mostly abandoned bookmarks and instead ensured that aws-okta supports engineers who needed to switch AWS accounts often. Our previous use of aws-vault meant many of us were familiar with the aws-vault login command. We found that adding a login command to aws-okta helped engineers who switched accounts often. 

After responding to the Duo push notification aws-okta will open a browser and log in to the specified role in only a couple of seconds. This feature is supported by the AWS Custom Federated Login feature, but feels more like magic when using it. It makes logging in a breeze.

Beyond 100 accounts

We expect to be near 50 AWS accounts by the end of this year. The security of having an account be completely closed by default, and the reliability benefits of having isolated per-account rate-limits are compelling.

This system we have built is plenty robust and usable enough to scale our AWS usage to many hundreds of AWS accounts and many more engineering teams.

Deleting all employee AWS keys was extremely satisfying from a security perspective, and this alone is a compelling enough reason to integrate your identity provider with your AWS hub account.


Our world-class engineering organization is hiring. If you are excited about building amazing tools to empower our engineers, our tooling team is hiring.

Alan Braithwaite on December 21st 2017

Go has a robust built-in testing library.  If you write Go, you already know this.  In this post we will discuss a handful of strategies to level up your Go testing.   We have learned from experience on our large Go codebase that these strategies work to save time and effort maintaining the code.

Use test suites

If you take away just one thing from this post, it should be: use test suites.  For those unfamiliar with the pattern, suite testing is the process of developing a test against a common interface which can be used against multiple implementations of that interface.  Below, you can see how we can pass in multiple different Thinger implementations and have them run against the same tests.

Fortunate readers may have worked with codebases which use this technique.  Frequently used in plugin-based systems, tests which are written against the interface are usable by all implementations of that interface to determine if the behavior requirements are met.

Using this strategy will save potentially hours, days, or perhaps even enough time to solve the P versus NP problem.  Also when swapping two underlying systems you don’t have to write (many) additional tests and it provides confidence that doing so won’t break your application.  This implicitly requires that you create an interface defining the surface area of that which you’re testing.  Using dependency injection, you set up the suite from your package passing in the implementation for the package.

A complete example is provided here.  While this example is fairly contrived, you can imagine one implementation being a remote database whilst the other being an in-memory database.

Another fantastic example of this in the standard library is the golang.org/x/net/nettest package.  It provides the means to verify a net.Conn satisfies its interface.  

Avoid interface pollution

We can’t talk about testing in Go without talking about interfaces.  

Interfaces are important in the context of testing because they are the most powerful tool in our test arsenal, so it’s important to use them correctly.  Packages frequently export an interface for consumers to use which in turn leads to either: a. consumers implementing their own mock of the package implementation or b. the package exporting their own mock. 

The bigger the interface, the weaker the abstraction.

— Rob Pike, Go Proverbs

Interfaces should be carefully considered before exporting them.  Developers are often tempted to export interfaces as a way for consumers to mock out their behavior.  Instead, document what interfaces your structs satisfy such that you don’t create a hard dependency between the consumer package and your own.  A great example of this is the errors package.

When we have an interface in our program which we don’t want to export can use an internal/ package subtree to keep it scoped to the package.  By doing this, we remove the concern that other consumers might depend on it and therefore can be flexible in the evolution of the interface as new requirements present themselves.  We usually create interfaces around external dependencies and use dependency injection so we can run tests locally.

This enables the consumer to implement small interfaces of their own, only wrapping the consumed surface of the library for their own testing.  For more details about this concept, see rakyll’s post on interface pollution.

Don’t export concurrency primitives

Go offers easy-to-use concurrency primitives which can also sometimes lead to their overuse.  We’re primarily concerned about channels and the sync package.  It is sometimes tempting to export a channel from your package for consumers to use.  Additionally, it’s a common mistake to embed sync.Mutex without making it private.  As with anything, this isn’t always bad but it does pose challenges when testing your program.

If you’re exporting channels, you’re exposing the consumer of the package to additional complexity they shouldn’t care about.  As soon as a channel is exported from a package, you’re opening up challenges in testing for the one consuming that channel.  To test well, the consumer will need to know:

  • When data is finished being sent on the channel

  • Whether or not there were any errors receiving the data

  • How does the package clean up the channel after completion, if at all?

  • How can I wrap an interface around the package API so I don’t have to call it directly?

Consider an example of reading a queue.  Here’s an example library which reads from the queue and exposes a channel for the consumer to read from.

Now a user of your library wants to implement a test for their consumer:

The user might then decide that dependency injection is a good idea here and write their own messages along the channel:

But wait, what about errors?

Now, how do we generate events to actually write into this mock which sufficiently replicate the behavior of the actual library we’re using?  If the library simply wrote a synchronous API, then we could add all this concurrency in our client code and it becomes much simpler to test.

When in doubt, remember that it’s always easy to add concurrency in a consuming package and hard/impossible to remove once exported from a library.  Finally, don’t forget to mention in package documentation whether or not a struct/package is safe for concurrent access by multiple goroutines!

Sometimes, it’s still desirable or necessary to export a channel.  To alleviate some of the problems above, you can expose channels through accessors instead of directly and force them to be read-only (←chan) or write-only (chan←) channels in their declaration.

Use net/http/httptest

 httptest allows you to exercise your http.Handler code without actually spinning up a server or binding to a port.  This speeds up tests and allows them to run in parallel with less effort.

 Here’s an example of the same test implemented using both methods.  It doesn’t look like much, but it saves you a considerable amount of code and resources.

Perhaps the biggest thing using httptest buys you is the ability to compartmentalize your test to just the function you want to test.  No routers, middleware or any other side-effects that come from setting up servers, services, handler factories, handler factory factories or whatever abominations are thrown upon you by ideas your former self thought were good.

To see more of this pattern in action, see this post by Mark Berger.

Use a separate _test package

Most tests in the ecosystem are created in files pkg_test.go but still live in the same package: package pkg.  A separate test package is a package you create in a new file, foo_test.go, inside the directory of the package you wish to test, foo/, with the declaration package foo_test .  From there, you can import github.com/example/foo and other dependencies.  This feature enables a number of things. It is the recommended workaround for cyclic dependencies in tests, it prevents brittle tests, and it allows the developer to feel what it’s like to consume their own package.    If your package is hard to use, it will likely also be hard to test using this method.

This strategy prevents brittle tests by restricting access to private variables.  In particular, if your tests break and you’re using a separate test packages it’s almost guaranteed that a client using the feature which broke in tests will also break when called.

Finally, this aids in avoiding import cycles in tests.  Most packages likely depend on other packages you wrote aside from those being tested, so you’ll eventually run into a situation where an import cycle happens as a natural consequence.  An external package sits above both packages in the package hierarchy.   To take an example from The Go Programming Language (Chp. 11 Sec 2.4), net/url implements a URL parser which net/http imports for use.  However, net/url would like to test using a real use case by importing net/http.  Thus net/url_test was born.

Now that you’re using a separate test package you might require access to unexported entities in your package where they were previously accessible.  Most people hit this first when testing something time based (e.g. time.Now gets mocked via a function).  In this situation, we can use an additional file to expose them exclusively during testing since _test.go files are excluded from regular builds.

Something to remember

It’s important to remember that none of the methods suggested above are silver bullets.  The best solution is to always apply critical analysis to the situation and decide upon the best solution that fits the problem.

Want to learn even more Go testing techniques?

Check out these posts:

Or these videos:

Fouad Matin on October 12th 2017

It’s not hyperbole to say that Segment would not exist, if not for open source. We’re heavy users of Kafka, Redis, Terraform, Docker, Golang, and Node.js, just to name a few of the tools we use. And we literally got our start as an open source library launched on Hacker News

Today, our engineering team actively contributes to hundreds of open source repos on Github. And while it’s a good start, we wanted to do more. So six months ago, we embarked on a bit of an experiment.

Following in the footsteps of Stripe and Google, we opened a request for Open Source Fellows”

The pitch was simple: we give you $24,000 and three months to develop open source software, no strings attached. At the end, you present your results.

Today, we’re excited to announce the results of the program and the progress our fellows have made.

  • Tobias Koppers’ work on Webpack

  • Ben Weinstein’s work on DeepMeerkat

  • Justin Keyes’ work on Neovim

  • Julia Evans upcoming work on a Ruby Debugger and Profiler

For more information on the progress each participant made, read on!

Tobias Koppers

Tobias is one of the core maintainers and project lead for Webpack–a module bundler for web applications. Tens of thousands of companies (including us here at Segment!) use Webpack to bundle together their image assets, javascript files, and CSS into single ‘bundled files’.

Instead of writing various pieces of inlined javascript, Webpack is the “universal tool” to get all of your raw source code into a single set of optimized, dependency-managed files. 

Tobias started Webpack back in 2012 while working on his masters thesis. As part of his thesis, he was trying to create a simple webapp, and wanted a way to bundle his javascript together. But he couldn’t find a lot of good prior art out there. 

At that time, the frontend Javascript ecosystem looked very different. You didn’t really see require statements or dependency management. Most libraries were referenced through global variables stored on the window object (think jQuery’s window.$ object).

In the best case, you’d have an asset pipeline like Rails that injected all of your core libraries into a single HTML template.

Worst case (let’s be real, average case), you’d manage them by hand like this:

And while the odd project might be using a tool like require.js or browserify–at the time they weren’t really mainstream.

So Tobias started by looking at Medikoo’s modules-webmake to package together the dependencies in his webapp. He liked the idea of not having to worry about the dependencies themselves and instead focusing just on the core development. 

Tobias had previously used GWT’s code splitting feature, so he pull requested it as an addition to modules-webmake. But it was such a drastic change, he ended up forking the project into a new repository dubbed modules-webpack, which eventually became webpack/webpack

As part of his work over the past three months, Tobias has focused on three separate additions to Webpack: implementing properly ordered scope hoisting for modules, speeding up the core parser, and in-lining ‘pure’ modules.

The features themselves are really cool–so it’s worth digging a little bit into what’s going under the hood. 

Scope Hoisting

For a bundler like Webpack, it’s a top priority to respect the ECMAScript specification when it comes to re-writing and generating spec-compliant code. And previously, Webpack had a few issues here. 

As a quick example, suppose that we have two files, an index.js containing most of our code, and a corresponding b.js file which contains additional code. It should be possible to structure these modules like this:

index.js

b.js

While this is a contrived example, it matches the hoisting rules for ES6 functions and modules (all defined functions should be ‘hoisted’ to the scope of the module). 

You can see the fix for yourself here.

Parser Speedup

Second, Tobias turned his focus towards speeding up the core webpack parser. Since Webpack might re-build its source javascript hundreds of times locally in a given development cycle, one of the core requirements is that it has to be fast.

In particular, he implemented what he terms a StackedSetMap. The general idea is that it combines the stack-based rules of variable and functional scoping with the quick access of the maps and sets. You can think of it as a “stack” of “maps”, with each map representing the variables in-scope at a certain point. 

Instead of repeatedly copying large amounts of memory, the StackedSetMap just tracks parent references (typically another stack frame) and then searches the map of locally defined variables.

In this way, the StackedSetMap keeps both variable access and writing fast:

  • creating child scope: O(1)

  • reading a value the first time: O(n) with n is the number of parent scopes

  • reading a value the second time: O(1)

  • writing a value: O(1)

If the parser ever wants to know that a given variable is in scope, it can then query the map like so.

If you’d like to see the change in action, you can find it here.

Side-effect-free pure modules

Finally, Tobias added support for side-effect-free ‘pure’ modules. As a toy example, suppose we have the following folder structure.

Here we have an a.js, a b.js and an index.js file which exports them both. Suppose our index file looks something like this:

In most cases, the compiler or bundler will have to re-package the entire module (index.js, a.js, and b.js) to meet the spec. After all, ES7 specifies that the entire module be loaded, just in case something from a.js modifies something from b.js

However, Webpack module authors can specifically mark their fields as being "pure-module": true in their package.json

When this happens, Webpack knows that each individual file won’t have any sort of side-effects when imported directly. So it can introduce an optimization and effectively omit including the index.js file, and instead require my-module/a and my-module/b directly. 

This creates smaller builds which skip the extra set of lookups. It’s a win for everyone! And here’s the PR.

You can find Tobias’ full write up of all his improvements (and those of the full Webpack team) up on his the Webpack medium blog

Ben Weinstein

Ben differs from the majority of the applicants to the fellowship. His focus isn’t on developer tooling or infrastructure. And while he has some software engineering background, he self-identifies primarily as a field research biologist. 

Ben is a postdoc at the Oregon State University where he studies ecology. He spends his time applying statistical methods to Ecology, and is one of the world experts on Taxonomic, phylogenetic, and trait beta diversity in South American hummingbirds.

Ben tells us that the most common means for ecologists to collect data is by observing animals directly in their natural habitat.

Here’s the typical setup. There’s a stationary camera somewhere, filming a congregation spot for hummingbirds, or butterflies, even sharks. The camera films the location for fifteen days in a row (!), and then the ecologist comes and retrieves the footage. 

In his case, his camera takes a still frame several times per second, gathering 46,000 images over the course of the day.

The ecologist then combs through the video, hoping to catch a fleeting glimpse of the animal to identify it. Once they catch the handful of ‘good frames’, they’ll loop those frames again and again to properly tag the specimen. 

It’s a needle in a haystack search to find 10-15mb of “good frames” amongst nearly 25gb of noise. 

Obviously, this process is time-consuming, error-prone, and tedious. So Ben thought it was a ripe candidate for a little bit of automation.

The result is a project he calls “DeepMeerkat”. It’s designed to run on your laptop, you just feed it an animal video, and it will automatically skip empty frames, and highlight the useful/interesting ones. 

Under the hood, DeepMeerkat uses Tensorflow to build a model to help identify the animals in videos. Each video is loaded into memory, and first analyzed using OpenCV

For each frame in the video, OpenCV will filter for background vs foreground noise by using a Gaussian image filter and then draw ‘bounding boxes’ around any distinct shapes. 

If the difference of the foreground is significant, the frame will be then passed into Tensorflow to train as part of the model there. 

Ben’s been testing his model against a variety of animals and landscapes:

And he’s uploaded a number of sample videos to his youtube channel to help train his dataset against.

Over the course of the fellowship, Ben built out most of the software, and just added the ability to substitute your own training models in Tensorflow. His idea is that anyone can train their own Tensorflow model for their particular dataset–though right now he’s trained his net against ImageNet.

Additionally, Ben has started work to do the full model training in Tensorflow, rather than passing it to OpenCV. While pushing frames into Tensorflow currently is insanely slow (2-3fps), he’d like to be able to do a first “filter” with OpenCV and then actually identify the animal within Tensorflow.

Currently, everything runs locally on his laptop, but he’s been experimenting with feeding the data into Google’s Cloud Dataflow to run it in the cloud. It’s his dream that someday he can just upload a video and have it automatically tagged. 

He’s hoping in the next six months he can spread it more widely. Over the past year, he’s seen about 800 downloads of DeepMeerkat, and he’d like to start onboarding more contributors. You can find the source on his github.

Justin Keyes

Justin Keyes has been working on Neovim, a modernized fork of popular text editor: vim. If you’ve SSH’d into a unix-based server at some point in the past 10 years, you’ve probably used vim to edit your files. 

Vim itself has been around since 1991, when Bram Moolenaar ported the stevie editor from the Atari ST (which in turn, was based upon Bill Joy’s vi for Unix) to the Amiga. 

And while vim has shipped on practically every linux, unix, and mac distribution since 1991, it means that there are a lot of parts of the codebase that have been added to handle 20 years worth of quirks and inconsistencies. 

Enter Neovim–an effort to create a fork of vim that is extensible, pluggable, and hackable.

Instead of starting from scratch and then re-building all of the battle-tested functionality that vim has acquired over its 20-year history, the Neovim team started with vim itself as a foundation. But with the explicit goal of encouraging contributions, hacking, and pluggable functionality.

While Vim still just has a single named contributor (Bram Moolenaar), Neovim takes commits from hundreds of different engineers and enthusiasts.

Over the three months of the fellowship, Justin has been hard at work closing issues, merging contributions, and generally focusing on cleaning up various parts of the codebase. 

In addition, he’s been working on a significant new feature: multicursor support. The goal is to allow users to queue up multiple actions at once, and then apply them in one go. 

But when he started to build out the new feature, Justin realized that creating multi-cursor support in Neovim wasn’t actually feasible with the current codebase. 

Most projects that re-implement vim, model sets of operations as "pipelines" that produce nice, composable “commands”. These commands are then “executed”. But Neovim wasn't implemented like that. 

Instead, in Neovim, many normal-mode operations are implemented by pushing keys onto the internal input queue, as if the user had typed them. It’s clever, and preserves parity, but hard to reason about. And even harder to leverage programmatically.

So Justin started work to fix the root problem by introducing two new primitives: atoms and contexts

Atoms

Atoms are simple, they provide a combination of user-actions, rather than individual keystrokes. And as the name implies, they can be grouped and applied atomically. As an example, instead of using 3j to jump ahead three characters and treating 3  and j as different single keystrokes–this update groups them as an ‘atom’ that can be repeated re-used.

Previously, Neovim would track macros as a string buffer of chacters. A given macro might look like the raw input stream defined below:

With the multicursor work, Neovim now tracks the individual atoms:

By grouping individual commands into actions, user scripts, plugins, and remote clients can query the atoms queue at any time via the Neovim API.

For example, here's the last 3 items in the atoms queue from a sample editing session:

It's easy to see that this is useful for applications beyond multicursor, such as introspection of user editing patterns, or re-invoking the last "thing" or the Nth last "thing". Using the current implementation, the following code maps "space" to repeat the last atom (unlike dot-repeat, this repeats motions, navigation, etc.):

With that mapping, a motion like 3j or CTRL-D can be repeated by pressing "space".

Context

The context contains the current Neovim state. It allows you to serialize a given Neovim session and actions, and retrieve it via Neovim’s API. It should enable multiple frontends to read from the same Neovim state to enable functionality like remote vim UIs.

In the current implementation, you can call the nvim_get_context() API call, and it will return you the full context–no complicated parsing required:

Justin expects the work to land just after the upcoming release in 0.2.1. And it helps ensure that everything in Neovim is API first.

If you’d like to check out Neovim’s source, you can find it on Github

Julia Evans

Last, we have Julia Evans who deferred her participation until early next year. She hasn’t yet started the program, but we’re excited to help fund her next project: an new profiling tool for Ruby.

If you haven’t yet read her blog, Julia runs a wonderful set of descriptions and zines on linux performance and tracing tools. She’s written invaluable posts on everything from strace to pprof to eBPF

When Julia applied, she said she was excited to work on better profiling tools for ruby. And naturally, she’d already outlined some initial work in a post. :D

The inspiration for her work comes from using interactive tools like gdb or perf, and a desire to provide that same kind of experience for Ruby. 

Today, if you want to get a CPU profile for an arbitrary Ruby program, you… can’t. You can use tools like stackprof (which is great), but to use stackprof you need to instrument your program in advance. The initial goal of this project is to be able to get a CPU profile for any Ruby program. You can do this for C/C++ programs, Rust programs, Go programs, and Java programs… so why should Ruby have worse tooling? 

As she talks about in her post, you can interactively connect to a Ruby process with gdb. And you can even get the currently executing spot within that program using your gdb session. This excerpt is taken from her blog post:

If you only know the process number, you can still understand exactly what your program is doing at this very moment.

Yet this still isn’t great. gdb uses the ptrace system call, which causes the program to stop in its tracks and then intensely query it for its internals. It’s not really a ‘passive’ profiler at all, and won’t work in cases where your program is actually under load.  

In just a few days, Julia built a prototype in Rust which could interactively introspect the system calls, function calls, and even spy on the memory contents using the process_vm_readv call (a syscall which can directly read memory from a user-space program). 

Here’s an example of a flamegraph generated using her prototype:

It’s still has a little ways to go, and she plans to make it more portable and work quickly and reliably across any sort of Linux distribution. 

She’ll be starting early next year to work full-time on the fellowship. And during that time, she’ll be on sabbatical from her work at Stripe.

If you’d like you can check out the early code on her Github, you can find it here and her full blog post outlining the project here.

Looking Ahead

We’ve been quite pleased with the breadth and depth of open source work that has come out of the fellowship. And we’ve been impressed with what a single focused individual or small team can accomplish in as little as three months.

A number of the fellows agreed that having the funding and 3-month period allowed them to really focus on tackling ‘bigger’ projects that they wouldn’t have had time for otherwise. 

We’re hopeful that we can continue the program again next year, and encourage another batch of fellows to help spread the open source love. If you’re interested in applying next year, leave your email as part of our Open Fellowships email list and we’ll send you a reminder once applications are open.

Achille Roussel, Rick Branson on September 19th 2017

Memory management can be tricky, to say the least. However, after reading the literature, one might be led to believe that all the problems are solved: sophisticated automated systems that manage the lifecycle of memory allocation free us from these burdens. 

However, if you’ve ever tried to tune the garbage collector of a JVM program or optimized the allocation pattern of a Go codebase, you understand that this is far from a solved problem. Automated memory management helpfully rules out a large class of errors, but that’s only half the story. The hot paths of our software must be built in a way that these systems can work efficiently.

We found inspiration to share our learnings in this area while building a high-throughput service in Go called Centrifuge, which processes hundreds of thousands of events per second. Centrifuge is a critical part of Segment’s infrastructure. Consistent, predictable behavior is a requirement. Tidy, efficient, and precise use of memory is a major part of achieving this consistency.

In this post we’ll cover common patterns that lead to inefficiency and production surprises related to memory allocation as well as practical ways of blunting or eliminating these issues. We’ll focus on the key mechanics of the allocator that provide developers a way to get a handle on their memory usage.

Tools of the Trade

Our first recommendation is to avoid premature optimization. Go provides excellent profiling tools that can point directly to the allocation-heavy portions of a code base. There’s no reason to reinvent the wheel, so instead of taking readers through it here, we’ll refer to this excellent post on the official Go blog. It has a solid walkthrough of using pprof for both CPU and allocation profiling. These are the same tools that we use at Segment to find bottlenecks in our production Go code, and should be the first thing you reach for as well.

Use data to drive your optimization!

Analyzing Our Escape

Go manages memory allocation automatically. This prevents a whole class of potential bugs, but it doesn’t completely free the programmer from reasoning about the mechanics of allocation. Since Go doesn’t provide a direct way to manipulate allocation, developers must understand the rules of this system so that it can be maximized for our own benefit.

If you remember one thing from this entire post, this would be it: stack allocation is cheap and heap allocation is expensive. Now let’s dive into what that actually means.

Go allocates memory in two places: a global heap for dynamic allocations and a local stack for each goroutine. Go prefers allocation on the stack — most of the allocations within a given Go program will be on the stack. It’s cheap because it only requires two CPU instructions: one to push onto the stack for allocation, and another to release from the stack.

Unfortunately not all data can use memory allocated on the stack. Stack allocation requires that the lifetime and memory footprint of a variable can be determined at compile time. Otherwise a dynamic allocation onto the heap occurs at runtime. malloc must search for a chunk of free memory large enough to hold the new value. Later down the line, the garbage collector scans the heap for objects which are no longer referenced. It probably goes without saying that it is significantly more expensive than the two instructions used by stack allocation.

The compiler uses a technique called escape analysis to choose between these two options. The basic idea is to do the work of garbage collection at compile time. The compiler tracks the scope of variables across regions of code. It uses this data to determine which variables hold to a set of checks that prove their lifetime is entirely knowable at runtime. If the variable passes these checks, the value can be allocated on the stack. If not, it is said to escape, and must be heap allocated.

The rules for escape analysis aren’t part of the Go language specification. For Go programmers, the most straightforward way to learn about these rules is experimentation. The compiler will output the results of the escape analysis by building with go build -gcflags '-m'. Let’s look at an example:

package main

import "fmt"

func main() {         x := 42         fmt.Println(x) }

$ go build -gcflags '-m' ./main.go # command-line-arguments ./main.go:7: x escapes to heap ./main.go:7: main ... argument does not escape

See here that the variable xescapes to the heap,” which means it will be dynamically allocated on the heap at runtime. This example is a little puzzling. To human eyes, it is immediately obvious that x will not escape the main() function. The compiler output doesn’t explain why it thinks the value escapes. For more details, pass the -m option multiple times, which makes the output more verbose:

$ go build -gcflags '-m -m' ./main.go # command-line-arguments ./main.go:5: cannot inline main: non-leaf function ./main.go:7: x escapes to heap ./main.go:7:         from ... argument (arg to ...) at ./main.go:7 ./main.go:7:         from *(... argument) (indirection) at ./main.go:7 ./main.go:7:         from ... argument (passed to call[argument content escapes]) at ./main.go:7 ./main.go:7: main ... argument does not escape

Ah, yes! This shows that x escapes because it is passed to a function argument which escapes itself — more on this later.

The rules may continue to seem arbitrary at first, but after some trial and error with these tools, patterns do begin to emerge. For those short on time, here’s a list of some patterns we’ve found which typically cause variables to escape to the heap:

  • Sending pointers or values containing pointers to channels. At compile time there’s no way to know which goroutine will receive the data on a channel. Therefore the compiler cannot determine when this data will no longer be referenced.

  • Storing pointers or values containing pointers in a slice. An example of this is a type like []*string. This always causes the contents of the slice to escape. Even though the backing array of the slice may still be on the stack, the referenced data escapes to the heap.

  • Backing arrays of slices that get reallocated because an append would exceed their capacity. In cases where the initial size of a slice is known at compile time, it will begin its allocation on the stack. If this slice’s underlying storage must be expanded based on data only known at runtime, it will be allocated on the heap.

  • Calling methods on an interface type. Method calls on interface types are a dynamic dispatch — the actual concrete implementation to use is only determinable at runtime. Consider a variable r with an interface type of io.Reader. A call to r.Read(b) will cause both the value of r and the backing array of the byte slice b to escape and therefore be allocated on the heap.

In our experience these four cases are the most common sources of mysterious dynamic allocation in Go programs. Fortunately there are solutions to these problems! Next we’ll go deeper into some concrete examples of how we’ve addressed memory inefficiencies in our production software.

Some Pointers

The rule of thumb is: pointers point to data allocated on the heap. Ergo, reducing the number of pointers in a program reduces the number of heap allocations. This is not an axiom, but we’ve found it to be the common case in real-world Go programs.

It has been our experience that developers become proficient and productive in Go without understanding the performance characteristics of values versus pointers. A common hypothesis derived from intuition goes something like this: “copying values is expensive, so instead I’ll use a pointer.” However, in many cases copying a value is much less expensive than the overhead of using a pointer. “Why” you might ask?

  • The compiler generates checks when dereferencing a pointer. The purpose is to avoid memory corruption by running panic() if the pointer is nil. This is extra code that must be executed at runtime. When data is passed by value, it cannot be nil.

  • Pointers often have poor locality of reference. All of the values used within a function are collocated in memory on the stack. Locality of reference is an important aspect of efficient code. It dramatically increases the chance that a value is warm in CPU caches and reduces the risk of a miss penalty during prefetching.

  • Copying objects within a cache line is the roughly equivalent to copying a single pointer. CPUs move memory between caching layers and main memory on cache lines of constant size. On x86 this is 64 bytes. Further, Go uses a technique called Duff’s device to make common memory operations like copies very efficient.

Pointers should primarily be used to reflect ownership semantics and mutability. In practice, the use of pointers to avoid copies should be infrequent. Don’t fall into the trap of premature optimization. It’s good to develop a habit of passing data by value, only falling back to passing pointers when necessary. An extra bonus is the increased safety of eliminating nil.

Reducing the number of pointers in a program can yield another helpful result as the garbage collector will skip regions of memory that it can prove will contain no pointers. For example, regions of the heap which back slices of type []byte aren’t scanned at all. This also holds true for arrays of struct types that don’t contain any fields with pointer types.

Not only does reducing pointers result in less work for the garbage collector, it produces more cache-friendly code. Reading memory moves data from main memory into the CPU caches. Caches are finite, so some other piece of data must be evicted to make room. Evicted data may still be relevant to other portions of the program. The resulting cache thrashing can cause unexpected and sudden shifts the behavior of production services.

Digging for Pointer Gold

Reducing pointer usage often means digging into the source code of the types used to construct our programs. Our service, Centrifuge, retains a queue of failed operations to retry as a circular buffer with a set of data structures that look something like this:

type retryQueue struct {     buckets       [][]retryItem // each bucket represents a 1 second interval     currentTime   time.Time     currentOffset int }

type retryItem struct {     id   ksuid.KSUID // ID of the item to retry     time time.Time   // exact time at which the item has to be retried }

The size of the outer array in buckets is constant, but the number of items in the contained []retryItem slice will vary at runtime. The more retries, the larger these slices will grow. 

Digging into the implementation details of each field of a retryItem, we learn that KSUID is a type alias for [20]byte, which has no pointers, and therefore can be ruled out. currentOffset is an int, which is a fixed-size primitive, and can also be ruled out. Next, looking at the implementation of time.Time type[1]:

type Time struct {     sec  int64     nsec int32     loc  *Location // pointer to the time zone structure }

The time.Time struct contains an internal pointer for the loc field. Using it within the retryItem type causes the GC to chase the pointers on these structs each time it passes through this area of the heap.

We’ve found that this is a typical case of cascading effects under unexpected circumstances. During normal operation failures are uncommon. Only a small amount of memory is used to store retries. When failures suddenly spike, the number of items in the retry queue can increase by thousands per second, bringing with it a significantly increased workload for the garbage collector.

For this particular use case, the timezone information in time.Time isn’t necessary. These timestamps are kept in memory and are never serialized. Therefore these data structures can be refactored to avoid this type entirely:

type retryItem struct {     id   ksuid.KSUID     nsec uint32     sec  int64 }

func (item *retryItem) time() time.Time {     return time.Unix(item.sec, int64(item.nsec)) }

func makeRetryItem(id ksuid.KSUID, time time.Time) retryItem {     return retryItem{         id:   id,         nsec: uint32(time.Nanosecond()),         sec:  time.Unix(), }

Now the retryItem doesn’t contain any pointers. This dramatically reduces the load on the garbage collector as the entire footprint of retryItem is knowable at compile time[2].

Pass Me a Slice

Slices are fertile ground for inefficient allocation behavior in hot code paths. Unless the compiler knows the size of the slice at compile time, the backing arrays for slices (and maps!) are allocated on the heap. Let’s explore some ways to keep slices on the stack and avoid heap allocation.

Centrifuge uses MySQL intensively. Overall program efficiency depends heavily on the efficiency of the MySQL driver. After using pprof to analyze allocator behavior, we found that the code which serializes time.Time values in Go’s MySQL driver was particularly expensive.

The profiler showed a large percentage of the heap allocations were in code that serializes a time.Time value so that it can be sent over the wire to the MySQL server.

This particular code was calling the Format() method on time.Time, which returns a string. Wait, aren’t we talking about slices? Well, according to the official Go blog, a string is just a “read-only slices of bytes with a bit of extra syntactic support from the language.” Most of the same rules around allocation apply!

The profile tells us that a massive 12.38% of the allocations were occurring when running this Format method. What does Format do?

It turns out there is a much more efficient way to do the same thing that uses a common pattern across the standard library. While the Format() method is easy and convenient, code using AppendFormat() can be much easier on the allocator. Peering into the source code for the time package, we notice that all internal uses are AppendFormat() and not Format(). This is a pretty strong hint that AppendFormat() is going to yield more performant behavior.

In fact, the Format method just wraps the AppendFormat method:

func (t Time) Format(layout string) string {           const bufSize = 64           var b []byte           max := len(layout) + 10           if max < bufSize {                   var buf [bufSize]byte                   b = buf[:0]           } else {                   b = make([]byte, 0, max)           }           b = t.AppendFormat(b, layout)           return string(b) }

Most importantly, AppendFormat() gives the programmer far more control over allocation. It requires passing the slice to mutate rather than returning a string that it allocates internally like Format(). Using AppendFormat() instead of Format() allows the same operation to use a fixed-size allocation[3] and thus is eligible for stack placement.

Let’s look at the change we upstreamed to Go’s MySQL driver in this PR.

The first thing to notice is that var a [64]byte is a fixed-size array. Its size is known at compile-time and its use is scoped entirely to this function, so we can deduce that this will be allocated on the stack.

However, this type can’t be passed to AppendFormat(), which accepts type []byte. Using the a[:0] notation converts the fixed-size array to a slice type represented by b that is backed by this array. This will pass the compiler’s checks and be allocated on the stack.

Most critically, the memory that would otherwise be dynamically allocated is passed to AppendFormat(), a method which itself passes the compiler’s stack allocation checks. In the previous version, Format() is used, which contains allocations of sizes that can’t be determined at compile time and therefore do not qualify for stack allocation.

The result of this relatively small change massively reduced allocations in this code path! Similar to using the “Append pattern” in the MySQL driver, an Append() method was added to the KSUID type in this PR. Converting our hot paths to use Append() on KSUID against a fixed-size buffer instead of the String() method saved a similarly significant amount of dynamic allocation. Also noteworthy is that the strconv package has equivalent append methods for converting strings that contain numbers to numeric types.

Interface Types and You

It is fairly common knowledge that method calls on interface types are more expensive than those on struct types. Method calls on interface types are executed via dynamic dispatch. This severely limits the ability for the compiler to determine the way that code will be executed at runtime. So far we’ve largely discussed shaping code so that the compiler can understand its behavior best at compile-time. Interface types throw all of this away!

Unfortunately interface types are a very useful abstraction — they let us write more flexible code. A common case of interfaces being used in the hot path of a program is the hashing functionality provided by standard library’s hash package. The hash package defines a set of generic interfaces and provides several concrete implementations. Let’s look at an example:

package main

import (         "fmt"         "hash/fnv" )

func hashIt(in string) uint64 {         h := fnv.New64a()         h.Write([]byte(in))         out := h.Sum64()         return out }

func main() {         s := "hello"         fmt.Printf("The FNV64a hash of '%v' is '%v'\n", s, hashIt(s)) }

Building this code with escape analysis output yields the following:

./foo1.go:9:17: inlining call to fnv.New64a ./foo1.go:10:16: ([]byte)(in) escapes to heap ./foo1.go:9:17: hash.Hash64(&fnv.s·2) escapes to heap ./foo1.go:9:17: &fnv.s·2 escapes to heap ./foo1.go:9:17: moved to heap: fnv.s·2 ./foo1.go:8:24: hashIt in does not escape ./foo1.go:17:13: s escapes to heap ./foo1.go:17:59: hashIt(s) escapes to heap ./foo1.go:17:12: main ... argument does not escape

This means the hash object, input string, and the []byte representation of the input will all escape to the heap. To human eyes these variables obviously do not escape, but the interface type ties the compilers hands. And there’s no way to safely use the concrete implementations without going through the hash package’s interfaces. So what is an efficiency-concerned developer to do?

We ran into this problem when constructing Centrifuge, which performs non-cryptographic hashing on small strings in its hot paths. So we built the fasthash library as an answer. It was straightforward to build — the code that does the hard work is part of the standard library. fasthash just repackages the standard library code with an API that is usable without heap allocations.

Let’s examine the fasthash version of our test program:

package main

import (         "fmt"         "github.com/segmentio/fasthash/fnv1a" )

func hashIt(in string) uint64 {         out := fnv1a.HashString64(in)         return out }

func main() {         s := "hello"         fmt.Printf("The FNV64a hash of '%v' is '%v'\n", s, hashIt(s)) }

And the escape analysis output?

./foo2.go:9:24: hashIt in does not escape ./foo2.go:16:13: s escapes to heap ./foo2.go:16:59: hashIt(s) escapes to heap ./foo2.go:16:12: main ... argument does not escape

The only remaining escapes are due to the dynamic nature of the fmt.Printf() function. While we’d strongly prefer to use the standard library from an ergonomics perspective, in some cases it is worth the trade-off to go to such lengths for allocation efficiency.

One Weird Trick

Our final anecdote is more amusing than practical. However, it is a useful example for understanding the mechanics of the compiler’s escape analysis. When reviewing the standard library for the optimizations covered, we came across a rather curious piece of code.

// noescape hides a pointer from escape analysis.  noescape is // the identity function but escape analysis doesn't think the // output depends on the input.  noescape is inlined and currently // compiles down to zero instructions. // USE CAREFULLY! //go:nosplit func noescape(p unsafe.Pointer) unsafe.Pointer {     x := uintptr(p)     return unsafe.Pointer(x ^ 0) }

This function will hide the passed pointer from the compiler’s escape analysis functionality. What does this actually mean though? Well, let’s set up an experiment to see!

package main

import (         "unsafe" )

type Foo struct {         S *string }

func (f *Foo) String() string {         return *f.S }

type FooTrick struct {         S unsafe.Pointer }

func (f *FooTrick) String() string {         return *(*string)(f.S) }

func NewFoo(s string) Foo {         return Foo{S: &s} }

func NewFooTrick(s string) FooTrick {         return FooTrick{S: noescape(unsafe.Pointer(&s))} }

func noescape(p unsafe.Pointer) unsafe.Pointer {         x := uintptr(p)         return unsafe.Pointer(x ^ 0) }

func main() {         s := "hello"         f1 := NewFoo(s)         f2 := NewFooTrick(s)         s1 := f1.String()         s2 := f2.String() }

This code contains two implementations that perform the same task: they hold a string and return the contained string using the String() method. However, the escape analysis output from the compiler shows us that the FooTrick version does not escape!

./foo3.go:24:16: &s escapes to heap ./foo3.go:23:23: moved to heap: s ./foo3.go:27:28: NewFooTrick s does not escape ./foo3.go:28:45: NewFooTrick &s does not escape ./foo3.go:31:33: noescape p does not escape ./foo3.go:38:14: main &s does not escape ./foo3.go:39:19: main &s does not escape ./foo3.go:40:17: main f1 does not escape ./foo3.go:41:17: main f2 does not escape

These two lines are the most relevant:

./foo3.go:24:16: &s escapes to heap ./foo3.go:23:23: moved to heap: s

This is the compiler recognizing that the NewFoo() function takes a reference to the string and stores it in the struct, causing it to escape. However, no such output appears for the function takes a reference to the string and stores it in the struct, causing it to escape. However, no such output appears for the NewFooTrick() function. If the call to noescape() is removed, the escape analysis moves the data referenced by the FooTrick struct to the heap. What is happening here?NewFooTrick() function. If the call to noescape() is removed, the escape analysis moves the data referenced by the FooTrick struct to the heap. What is happening here?

func noescape(p unsafe.Pointer) unsafe.Pointer {     x := uintptr(p)     return unsafe.Pointer(x ^ 0) }

The noescape() function masks the dependency between the input argument and the return value. The compiler does not think that p escapes via x because the uintptr() produces a reference that is opaqueopaque to the compiler. The builtin uintptr type’s name may lead one to believe this is a bona fide pointer type, but from the compiler’s perspective it is just an integer that just happens to be large enough to store a pointer. The final line of code constructs and returns an unsafe.Pointer value from a seemingly arbitrary integer value. Nothing to see here folks! to the compiler. The builtin uintptr type’s name may lead one to believe this is a bona fide pointer type, but from the compiler’s perspective it is just an integer that just happens to be large enough to store a pointer. The final line of code constructs and returns an unsafe.Pointer value from a seemingly arbitrary integer value. Nothing to see here folks!

noescape() is used in dozens of functions in the runtime package that use unsafe.Pointer. It is useful in cases where the author knows for certain that data referenced by an unsafe.Pointer doesn’t escape, but the compiler naively thinks otherwise.

Just to be clear — we’re not recommending the use of such a technique. There’s a reason why the package being referenced is called unsafe and the source code contains the comment “USE CAREFULLY!”

Takeaways

Building a state-intensive Go service that must be efficient and stable under a wide range of real world conditions has been a tremendous learning experience for our team. Let’s review our key learnings:

  1. Don’t prematurely optimize! Use data to drive your optimization work.

  2. Stack allocation is cheap, heap allocation is expensive.

  3. Stack allocation is cheap, heap allocation is expensive.

  4. Understanding the rules of escape analysis allows us to write more efficient code.

  5. Pointers make stack allocation mostly infeasible.

  6. Look for APIs that provide allocation control in performance-critical sections of code.

  7. Use interface types sparingly in hot paths.

  8. Understanding the rules of escape analysis allows us to write more efficient code.

  9. Pointers make stack allocation mostly infeasible.

  10. Look for APIs that provide allocation control in performance-critical sections of code.

  11. Use interface types sparingly in hot paths.

We’ve used these relatively straightforward techniques to improve our own Go code, and hope that others find these hard-earned learnings helpful in constructing their own Go programs.

Happy coding, fellow gophers!

Notes

[1] The time.Timetime.Time struct type has changed in Go 1.9. struct type has changed in Go 1.9.

[2] You may have also noticed that we switched the order of the nsec and sec fields, the reason is that due to the alignment rules, Go would generate a 4 bytes padding after the KSUID. The nanosecond field happens to be 4 bytes so by placing it after the KSUID Go doesn’t need to add padding anymore because the fields are already aligned. This dropped the size of the data structure from 40 to 32 bytes, reducing by 20% the memory used by the retry queue.

[3] Fixed-size arrays in Go are similar to slices, but have their size encoded directly into their type signature. While most APIs accept slices and not arrays, slices can be made out of arrays!

Evan Johnson on August 28th 2017

The way companies manage application secrets is critical. Even today, improper secrets management has resulted in an astonishing number of high profile breaches.

Having internet-facing credentials is like leaving your house key under a doormat that millions of people walk over daily. Even if the secrets are hard to find, it’s a game of hide and seek that you will eventually lose.

At Segment we centrally and securely manage our secrets with AWS Parameter Store, lots of Terraform configuration, and chamber.

While tools like Vault, Credstash, and Confidant have gotten a lot of buzz recently, Parameter Store is consistently overlooked when discussing secrets management. 

After using Parameter Store for a few months, we’ve come to a separate conclusion: if you are running all of your infrastructure on AWS, and you are not using Parameter Store to manage your secrets, then you are crazy__! This post has all of the information you need to get running with Parameter Store in production.

Service Identity

Before diving into Parameter Store itself, it’s worth briefly discussing how service identity works within an AWS account.

At Segment, we run hundreds of services that communicate with one another, AWS APIs, and third party APIs. The services we run have different needs and should only have access to systems that are strictly necessary. This is called the principle of least privilege’.

As an example, our main webserver should never have access to read security audit logs. Without giving containers and services an identity, it's not possible to protect and restrict access to secrets with access control policies.

Our services identify themselves using IAM roles.  From the AWS docs:

An IAM role … is an AWS identity with permission policies that determine what the identity can and cannot do in AWS.

IAM roles can be assumed by almost anything: AWS users, running programs, lambdas, or ec2 instances. They all describe what the user or service can and cannot do. 

For example, our IAM roles for instances have write-only access to an s3 bucket for appending audit logs, but prevent deletion and reading of those logs.

How do containers get their role securely?

A requirement to using ECS is that all containers must run the EC2 Container Service Agent (ecs-agent). The agent runs as a container that orchestrates and provides an API that containers can communicate with. The agent is the central nervous system of which containers are scheduled on which instances, as well as which IAM role credentials should be provided to a given container.

In order to function properly, the ecs-agent runs an HTTP API that must be accessible to the other containers that are running in the cluster. The API itself is used for healthchecks, and injecting credentials into each container. 

To make this API available inside a container on the host, an iptables rule is set on the host instance. This iptables rule forwards traffic destined for a magic IP address to the ecs-agent container.

Before ecs-agent starts a container, it first fetches credentials for the container’s task role from the AWS credential service. The ecs-agent next sets the credentials key ID, a UUID, as the AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable inside the container when it is started. 

From inside the container, this variable looks like the following:

Using this relative URI and UUID, containers fetch AWS credentials from the ecs-agent over HTTP. 

One container cannot access the authentication credentials to impersonate another container because the UUID is sufficiently difficult to guess.

Additional security details

As heavy ECS users we did find security foot-guns associated with ECS task roles. 

First, it’s very important to realize that any container that can access the EC2 metadata service can become any other task role on the system. If you aren’t careful, it’s easy overlook that a given container can circumvent access control policies and gain access to unauthorized systems.

The two ways ways a container can access the metadata service are a) by using host networking and b) via the docker bridge.

When a container is run with --network='host', it is able to connect to the EC2 metadata service using its host’s network. Setting the ECS_ENABLE_TASK_IAM_ROLE_NETWORK_HOST variable to false in the ecs.config prevents containers from running with this privilege.

Additionally, it’s important to block access to the metadata service IP address over the docker bridge using iptables. The IAM task role documentation recommends preventing access to the EC2 metadata service with this specific rule.

The principle of least privilege is always important to keep in mind when building a security system. Setting ECS_DISABLE_PRIVILEGED to true in the host’s ecs.config can prevent privileged Docker containers from being run and causing other more nuanced security problems.

Now that we’ve established how containers establish identity and exchange secret keys, that brings us to the second part of the equation: Parameter Store.

AWS Parameter Store

Parameter Store is an AWS service that stores strings. It can store secret data and non-secret data alike. Secrets stored in parameter store are “secure strings”, and encrypted with a customer specific KMS key.

Under the hood, a service that requests secure strings from the AWS Parameter Store has a lot of things happening behind the scenes.

  1. ecs-agent requests the host instance’s temporary credentials.

  2. The ecs agent continuously generates temporary credentials for each ecs task role running on ECS, using an undocumented service called ECS ACS.

  3. When the ecs agent starts each task it sets a secret UUID in the environment of the container.

  4. When the task needs its task role credentials, it requests them from the ecs-agent API and authenticates with the secret UUID.

  5. The ecs task requests it’s secrets from the parameter store using the task role credentials.

  6. Parameter store transparently decrypts these secure strings before returning them to the ecs task.

Using roles with Parameter Store is especially nice because it doesn’t require maintaining additional authentication tokens. This would create additional headache and additional secrets to manage!

Parameter Store IAM Policies

Each role that accesses the Parameter Store requires the ssm:GetParameters permission. “SSM” stands for “Simple System Manager”, and is how AWS denotes Parameter Store operations

The ssm:GetParameters permission is the policy used to enforce access control and protect one service’s secrets from another. Segment gives all services an IAM role that grants access to secrets that match the format  {{service_name}}/*.  

In addition to the access control policies, Segment uses a dedicated AWS KMS key to encrypt secure strings within the Parameter Store. Each IAM role is granted a small set of KMS permissions in order the decrypt the secrets they store in Parameter Store.

Of course, creating all of these pieces of boilerplate by hand quickly becomes tiresome. Instead, we automate the creation and configuration of these roles via Terraform modules.  

Automating service identity and policies

Segment has a small Terraform module that abstracts away the creation of a unique IAM role, load balancers, DNS records, autoscaling, and CloudWatch alarms. Below we show how our nginx load balancer is defined using our service module.

Under the hood, the task role given to each service has all of the IAM policies we previously listed, restricting access to the parameter store by the value in the name field. No configuration required.

Additionally, developers have the option to override which secrets their service has access to by providing a “secret label”. This secret label will replace their service name in their IAM policy. If nginx were to need the same secrets as an HAProxy instance, the two services can share credentials by using the same secret label.

Parameter Store in production

All Segment employees authenticate with AWS using aws-vault, which can securely store AWS credentials in the macOS keychain or in an encrypted file for Linux users.

Segment has several AWS accounts. Engineers can interact with each account by using the aws-vault command and executing commands locally with their AWS credentials populated in their environment.

This is great for AWS APIs, but the AWS CLI leaves a bit to be desired when it comes to interacting with parameter store. For that, we use Chamber.

Using Chamber with Parameter Store

Chamber is a CLI tool that we built internally to allow developers and code to communicate with Parameter Store in a consistent manner. 

By allowing developers to use the same tools that run in production, we decrease the number of differences between code running in development with staging and production.

Chamber works out of the box with aws-vault, and has only a few key subcommands:

  • exec - a command after loading secrets in to the environment.

  • history - of changes of a secret in parameter store.

  • list - the names of all secrets in a secret namespace.

  • write - a secret to the Parameter Store.

Chamber leverages Parameter Store’s built in search and history mechanisms to implement the list and history subcommands. All strings stored in Parameter Store are automatically versioned as well, so we gain a built-in audit trail. 

The subcommand used to fetch secrets from the Parameter Store is exec. When developers use the exec subcommand they use it with aws-vault.

In the preceeding command, chamber is executed with the credentials and permissions of the employee in the development account, and it fetches the secrets associated with loadbalancers from Parameter Store. After chamber populates the environment it will run the nginx server. 

Running chamber in production

In order to populate secrets in production, chamber is packaged inside our docker containers as a binary and is set as the entrypoint of the container. Chamber will pass signals to the program it executes in order to allow the program to gracefully handle them.

Here’s a diff of what it required to make our main website chamber ready.

Non-docker containers can also use chamber to populate the environment before creating configuration files out of templates, run daemons, etc. We simply need to wrap their command with the chamber executable, and we’re off to the races.

Auditing

As a last piece of our security story, we want to make sure that every action we described above is logged and audited. Fortunately for us, all access to the AWS Parameter Store is logged with CloudTrail.

This makes keeping a full audit trail for all parameters simple and inexpensive. It also makes building custom alerting and audit logging straightforward.

CloudTrail makes it possible to determine exactly what secrets are used and can make discovering unused secrets or unauthorized access to secrets possible.

AWS logs all Parameter Store access for free as a CloudTrail management event. Most Security Information and Events Management (SIEM) solutions can be configured to watch, and read data from S3. 

Takeaways

By using Parameter Store and IAM, we were able to build a small tool that gives us all of the properties that were most important to us in a secret’s management system, with none of the management overhead. In particular, we get:

  • Protect the secrets at rest with strong encryption.

  • Enforce strong access control policies.

  • Create audit logs of authentication and access history.

  • Great developer experience.

Best of all, these features are made possible with only a little configuration and zero service management. 

Secrets management is very challenging to get right. Many products have been built to manage secrets, but none fit the use cases needed by Segment better than Parameter Store.

Daniel Fuente on July 26th 2017

Today we’re excited to open source the various pieces of our logging pipeline. We’ve released a rate-limiting-syslog proxy, a journald fanout service, and a cloudwatch logs CLI. To understand how they work in concert, read on.

If you’re running a production system which doesn’t have logs or metrics, you’re flying blind. But if your logs are getting dropped… how does one find out?

A few months ago, this started happening to us at Segment. 

It started gradually. Every few weeks, someone on the Engineering team would ping the #engineering channel and ask why they weren’t seeing log messages for a given container. 

At first we thought these missing logs were the result of ephemeral failures. 

But, soon, we’d start seeing containers that seemed to be working based on all of our metrics–but not logging a thing. No output in our log aggregators. Nothing in docker logs. No trace whatsoever that the program was running. 

Here’s the story of how we dug through our logging pipeline, got to a root cause, and open-sourced our tooling for logging with Docker, ECS, Cloudwatch, and Go

System Overview

Before diving in, it’s worth taking a cursory look at how logging works at Segment. 

Currently, we’re pushing multiple TB of log data per day into our logging pipeline. Some services log frequently with request-level information, while others contain only important audits.

In order to get good utilization, we also make heavy use of service bin-packing, often running hundreds of containers on a single host.

To get an idea of how our logging pipeline fits together, let’s first take a look at the high level architecture for our logging setup. Here’s the architecture diagram showing how logs propagate through a given host:

Containers run via the docker daemon are configured to log straight to journald. Journald is the default logger that ships with systemd and recent Ubuntu distributions (we use Xenial in production), and collects system-wide logs across our pipeline. 

From Journald, we run a daemon on the host that tails our logs and handles our fanout to different logging providers (more on that shortly).

This design covers a few core tenets:

  • structured logging – clients should log structured fields to query by, not just plain text

  • use system-wide journald – rather than buffering in memory, we log to journald (which writes directly to disk) and then tail logs from there

  • multi-destination adapter – instead of coupling to a single provider, allow multi-destination log fanout (via our open-sourced ecs-logs)

Here’s what each tenet looks like in more detail:

Structured logging

At the application level, we encourage our developers to use structured logs. To make structured logging as easy as possible, we’ve created logging helper libraries for our two main languages (Go and JS) which provide consistent formatters. 

For Go, we use  ecs-logs-go which provides a set of logrus (or events) compatible formatters and outputs. For JS, we have ecs-logs-js which provides a logger built on winston to do the same. 

By using these libraries, we ensure that logging from applications built in either language look identical.

This uniformity in log structure helps us build tooling for routing and querying logs without needing to worry about where the logs originated.

Use system-wide journald

From the application, we write our logs into journald via the journald Docker logging driver. This plays well with our structured logging approach, as journald easily allows you to do structured logging by attaching metadata to logs. 

journald logs first to a local file rather than a remote server.  That means if there’s a network partition or we lose instance connectivity, we can still reboot the instance and identify the root cause for the problem. 

Furthermore, because journald logs information from all systemd processes, the logging tooling on a host is consistent. It preserves the behavior of journalctl <service-name> as well as docker logs <container-id>, allowing us to debug once logged into a host in the case of a complex problem. 

The Docker logging driver also adds additional metadata about which specific containers logs come from, which is often very valuable debugging information. It allows us to separate the logs coming from different containers 

Multi-destination adapter

Our log forwarder (ecs-logs), tails journald on each host and forwards those logs to downstream aggregators. This forwarder is also responsible for taking unstructured logs from legacy applications, or other open source applications we run, and converting them to our structured log format.

We have experimented with a few log aggregators; internally we use Loggly, LogDNA and AWS Cloudwatch Logs. Loggly and LogDNA are good for fulfilling an indexed ‘search’ use case, where you need to answer a ‘needle in the haystack’ type query. 

Cloudwatch is great for tailing and auditing. Even if you persist logs indefinitely, it’s almost an order of magnitude cheaper than any other logging provider we’ve seen–and it integrates nicely with a variety of AWS tooling. 

Between the three of them, we’re able to handle different use cases with different retentions, from different users inside the company. 

The case of the missing logs

This setup has worked well for us, but we noticed that sometimes we'd be missing a few log lines. The problem was sporadic, but affected all services on a host when it did happen. 

After digging in, we managed to isolate the root cause: systemd-journal wasn’t keeping up with our logging throughput.

To understand what was going on, it’s worth looking at the way that journald is implemented. journald runs as a single process on each host. When set as the logging client for Docker, all logs are sent from the docker daemon directly to journald.

As we run tens or hundreds of containers on a given host, we were seeing times where a single host’s journald process would be completely CPU-bound. 

Fortunately, journald allows you to set a rate limit in its configuration.

This means that for a given process, it can send the burst number of messages in the specified interval (200,000 every minute in our configuration above).

Unfortunately for us, journald applies rate limits based on systemd service, not per process. 

To understand why this is an issue, know that we run all of our applications in Docker containers via ECS. Our systemd unit for docker looks like this:

We have one Docker process, run as a single systemd process. And that Docker process manages all other services running on a host. 

What that means is that all logs from all containers on a host come from the exact same systemd service. What’s more, these log lines are dropped silently when any process causes us to exceed the ratelimit.

With this setup, one misbehaving container on a host can cause logs from all other containers on that host to be dropped without any sort of notification whatsoever.

In order for us to be able to guarantee the same log reliability to any application running on our infrastructure, we needed a way to apply this rate limiting per container, rather than globally.

Our first thought was to apply this rate limiting at the docker logging driver level. 

However, none of the docker drivers provide this capability out of the box. We could write our own logging driver, but that would mean compiling it into docker and maintaining a fork–something we didn’t want to do (note: the latest docker version has added support for dynamically loading logging drivers, which might be a possibility in the future).

Instead, we created a small proxy (rate-limiting-log-proxy) to sit between the Docker daemon and journald. This proxy presents itself as a syslog server, so that we can use the built in Docker syslog driver to forward logs to it. 

The proxy is a tiny golang binary, with extremely low memory footprint and CPU overhead. You can grab it from github or install it straight from docker hub. 

Once the logs have been shipped to our proxy, we use the log tag (container ID), to rate limit messages from individual containers. Messages that obey the rate limit are forwarded along to journald, while messages from noisy containers will be dropped to fit within the limit.

Here’s a detailed look at the handle method. Notice that we first pull out the message tag, then look up the container by ID to pull its full info from the Docker daemon. 

After each message has been tagged, we see whether the message has exceeded the limit. If it hasn’t, we log it. The rate-limiting uses a simple token-bucket approach to drop messages. 

Now, regardless of noisy neighbors, well behaving containers will always get their logs into journald (and from there, into the rest of our logging pipeline).

What’s more, instead of silently dropping log lines like the journald ratelimit does, we wanted to give developers a much bigger ‘heads up’ of when their logs are getting dropped. 

If you’re logging too much, the proxy will now add the following lines to your log output:

It’s now much easier to understand where and why your logs may be getting dropped, as opposed to the silent failures we encountered earlier. 

Where performance is concerned, Go performs admirably (as per usual). The log-proxy is capable of handling hundreds of thousands of messages per second, all with minimal CPU and memory overhead. 

Even with hundreds of containers running on a single host, logging tens of thousands of lines per second, CPU hovers at 2-3% and memory is capped to 10mb. 

There’s still some work we could do here. From our early profiling, the majority of the CPU is being consumed by the syslog server parsing code. But this is a step which can easily be optimized.

If you’d like to use this yourself, the code is freely available on GitHub, and the docker image is runnable today via Docker Hub.

To get up and running, first run the image as a standalone syslog server, listening on a unixgram socket. 

And then run the docker config for a given service to point at the proxy server:

And voila! Rate-limited logs tagged with your service and container information!

Useful once usable

We’d successfully managed to debug where logs were being dropped, but our job wasn’t over yet. Logs aren’t actually useful until they are useable.

On that note, the biggest issues we had here were with Cloudwatch. The Cloudwatch logs UI is clunky and slow for our common use cases, and the aws-cli tool is hard to use effectively for day-to-day debugging.

To ease this process for developers, we created cwlogs, a more user-friendly CLI for accessing Cloudwatch logs. 

When using the rate-limiting-proxy and ecs-logs, cloudwatch logs are grouped by “ECS service”. Within each service, individual containers log to a given “stream”. This allows you to filter either by service-wide logs, or a particular running instance of the program. 

In turn, we designed cwlogs around our most common logging use cases:

Listing the available log streams for a given service

Fetching/tailing logs from a particular service

Fetching/tailing logs from a particular ECS Task (stream)

Formatting log output for analysis with common cli tooling (grep, jq, etc)

The formatting string passed in here is a Go template string, so you can easily tune the ‘verbosity’ of your log output (more details can be found here).  Because we have a consistent schema across all of our logs, it’s easy for our developers to construct format strings to pull out the data they need and know that it will exist.

Our cwlogs CLI also supports fuzzy “did you mean” matching of various streams in case you forget the exact name of the service you’re trying to query. 

Accessing the right logs from the terminal now only takes a few seconds. No more complicated login process, or tens of clicks to identify the log streams you’re interested in.

EOF

While all the usual advice when it comes to logging still applies (use structured logs, log straight to disk, adapt to common interchange formats, etc)–we’ve found the biggest piece still missing from most implementations is clear rate limits.

Many logging providers don’t provide service-level rate limits at all, or will silently drop messages when those limits are applied. At best, this causes confusion. At worst, it may cost you hours of debugging headaches. 

At the end of the day, the I/O on a machine is a shared resource. If you’re stacking hundreds of containers on a given host, that I/O can start to disappear quickly. By introducing rate-limiting with clear indication when it is invoked, we’ve been able to spot problematic services much more quickly, and adjust their log volume accordingly. 

If you’d like to try rate-limit-log-proxy, you can find the source on github and the image on docker hub. Same goes for the cwlogs CLI to access Cloudwatch. If you do end up kicking the tires, let us know what you think. Happy (rate-limited) logging!

Amir Abu Shareb on June 29th 2017

The single requirement of all data pipelines is that they cannot lose data. Data can usually be delayed or re-ordered–but never dropped. 

To satisfy this requirement, most distributed systems guarantee at-least-once delivery. The techniques to achieve at-least-once delivery typically amount to: “retry, retry, retry”. You never consider a message ‘delivered’ until you receive a firm acknowledgement from the consumer.

But as a user, at-least-once delivery isn’t really what I want. I want messages to be delivered once. And only once.

Unfortunately, achieving anything close to exactly-once delivery requires a bullet-proof design. Each failure case has to be carefully considered as part of the architecture–it can’t be “bolted on” to an existing implementation after the fact. And even then, it’s pretty much impossible to have messages only ever be delivered once. 

In the past three months we’ve built an entirely new de-duplication system to get as close as possible to exactly-once delivery, in the face of a wide variety of failure modes. 

The new system is able to track 100x the number of messages of the old system, with increased reliability, at a fraction of the cost. Here’s how. 

The problem

Most of Segment’s internal systems handle failures gracefully using retries, message re-delivery, locking, and two-phase commits. But, there’s one notable exception: clients that send data directly to our public API.

Clients (particularly mobile clients) have frequent network issues, where they might send data, but then miss the response from our API.

Imagine, you’re riding the bus, booking a room off your iPhone using HotelTonight. The app starts uploading usage data to Segment’s servers, but you suddenly pass through a tunnel and lose connectivity. Some of the events you’ve sent have already been processed, but the client never receives a server response. 

In these cases, clients retry and re-send the same events to Segment’s API, even though the server has technically already received those exact messages.

From our server metrics, approximately 0.6% of events that are ingested within a 4-week window are duplicate messages that we’ve already received. 

This error rate might sound insignificant. But for an e-commerce app generating billions of dollars in revenue, a 0.6% discrepancy can mean the difference between a profit and a loss of millions. 

De-duplicating our messages

So we understand the meat of the problem–we have to remove duplicate messages sent to the API. But how?

Thinking through the high-level API for any sort of dedupe system is simple. In Python (aka pseudo-pseudocode), we could represent it as the following:

For each message in our stream, we first check if we’ve seen that particular message, keyed by its id (which we assume to be unique). If we’ve seen a message before, discard it. If it’s new, we re-publish the message and commit the message atomically. 

To avoid storing all messages for all time, we keep a ‘de-duplication window’–defined as the time duration to store our keys before we expire them. As messages fall outside the window, we age them out. We want to guarantee that there exists only a single message with a given ID sent within the window.

The behavior here is easy to describe, but there are two aspects which require special attention: read/write performance and correctness.

We want our system to be able to de-duplicate the billions of events passing through our pipeline–and do so in a way that is both low-latency and cost efficient. 

What’s more, we want to ensure the information about which events we’ve seen is written durably so we can recover from a crash, and that we never produce duplicate messages in our output.

Architecture

To achieve this, we’ve created a ‘two-phase’ architecture which reads off Kafka, and de-duplicates all events coming in within a 4-week window.

The dedupe high-level architecture

Kafka topology

To understand how this works, we’ll first look at the Kafka stream topology. All incoming API calls are split up as individual messages, and read off a Kafka input topic. 

First, each incoming message is tagged with a unique messageId , generated by the client. In most cases this is a UUIDv4 (though we are considering a switch to ksuids). If a client does not supply a messageId, we’ll automatically assign one at the API layer.

We don’t use vector clocks or sequence numbers because we want to reduce the client complexity. Using UUIDs allows anyone to easily send data to our API, as almost every major language supports it.

Individual messages are logged to Kafka for durability and replay-ability. They are partitioned by messageId so that we can ensure the same messageId will always be processed by the same consumer.

This is an important piece when it comes to our data processing. Instead of searching a central database for whether we’ve seen a key amongst hundreds of billions of messages, we’re able to narrow our search space by orders of magnitude simply by routing to the right partition. 

The dedupe “worker” is a Go program which reads off the Kafka input partitions. It is responsible for reading messages, checking whether they are duplicates, and if they are new, sending them to the Kafka output topic. 

In our experience, the worker and Kafka topology are both extremely easy to manage. We no longer have a set of large Memcached instances which require failover replicas. Instead we use embedded RocksDB databases which require zero coordination, and gets us persistent storage for an extremely low cost. More on that now!

The RocksDB worker

Each worker stores a local RocksDB database on its local EBS hard drive. RocksDB is an embedded key-value store developed at Facebook, and is optimized for incredibly high performance.

Whenever an event is consumed from the input topic, the consumer queries RocksDB to determine whether we have seen that event’s messageId

If the message does not exist in RocksDB, we add the key to RocksDB and then publish the message to the Kafka output topic.  

If the message already exists in RocksDB, the worker simply will not publish it to the output topic and update the offset of the input partition, acknowledging that it has processed the message.

Performance

In order to get high performance from our database, we have to satisfy three query patterns for every event that comes through:

  1. detecting existence of random keys that come in, but likely don’t exist in our DB. These may be found anywhere within our keyspace.

  2. writing new keys at a high write throughput

  3. aging out old keys that have passed outside of our ‘de-duplication window’

In effect, we have to constantly scan the entire database, append new keys, and age out old keys. And ideally, it happens all within the same data model.

Our database has to satisfy three very separate query patterns

Generally speaking, the majority of these performance gains come from our database performance–so it’s worth understanding the internals that make RocksDB perform so well. 

RocksDB is an log-structured-merge-tree (LSM) database–meaning that it is constantly appending new keys to a write-ahead-log on disk, as well as storing the sorted keys in-memory as part of a memtable.

Keys are sorted in-memory as part of a memtable

Writing keys is an extremely fast process. New items are journaled straight to disk in append-only fashion (for immediate persistence and failure recovery), and the data entries are sorted in-memory to provide a combination of fast search and batched writes. 

Whenever enough entries have been written to the memtable, it is persisted to disk as an SSTable (sorted-string table). Since the strings have already been sorted in memory, they can be flushed directly to disk. 

The current memtable is flushed to disk as an SSTable at Level 0

Here’s an example of flushing from our production logs:

Each SSTable is immutable–once it has been created, it is never changed–which is what makes writing new keys so fast. No files need to be updated, and there is no write amplification. Instead, multiple SSTables at the same ‘level’ are merged together into a new file during an out-of-band compaction phase. 

When individual SSTables at the same level are compacted, their keys are merged together, and then the new file is promoted to the next higher level.

Looking through our production logs, we can see an example of these compaction jobs. In this case, job 41 is compacting 4 level 0 files, and merging them into a single, larger, level 1 file. 

After a compaction completes, the newly merged SSTables become the definitive set of database records, and the old SSTables are unlinked.

If we log onto a production instance, we can see this write-ahead-log being updated–as well as the individual SSTables being written, read, and merged. 

The log and the most recent SSTable dominate the I/O

If we look at the SSTable statistics from production, we can see that we have four total ‘levels’ of files, with larger and larger files found at each higher level.

RocksDB keeps indexes and bloom filters of particular SSTables stored on the SSTable itself–and these are loaded into memory. These filters and indexes are then queried to find a particular key.  and then the full SSTable is loaded into memory as part of an LRU basis. 

In the vast majority of cases, we see new messages–which makes our dedupe system the textbook use case for bloom filters. 

Bloom filters will tell us whether a key is ‘possibly in the set’, or ‘definitely not in the set’. To do this, the bloom filter keeps set bits for various hash functions for any elements which have been seen. If all the bits for a hash function are set, the filter will return that the message is ‘possibly in the set’.

Querying for w in our bloom filter, when our set contains {x, y, z}. Our bloom filter will return ‘not in set’ as one of the bits is not set.

If the response is ‘possibly in the set’, then RocksDB can query the raw data from our SSTables to determine whether the item actually exists within the set. But in most cases, we can avoid querying any SSTables whatsoever, since the filter will return a ‘definitely not in the set’ response. 

When we query RocksDB, we issue a MultiGet for all of the relevant messageIds  we’d like to query. We issue these as part of a batch for performance, and to avoid many concurrent locking operations. It also allows us to batch the data coming from Kafka and generally avoid random writes in favor of sequential ones. 

This answers the question of how the read/write workload gets good performance–but there’s still the question of how stale data is aged out. 

Deletion: size-bound, not time-bound

With our de-dupe process, we had to decide whether to limit our system to a strict ‘de-duplication window’, or by the total database size on disk.

To avoid the system falling over suddenly and de-dupe collection for all customers, we decided to limit by size rather than limit to a set time window. This allows us to set a max size for each RocksDB instance, and deal with sudden spikes or increases in load. The side-effect is that this can lower the de-duplication window to under 24 hours, at which point it will page our on-call engineer. 

We periodically age out old keys from RocksDB to keep it from growing to an unbounded size. To do this, we keep a secondary index of the keys based upon sequence number, so that we can delete the oldest received keys first.  

Rather than using the RocksDB TTL, which would require that we keep a fixed TTL when opening the database–we instead delete objects ourselves using the sequence number for each inserted key.

Because the sequence number is stored as a secondary index, we can query for it quickly, and ‘mark’ it as being deleted. Here’s our deletion function, when passed a sequence number. 

To continue ensuring write speed, RocksDB doesn’t immediately go back and delete a key (remember these SSTables are immutable!). Instead, RocksDB will append a ‘tombstone’ which then gets removed as part of the compaction process. Thus, we can age out quickly with sequential writes, and avoid thrashing our memory by removing old items.

Ensuring Correctness

We’ve now discussed how we ensure speed, scale, and low-cost searching across billions of messages. The last remaining piece is how we ensure correctness of the data in various failure modes. 

EBS-snapshots and attachments

To ensure that our RocksDB instances are not corrupted by a bad code push or underlying EBS outage, we take periodic snapshots of each of our hard drives. While EBS is already replicated under the hood, this step guards against the database becoming corrupted from some underlying mechanism. 

If we need to cycle an instance–the consumer can be paused, and the associated EBS drive detached and then re-attached to the new instance. So long as we keep the partition ID the same, re-assigning the disk is a fairly painless process that still guarantees correctness. 

In the case of a worker crash, we rely on RocksDB’s built-in write-ahead-log to ensure that we don’t lose messages. Messages are not committed from the input topic unless we have a guarantee that RocksDB has persisted the message in the log. 

Reading the output topic

You may notice that up until this point, that there is no ‘atomic’ step here which allows us to ensure that we’ve delivered messages just once. It’s possible that our worker could crash at any point: writing to RocksDB, publishing to the output topic, or acknowledging the input messages. 

We need a ‘commit’ point that is atomic–and ensures that it covers the transaction for all of these separate systems. We need some “source of truth” for our data. 

That’s where reading from the output topic comes in. 

If the dedupe worker crashes for any reason or encounters an error from Kafka, when it re-starts it will first consult the “source of truth” for whether an event was published: the output topic

If a message was found in the output topic, but not RocksDB (or vice-versa) the dedupe worker will make the necessary repairs to keep the database and RocksDB in-sync. In essence, we’re using the output topic as both our write-ahead-log, and our end source of truth, with RocksDB checkpointing and verifying it. 

In Production

We’ve now been running our de-dupe system in production for 3 months, and are incredibly pleased with the results. By the numbers, we have:

  • 1.5 TB worth of keys stored on disk in RocksDB

  • a 4-week window of de-duplication before aging out old keys

  • approximately 60B keys stored inside our RocksDB instances

  • 200B messages passed through the dedupe system

The system has generally been fast, efficient, and fault tolerant–as well as extremely easy to reason about. 

In particular, the our v2 system has a number of advantages over our old de-duplication system. 

Previously we stored all of our keys in Memcached and used Memcached’s atomic CAS (check-and-set) operator to set keys if they didn’t exist. Memcached served as the commit point and ‘atomicity’ for publishing keys. 

While this worked well enough, it required a large amount of memory to fit all of our keys. Furthermore, we had to decide between accepting the occasional Memcached failures, or doubling our spend with high-memory failover replicas. 

The Kafka/RocksDB approach allows us to get almost all of the benefits of the old system, with increased reliability. To sum up the biggest wins:

Data stored on disk: keeping a full set of keys or full indexing in-memory was prohibitively expensive. By moving more of the data to disk, and leveraging various level of files and indexes, we were able to cut the cost of our bookkeeping by a wide margin. We are able to push the failover to cold storage (EBS) rather than running additional hot failover instances. 

Partitioning: of course, in order to narrow our search space and avoid loading too many indexes in memory, we need a guarantee that certain messages are routed to the right workers. Partitioning upstream in Kafka allows us to consistently route these messages so we can cache and query much more efficiently. 

Explicit age-out: with Memcached, we would set a TTL on each key to age them out, and then rely on the Memcached process to handle evictions. This caused us exhaust our memory in the face of large batches of data, and spike the Memcached CPU in the face of a large number of evictions. By having the client handle key deletion, we’re able to fail gracefully by shortening our ‘window of deduplication’. 

Kafka as the source of truth: to truly avoid de-duplication in the face of multiple commit points, we have to use a source of truth that’s common to all of our downstream consumers. Using Kafka as that ‘source of truth’ has worked amazingly well. In the case of most failures (aside from Kafka failures), messages will either be written to Kafka, or they wont. And using Kafka ensures that published messages are delivered in-order, and replicated on-disk across multiple machines, without needing to keep much data in memory. 

Batching reads and writes: by making batched I/O calls to Kafka and RocksDB, we’re able to get much better performance by leveraging sequential reads and writes. Instead of the random access we had before with Memcached, we’re able to achieve much better throughput by leaning into our disk performance, and keeping only the indexes in memory. 

Overall, we’ve been quite happy with the guarantees provided by the de-duplication system we’ve built. Using Kafka and RocksDB as the primitives for streaming applications has started to become more and more the norm. And we’re excited to continue building atop these primitives to build new distributed applications. 


Thanks to Rick Branson, Calvin French-Owen, Fouad Matin, Peter Reinhardt, Albert Strasheim, Josh Ma and Alan Braithwaite for providing feedback around this post.

Rick Branson on June 7th 2017

Today we’re releasing ksuid, a Golang library for unique ID generation. It borrows core ideas from the ubiquitous UUID standard, adding time-based ordering and more friendly representation formats. In doing the research that went into this library, we uncovered a compelling story that we wanted to share with a larger audience.

Ever since two or more machines found themselves exchanging information on a network, they’ve needed a way to uniquely identify things.

The first networks that resemble our contemporary ideas began with the construction of the first telephone exchanges in 1870s. Before this crucial tipping point, telecom wires were entirely point-to-point links. While amazing at the time, they were expensive, inflexible, and unreliable. It even resulted in an almost comical explosion of copper lines zig-zagging above the roads of major cities.

While telegraphs were mainly used for important governmental and business communications, telephones were an extreme luxury. Compared to the speed of the telegraph, tying up an expensive copper line for a quick chat was incredibly frivolous. The key invention that brought telecommunications to the masses was the switchboard, enabling the creation of exchanges, and in turn vastly increased the utility of these lines. With it brought the first unique identifier in a network: the telephone number.

Fast forward many decades later to the advent of the networked computer. Suddenly the granularity of things had to become orders of magnitude finer. 

Until this tipping point, data sent across telecom wires had been ephemeral — the networks were just a conduit. Now, it became routine to store and retrieve data on demand, and thus the titanic explosion of data that has inundated the world ever since. Given these new capabilities, the things on a network shifted from physical machines to logical pieces of data.

These networks needed a way to uniquely address these pieces of data. The old systems of central control during the telecom age just wouldn’t scale. This is a mathematical inevitability as a network’s capacity for storage and retrieval increases linearly with size. This scale also brings with it a bit of chaos — failures and machine ephemerality move from a yak shaving problem to the routine. Data no longer lives in one place, it flows freely across the network.

A Networking Event for Computing

This brings us to the 1980s. At this point in time, using a computer to share data actually meant sharing a physical computer. Institutions exchanged information using minicomputers and powerful mainframes with hundreds or thousands of dumb terminals. 

In other words, data was colocated with computation. While PCs had revolutionized computing, they lacked networking capabilities, and therefore were very fancy calculators. 

Founded in 1980, Apollo Computer was one of the first companies to enter the nascent workstation market. Workstations were really the first networked computers. It sounds kind of ridiculous to use this term, but worth stating that at this time most of the networking technology we take for granted had yet to blossom. And in stark contrast to the mainframe world, data and compute was distributed across many interconnected computers. Thus the idea of distributed computing entered the mainstream.

Like its contemporary, Sun Microsystems, Apollo was truly full-stack. Everything had to be built from scratch as the hardware and software of that era was not designed for the use cases they imagined. The asynchrony of networks and the demanding nature of these tasks required vastly more capable computers. Multi-tasking, security controls, networking, and mass storage were all too expensive or impractical to include in PCs at the time. However, they were considered table stakes for the vision of the workstation.

Despite an impressive technological boom in the workstation market, all of these vendors ran into the same road block: few developers knew anything about networks. In order to make a business case for their pricey workstations, they needed a programming environment. Developers needed a way to easily build applications that could fully exploit the networking capabilities of their respective products.

Apollo’s answer was the their Network Computing System (NCS). NCS borrowed ideas from object-oriented programming and was built around Remote Procedure Calls (RPC). While now mostly obsolete, this approach achieved the end result Apollo was hoping for: any developer knew how to call a function, and object-oriented was the programming paradigm du jour

In an article published in Network World in 1989 about RPC, one MIS Director at Burlington Coat Factory made a particularly salient observation: “It takes a good programmer only a day or so to learn how to build distributed applications using RPCs.” Cha-ching. That year Apollo sold to Hewlett-Packard for a whopping $476 million USD, almost a billion dollars when adjusted for inflation.

Things (objects, interfaces, operations [methods], etc), or “entities” in NCS terminology, all needed unique identities to be addressed in a networked environment. In the standard Von Neumann architecture this is trivial: the memory or mass storage address easily serves this purpose. In a distributed computing model which allows many computers to operate independently, this becomes non-trivial. The scale of their use case meant that coordination across the network was off the table — it was just too slow and prone to failure. 

NCS introduced the concept of the UID (Universal IDentifier), which served as the unique primary identity for entities. UIDs are 64-bit numbers that combine a monotonic clock with a unique host ID permanently embedded in the hardware of all of their workstations. Under this scheme, identifiers could be generated thousands of times per second at each host and remain globally unique for all time with no scaling bottleneck. The only point of coordination was at Apollo’s factories — where the machines were permanently branded with their respective identifiers.

The First UUID

When Apollo began to approach standardizing the principles of NCS as the Network Computing Architecture (NCA), it became clear that the existing UID design was insufficient. Apollo wanted all the workstation vendors to standardize on NCA, and while they all embedded host IDs in their workstations, the bit size varied from vendor to vendor. 

Apollo used 20-bits, good enough for around a million machines. While laughable at today’s scale, to run out, Apollo would have to sell more than $10 billion worth of hardware to a market that was an order of magnitude smaller.

NCA introduced UUIDs, which built on the UID design, but accommodated a broader range of vendors by extending the number space to 128-bits. Thus the UUID was born. This concept was so useful that even after NCA became a distant memory and RPC fell out of fashion, the UUID remained popular, eventually being standardized by ISO, IETF, and ITU.

Readers somewhat familiar with UUIDs will recognize that their contents were quite a bit different than the popular UUID Version 4 that is most used today. NCA UUIDs had a 48-bit timestamp, 16 reserved bits, an 8-bit indicator for network address family, and a 56-bit host ID. All told, pretty similar in concept to the Version 1 UUIDs defined in the current-day IETF standard.

All of this history sparked my curiosity about the implementations, and fortunately was able to dig up fragments of the original Apollo NCS source code online. If you’re like me, looking at source code that is many-decades old usually makes for a good time. The first peculiar thing I noticed about this code was the use of the dollar-sign ($) in identifiers like variable and function names.

It turns out that NCS uses a language called “Domain C” introduced by Apollo as part of their “Domain/OS” operating system. Courtesy of Bitsavers, I was able to find a reference manual from 1988 in PDF format. Domain C extends ANSI C in several ways, most importantly here that it allows $ to appear after the first character in any identifier.

In contemporary times, dollar-signs are used as syntax for variables in uncool programming languages, currency in economics, and to adorn the name of self-aggrandi$ing musicians. To understand their actual purpose in the now extinct world of Apollo Computer, it took digging through the trace amounts of code and documentation that still remains. 

After poring through what could be found, a rather anti-climactic conclusion is reached. While not explicitly stated, it seems that this is just a coding convention. Whatever comes before _$ identifies a particular module. The _$t indicates the “default type” such as the uuid_$t used above. It also appears it was useful as a way to easily pick out which identifiers were part of libraries that followed Apollo’s programming style. It was a bit disorienting that Apollo would make extensions to C just to suit a particular coding style.

I digress.

NCA UUIDs became the basis for the standardized Version 1 UUIDs. Reiterating from before: they included a high-precision timestamp and the hardware-based unique host identifier. It almost goes without saying that system clocks can’t be used to reliably generate unique sequence numbers as clock skew or even chance can cause repeated timestamps. Apollo addressed this by using a global file (literally /tmp/last_uuid) to coordinate across processes.

The file was necessary globally-writable by any user. While not particularly secure, Apollo sold end-user workstations used on somewhat trusted networks, so it was at least a somewhat reasonable decision. This technique carried forward in the IETF specification for UUIDs:

The source code for a current-day implementation of DCE somewhat surprisingly comes from Apple. They appear to use it mostly to communicate with Microsoft systems like Active Directory and Windows-based File Servers. This implementation, which bears the Open Software Foundation copyright, places the actual working stable store behind a preprocessor flag called UUID_NONVOLATILE_CLOCK.

I couldn’t find any code on the Internet that actually implements the non-volatile clock for DCE RPC’s UUID generation. However, libuuid, included in the package repositories of most Linux distributions, does include a non-volatile UUID clock implementation that can be inspected. Similar to NCS, it uses a file for monotonicity, but places it at a more sensible /var/lib/libuuid/clock.txt. It does attempt to manage the permissions in a slightly more sane way, but the same security caveats apply.

Both the NCS and the libuuid implementations spin to acquire a lock on the state file. This is fertile ground for some really nasty problems.

What libuuid does bring to the table is a daemon somewhat confusingly named uuidd which attempts to bring some safety to the table. uuidd can make a strong guarantee if one abides by its rules. Combined with the assumed uniqueness of Ethernet MAC addresses, this delivers a pretty strong guarantee within a given distributed system.

In practice, however, this is all quite a bit to ask. The file-based synchronization has a large number of problematic failure cases. The daemon-based solution is better, but it never really caught on. It is exceedingly rare to use a system that comes with it configured out-of-the-box.

It also turns out that MAC addresses are not actually globally unique as they can be user-modified. Their inclusion in UUIDs also poses a threat to privacy and security. Given their opaque nature, developers tend to not realize that UUIDs could come with machine-identifying information. The creator of the Melissa virus that impacted Windows in the late 90s was identified using the MAC address from a UUID found in the virus’ code. As the untrustworthy Internet became the dominant networking platform, UUID generation which depended on trust became obsolete. All of these concerns have lead most to abandon leveraging hardware identifiers in UUIDs.

In fact, the default paths in libuuid avoid time-based UUIDs on any system which provides a pseudo-random number generating block device at /dev/(u?)random, which has been available on popular UNIX variants since the 1990s. This has been a factor in the rise of UUID Version 4, which contains only random data: 122-bits of it. The simplicity of implementation has driven its ubiquity.

When Worlds Collide

When I first came across these random Version 4 UUIDs, the threat of a collision was concerning. While UUIDs should not be used in such a way that collisions would create a security threat, as a developer I would like some level of confidence that my own systems aren’t going to trip over themselves. The bad news is that UUID generation still requires some small amount of trust.

The most important aspect of collision safety is the source of entropy. Consider two common cases: a modern version of Linux deployed into a trusted cloud computing environment, and an untrusted mobile device. In the Linux on cloud case, we’re provided with a cryptographically secure Pseudorandom Number Generator (PRNG) in the form of /dev/urandom. This is the “cryptographer-approved” and non-blocking source of entropy. It blends several sources like the “noise” generated by hardware interrupts and I/O activity metadata with a cryptographic function.

However, on a mobile device, almost anything goes: mobile devices cannot be trusted. While most of these are just as good as what’s available in the scenario above, it’s routine that the PRNG source on these devices isn’t very random at all. Given that there’s no way to certify the quality of these, it’s a big gamble to bet on mobile PRNGs. ID generation on low-trust mobile devices is an interesting and active area of academic research[1].

Even in the environment with a trustworthy PRNG device, implementation bugs can lead to collisions. In one particular case, an obscure bug in how OpenSSL deals with process forking lead to a high collision rate for a pure PHP UUID library. This might sound a bit overboard, but it’s probably worth testing your UUID implementation for obvious collision-creating bugs. It’s more common than you might think. At the system level, dieharder is one of many well-regarded tools for analyzing the quality of a system’s PRNG.

Given the proper environment, there’s a vanishingly low risk of collision, bordering on the infeasible. To be able to depend on this uniqueness, a large enough number space must be used to make it much more likely that other extremely uncommon events will occur far sooner than a collision.

With 122 random bits, it would take petabyte-sized pile of UUIDs (2^46) to even bring the chance of a collision into the realm of feasibility, around 1 in 50 billion. It would take billions of petabytes of UUIDs to make it likely.

The threat of an implementation bug or misconfiguration is vastly more important than the threat of random collision. Those concerned with UUID collision in a properly-configured system would find their time better spent pondering far more probable events like solar flares, thermonuclear war, and alien invasion on their systems. Just make sure your systems are property-configured.

Time Is On Our Side

In some cases the timestamp component of UUID Version 1 is actually quite useful. The first time I ran into this was in Apache Cassandra, where they’re called “TimeUUID.” In Cassandra, TimeUUIDs are sortable by timestamp, quite useful when needing to roughly order by time. The implementation swaps some of the random bits with a timestamp and a host identifier. The host ID is derived from the node’s IP addresses, which also form the unique identifier in a Cassandra cluster. 

The implementation has suffered from weaknesses when trying to compromise the uniqueness in the face of clock skew (see CASSANDRA-11991). More importantly, host-identifiable information is embedded in the UUIDs, which, if we’ve learned anything in the past, is not a great idea. Even if these IDs are derived from local network addresses, security best practices discourage actively exposing this information to the outside world, even indirectly.

Flakey Friends

The ability to sort IDs by time was largely the motivation behind Twitter’s Snowflake, which largely popularized the concept of k-ordering by timestamp[2]. Twitter needed a way to sort piles of arbitrary tweets by creation time without global coordination. Embedding a timestamp in the ID provides this functionality without the overhead of an additional timestamp field.

K-ordering is a more precise way of saying roughly sorted. In Snowflake, a large amount of the design was driven by the need to fit these IDs into a 64-bit number space. This includes requiring dedicated ID-generation servers that use a separate strong coordination mechanism (ZooKeeper) to assign host IDs and store sequence checkpoints.

Inspired by Snowflake, the team at Boundary released Flake in early 2012. It also uses dedicated ID-generation server processes, but does not require a strong coordination mechanism. Flake is similar to UUID Version 1 in that it uses a much larger 128-bit number space and a 48-bit host identifier derived from the hardware address to protect against overlap in a distributed environment.

It primarily differs from UUID Version 1 in that it’s structured for lexicographic ordering. The bits of a Flake ID are arranged in such a way that users can expect that they will be ordered by their timestamp regardless of where they’re written. In contrast, Cassandra must implement specific collation logic to get the same behavior from their TimeUUID. 

Unfortunately it appears that Flake ID can expose host identification information to end users as this information is embedded in the generated IDs. While the implementation provided defends against clock skew situations, their uniqueness property does depend heavily on the forward movement of the wall clock.

One noteworthy feature included in Flake is base62 encoding, which provides a much more “portable” representation than UUID. The string representation of UUIDs is one of it’s weaker features. This may seem trivial, but the inclusion of the dash (-) character makes them less usable. An example of this is when UUIDs are indexed by a search engine, where the dashes will likely be interpreted as token delimiters. The base62 encoding avoids this pitfall and retains the lexicographic ordering properties of the binary encoding.

The Best of Both Worlds

When implementing an internal system at Segment, the team began using UUID Version 4 for generating unique identifiers. It was simple and required no additional dependencies.

After a few weeks, a requirement to order these identifiers by time emerged. This requirement wasn’t strict: the initial purpose was to enable log archiving to Amazon S3 where they are keyed by ranges of message identifiers. The existing UUIDs would have resulted in random dispersion of messages with no natural grouping property. However, if we could take advantage of the arrow of time, it would result in a natural grouping and a feasible number of objects in S3.

Thus KSUID was born. KSUID is an abbreviation for K-Sortable Unique IDentifier. It combines the simplicity and security of UUID Version 4 with the lexicographic k-ordering properties of Flake. KSUID makes some trade-offs to achieve these goals, but we believe these to be reasonable for both our use cases and many others out there.

KSUIDs are larger than UUIDs and Flake IDs, weighing in at 160 bits. They consist of a 32-bit timestamp and a 128-bit randomly generated payload. The uniqueness property does not depend on any host-identifiable information or the wall clock. Instead it depends on the improbability of random collisions in such a large number space, just like UUID Version 4. To reduce implementation complexity, the 122-bits of UUID Version 4 are rounded up to 128-bits, making it 64-times more collision resistant as a bonus, even when the additional 32-bit timestamp is not taken into account.

The timestamp provides 1-second resolution, which we found to be acceptable for a broad range of use cases. If a higher resolution timestamp is desired, payload bits can be traded for more timestamp bits. While high-resolution timestamp support is not included in our implementation, it is backwards compatible. Any implementation which uses 32-bit timestamps can safely work with KSUIDs that use higher resolution timestamps. 

A “custom” epoch is used that ensures >100 years of useful life. The epoch offset (14e8) was also chosen to be easily remembered and quickly singled out out by human eyes.

KSUID provides two fixed-length encodings: a 20-byte binary encoding and a 27-character base62 encoding. The lexicographic ordering property is provided by encoding the timestamp using big endian byte ordering. The base62 encoding is tailored to map to the lexicographic ordering of characters in terms of their ASCII order.

Fixed-length encodings result in simpler and safer implementations. As a small bonus they are sometimes more efficient, such as in SQL databases, where variable-length data types result in additional storage overhead. Regardless of the chosen format, KSUIDs can be lexicographically ordered by time. The string representation is entirely alphanumeric, thus avoids the problem of tokenized dashes in UUIDs.

Our Implementation

Today we’re open sourcing our KSUID implementation, which is written in Go. It implements popular idiomatic interfaces, making it easy to integrate into an existing codebase and use with other Go libraries. It comes packaged with a command-line tool for generating and inspecting KSUIDs.

Acknowledgements

This post would not have been possible without the efforts of Bitsavers, which gathered and archived the source material related to Apollo Computing. Thanks to Albert Strasheim, Calvin French-Owen, Evan Johnson, Peter Reinhardt, and Tido Carriero for their insightful comments and feedback.

References

[1] P. Jesus, C. Baquero, and P. Almeida: ID Generation in Mobile Environments (2006) [2] T. Altman, Y. Igarashi: Roughly sorting: sequential and parallel approach (1989)


See what Segment is all about

Since you've made it this far, perhaps you want to check out Segment? Sign up for a free workspace here or a get a demo here 👉

Become a data expert.

Get the latest articles on all things data, product, and growth delivered straight to your inbox.