How we 30x'd our Node parallelism


Updated on December 13, 2019

What's the best way to safely increase parallelism in a production Node service? That's a question my team needed to answer a couple of months ago.

We were running 4,000 Node containers (or "workers") for our bank integration service. The service was originally designed such that each worker would process only a single request at a time. This design lessened the impact of integrations that accidentally blocked the event loop, and allowed us to ignore the variability in resource usage across different integrations. But since our total capacity was capped at 4,000 concurrent requests, the system did not gracefully scale. Most requests were network-bound, so we could improve our capacity and costs if we could just figure out how to increase parallelism safely.

In our research, we couldn't find a good playbook for going from "no parallelism" to "lots of parallelism" in a Node service. So we put together our own plan, which relied on careful planning, good tooling and observability, and a healthy dose of debugging. In the end, we were able to 30x our parallelism, which equated to a cost savings of about $300k annually. This post will outline how we increased the performance and efficiency of our Node workers and describe the lessons that we learned in the process.

Why we invested in parallelism

It might be surprising that we got this far without using parallelism – how did we do it? Only 10% of Plaid's data pulls involve a user who is present and linking their account to an app. The rest are periodic transaction updates that occur without the user present. By adding logic in our load balancing layer to prioritize user-present requests over transaction updates, we could handle API spikes of 1,000% or more at the expense of transaction freshness.

While this stopgap approach had worked for a long time, there were several pain points that we knew would eventually affect our service reliability:

  • API request spikes from our clients had gotten larger and larger, and we were worried about a coordinated traffic spike exhausting our worker capacity.
  • Latency spikes on bank requests were similarly causing our worker capacity to decrease. Due to the variability of bank infrastructure, we set conservative timeouts on our outbound requests, and a full data pull can take several minutes. If a large bank's latency spiked, more and more workers would be stuck waiting on responses.
  • ECS deployment times had become painfully slow, and even though we improved our deployment speed, we didn't have a desire to further increase our cluster size.

We decided that increasing parallelism was the best way to remove application bottlenecks and improve our service reliability. As side effects, we believed that we could reduce infrastructure costs and implement better observability and monitoring, both of which would pay dividends in the future.

How we rolled out changes reliably

Tooling and observability

We have a custom load balancer that routes requests to our Node workers. Each Node worker runs a gRPC server to handle requests and uses Redis to advertise its availability back to the load balancer. This means that adding parallelism is as simple as changing a few lines of logic: a worker should continue to advertise availability until it is processing N in-flight requests (each its own Promise), instead of reporting unavailable as soon as it picks up a task.

It's not that simple, though. Our primary goal during any rollout is to maintain reliability, and we can't just YOLO a parallelism increase. We expected this rollout to be especially risky: it would affect our CPU usage, memory usage, and task latency in hard-to-predict ways. Since Node’s V8 runtime processes tasks on an event loop, our main concern was that we might do too much work on the event loop and reduce our throughput.

To mitigate these risks, we made sure that the following tooling and observability was in place before the first parallel worker was ever deployed to production:

  • Our existing ELK stack had enough existing logging for ad-hoc investigations.
  • We added several Prometheus metrics, including:
    • V8 heap size using process.memoryUsage()
    • Garbage collection stats using the gc-stats package
    • Task latency statistics, grouped by the type of bank integration and level of parallelism, to reliably measure how parallelism affected our throughput.
  • We created Grafana dashboards to measure the effects of parallelism.
  • Changing application behavior without having to re-deploy our service was critical, so we created a set of LaunchDarkly feature flags to control various parameters. Tuning the maximum parallelism per worker this way eventually allowed us to rapidly iterate and find the best parallelism value – in a span of minutes.
  • To learn about the distribution of CPU time spent in various parts of the application, we built flamegraphs into our production service.
    • We used the 0x package because the Node instrumentation was easy to integrate into our service, and the resulting HTML visualization was searchable and provided a good level of detail.
    • We added a profiling mode where a worker would start with 0x enabled and write the resulting traces to S3 on exit. We could then download these logs from S3 and view them locally with 0x --visualize-only ./flamegraph
    • We only ran profiling for a single worker at a time. CPU profiling increases resource utilization and degrades performance, and we wanted to isolate this impact to a single worker.

Starting the rollout

After this preliminary work was done, we created a new ECS cluster for our "parallel workers". These are workers that use LaunchDarkly feature flags to dynamically set their maximum parallelism.

Our rollout plan involved gradually routing increasing amounts of traffic from the old cluster to the new cluster while closely monitoring the performance of the new cluster. At each traffic level, we would adjust the parallelism on each worker until it was as high as possible without causing any degradation in task latency or other metrics. If we saw issues, we could dynamically route traffic back to the old cluster in a matter of seconds.

As expected, several challenges presented themselves along the way. We needed to investigate and fix these tricky issues in order to meaningfully increase our parallelism. This is where the real fun started!

Deploy, investigate, repeat

Increasing Node’s max heap size

