Home

Awesome

RxJS-toolbox - set of custom operators and handy factory functions for RxJS

*Note: updated to Angular 16.x in version 2.2.0

Installation

Install using NPM CLI

npm install --save rxjs-toolbox

forkJoin-transparent

A combination operator that combines multiple sources and returns their last emitted data as well as percentage of their completion.

Usage

forkJoinWithProgress

import { ajax } from 'rxjs/ajax';
import { merge } from 'rxjs';
`import { forkJoinWithProgress } from 'rxjs-toolbox';`
import {tap, mergeMap, ignoreElements} from 'rxjs/operators';

const getUserDetails = userIdsList => {
  
  const arrayOfObservables = userIdsList.map(userId =>
    ajax('https://jsonplaceholder.typicode.com/comments/' + userId)
  )
  
  return forkJoinWithProgress(arrayOfObservables)
}


const result$ = getUserDetails([1, 2, 15]);

result$.pipe(
  mergeMap(([finalResult, progress]) => merge(
    progress.pipe(
      tap((value) => console.log(`${value} completed`)),
      ignoreElements()
    ),
    finalResult
  ))
).subscribe(values => console.log(values), console.warn);

// Output:
// 33.333333333333336 completed
// 66.66666666666667 completed
// 100 completed
// final value:  (3) [{…}, {…}, {…}]

waitUntil

I found an interesting #rxjs custom operator in auth0-angular interceptor codebase: waitUntil

It holds until the param observable emits first value- and then switches to the source

######Params: isLoaded$

isLoaded$ // - some observable we wait for
  ...
of(route).pipe(
    waitUntil(isLoaded$)
)
...

Helper functions

timeRange

Function to create Observable that will emit values with specified delays ######Params: range - array of objects with special structure [{value: <some value>, delay: <delayInMs>},...]

isRelative - if true = next emissions is scheduled only after previous is complete (so delays are summarized).

if false - all values are scheduled at once (delay values are absolute in relation to the moment of subscription)

 const range$ = timeRange([
   {value: 15, delay: 1500}, // 1500ms
   {value: 15, delay: 2500} // 2500ms
 ])

  const range2$ = timeRange([
   {value: 15, delay: 1500}, // 1500ms
   {value: 15, delay: 2500} // 1500+2500
 ], true);

finalizeWithValue

Provides (unlike original finalize from RxJS) source$'s last emitted value (if any) in format {value: <lastValue>}.

If source$ completes with noe emitted value - provides undefined.

Author - Ben Lesh, taken here


from([1,3]).pipe(
  finalizeWithValue((lastEmittedValue) => console.log(lastEmittedValue)) // 3
)

Want to learn RxJS?

Try my "Hands-on RxJS for Web Development" video-course!