Functional programming

Pipe data through (pure) functions to manipulate datasets without changing the underlying data (immutable data).


// POJS
const data = [[1,2], [1,2,3], [1]];
const result = data
  .map(x => x.length)
  .filter(x => x > 1)
  .reduce((acc, len) => acc + len, 0);
console.log(result); // will return 5
          
jsbin

// RxJS
const data = [[1,2], [1,2,3], [1]];
Rx.Observable.from(data)
  .map(x => x.length)
  .filter(x => x > 1)
  .reduce((acc, len) => acc + len, 0)
  .subscribe(x => console.log(x)); // will return 5
          
jsbin

Reactive programming

  • Write pipeline with no side effects
  • Put side effects at a common place (in subscribe call)
  • Application state is put into the Observables

// Describe data pipeline
const stream$ = new Rx.Subject()
  .map(x => x.length)
  .filter(x => x > 1)
  .scan((acc, length) => acc + length, 0);

// Subscription adds side effects
stream$.subscribe(x => console.log(x));

// Feed new data into the data stream
stream$.next([1,2]);
stream$.next([1,2,3]);
stream$.next([1]);
            
jsbin

Observables

  • Cold by default (do nothing until subscribed to)
  • Can be unsubscribed from (in contrast to Promises)
  • Data transformations don't change the original streams but create new streams
  • Are cheap to produce
  • Can be combined through operators with other observables
  • Many producers are available: from, of, interval, fromPromise, etc.
  • Many operators are available: map, filter, merge, flatMap, etc.
  • Custom Observables and Operators can be written if required

Caution!
Values of Observables are not immutable by default!


const stream$ = Rx.Observable.of({ test: true });
const stream2$ = stream$
  .map(x => {
    x.myProp = false;
    return x;
  });

stream$.subscribe(x => console.log(x));
stream2$.subscribe(x => console.log(x)); // Here the underlying data
                                         // is changed
stream$.subscribe(x => console.log(x));  // Because the underlying
                                         // object is mutable, this
                                         // subsciption is now also
                                         // affected
          
jsbin, other example

Solutions: Take care ... or use immutable.js

Subscription

Observable's life span: (next)* (complete|error)

  • Run forever
  • Complete
  • Throw an error

Thus subscriptions provide three callback methods


const stream$ = Rx.Observable.from( ... );
stream$.subscribe(
  (next)  => { /* called on next value */ },
  (error) => { /* called in case of an (unhandled) error */  },
  ()      => { /* called when the observable completes */ }
);
            

Marble diagrams

Marble diagrams can be used to get a visual understanding of the data and operations.


stream$:            ---o---o-------o--o---------|
streamNeverending$: ---o---o-------o--o----------
streamWithError$:   ---o---o-------o--o---------X

Legend:
o : Value
| : Complete signal
X : Error signal
            


stream$:            ---o---o-------o--o---------------|

                    vvvvvv  map (x => 1)  vvvvvvvvvvvvv

                    ---1---1-------1--1---------------|

                    v  scan ((acc, x) => acc + x, 0)  v

                    ---1---2-------3--4---------------|
              

Shared Observables

  • Observables are not shared accross subscribers by default
  • Each subscriber usually gets its own instance
  • Shared Observables are always hot

const stream$ = Rx.Observable
  .interval(500)
  .take(10)
  .share(); // Share data across subscribers (at least one
            // subscriber is necessary).
            // This is the same as publish().refCount() and
            // automatically makes the observable hot.

stream$.subscribe(x => console.log(`stream$ (1): ${x}`));

setTimeout(() => {
  stream$.subscribe(x => console.log(`stream$ (2): ${x}`));
}, 2000);
            
jsbin

Combining Observables

  • Demo of how to deal with async data.
  • Change mergeMap to switchMap to get rid of any overlaps of the async responses.

const dataset = [
  {id: 1, timeout: 2000},
  {id: 2, timeout: 600},
  {id: 3, timeout: 600}
];

// 0----1----2----3----4- Seconds

