Operator create
pozwala na szybkie tworzenie własnych Observables
na podstawie przekazanej jako parametr funkcji anonimowej. Najlepszą drogą do jego zrozumienia – jest praktyka.
Składnia
Przyjrzyjmy się poniższemu przykładowi:
const source = Observable.create((observer) => {
observer.next(1);
setTimeout(() => observer.next(2), 1000);
setTimeout(() => {
observer.next(3);
observer.complete();
}, 2000);
return () => {
console.log('Disposed');
};
});
Okej, czas na analizę. create
jest statyczną metodą klasy Observable
, która zwraca nam gotowy obiekt typu Observable
. Funkcja, którą przekazaliśmy jako parametr determinuje zachowanie naszego obiektu i zwraca kolejną funkcję, która wykona się w celu ewentualnego zwolnienia zasobów (programiści C# powinni skojarzyć to z obiektami implementującymi interfejs IDisposable). Wywołanie metody next()
oznacza przekazanie obserwatorowi wartości podanej jako parametr (w naszym przypadku kolejno liczby 1, 2 i 3) natomiast metoda complete()
powoduje zakończenie działania Observable
i powiadomienie o tym obserwatora.
Istnieje jeszcze jedna metoda – error()
, której nie wykorzystaliśmy w powyższym przykładzie. Jej wywołanie sygnalizuje pojawienie się błędu, skutkuje poinformowaniem o tym obserwatora oraz zakończeniu działania subskrypcji.
Observable
stworzone w ten sposób (przykład) należą do grupy tzw. Cold Observables – oznacza to, że generują wartości dopiero wtedy, kiedy obserwator zacznie subskrybować obiekt i robią to niezależnie dla każdego z nich.
Pierwszy obserwator
Przyszedł czas na utworzenie pierwszego observera
.
const subscription$ = source.subscribe(
value => console.log('Value from Observable: ', value),
error => console.error('Error: ', error),
() => console.warn('Completed'));
Metoda subscribe
pozwala na podpięcie nowego obserwatora do obiektu. Jako parametry przyjmuje 3 funkcje, kolejno – przechwytującą przekazaną wartość, informacje o błędzie i sygnał zakończenia subskrypcji.
Zmienna subscription$
przechowuje obiekt typu Subscription
udostępniający m.in. metodę unsubscribe()
, która pozwala „odwołać subskrypcję”. W wielu przypadkach warto o niej pamiętać, aby zapobiec wyciekom pamięci.
Znak dolara umieszczony na końcu nazwy zmiennej subscription$ wynika z konwencji nazw, która przyjęła się przykładowo w środowisku programistów Angular 2.
Rezultat wywołania naszego kodu:
Hot vs Cold?
Jak zdążyłem wspomnieć nieco wyżej, na ten moment nasze Observable
jest Cold – generuje wartości niezależnie dla konkretnego obserwatora. Do naszego kodu dorzućmy drugiego obserwatora i sprawdźmy efekt (aby rozróżnić komunikaty poszczególnych obserwatorów, do drugiego dorzuciłem trzy znaki —):
Widzimy, że zgodnie z założeniem, każdy obserwator jest traktowany niezależnie i otrzymuje ten sam zestaw wartości. Co w sytuacji, gdybyśmy chcieli, aby Observable
rozpoczynało realizację swojego „zadania” natychmiastowo? To właśnie cechuje Hot Observable.
W jaki sposób możemy uzyskać Hot z naszego Cold? Z pomocą przychodzi nam metoda publish()
, której wywołanie zwraca ConnectableObservable
:
const published = this.source.publish();
Teraz, aby nasze Observable
rozpoczęło wykonywanie zadania niezależnie, wystarczy wywołać metodę connect()
obiektu published
.
published.connect();
Współdzielenie stanu
Pamiętamy, że na ten moment każdy obserwator funkcjonuje niezależnie. Co w sytuacji, gdybyśmy chcieli współdzielić stan Observable
pomiędzy naszymi obserwatorami? Możemy to osiągnąć właśnie przy użyciu publish()
i connect()
. Przykład (opóźnimy w nim subskrypcję drugiego obserwatora o 900ms):
const published = this.source.publish();
published.subscribe(
value => console.log('Value from Observable: ', value),
error => console.error('Error: ', error),
() => console.warn('Completed'));
setTimeout(() => {
published.subscribe(
value => console.log('--- Value from Observable: ', value),
error => console.error('Error: ', error),
() => console.warn('--- Completed'));
}, 900);
published.connect();
Rezultat można zaobserwować poniżej:
Widzimy, że obserwator numer dwa nie rozpoczął już od wartości 1, a od kolejnego wywołania. Co w sytuacji, kiedy spróbujemy zasubskrybować Observable
po tym jak zakończyła się realizacja współdzielonego zadania? W tym celu dorzucimy kolejnego obserwatora i opóźnimy jego subskrypcję o 4 sekundy:
setTimeout(() => {
published.subscribe(
value => console.log('------ Value from Observable: ', value),
error => console.error('Error: ', error),
() => console.warn('------ Completed'));
}, 4000);
Rezultat? Zerowy. Obserwator nie zareaguje w żaden sposób. Aby to rozwiązać, możemy wykorzystać metodę publishLast()
, która spowoduje, że emitowane będą tylko ostatnie wartości, również w przypadku zakończonego zadania.
Jeżeli nawet po zakończeniu zadania, chcemy aby mimo wszystko wyemitowane zostały wszystkie wartości, a nie tylko ostatnia – możemy wykorzystać operator publishReplay()
:
Podsumowanie
Celem wpisu było przedstawienie możliwości operatora create
i różnic pomiędzy Cold a Hot Observable. W następnym wpisie skupimy się na tworzeniu Observable
ze zdarzeń (z ang. events).