As we began the rollout process, we started getting alerts that our tasks had exited with a non-zero code – an auspicious start. We dug into Kibana and found the associated log:


FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - Javascript heap out of memory
 1: node::Abort()
 2: node::FatalException(v8::Isolate*, v8::Local, v8::Local)
 3: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool)
 4: v8::internal::Factory::NewFixedArray(int, v8::internal::PretenureFlag)

This was reminiscent of memory leaks we had experienced in the past, where V8 would exit unexpectedly after spitting out a similar error message. It made a lot of sense: greater task parallelism leads to higher memory usage.

We hypothesized that increasing the Node maximum heap size from the default 1.7GB may help. To solve this problem, we started running Node with the max heap size set to 6GB (--max-old-space-size=6144 command-line flag), which was an arbitrary higher value that still fit within our EC2 instances. To our delight, this fixed the "allocation failed" messages in production.

Identifying a memory bottleneck

With our memory allocation problem solved, we began seeing poor task throughput on our parallel workers. One of the graphs on our dashboard immediately stood out. This was the per-process heap usage of our parallel workers:

Some of these lines continually grow until they plateau at the maximum heap size – bad news!

We used our system metrics in Prometheus to rule out file descriptor or network socket leaks as the root cause. Our best guess was that GC wasn’t happening frequently enough on old objects, causing the worker to accumulate more allocated objects as it processed more tasks. We hypothesized that this would decrease our throughput as follows:

  • Worker receives new task and performs some work
  • While performing its task, some objects are allocated on the heap
  • Due to an (as-yet undetermined) fire-and-forget operation not completing, object references are maintained after the task completes
  • Garbage collection becomes slower because V8 needs to scan through more objects on the heap
  • Since V8 implements a stop-the-world GC, new tasks will inevitably receive less CPU time, reducing the worker’s throughput

We searched through our code for fire-and-forget operations, also known as "floating promises". This was easy: we just looked for lines of code where our no-floating-promises linter rule was turned off. One method in particular caught our eye. It spawned a call to compressAndUploadDebuggingPayload without awaiting the result. This call seemed like it could easily continue running long after the task had completed.

const postTaskDebugging = async (data: TypedData) => {
	const payload = await generateDebuggingPayload(data);
    
    // Don't await this operation since it's not required
    // for generating a response.
    // tslint:disable-next-line:no-floating-promises
    compressAndUploadDebuggingPayload(payload)
    	.catch((err) => logger.error('failed to upload data', err));
}

We wanted to test our hypothesis that these floating promises were the main source of bottlenecks. Suppose we skipped these calls, which were not crucial to the correctness of our system — would task latencies improve? Here's what our heap usage looked like after temporarily removing calls to postTaskDebugging.

Bingo! Heap usage on our parallel workers now remains stable for an extended period of time.

There seemed to be a “backlog” of compressAndUploadDebuggingPayload calls that slowly built up as the associated tasks finished. If a worker received tasks faster than it was able to prune this backlog, then any objects that were being allocated in memory would never be garbage collected, leading to the heap saturation we observed in the previous graph.

We began to wonder what made these floating promises so slow. We weren’t willing to permanently remove compressAndUploadDebuggingPayload from our code, because it was crucial to helping engineers debug production tasks on their local machines. We could technically fix the problem by awaiting this call before finishing a task, thereby removing the floating promise. Yet this would add a nontrivial amount of latency to each task we were processing.

Keeping this potential solution as a backup plan, we decided to investigate code optimizations. How could we speed up this operation?

Fixing an S3 bottleneck

The logic in compressAndUploadDebuggingPayload is easy to follow. We compress our debugging data, which can be quite large because it includes network traffic. We then upload the compressed data to S3.