// x----x----x|
// 1----2----3|        this could be a request
// --------2-1--3|     this could be a response from the server


const interval = 200; // time in ms
const interval$ = Rx.Observable.interval(interval)
  .map(i => i+1)
  .startWith(0)
  .map(i => `Elapsed time: ${i * interval}`);

const demo$ = Rx.Observable.interval(1000)
  .map(i => i+1)
  .startWith(0)
  .take(dataset.length)
  .map(i => dataset[i])
  .mergeMap(x => Rx.Observable.timer(x.timeout).mapTo(x));
  // use switchMap in the next line to automatically
  // unsubscribe from observable branches that are not
  // the newest (in RxJS 4: flatMapLatest)

interval$.merge(demo$)
  .takeUntil(Rx.Observable.timer(3000))
  .subscribe(x => console.log(x));
          
jsbin

Error handling

Errors can be handled in the error callback.


const interval$ = Rx.Observable.interval(1000);
const result$ = interval$
  .mergeMap(x =>
    x === 3 ?
      Rx.Observable.throw(`I don't like threes`) :
      Rx.Observable.of('a', 'b', 'c')
  );

result$.subscribe(
  x => console.log(x),
  e => console.error(e)  // Here the error can be handled
);
            
jsbin

Error and retry

  • Error handling can also be added to the pipeline
  • Operators such as retry, retryWhen and catch are available

const interval$ = Rx.Observable.interval(100);
const result$ = interval$
  .mergeMap(x =>
    x === 3 ?
      Rx.Observable.throw(`I don't like threes`) :
      Rx.Observable.of(`passed ${x}`)
  )
  .retry(2)
  .catch((error) => Rx.Observable.of(
    `My fallback value. Error was: ${error}`));

result$.subscribe(x => console.log(x));
            
jsbin
### Code samples - [Redux: A minimalistic RxJS implementation](https://jsbin.com/furelam/edit?js,console) - [Angular 2: Isolate components with RxJS](http://plnkr.co/edit/zL67mT7mbJ6kbjsAPtCJ?p=preview) - Drag and drop implementation: [Easy](https://jsfiddle.net/djwfyxs5/), [Advanced](https://jsfiddle.net/bq8s4dbt/) - Error handling: [Automatic retry with `retryWhen`](https://jsbin.com/behofo/edit?js,console)
### References / Resources - [RxJS API Doc](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html) and [Manual](http://reactivex.io/rxjs/manual/index.html) - [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754) (RxJS 4) - [RxJS Doc: Error handling](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/errors.md) - [rx-book](http://xgrommx.github.io/rx-book/) (RxJS 4) - [Introduction to Reactive Programming](https://egghead.io/courses/introduction-to-reactive-programming) (egghead.io) - [Creating Observables from scratch](https://egghead.io/courses/rxjs-beyond-the-basics-creating-observables-from-scratch) (egghead.io) - [Operators in Depth](https://egghead.io/courses/rxjs-beyond-the-basics-creating-observables-from-scratch) (egghead.io) - [RxJS with Matthew Podwysocki](https://devchat.tv/js-jabber/182-jsj-rxjs-with-matthew-podwysocki) (JSJ Podcast) - [Podcastwysocki - RxJS Banter with Matt Podwysocki and Ben Lesh](http://modernweb.podbean.com/e/podcastwysocki-rxjs-banter-with-matt-podwysocki-and-ben-lesh/) (Modern Web Podcast)
### References / Resources - Angular 2 - [Build Redux Style Applications with Angular2, RxJS and ngrx/store](https://egghead.io/courses/building-a-time-machine-with-angular-2-and-rxjs) (egghead.io) - [Learn the Basics of Angular 2 Forms](https://egghead.io/courses/intro-to-angular-2-forms) (egghead.io) - [Functional Reactive Programming for Angular 2 Developers - RxJs and Observables](http://blog.angular-university.io/functional-reactive-programming-for-angular-2-developers-rxjs-and-observables/)