RxJS Custom Operators

The RxJS library, due to its wide choice of operators, is rightfully considered an extremely powerful tool in the developer's arsenal. In this post, I want to introduce you to the concept of RxJS custom operators with examples of implementation.





RxJS . , . RxJS .





RxJS β€” , (observable) . , RxJS JavaScript (TypeScript). (identity), :





import { interval, Observable } from "rxjs";
import { take } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function identity<T>(source$: Observable<T>): Observable<T> {
  return source$;
}

const results$ = source$.pipe(identity);

results$.subscribe(console.log);
  
// console output: 0, 1, 2
      
      



- .





( ) :





<Copy>
import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function log<T>(source$: Observable<T>): Observable<T> {
  return source$.pipe(tap(v => console.log(`log: ${v}`)));
}

const results$ = source$.pipe(log);

results$.subscribe(console.log);
  
// console output: log: 0, log: 1, log: 2
      
      



source$, pipe.





. , . :





import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): (source$: Observable<T>) => Observable<T> {
  return source$ =>
    source$.pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2
      
      



, MonoTypeOperatorFunction  RxJS. , pipe :





import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): MonoTypeOperatorFunction<T> {
  return pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2
      
      



RxJS .





. :





import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnce<T>(job: Function): MonoTypeOperatorFunction<T> {
  let isFirst = true;

  return pipe(
    tap(v => {
      if (!isFirst) {
        return;
      }

      job(v);
      isFirst = false;
    })
  );
}

const results$ = source$.pipe(tapOnce(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, 0, 1, 1, 2, 2
      
      



, defer:





import { defer, interval, MonoTypeOperatorFunction } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnceUnique<T>(job: Function): MonoTypeOperatorFunction<T> {
  return source$ =>
    defer(() => {
      let isFirst = true;

      return source$.pipe(
        tap(v => {
          if (!isFirst) {
            return;
          }

          job(v);
          isFirst = false;
        })
      );
    });
}

const results$ = source$.pipe(tapOnceUnique(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, First value emitted, 0, 1, 1, 2, 2
      
      



tapOnce



.





firstTruthy



:





import { MonoTypeOperatorFunction, of, pipe } from "rxjs";
import { first } from "rxjs/operators";

const source1$ = of(0, "", "foo", 69);

function firstTruthy<T>(): MonoTypeOperatorFunction<T> {
  return pipe(first(v => Boolean(v)));
}

const result1$ = source1$.pipe(firstTruthy());

result1$.subscribe(console.log);

// console output: foo
      
      



evenMultiplied



:





import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { filter, map, take } from "rxjs/operators";

const source2$ = interval(10).pipe(take(3));

function evenMultiplied(multiplier: number): MonoTypeOperatorFunction<number> {
  return pipe(
    filter(v => v % 2 === 0),
    map(v => v * multiplier)
  );
}

const result2$ = source2$.pipe(evenMultiplied(3));

result2$.subscribe(console.log);
  
// console output: 0, 6
      
      



liveSearch



:





import { ObservableInput, of, OperatorFunction, pipe  } from "rxjs";
import { debounceTime, delay, distinctUntilChanged, switchMap } from "rxjs/operators";

const source3$ = of("politics", "sport");

type DataProducer<T> = (q: string) => ObservableInput<T>;

function liveSearch<R>(
  time: number,
  dataProducer: DataProducer<R>
): OperatorFunction<string, R> {
  return pipe(
    debounceTime(time),
    distinctUntilChanged(),
    switchMap(dataProducer)
  );
}

const newsProducer = (q: string) =>
  of(`Data fetched for ${q}`).pipe(delay(2000));

const result3$ = source3$.pipe(liveSearch(500, newsProducer));

result3$.subscribe(console.log);
  
// console output: Data fetched for sport
      
      



RxJS . , pipe-.





: [ ]





, - .






"JavaScript Developer. Professional". , , .








All Articles