export const compressAndUploadDebuggingPayload = async (
    logger: Logger,
    data: any,
) => {
    const compressionStart = Date.now();
    const base64CompressedData = await streamToString(
    	bfj.streamify(data)
        	.pipe(zlib.createDeflate())
        	.pipe(new b64.Encoder()),
    );
    logger.trace('finished compressing data', {
        compression_time_ms: Date.now() - compressionStart,
    );
        
    const uploadStart = Date.now();
    s3Client.upload({
		Body: base64CompressedData,
		Bucket: bucket,
		Key: key,
	});
    logger.trace('finished uploading data', {
        upload_time_ms: Date.now() - uploadStart,
    );
}

Our Kibana logs were showing long S3 upload times even for small payloads. We didn't initially think of sockets as a bottleneck, since Node's default HTTPS agent sets maxSockets to Infinity. However, we eventually dug into the AWS Node documentation and found something surprising: the S3 client reduces maxSockets from Infinity to 50. Needless to say, this is not the most intuitive behavior.

Since we had been pushing our workers past 50 concurrent tasks, the upload step was bottlenecked waiting on sockets to S3. We improved upload latency with the following change to our S3 client initialization:

const s3Client = new AWS.S3({
	httpOptions: {
		agent: new https.Agent({
			// Setting this to an arbitrarily high amount for
			// maximum S3 upload concurrency.
			maxSockets: 1024 * 20,
        }),
	},
	region,
});

Speeding up JSON serialization

Our S3 improvements slowed down the heap size increase, but they didn't fully solve the problem. There was another clear culprit: according to our timing metrics, the compression step in the preceding code occasionally took as long as 4 minutes. This was much longer than the average per-task latency of only 4 seconds. Incredulous of how this step could take such a long time, we decided to run local benchmarks and optimize this block of code.

The compression involved three steps (using Node Streams to limit memory usage): JSON stringification, zlib compression, and base64 encoding. We suspected that the third-party stringification library we used – bfj – might be the problem. We wrote a script that performs some benchmarking on various stream-based stringification libraries (see our code here). Turns out that Big Friendly JSON, as the package was named, wasn’t very friendly after all. Just take a look at the two results we got from our experiment:

benchBFJ*100:        67652.616ms
benchJSONStream*100: 14094.825ms

The results were astonishing. Even with a minimal test, bfj was around 5x slower than another package, JSONStream. We quickly replaced bfj with JSONStream, and immediately observed significant increases in performance.

Reducing garbage collection time

With our memory problems solved, we began to focus on the ratio of task latencies between parallel workers and single workers for the same type of bank integration. This was an apples to apples comparison of how our parallel workers were performing, so a ratio close to 1 gave us confidence in further rolling out traffic to the parallel workers. At this point in the rollout, here's what our Grafana dashboard looked like:


Notice that some of the ratios run as high as 8:1, even at a reasonably low average parallelism (around 30 at this point). We knew that our bank integrations weren’t performing CPU-intensive work, nor were our containers limited by any other bottleneck we could think of. Having no more leads, we looked for online resources on optimizing Node’s performance. Despite the paucity of such articles, we stumbled upon this blog post detailing how the author had reached 600k concurrent websocket connections on a Node process.

In particular, the particular usage of --nouse-idle-notification caught our attention. Could our Node process be spending too much time performing GC? Conveniently, the gc-stats package gave us visibility into the average time spent on garbage collection:

Seems like our processes were spending around 30% of their time performing "scavenge" garbage collection. We’re not going to go into the technical details of the different types of GC in Node, but this is a great resource to peruse. Essentially, scavenging runs frequently to clean up small objects in Node’s “new space” region of the heap.

So garbage collection is being run too often on our Node processes. Could we disable V8 garbage collection and run it ourselves? Is there a way to reduce the frequency of garbage collection? Turns out that the former isn’t possible, but the latter is! We could just increase the size of the new space by bumping the limit on the “semi space” in Node (--max-semi-space-size=1024 command-line flag). This allows for more allocations of short-lived objects before V8 runs its scavenging, thereby reducing frequency of GC:

Another victory! Increasing the new space size resulted in a precipitous drop of time spent on scavenge garbage collection, from 30% down to 2%.

Optimizing CPU usage

After all this work, we were satisfied with the results. Tasks running on parallel workers had latencies almost on par with those running on single workers at a concurrency of around 20. It seemed to us that we had addressed all the bottlenecks, but we still didn't fully know what operations in production were actually slowing us down. Since we were out of low hanging fruit, we decided to look into profiling the CPU usage of our workers.

We generated a flamegraph on one of our parallel workers, and voilà, we got a neat interactive viz that we could play around with locally. Tidbit: it takes up 60MB on disk! Here's what we saw when we searched for "logger" in the 0x flamegraph:

The teal-highlighted bars indicate that at least 15% of our CPU time was spent generating worker logs. We were eventually able to cut this time by 75% – although how we did it is a topic worthy of another blog post. (Hint: it involved regexes and lots of property enumeration.)

After this final bit of optimization, we were able to support up to 30 parallel tasks per worker with no impact on task latencies!

Results and learnings

The migration to parallel workers has reduced our annual EC2 instance spend by around $300k and greatly simplified our architecture. We now run about 30x fewer containers in production, and our system is more robust to increases in external request latencies or spikes in API traffic from our customers.

We learned a lot while parallelizing our bank integration service:

  • Never underestimate the importance of having low-level metrics for a system. Being able to monitor GC and memory statistics during rollout was essential.
  • Flamegraphs are awesome! Now that we've instrumented them, we can easily diagnose other performance bottlenecks in our system.
  • Understanding the Node runtime empowers us to write better application code. For example, knowing the V8 object allocation and garbage collection model is a motivating reason to reuse objects as much as possible. Sometimes it takes interacting directly with V8 or toying with Node command line flags to develop a strong mental model here.
  • Make sure to read the docs for every layer of your system! We had trusted the Node documentation on maxSockets, but it took lots of digging before we figured out that the AWS package had overridden the default Node behavior. Every infrastructure project seems to have one of these "gotcha" moments.

We hope this blog post has provided some useful strategies for optimizing Node parallelism and safely rolling out an infrastructure migration. If you're interested in working on challenging engineering projects like this one, we're hiring!

Join us!