AsyncSubject
AsyncSubject is a variant of a Subject which keeps the last value emitted by a source observable before completion and sends it to all new subscriptions. AsyncSubject needs to wait until the source observable completes before identifying the current value as the latest and only then emit it to existing or future subscribers.
This behavior means that you can always directly get the last emitted value from the AsyncSubject even if the subscriber subscribes much later than the value was stored.
In a way, this AsyncSubject is similar to how Promise works. The biggest difference between the two is that a Promise is always eager, meaning it will execute the function you pass to it immediately. With AsyncSubject, however, you can control when you want to subscribe to a source observable. And because all observables are lazy, only when you subscribe the producer function in the source observable will be executed.
In case of an error on the source observable, AsyncSubject will not emit the latest value to subscriptions. Instead, it will simply pass along the error notification from the source Observable to new subscriptions.
AsyncSubject works in the following way:
- Create an internal subscriptions container
- When a new subscription occurs, add it to the container
- When a source observable emits a new value or the method
nextis called on the subject, set the emitted value as the current latest value overriding any existing value if any - If the source observable completes or the method
completeis called on the subject, set the state of the subject tostoppedand send the current latest value alongside the complete notification to all existing subscriptions; remove the subscriptions from the container - If the source observable throws an error or the method
erroris called on the subject, set the state of the subject tostoppedand set theerrornotification as the current value; send theerrornotification to all existing subscriptions; remove the subscriptions from the container - If the subject is
stopped, do not add any new subscriptions to the container. Instead, if the subject hasn’t been errored, send the current latest alongside complete or notification immediately on subscription to the corresponding observer, otherwise send the error notification without the latest vlue - If the
stoppedsubject is subscribed to a new source observable, ignore the values from this source
Usage
AsyncSubject is a great choice if you need to fetch and cache (one-shot) resources. When making a network request we’re generally interested in the final output, which is a response. And to get it we need to wait until the request is fully loaded at which point an observable stream completes.
Here’s an example that demonstrates that scenario:
const cache = {};
function getResource(url) {
if (!cache[url]) {
cache[url] = new AsyncSubject();
fetch(url)
.then((response) => response.json())
.then((data) => {
cache[url].next(data);
cache[url].complete();
});
}
return cache[url].asObservable();
}
const url = 'https://api.mocki.io/v1/ce5f60e2';
getResource(url).subscribe((data) => console.log(data));
setTimeout(() => {
// no request is made, data is served from the AsyncSubject's cache
getResource(url).subscribe((data) => console.log(data));
}, 3000);