# Parallel Iterators These are some notes on the design of the parallel iterator traits. This file does not describe how to **use** parallel iterators. ## The challenge Parallel iterators are more complicated than sequential iterators. The reason is that they have to be able to split themselves up and operate in parallel across the two halves. The current design for parallel iterators has two distinct modes in which they can be used; as we will see, not all iterators support both modes (which is why there are two): - **Pull mode** (the `Producer` and `UnindexedProducer` traits): in this mode, the iterator is asked to produce the next item using a call to `next`. This is basically like a normal iterator, but with a twist: you can split the iterator in half to produce disjoint items in separate threads. - in the `Producer` trait, splitting is done with `split_at`, which accepts an index where the split should be performed. Only indexed iterators can work in this mode, as they know exactly how much data they will produce, and how to locate the requested index. - in the `UnindexedProducer` trait, splitting is done with `split`, which simply requests that the producer divide itself *approximately* in half. This is useful when the exact length and/or layout is unknown, as with `String` characters, or when the length might exceed `usize`, as with `Range` on 32-bit platforms. - In theory, any `Producer` could act unindexed, but we don't currently use that possibility. When you know the exact length, a `split` can simply be implemented as `split_at(length/2)`. - **Push mode** (the `Consumer` and `UnindexedConsumer` traits): in this mode, the iterator instead is *given* each item in turn, which is then processed. This is the opposite of a normal iterator. It's more like a `for_each` call: each time a new item is produced, the `consume` method is called with that item. (The traits themselves are a bit more complex, as they support state that can be threaded through and ultimately reduced.) Unlike producers, there are two variants of consumers. The difference is how the split is performed: - in the `Consumer` trait, splitting is done with `split_at`, which accepts an index where the split should be performed. All iterators can work in this mode. The resulting halves thus have an idea about how much data they expect to consume. - in the `UnindexedConsumer` trait, splitting is done with `split_off_left`. There is no index: the resulting halves must be prepared to process any amount of data, and they don't know where that data falls in the overall stream. - Not all consumers can operate in this mode. It works for `for_each` and `reduce`, for example, but it does not work for `collect_into_vec`, since in that case the position of each item is important for knowing where it ends up in the target collection. ## How iterator execution proceeds We'll walk through this example iterator chain to start. This chain demonstrates more-or-less the full complexity of what can happen. ```rust vec1.par_iter() .zip(vec2.par_iter()) .flat_map(some_function) .for_each(some_other_function) ``` To handle an iterator chain, we start by creating consumers. This works from the end. So in this case, the call to `for_each` is the final step, so it will create a `ForEachConsumer` that, given an item, just calls `some_other_function` with that item. (`ForEachConsumer` is a very simple consumer because it doesn't need to thread any state between items at all.) Now, the `for_each` call will pass this consumer to the base iterator, which is the `flat_map`. It will do this by calling the `drive_unindexed` method on the `ParallelIterator` trait. `drive_unindexed` basically says "produce items for this iterator and feed them to this consumer"; it only works for unindexed consumers. (As an aside, it is interesting that only some consumers can work in unindexed mode, but all producers can *drive* an unindexed consumer. In contrast, only some producers can drive an *indexed* consumer, but all consumers can be supplied indexes. Isn't variance neat.) As it happens, `FlatMap` only works with unindexed consumers anyway. This is because flat-map basically has no idea how many items it will produce. If you ask flat-map to produce the 22nd item, it can't do it, at least not without some intermediate state. It doesn't know whether processing the first item will create 1 item, 3 items, or 100; therefore, to produce an arbitrary item, it would basically just have to start at the beginning and execute sequentially, which is not what we want. But for unindexed consumers, this doesn't matter, since they don't need to know how much data they will get. Therefore, `FlatMap` can wrap the `ForEachConsumer` with a `FlatMapConsumer` that feeds to it. This `FlatMapConsumer` will be given one item. It will then invoke `some_function` to get a parallel iterator out. It will then ask this new parallel iterator to drive the `ForEachConsumer`. The `drive_unindexed` method on `flat_map` can then pass the `FlatMapConsumer` up the chain to the previous item, which is `zip`. At this point, something interesting happens. ## Switching from push to pull mode If you think about `zip`, it can't really be implemented as a consumer, at least not without an intermediate thread and some channels or something (or maybe coroutines). The problem is that it has to walk two iterators *in lockstep*. Basically, it can't call two `drive` methods simultaneously, it can only call one at a time. So at this point, the `zip` iterator needs to switch from *push mode* into *pull mode*. You'll note that `Zip` is only usable if its inputs implement `IndexedParallelIterator`, meaning that they can produce data starting at random points in the stream. This need to switch to push mode is exactly why. If we want to split a zip iterator at position 22, we need to be able to start zipping items from index 22 right away, without having to start from index 0. Anyway, so at this point, the `drive_unindexed` method for `Zip` stops creating consumers. Instead, it creates a *producer*, a `ZipProducer`, to be exact, and calls the `bridge` function in the `internals` module. Creating a `ZipProducer` will in turn create producers for the two iterators being zipped. This is possible because they both implement `IndexedParallelIterator`. The `bridge` function will then connect the consumer, which is handling the `flat_map` and `for_each`, with the producer, which is handling the `zip` and its preecessors. It will split down until the chunks seem reasonably small, then pull items from the producer and feed them to the consumer. ## The base case The other time that `bridge` gets used is when we bottom out in an indexed producer, such as a slice or range. There is also a `bridge_unindexed` equivalent for - you guessed it - unindexed producers, such as string characters. ## What on earth is `ProducerCallback`? We saw that when you call a parallel action method like `par_iter.reduce()`, that will create a "reducing" consumer and then invoke `par_iter.drive_unindexed()` (or `par_iter.drive()`) as appropriate. This may create yet more consumers as we proceed up the parallel iterator chain. But at some point we're going to get to the start of the chain, or to a parallel iterator (like `zip()`) that has to coordinate multiple inputs. At that point, we need to start converting parallel iterators into producers. The way we do this is by invoking the method `with_producer()`, defined on `IndexedParallelIterator`. This is a callback scheme. In an ideal world, it would work like this: ```rust base_iter.with_producer(|base_producer| { // here, `base_producer` is the producer for `base_iter` }); ``` In that case, we could implement a combinator like `map()` by getting the producer for the base iterator, wrapping it to make our own `MapProducer`, and then passing that to the callback. Something like this: ```rust struct MapProducer<'f, P, F: 'f> { base: P, map_op: &'f F, } impl IndexedParallelIterator for Map where I: IndexedParallelIterator, F: MapOp, { fn with_producer(self, callback: CB) -> CB::Output { let map_op = &self.map_op; self.base_iter.with_producer(|base_producer| { // Here `producer` is the producer for `self.base_iter`. // Wrap that to make a `MapProducer` let map_producer = MapProducer { base: base_producer, map_op: map_op }; // invoke the callback with the wrapped version callback(map_producer) }); } }); ``` This example demonstrates some of the power of the callback scheme. It winds up being a very flexible setup. For one thing, it means we can take ownership of `par_iter`; we can then in turn give ownership away of its bits and pieces into the producer (this is very useful if the iterator owns an `&mut` slice, for example), or create shared references and put *those* in the producer. In the case of map, for example, the parallel iterator owns the `map_op`, and we borrow references to it which we then put into the `MapProducer` (this means the `MapProducer` can easily split itself and share those references). The `with_producer` method can also create resources that are needed during the parallel execution, since the producer does not have to be returned. Unfortunately there is a catch. We can't actually use closures the way I showed you. To see why, think about the type that `map_producer` would have to have. If we were going to write the `with_producer` method using a closure, it would have to look something like this: ```rust pub trait IndexedParallelIterator: ParallelIterator { type Producer; fn with_producer(self, callback: CB) -> R where CB: FnOnce(Self::Producer) -> R; ... } ``` Note that we had to add this associated type `Producer` so that we could specify the argument of the callback to be `Self::Producer`. Now, imagine trying to write that `MapProducer` impl using this style: ```rust impl IndexedParallelIterator for Map where I: IndexedParallelIterator, F: MapOp, { type MapProducer = MapProducer<'f, P::Producer, F>; // ^^ wait, what is this `'f`? fn with_producer(self, callback: CB) -> R where CB: FnOnce(Self::Producer) -> R { let map_op = &self.map_op; // ^^^^^^ `'f` is (conceptually) the lifetime of this reference, // so it will be different for each call to `with_producer`! } } ``` This may look familiar to you: it's the same problem that we have trying to define an `Iterable` trait. Basically, the producer type needs to include a lifetime (here, `'f`) that refers to the body of `with_producer` and hence is not in scope at the impl level. If we had [associated type constructors][1598], we could solve this problem that way. But there is another solution. We can use a dedicated callback trait like `ProducerCallback`, instead of `FnOnce`: [1598]: https://github.com/rust-lang/rfcs/pull/1598 ```rust pub trait ProducerCallback { type Output; fn callback(self, producer: P) -> Self::Output where P: Producer; } ``` Using this trait, the signature of `with_producer()` looks like this: ```rust fn with_producer>(self, callback: CB) -> CB::Output; ``` Notice that this signature **never has to name the producer type** -- there is no associated type `Producer` anymore. This is because the `callback()` method is generically over **all** producers `P`. The problem is that now the `||` sugar doesn't work anymore. So we have to manually create the callback struct, which is a mite tedious. So our `MapProducer` code looks like this: ```rust impl IndexedParallelIterator for Map where I: IndexedParallelIterator, F: MapOp, { fn with_producer(self, callback: CB) -> CB::Output where CB: ProducerCallback { return self.base.with_producer(Callback { callback: callback, map_op: self.map_op }); // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // Manual version of the closure sugar: create an instance // of a struct that implements `ProducerCallback`. // The struct declaration. Each field is something that need to capture from the // creating scope. struct Callback { callback: CB, map_op: F, } // Implement the `ProducerCallback` trait. This is pure boilerplate. impl ProducerCallback for Callback where F: MapOp, CB: ProducerCallback { type Output = CB::Output; fn callback(self, base: P) -> CB::Output where P: Producer { // The body of the closure is here: let producer = MapProducer { base: base, map_op: &self.map_op }; self.callback.callback(producer) } } } } ``` OK, a bit tedious, but it works!
(self, producer: P) -> Self::Output where P: Producer; } ``` Using this trait, the signature of `with_producer()` looks like this: ```rust fn with_producer>(self, callback: CB) -> CB::Output; ``` Notice that this signature **never has to name the producer type** -- there is no associated type `Producer` anymore. This is because the `callback()` method is generically over **all** producers `P`. The problem is that now the `||` sugar doesn't work anymore. So we have to manually create the callback struct, which is a mite tedious. So our `MapProducer` code looks like this: ```rust impl IndexedParallelIterator for Map where I: IndexedParallelIterator, F: MapOp, { fn with_producer(self, callback: CB) -> CB::Output where CB: ProducerCallback { return self.base.with_producer(Callback { callback: callback, map_op: self.map_op }); // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // Manual version of the closure sugar: create an instance // of a struct that implements `ProducerCallback`. // The struct declaration. Each field is something that need to capture from the // creating scope. struct Callback { callback: CB, map_op: F, } // Implement the `ProducerCallback` trait. This is pure boilerplate. impl ProducerCallback for Callback where F: MapOp, CB: ProducerCallback { type Output = CB::Output; fn callback(self, base: P) -> CB::Output where P: Producer { // The body of the closure is here: let producer = MapProducer { base: base, map_op: &self.map_op }; self.callback.callback(producer) } } } } ``` OK, a bit tedious, but it works!
(self, base: P) -> CB::Output where P: Producer { // The body of the closure is here: let producer = MapProducer { base: base, map_op: &self.map_op }; self.callback.callback(producer) } } } } ``` OK, a bit tedious, but it works!