https://github.com/frankmcsherry/blog
Raw File
Tip revision: 341b15d9847889f1b556154eaa797b80af8ec8a3 authored by Frank McSherry on 18 March 2024, 22:10:07 UTC
Variadic left joins
Tip revision: 341b15d
2017-03-28.md
## Differential dataflow roadmap (Early 2017)

Almost a year back, I wrote a [differential dataflow roadmap](https://github.com/frankmcsherry/blog/blob/master/posts/2016-07-26.md), which indicated several outstanding issues with differential dataflow, set some goals in terms of measuring performance using terms that do not side-step these issues, and outlined some candidate solutions and constraints.

Of course, then I went on holiday for a while and only just (February) got back to work on this stuff.

It seems like we've got some good progress on the two main issues. Of course nothing is ever *done*, but it seemed like a fine time to report on the larger-picture progress in the project, and outline some new issues and associated goals.

Let's recap the two issues, the associate goals, and take some measurements

### Old issue 1: Sustained performance degrades

Differential dataflow manages computation by tracking the changes its collections undergo, and back at the time of the post differential dataflow kept all changes forever. This meant that to find the actual value of a collection at some point in time, we would need to flip through an increasing amount of data, slowing down the longer we run. That sucks.

In particular, for a breadth-first search / distance labeling computation, we had a series of latency curves describing the time it takes the system to respond to changes in the input, grouped by number of changes previously experienced. 

![1k nodes](https://github.com/frankmcsherry/blog/blob/master/assets/resolution/1k-old.png)

There is a pronounced slowdown as we accept more and more inputs, the only upside of which is that the slowdown lowers the rate at which we accept new updates, slowing down the slowdown. That's a joke; there is no upside.

The problem here was that the evaluation methodology was "start running a cool computation, collect some latency statistics, ^C". Worse, by aiming for "big" computations, the changes didn't much coincide at the same keys and the misery of arbitrary increasing histories was "measured-around". 

The solution is to change what we measure to make sure we cannot ignore this sort of behavior. Of course, I typed some code too, but insisting on measuring the right things is the real source of the solution, in my eyes. We put forward the following goal:

> **Sustained latency**: For windowed computations (or those with bounded inputs, generally), the latency distribution should stabilize with time. The latency distribution for 1,000 node 2,000 edge reachability computations after one million updates should be pretty much the same as the distribution after one thousand updates. Minimize the difference, and report only the former.

The 1,000 node 2,000 edge graph sounds "small", but it antagonizes the "accumulation of history" defect much more than a graph in which the million updates could be spread over one million nodes. 

Let's look at the current state of affairs. For the 1,000 node 2,000 edge graph, we introduce one million single-element updates, and capture the distributions of the first 1,000 latencies and the last 1,000 latencies. One is the solid line and one is the dashed line. I hope you agree that it doesn't matter which is which.

![1k nodes](https://github.com/frankmcsherry/blog/blob/master/assets/status2017/problem1.png)

These latencies largely stabilize. The code is performing compaction on the histories using its understanding of when logical times become indistinguishable, and if the underlying state and computation do not stabilize with time there is a bug. But, it seems like they mostly do. 

### Old issue 2: Scaling versus resolution

Differential dataflow computations are data-parallel, and can take advantage of multiple workers to increase the rate at which they perform work. However, to achieve this scaling the workers must have enough work that can be performed concurrently. If we only supply single updates at a time, there is not usually enough work to interest more than one worker, and often additional workers simply slow down the computation. To increase the throughput of the computation, at the time of the post, one would need to batch updates together at the same logical time.

This figure shows latency distributions for single worker (solid line) and two worker (dashed line) for different batch sizes. The additional worker is counter-productive for small batches, and helpful only for larger batches.

![batching](https://github.com/frankmcsherry/blog/blob/master/assets/roadmap/batching.png)

Batching updates at the same logical time improves performance, but it changes the semantics of the computation: we get a different set of output updates than if the input updates had occurred at distinct times. We no longer get to see the precise impact of each input update, which may be crucially important for some computations. At the least, we would like you to be able to write such a computation if you need, and to get scaling benefits from multiple workers.

The problem here was that the evaluation methodology was "measure either latency or throughput, as appropriate" ignoring the fact that as we change the batch size we are changing the computation. We did not actually have a latency-throughput trade-off for a *fixed* computation.

The solution is again to change what we measure to make sure that we cannot (even unintentionally, I swear) cheat in this way. To evaluate any claimed latency-throughput trade-offs, we must be able to produce the same outputs in each configuration. We put forward the following goal:

> **Single-update throughput scaling**: The throughput of single-element updates should improve with multiple workers (up to a point). The single-update throughput for 1,000 node 2,000 edge reachability computations should scale nearly linearly with (a few) workers. Maximize the throughput, reporting single-element updates per second per worker.

I think this goal might be a bit over-zealous, in that we don't simply want to maximize the throughput but rather present a latency-throughput trade-off. But, we should only present that trade-off while holding the computation fixed, and any claims of high throughput or scaling with workers should not involve a change to the computation to expose more concurrency.

Let's take a look at the current state of affairs. We can now trade off latency against throughput by altering the sizes of batches we expose to timely dataflow, without altering the logical times associated with updates. This amortizes out some system overhead, and also allows each operator to re-organize its work to improve efficiency.

Here are the most recent round of numbers for the small and large graphs, elapsed times to process one million updates in various batch sizes.

|  experiment |   1k / 2k |  1m / 10m | 1m / 10m -w2 |
|------------:|----------:|----------:|-------------:|
| 1 x 1000000 |         - |         - |            - |
| 10 x 100000 |      117s |       82s |          68s |
| 100 x 10000 |       75s |       65s |          46s |
| 1000 x 1000 |       63s |       54s |          36s |
| 10000 x 100 |       61s |       43s |          28s |
| 100000 x 10 |      127s |       31s |          20s |
| 1000000 x 1 |      385s |       23s |          16s |

To read these numbers differently, we can report each line as a (latency, throughput) pair, reporting the average latency and aggregate throughput. Each quantity can be determined from the elapsed times, number of updates, and number of batches (and only because the number of updates per batch is held constant).

|  experiment |           1k / 2k |         1m / 10m |
|------------:|------------------:|-----------------:|
| 1 x 1000000 |                 - |                - |
| 10 x 100000 |  1.17ms, 8.547k/s | 820us, 12.195k/s |
| 100 x 10000 |  7.5ms, 13.333k/s | 6.5ms, 15.384k/s |
| 1000 x 1000 |   63ms, 15.853k/s |  54ms, 18.518k/s |
| 10000 x 100 |  610ms, 16.393k/s | 430ms, 23.255k/s |
| 100000 x 10 |   12.7s, 7.874k/s |  3.1s, 32.258k/s |
| 1000000 x 1 |    385s, 2.597k/s |   23s, 43.478k/s |

I left out the multiple worker data because it was already badly enough formatted. Please help me.

You can see from the numbers that not all is yet perfect with the small graph (there is some lingering non-linearity in `group`, I think), but that otherwise (and especially for the large graph) there is a clear trade-off between latency and throughput. The more we are willing to wait, the higher throughput we can accommodate. For a given workload with a given throughput, the latency should stabilize somewhere and correct itself if the load climbs or dips (unless we are working on the small graph, in which case everything just melts; computers are hard).

I should also stress that these absolute numbers probably suck. We are going to work on improving them in the coming roadmap steps, but at least we are looking at the right measurements now, rather than citing the latency of single-element updates and the throughput of large batches of updates with one timestamp.

### Old issue 0: Data structures for high-resolution times

If you have read (or re-read) the roadmap post, you might recall an "issue 0", that the datastructures underlying our collections needed a bit of a rework. A fair bit of work was done to make these more modular, and hidden behind tasteful traits that should allow a greater degree of flexibility in implementation. 

Already, we have a few specialized implementations based on whether we (i) have values with our keys on not, and (ii) want to use hashing or sorting to order the data. We will talk about a few more candidates later in this post. I imagine these implementations will continue to evolve as a place where core performance improvements land, as this is where a fair volume of the compute actually happens. At the same time, as we learn more we may change the trait interfaces to expose more helpful functionality.

We didn't have a concrete goal here, which was fine. We will start to have some performance related goals in just a moment, and we may frame some in terms of the performance of these data structures.

### Constraints 1 and 2

We also imposed some constraints on our design, one of which was well respected and one of which was deferred (approximately until this post). They were:

> **Constraint 1**: The representation of a trace should not be so large that I can’t fit normal graphs in memory on my laptop. Ideally the memory footprint should be not much larger than required to write the data on disk.

> **Constraint 2**: Index structures are currently shared between operators, so that a collection only needs to be indexed and maintained once per computation, for each key on which it is indexed.

The second constraint is live and in effect. If you have a graph computation that needs to use the graph seven different times indexed by the source vertex, you just need to maintain one copy in memory. If you need the graph indexed by source vertex, destination vertex, and by the pair together (the only three ways I can think) then you just need to maintain the three copies in memory, independent of the number of times you use these collections.

This is a bit of a lie, in that it is only true for uses within the same scope. If you write breadth-first search and connected components, differential dataflow is currently stressed out by the fact that there are different scopes for the two computations and you end up with the same data indexed multiple times. This should be pretty easy to avoid, as the underlying data are identical, and we just need to provide the right implementations of the `Trace` trait. I think this is a "weekend project" scale change. Certainly not last weekend, though, and probably not next weekend.

The first constraint was largely ignored in the rethink that accompanied the fixes to problems one and two. The reasoning here was that we should make solve fundamental problems first, and optimize memory use and computations second. We are going to promote this constraint to a "problem" this time around though, which means we will have to address it.

## New Problems

The previously problems aren't "done", but they are in a state where they seem to work correctly (there are tests!). I think it is now time to return to performance and see about shaving off some of the execution time and memory footprint we packed on in the process of improving differential dataflow's fundamental characteristics.

### Problem 1: Absolute execution times

The absolute execution times got worse for batch computations, which are neither long-running nor require high-resolution changes to their inputs. At some level this degradation is unavoidable, in that we are now doing more than we did before. 

Actually, here is a first question we could try and answer: how long *should* a one million breadth-first search computations take on slightly changed graphs? Any guesses? Me neither.

**Challenge**: Write a single-threaded baseline for performing one million single-element updates to breadth-first search on an arbitrary graph. 

This baseline doesn't seem to be too hard to write, given that we have only a bounded amount of work to apply, and a very specific computation to perform. We *should* be able to take the million updates as one batch, and then work iteration by iteration updating the histories of distances for each node. At least, this is how I imagine a sweet implementation would look. I'm going to write it tomorrow!

My interest in this baseline (aside from measuring the [COST](https://github.com/frankmcsherry/blog/blob/master/posts/2015-01-15.md)!) is that we can contrast it with any differential dataflow implementation and see, quantitatively, where we are spending time that perhaps we do not need to spend. Each instance of such should either be correctable, or at least discussed as to why we aren't planning on doing the correction (maybe there is a good reason).

How fast will this go? We will find out tomorrow (or, whenever the code works; I have tests!), but let's take a guess and put a challenge forward for what we should be able to do using differential dataflow.

**Challenge**: For either or both of the small (1,000 node 2,000 edge) and large (1,000,000 node 10,000,000 edge) graphs, perform one million single-element updates in under one second.

Given that the current performance numbers are off of this target by 23-61x, I think this is definitely a stretch goal. But I'm willing to bet that our single-threaded implementation above is going to come in at under a second. Moreover, given our newfound ability to batch single-element updates in differential dataflow, I don't see why it shouldn't be achievable; we do need to average less than a microsecond of work per update, but batching allows us to avoid a great deal of system overhead. Plus, we have multiple cores and stuff, and shouldn't be afraid to use them. 

Also we can cheat. Watch me.

### Problem 2: Memory footprint

The new data structures for representing compactable high-resolution times are not nearly as tight as the collections that do not support these features. Mostly, the new data structures have `Time: Lattice` fields scattered throughout them, and compared with the `u32` sorts of node values we often see in graph computation, the `(usize, usize)` timestamp pairs we often see in iterative graph computation are really quite large.

This space requirement doesn't seem to be fundamental to every collection, and we could hope to avoid it for large classes of computations. Yes if everything about the graph changes at every moment, we will need to have that all written down. On the other hand, for large datasets it is much more common that relatively little changes at a time, with the rest of the collection just "committed" rather than differentiated by time. As time advances, all of the changes get folded in to the large committed collection and we can mostly discard their times, rather than keep them around.

**Challenge** For the `twitter_rv` graph, with 42 million nodes and 1.4 billion edges, perform breadth-first search with 1.4 billion (or however many edges it has) single-element updates updates, on my laptop with 16GB of memory.

The 1.4 billion edges should fit in 6GB of memory, if we didn't have to worry about them changing so much. Also if they just magically appeared in pre-allocated memory, rather than started as undifferentiated `((src, dst), 0, +1)` pairs. Also if we can manage to avoid accidentally sending 1.4 billion messages about graph distances all at once from our `join` operator. And if the `group` operator consumes and compacts them in-place. There are lots of ways to screw this up, and I'm familiar with several of them (I won't say "all", but I'll wager "most"). Each of them has a story to tell about how to make differential dataflow (and similar systems) not fall over in some different and thought-provoking way.

Nonetheless, this is a totally excellent challenge that should test whether our trait abstractions in place of collections are tasteful enough that we can put some clever representations behind them, without having to open up the kimono.

### Next steps: COST

I'm optimistic that I'll be able to write a pretty slick single-threaded bfs implementation, and that it will perform well enough that my smugness about its performance will mask the otherwise inescapable chasm between it and how differential dataflow currently performs. I'm also optimistic that we'll be able to walk through at least four or five details about the implementation and connect them back to things that differential dataflow *could* be doing, with enough care. And then we'll start looking at how to do each of them.
back to top