class: center, middle An introduction to # Async Iterators ## in JavaScript by Jamie McCrindle find me at [jamie.mccrindle.org](https://jamie.mccrindle.org) ??? Head of Technology for Business Banking at Investec No um, or uh --- class: middle ### Why Async Generators ### Async Iterators ### Using Async Iterators ### vs Rx ### Performance ### Q&A --- ### History * 2014: First Proposal * 2015: Available in Babel * 2017: Available in TypeScript 2.3 * 2018: Part of the ECMAScript Standard ??? In the wild: graphql-js --- class: middle, center ### Why Async Generators ??? Before I talk about Async Generators I'm going to talk about why generators and async await are useful --- ## Generators are great ### Before ```javascript function naturalNumbers(callback) { while(true) // break out of the loop if the // callback returns false if(!callback(i++)) break; } ``` -- ### After ```javascript function* naturalNumbers() { let i = 0; while(true) yield i++; } ``` --- ## Generators are great: interview edition ```javascript function* infiniteFibonacci() { // start with n1 = 1 and n2 = 2 let [n1, n2] = [1, 1]; // yield 1 yield n1; // continue indefinitely while (true) { // yield the next value yield n2; // n1 = n2, n2 = n1 + n2 and we cunningly avoid a temporary variable [n1, n2] = [n2, n1 + n2]; } } ``` ??? This is my favourite way to write fibonacci. Not only because it's pretty neat using generators but also because the destructuring saves a temporary variable --- ## So is async await ### Before ```javascript function getLiveRate() { // get the rates return fetch('https://api.exchangeratesapi.io/latest?base=GBP') // parse the json body .then(response => response.json()) // return the current GBPUSD exchange rate .then(json => json.rates.USD) } ``` ??? Talk about what the code is doing In this instance, the promise chain doesn't look too bad but as you add more of them, it becomes harder to reason about the code. --- ## So is async await ### After ```javascript async function getLiveRate() { // get the rates const response = await fetch('https://api.exchangeratesapi.io/latest?base=GBP'); // parse the json body const json = await response.json(); // return the current GBPUSD exchange rate return json.rates.USD; } ``` ??? Talk about the code The great thing about async await is that it makes asynchronous code appear synchronous which means, for example that try catch finally blocks work --- class: center ## Why not both? .max-height-image[![Why not both](images/dog-cat.jpg)] --- class: center ## Why not both? .max-width-image[![Why not both](images/spider-mouse.gif)] --- class: center ## Why not both? [![Why not both](images/croc.png) ??? This is a crocodile with butterflies on it --- ## Why not both? ```javascript // get the GBP / USD rate once a day async function* getLiveRates() { // continue indefinitely while(true) { // fetch the rates const response = await fetch('https://api.exchangeratesapi.io/latest?base=GBP'); // parse the json body const json = await response.json(); // return the current GBPUSD exchange rate yield json.rates.USD; // pause for a day await delay(24 * 60 * 60 * 1000); } } // a function that pauses for a set amount of time const delay = (timeout) => new Promise(resolve => setTimeout(resolve, timeout)); ``` ??? Talk about the code --- ## Using our rate watch ```javascript // go through the prices as they come in for await(const price of getLiveRates()) { if(price <= 1) { console.log('brexit means brexit'); break; } } ``` -- .center[![Celebration](images/theresa-may.gif)] ??? The for await syntax is now part of the language and it's how you loop through an async iterator much like you can loop through a generator using for of --- ## Sleep Sort! ```javascript const deferred = () => { let resolve; return { promise: new Promise(r => (resolve = r)), resolve }; }; export async function* sleepSort(values) { // create an array of deferreds let deferreds = Array.from(Array(values.length), deferred); let index = 0; for (const value of values) { // delay for value * 500 millisconds and resolve them into the // deferreds in order setTimeout(() => deferreds[index++].resolve(value), value * 500); } for (let i = 0; i < values.length; i++) { // return the values as they come in over time yield await deferreds[i].promise; } } ``` ??? This was going to be my first example of how async generators can solve problems that regular ones can't but it ended up being more complicated than I expected. For those of you who don't know, sleep sort sorts numeric values by adding a delay that's a multiple of the item in the unsorted collection and just returning the values as they come in. Talk about the code --- class: middle, center ### Async Iterators --- ## The AsyncIterator interface Async generators are built on top of the AsyncIterator interface - A method called: `[Symbol.asyncIterator]()` - `[Symbol.asyncIterator]()` returns an object with a `next()` method - next() returns a `Promise` - If this is the last value, the `Promise` should resolve to `{ done: true }` - Otherwise it should resolve to `{ value: value, done: false }` where value is the next value - The object returned by `[Symbol.asyncIterator]()` can optionally have a `throw()` and `return()` method ??? Read the slide. --- ## interval as an AsyncIterator ```javascript function interval(milliseconds) { let running = true; let i = 0; return { [Symbol.asyncIterator]() { return this; }, next(value) { return running ? delay(milliseconds) .then(() => ({ value: i++, done: false })) : Promise.resolve({ done: true }); }, throw(error) { running = false; return Promise.resolve({ done: true }); }, return(value) { running = false; return Promise.resolve({ done: true }); } }; } ``` ??? Talk about the code. --- ## Live Rates as an Async Iterator ```javascript function getLiveRates() { let firstTime = true; return { // async iterables have a method called [Symbol.asyncIterator] [Symbol.asyncIterator]() { return { // [Symbol.asyncIterator] must return an object with a next method async next() { const response = await fetch('https://api.exchangeratesapi.io/latest?base=GBP'); const json = await response.json(); // only delay after the first time if (!firstTime) await delay(24 * 60 * 60 * 1000); else firstTime = false; return { value: json.rates.USD, done: false }; } }; } }; } ``` ??? Talk about the code --- class: middle, center ### Using Async Iterators --- ## Turning a stream of events into an async iterable Async iterators make it possible to turn code that requires callbacks to one that has control flow that's easier to follow e.g. Instead of this syntax: ```javascript lineReader.on("line", line => { console.log(line); }); lineReader.on("close", () => { console.log("done"); }); ``` -- We could use this instead: ```javascript for await (const line of fromLineReader(lineReader)) { console.log(line); } console.log('done'); ``` --- ## Subjects It turns out that doing this generically is somewhat complicated because: - lineReader could send lines before the `for await` consumes them - lineReader may need to pause sending lines so that we don't run out of memory - `for await` could run before there are any lines available - the async iterator next method could be called directly multiple times without waiting for new lines Turns out we can borrow the concept of a `Subject` from RxJS. --- ## Using a Subject ```javascript function fromLineReader(lineReader) { const subject = new Subject(); // send a line to the subject lineReader.on('line' => subject.onNext(line)); // close the subject when the line reader closes lineReader.on('close' => subject.onCompleted()); // close the line reader when the subject closes subject.finally(() => lineReader.close()); // return the async iterator return subject.iterable; } ``` ??? Talk about the code --- ## Axax Axax is a library for async iterators found here https://github.com/jamiemccrindle/axax It includes: * map, reduce, filter, flatMap etc. * fromLineReader, fromNodeStream * concurrentMap * Subject --- ## Axax concurrentMap ```javascript import { concurrentMap } from "axax/esnext/concurrentMap"; import { of } from "axax/esnext/of"; // fetch urls in parallel const mapped = concurrentMap( async url => await (await fetch(url)).json() 3 // run at most 3 in parallel ) (of( "http://www.example.com/api/product1.json", "http://www.example.com/api/product2.json", // ... )); // iterate over the json payloads for await(const content of mapped) { console.log(content); } ``` ??? Talk about the code --- ## Cancellation and leaks ```javascript // an async generator that never returns async function* neverEnds() { await new Promise(() => {}); } async function* asyncIterable() { try { // waits indefinitely for neverEnds() for await (const item of neverEnds()) { yield item; } } finally { // never called console.log("clean up resources"); } } const iter = asyncIterable(); // doesn't do much as the iterable is stuck waiting for neverEnds() await iter.return(); ``` --- class: middle, center ### vs RxJS --- ## vs RxJS ### Control flow for async iterators ```javascript console.log('starting'); try { for await (const item of source) { console.log('in the loop'); } console.log('succeeded'); } catch(error) { console.log('error'); } finally { console.log('done'); } console.log('on to the next thing'); ``` --- ## vs RxJS ### Control flow for RxJS ```javascript console.log("starting"); const subscribe = source .pipe( finalize(() => { console.log("done"); console.log("on to the next thing"); }) ) .subscribe( item => console.log("in the loop"), error => console.log("error"), () => console.log("succeeded") ); ``` --- class: middle, center ### Performance --- ## Performance vs RxJS | Property | Value | | ---------------- | --------------------- | | Node | 10.9.0 | | Hardware | 2.7 GHz Intel Core i7 | | Operating System | Mac OS X | --- ## RxJS Schedulers | Scheduler | Purpose | | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | `null` | By not passing any scheduler, notifications are delivered synchronously and recursively. Use this for constant-time operations or tail recursive operations. | | `queueScheduler` | Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations. | | `asapScheduler` | Schedules on the micro task queue, which is the same queue used for promises. Basically after the current job, but before the next job. Use this for asynchronous conversions. | | `asyncScheduler` | Schedules work with `setInterval`. Use this for time-based operations. | | `animationFrameScheduler` | Schedules task that will happen just before next browser content repaint. Can be used to create smooth browser animations. | --- class: center, middle ## RxJS default scheduler _It's a result of default recursive scheduling on synchronous observable types. Where RxJS 4 used trampoline scheduling by default, we're using recursive scheduling for performance reasons. The side effect to this is that if you retry or repeat more than 255 times, you blow the stack._ \- [@benlesh](https://github.com/ReactiveX/rxjs/issues/651#issuecomment-153944205) --- ## reduce In this benchmark we implemented sum as a reduce over 1000 numbers. #### native ```javascript async function sum(source) { let accumulator = 0; for await (const item of source) { accumulator += item; } return accumulator; } ``` #### RxJS ```javascript Rx.from(source) .pipe(RxOperators.reduce((a, n) => a + n, 0)) ``` --- class: center ## reduce results .benchmark-results[ | Implementation | Ops Per Second\* | | ---------------- | -----------------------: | | **RxJS default** | **15,912.45** per second | | RxJS queued | 5,353.40 per second | | RxJS asap | 1,022.07 per second | | AsyncIterators | 1,815.12 per second | \* higher is better ] --- ## map filter reduce In this case we combine a map, filter and a reduce over 1000 numbers. #### native ```javascript async function mapFilterReduce(source) { let accumulator = 0; for await (const x of Native.from(source)) { if (x % 2 === 0) { accumulator = accumulator + x * 2; } } } ``` #### RxJS ```javascript Rx.from(source) .pipe( RxOperators.filter(x => x % 2 === 0), RxOperators.map(x => x * 2), RxOperators.reduce((a, n) => a + n, 0) ) ``` --- class: center ## map filter reduce results .benchmark-results[ | Implementation | Ops Per Second\* | | ---------------- | -----------------------: | | **RxJS default** | **10,599.15** per second | | RxJS queued | 2,711.63 per second | | RxJS asap | 801.94 per second | | AsyncIterators | 1,781.83 per second | \* higher is better ] --- ## concat map reduce In this case the source is an array of arrays of numbers that is 100x100. #### native ```javascript async function concatMapReduce(source) { let accumulator = 0; for await (const outer of source) { for await (const inner of from(outer)) { accumulator = accumulator + inner; } } } ``` #### RxJS ```javascript Rx.from(source) .pipe( RxOperators.concatMap(value => Rx.from(value)), RxOperators.reduce((a, n) => a + n, 0) ) ``` --- class: center ## concatMap reduce results .benchmark-results[ | Implementation | Ops Per Second\* | | ---------------- | ----------------------: | | **RxJS default** | **2,889.56** per second | | RxJS queued | 1,845.89 per second | | RxJS asap | 1,242.96 per second | | AsyncIterators | 182.97 per second | \* higher is better ] --- class: center ## vs `most` .benchmark-results[ | Variation | Implementation | Ops Per Second\* | | --------------- | ---------------- | -----------------------: | | reduce | RxJS default | 15,912.45 per second | | reduce | AsyncIterators | 1,815.12 per second | | reduce | **Most** | **66,560.13** per second | | mapFilterReduce | RxJS default | 10,599.15 per second | | mapFilterReduce | AsyncIterators | 1,781.83 per second | | mapFilterReduce | **Most** | **34,252.84** per second | | concatMapReduce | **RxJS default** | **2,889.56** per second | | concatMapReduce | AsyncIterators | 182.97 per second | | concatMapReduce | Most | 2,318.10 per second | \* higher is better ] --- ## Native vs Transpiled It's likely that you'll want to transpile your async iterators to support all browsers. | Browser | Supported | | --------- | --------- | | IE 11 | No | | Edge | No | | Firefox | Yes | | Chrome | Yes | | Safari 12 | Yes | --- class: center ## reduce results .benchmark-results[ | Variation | Implementation | Ops Per Second\* | | --------- | -------------------- | -----------------------: | | reduce | TypeScript 3.0.2 | 8,967.30 per second | | reduce | Babel 6.26.0 | 15,101.20 per second | | reduce | **Native** | **19,129.27** per second | \* higher is better ] --- class: center # When should I use Async Iterators? -- .center[###When readability is important and your performance is not cpu bound ###e.g. when it is IO or network bound] --- class: center, middle # Thanks! --- class: center, middle # Any questions? --- class: center, middle # Where can I find these benchmarks? https://github.com/jamiemccrindle/async-iterator-benchmarks