Javascript

RxJS – Hot vs Cold Observable, operator create

Operator create pozwala na szybkie tworzenie własnych Observables na podstawie przekazanej jako parametr funkcji anonimowej. Najlepszą drogą do jego zrozumienia – jest praktyka.

Składnia

Przyjrzyjmy się poniższemu przykładowi:


const source = Observable.create((observer) => {
  observer.next(1);

  setTimeout(() => observer.next(2), 1000);
  setTimeout(() => {
    observer.next(3);
    observer.complete();
  }, 2000);

  return () => {
    console.log('Disposed');
  };
});

Okej, czas na analizę. create jest statyczną metodą klasy Observable, która zwraca nam gotowy obiekt typu Observable. Funkcja, którą przekazaliśmy jako parametr determinuje zachowanie naszego obiektu i zwraca kolejną funkcję, która wykona się w celu ewentualnego zwolnienia zasobów (programiści C# powinni skojarzyć to z obiektami implementującymi interfejs IDisposable). Wywołanie metody next() oznacza przekazanie obserwatorowi wartości podanej jako parametr (w naszym przypadku kolejno liczby 1, 2 i 3) natomiast metoda complete() powoduje zakończenie działania Observable i powiadomienie o tym obserwatora.

Istnieje jeszcze jedna metoda – error(), której nie wykorzystaliśmy w powyższym przykładzie. Jej wywołanie sygnalizuje pojawienie się błędu, skutkuje poinformowaniem o tym obserwatora oraz zakończeniu działania subskrypcji.

Observable stworzone w ten sposób (przykład) należą do grupy tzw. Cold Observables – oznacza to, że generują wartości dopiero wtedy, kiedy obserwator zacznie subskrybować obiekt i robią to niezależnie dla każdego z nich.

Pierwszy obserwator

Przyszedł czas na utworzenie pierwszego observera.


const subscription$ = source.subscribe(
  value => console.log('Value from Observable: ', value),
  error => console.error('Error: ', error),
  () => console.warn('Completed'));

Metoda subscribe pozwala na podpięcie nowego obserwatora do obiektu. Jako parametry przyjmuje 3 funkcje, kolejno – przechwytującą przekazaną wartość, informacje o błędzie i sygnał zakończenia subskrypcji.

Zmienna subscription$ przechowuje obiekt typu Subscription udostępniający m.in. metodę unsubscribe(), która pozwala „odwołać subskrypcję”. W wielu przypadkach warto o niej pamiętać, aby zapobiec wyciekom pamięci.

Znak dolara umieszczony na końcu nazwy zmiennej subscription$ wynika z konwencji nazw, która przyjęła się przykładowo w środowisku programistów Angular 2.

Rezultat wywołania naszego kodu:

Cold Observable z pojedynczym obserwatorem

Cold Observable z pojedynczym obserwatorem

Hot vs Cold?

Jak zdążyłem wspomnieć nieco wyżej, na ten moment nasze Observable jest Cold – generuje wartości niezależnie dla konkretnego obserwatora. Do naszego kodu dorzućmy drugiego obserwatora i sprawdźmy efekt (aby rozróżnić komunikaty poszczególnych obserwatorów, do drugiego dorzuciłem trzy znaki ):

Cold Observable z dwoma obserwatorami
Cold Observable z dwoma obserwatorami

Widzimy, że zgodnie z założeniem, każdy obserwator jest traktowany niezależnie i otrzymuje ten sam zestaw wartości. Co w sytuacji, gdybyśmy chcieli, aby Observable rozpoczynało realizację swojego „zadania” natychmiastowo? To właśnie cechuje Hot Observable.

W jaki sposób możemy uzyskać Hot z naszego Cold? Z pomocą przychodzi nam metoda publish(), której wywołanie zwraca ConnectableObservable:


const published = this.source.publish();

Teraz, aby nasze Observable rozpoczęło wykonywanie zadania niezależnie, wystarczy wywołać metodę connect() obiektu published.


published.connect();

Współdzielenie stanu

Pamiętamy, że na ten moment każdy obserwator funkcjonuje niezależnie. Co w sytuacji, gdybyśmy chcieli współdzielić stan Observable pomiędzy naszymi obserwatorami? Możemy to osiągnąć właśnie przy użyciu publish() i connect(). Przykład (opóźnimy w nim subskrypcję drugiego obserwatora o 900ms):


const published = this.source.publish();
published.subscribe(
    value => console.log('Value from Observable: ', value),
    error => console.error('Error: ', error),
    () => console.warn('Completed'));

setTimeout(() => {
    published.subscribe(
        value => console.log('--- Value from Observable: ', value),
        error => console.error('Error: ', error),
        () => console.warn('--- Completed'));
}, 900);
published.connect();

Rezultat można zaobserwować poniżej:

Hot Observable z dwoma obserwatorami
Hot Observable z dwoma obserwatorami

Widzimy, że obserwator numer dwa nie rozpoczął już od wartości 1, a od kolejnego wywołania. Co w sytuacji, kiedy spróbujemy zasubskrybować Observable po tym jak zakończyła się realizacja współdzielonego zadania? W tym celu dorzucimy kolejnego obserwatora i opóźnimy jego subskrypcję o 4 sekundy:


setTimeout(() => {
  published.subscribe(
    value => console.log('------ Value from Observable: ', value),
    error => console.error('Error: ', error),
    () => console.warn('------ Completed'));
}, 4000);

Rezultat? Zerowy. Obserwator nie zareaguje w żaden sposób. Aby to rozwiązać, możemy wykorzystać metodę publishLast(), która spowoduje, że emitowane będą tylko ostatnie wartości, również w przypadku zakończonego zadania.

Hot Observable z wykorzystaniem operatora publishLast
Hot Observable z wykorzystaniem operatora publishLast

Jeżeli nawet po zakończeniu zadania, chcemy aby mimo wszystko wyemitowane zostały wszystkie wartości, a nie tylko ostatnia – możemy wykorzystać operator publishReplay():

>Hot Observable z wykorzystaniem operatora publishReplay
Hot Observable z wykorzystaniem operatora publishReplay

Podsumowanie

Celem wpisu było przedstawienie możliwości operatora create i różnic pomiędzy Cold a Hot Observable. W następnym wpisie skupimy się na tworzeniu Observable ze zdarzeń (z ang. events).