A visual intro to Rx.js

Consuming an Observable

              
            import {appendResults} from 'utils'
            import Rx from 'Rx'

            let source = Rx.Observable.interval(125)

            source
              .filter(x => x % 2 === 0)
              .map(x => '.')
              .subscribe(x => appendResults(x, 'results'))
              
            

Results

Cold Observables

Source 1

Source 2

Hot Observables

              
            import {appendResults} from 'utils'
            import Rx from 'Rx'

            let cold = Rx.Observable.interval(500)
              , hot = Rx.Observable.interval(500).publish().refCount()

            cold.subscribe(x => appendResults(x, 'cold1'))
            setTimeout(function(){
              cold.subscribe(x => appendResults(x, 'cold2'))
            }, 1500)


            hot.subscribe(x => appendResults(x, 'hot1'))
            setTimeout(function(){
              hot.subscribe(x => appendResults(x, 'hot2'))
            }, 1500)
              
            

Source 1

Source 2

Creating your own observables

              
              import {appendResults} from 'utils'
              import Rx from 'Rx'

              let myObservable = Rx.Observable.create((observer) => {

                let number = Math.random() * 100

                try{
                  if (number > 100 ){ throw new Error('RandomIsBroken') }
                } catch(err){
                  observer.onError(err)
                }

                observer.onNext(number)
                observer.onCompleted()

                // disposal function
                return x => console.log('myObservable has been disposed.')
              })

              myObservable
                .subscribe( results => appendResults(results, 'custom')
                          , err => document.getElementById('custom-error').checked = true
                          , () => document.getElementById('custom-success').checked = true
                          )

              
            

Consuming Data

Rest Api

From Scratch

                
              import {appendResults} from 'utils'
              import Rx from 'Rx'

              const weatherUrl = 'http://api.openweathermap.org/data/2.5/weather?q=NewYork,us'


              let weatherObservable = Rx.Observable.create((observer) => {
                jQuery.getJSON(weatherUrl)
                .done((response) => { observer.onNext(response) })
                .fail((jqXHR, status, error) => { observer.onError(error) })
                .always(() =>{ observer.onCompleted() })
              })

              weatherObservable.subscribe(result => appendResults(`${result.name} | ${result.weather[0].description}`, 'weather'))
                
              

Weather in NYC

With Built-in functions

                
              import {appendResults} from 'utils'
              import Rx from 'Rx'

              const flightUrl = 'http://services.faa.gov/airport/status/JFK?format=json'


              // Or we can use fromPromise for convenience
              let flightDelayObservable = Rx.Observable.fromPromise(jQuery.getJSON(flightUrl))

              flightDelayObservable
               .subscribe(delays => appendResults(`${delays.status.type}: ${delays.status.reason}`, 'flights'))
                
              

JFK Flight Delays

Observables clean up themselves

Disposing of observables

              
                import {appendResults} from 'utils'
                import Rx from 'Rx'


                let toBeDisposedObservable = Rx.Observable.create((observer) => {
                  let id = setTimeout(() => {
                    observer.onNext('I made it!')
                    observer.onCompleted()
                  }, 1000)

                  // Disposal function
                  return () => clearTimeout(id)
                })


                let subscription1 = toBeDisposedObservable
                    .subscribe(x => appendResults(x, 'not-disposed'))

                let subscription2 = toBeDisposedObservable
                    .subscribe(x => appendResults(x, 'disposed'))

                // Dispose the observable before we can consume it
                setTimeout(() => { subscription2.dispose() }, 500)
             
            

Not disposed

Disposed