Schedulers in RxJS

What do you know about Schedulers in RxJS? They hide from the developers working with the Observable execution context. Like those house elves from Harry Potter who do all the dirty work at Hogwarts, and no one even heard of them. Let's fix that and find out a little more about them.

What is Scheduler

Scheduler Observable Observer. ( )

, Scheduler Observable. , .

import { of } from "rxjs";

console.log("Start");
of("Observable").subscribe(console.log);
console.log("End");

// Logs:
// Start
// Observable
// End

console.log Observable . . , Observable , observeOn Scheduler.

import { asyncScheduler, of } from "rxjs";
import { observeOn } from "rxjs/operators";

console.log("Start");
of("Observable")
  .pipe(observeOn(asyncScheduler))
  .subscribe(console.log);
console.log("End");

// Logs:
// Start
// End
// Observable

StackBlitz

Observable . . , setTimeout(…, 0), ?

, asyncScheduler . Schedulers. .

Schedulers

Schedulers , Event Loop JavaScript. .

, :

  1. (callstack)

  2. (Promise)

  3. (setTimeout, setInterval, XMLHttpRequest ..).

  4. , . (requestAnimationFrame)

RxJS Scheduler :

queueScheduler

asapScheduler

asyncScheduler

animationFrameScheduler

VirtualTimeScheduler  TestScheduler, . .

.

import { of, merge, asapScheduler,asyncScheduler,queueScheduler, animationFrameScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";

const async$ = of("asyncScheduler").pipe(observeOn(asyncScheduler));
const asap$ = of("asapScheduler").pipe(observeOn(asapScheduler));
const queue$ = of("queueScheduler").pipe(observeOn(queueScheduler));
const animationFrame$ = of("animationFrameScheduler").pipe(
  observeOn(animationFrameScheduler)
);
merge(async$, asap$, queue$, animationFrame$).subscribe(console.log);

console.log("synchronous code");

// Logs:
// queueScheduler
// synchronous code
// asapScheduler
// animationFrameScheduler
// asyncScheduler

StackBlitz

, "queueScheduler" , "synchronous code". "asapScheduler" "asyncScheduler", .

Schedulers

Scheduler observeOn subscribeOn. Scheduler, delay, .

import { of, asyncScheduler } from "rxjs";
import { observeOn, subscribeOn } from "rxjs/operators";

of("observeOn")
  .pipe(observeOn(asyncScheduler, 100))
  .subscribe(console.log);

of("subscribeOn")
  .pipe(subscribeOn(asyncScheduler, 50))
  .subscribe(console.log);

// Logs:
// subscribeOn
// observeOn

StackBlitz

, observeOn observer β€” next, error complete Scheduler . subscribeOn subscriber β€” subscribe .

, delay observeOn/subscribeOn, Scheduler, asyncScheduler. β€” observeOn(animationFrameScheduler, 100).

RxJS 6.5.0 Scheduler of, from, merge, range .. RxJS deprecated,   scheduled .

import { of, scheduled, asapScheduler } from 'rxjs';

// DEPRECATED
// of(2, asapScheduler).subscribe(console.log);

scheduled(of('scheduled'), asapScheduler).subscribe(console.log);

Scheduler

Schedulers RxJS, . , Scheduler , . . . :

const cache = new Map<number, any>();
function get(id: number): Observable<any> {
 if (cache.has(id)) {
   return of(cache.get(id));
 }
 return http.get(β€˜some-url\’ + id).pipe(
   tap(data => {
     cache.set(id, data);
   }),
 );
}

. . , , , - . , . , .

scheduled asyncScheduler 4 , .

return scheduled(of(cache.get(id)), asyncScheduler);

. , .

Schedulers . RxJS . . , Scheduler.

Schedulers . !




All Articles