Back to the homepage
RxJS

RxJS used in Angular – Knowledge in a Nutshell

One of the more difficult challenges that less experienced developers, who want to dive into the ecosystem built around Angular, are facing is the eponymous RxJS. This library is considered, by many, an essential factor of a slightly higher (compared to the competition) entry threshold to our favorite framework. Indeed, mastering it requires learning to think in a more reactive way, but the effort is definitely worth it.

In this article, we will present the issues that you need to understand in order to properly work with RxJS, in a slightly more theoretical way.

  1. Reactive programming
  2. RxJS Principles
    2.1. Function Composition
    2.2. Lazy Execution
    2.3. Push-Based Architecture
    2.4. Behavioral Patterns
  3. Observable
  4. Subscription and Observer
  5. Cold vs Hot, Unicast vs Multicast
  6. Subject
    6.1. subject
    6.2. BehaviorSubject
    6.3. ReplySubject
    6.4. AsyncSubject
  7. Summary

 

Reactive programming

Reactive programming is a programming paradigm (like imperative, object-oriented, functional, or declarative programming) that focuses on asynchronous and non-blocking data processing. 

In this case, the events are the data and the processing mode itself is about defining suitable data streams within which the events can undergo various modifications (e.g., transformation, merging, splitting, etc.). Events are created and published by the producer and read by the consumers.

This paradigm is extremely useful in the web applications environment, where there are constantly asynchronous events (such as user interaction with the application interface, events generated by the browser API or extensive communication with the server application) that we want to process in the background.

RxJS Principles

RxJS is a javascript library which facilitates the implementation of the reactive code. The whole idea is based on a few important concepts, which, if noticed and understood, make your work with the reactive code much easier.

Function composition

This is an element that combines with the functional programming paradigm and it applies to RxJS operators. The simplicity of defining the data process in a stream is that it is done with a composition of many simple operators. 

Operators are pure functions, so their results only depend on their arguments (in particular, arguments for an operator are going to be specific values originating from a stream, or even entire streams themselves). They have a single and well-defined responsibility (often, but not always, their behavior is easy to decode directly from the operator name, e.g. “filter” filters, “map” maps, and “catchError” catches errors).

A chain of pure functions is clear and easy to understand and easily testable (because each pure function can be tested separately).

Lazy Execution

This is the opposite approach to eager execution and in the case of RxJS streams (except for certain cases) it means that the operations defined within the streams will be executed  when a subscription is created and not when the stream is defined (i.e., a consumer starts listening to the stream values).

Javascript Promises whose processing begins immediately after they are defined, operate contrary to this approach. 

For example, if we define a Promise that makes an HTTP call and an Observable that makes the same HTTP call, then in the case of Promise, the request will be sent as soon as it is defined, and in the case of Observable it will only be executed when the subscription is created (which may happen much later, or not at all).

Push-Based Architecture

This is the opposite approach to the Pull-Based Architecture. What do both terms mean:

  • pull-based means that in case we need some data we have to actively query a mechanism that will return it to us (real life example: we go to the angular.love blog to check or see a list of recent articles and thus see whether a new entry has occurred),
  • push-based is based on predefined streams, into which we “push” data that goes out to all subscribed consumers (real life example: after liking the angular.love fanpage you receive a notification about a new blog post),

The entire stream serves as a contract between the producer and the consumers at the same time. Consumers can listen for events even when the producer does not yet exist or conversely, the producer can send data to the stream even when no one is listening yet.

Behavioral Pattern

Behavioral patterns are those design patterns that focus on managing, organizing, and connecting behaviors.

When it comes to RxJS, we can specify several important terms (e.g. producer, consumer, subscription, observable, subject, operator, etc.).

Every problem we want to solve with RxJS should be examined in terms of behavioral patterns, so we have to establish which role is played by what (in particular, who is the producer and who is the consumer) and how do we want to combine different behaviors (appropriate composition of operators whose behavior is well-defined, combining multiple streams, etc.). 

If we become proficient in the reactive mindset combined with a behavioral approach to problem solving, then using RxJS correctly will become extremely easy and intuitive to us.

Observable

Streams that we are talking about are just a specific type of collection, into which values are being pushed lazily (lazy push). Such a collection in the RxJS library is represented by the Observable (which, by the way, is a generic class whose generic type describes the type of values in the collection).

Consumers can listen for values in the stream using the ‘subscribe’ method.

The ‘pipe’ method however takes RxJS operators as arguments, thanks to which we can modify the stream (e.g. filter out unwanted values).

Many built-in mechanisms in Angular return a stream (Observable). For example:

  • HttpClient methods (get, post, patch, delete itd.),
  • Router.events,
  • AbstractControl getters (valueChanges, statusChanges),
  • ActivatedRoute fields (url, params, queryParams, fragment, data)

Subscription and Observer

A subscription is an object created each time a new consumer starts listening for values in a stream.

A reference to such an object is returned by the ‘subscribe’ method. As mentioned earlier, the creation of a subscription usually triggers the start of data processing in the stream (apart from exceptions). So for example, if we define a stream with an httpClient that sends a request, that request is not sent when the stream is created, but when the subscription is created (in the example below, a separate request is made for each subscription).

A subscription connects a particular consumer with a stream and exposes a very important ‘unsubscribe’ method that allows you to unsubscribe (stop a particular consumer from listening for values in the stream) and at the same time stop (cancel) the data processing in the stream related to that particular consumer (in the example above, it could cancel the request sent by the browser).

In the context of an Angular application, the most common practice is to cancel all active subscriptions (at the component level) in the OnDestroy hook (at the end of the component lifecycle). It is important not to forget to do this, because in the best case it will lead to memory leaks and subscriptions will remain active despite the destroyed component.

Tip: 

The recommended way to cancel a subscription in a component is to use the ‘takeUntil’ operator.

Alternatively, you can use the popular @ngneat/until-destroy library

In the angular view, we have AsyncPipe, which creates the subscription itself and cancels it at the right moment. Its additional advantage is that a new value retrieved from this pipe affects the component, which will be marked as ‘dirty’ (and results in triggering change detection). In most cases, using AsyncPipe is a much better solution than creating and deleting subscriptions manually on the component logic side.

The observer is simply our consumer, represented in RxJS as an Observer type object. The subscription links the stream and the consumer and our observer is the physical implementation of consumed events from the stream.

The ‘subscribe’ method of an Observable type object takes an (optionally partial) observer as an argument.

Cold vs Hot, Unicast vs Multicast

We can classify streams in many ways, but one of the most important differences is the way we process logic inside the defined stream.

It was mentioned earlier that the logic inside the stream is executed after the subscription is created (see the example with HttpClient). Those streams are called COLD (easy to remember as dormant/frozen streams whose processing starts after the consumer shows up).

Opposite to the above, we have streams called HOT, i.e. streams inside which processing takes place regardless of the consumer’s presence/absence. For example, in the Router.events stream, events related to navigation are emitted even if we don’t create a subscription.

Let’s take a further look at a situation with a stream with multiple consumers (with multiple created subscriptions). In a situation where the logic in the stream is executed independently for each consumer, we are dealing with a UNICAST stream. In RxJS, streams are unicast by default, but that can be changed with the help of special operators.

When processing is performed only once and the result is distributed to all consumers, in that case the stream is a MULTICAST type.

UNICAST

MULTICAST

HOT

hot unicast

hot multicast – processing is independent from subscription, the result is distributed to all consumers. Example: Subject

COLD

cold unicast – processing is only executed after the subscription is created, independently for each subscriber. Example: HttpClient.get cold multicast – processing is only executed after the subscription is created, but the result will be shared among all consumers. Example: HttpClient.get(…).pipe(shareReply(1))

 

hot unicast this combination is contradictory, the stream cannot do independent processing and also process separately for each subscription at the same time.

Tip:

Let’s imagine that we have a cold unicast stream that sends a HTTP request (the creation of the subscription sends the request and the response from the server is pushed into the stream). A common practice in Angular is to inject such a service into a component, pass a reference to the stream view and create a subscription with the help of asyncPipe. In this scenario, the application has to wait for the initialization of the entire module, component, and then the view, until the pipe instance is created, before sending the request. If we turn this stream into a hot multicast using the single ‘publishReplay’ operator, then the request will be sent much faster to the server (when the service instance is created) and the data from the response will end up in the application view faster. 

Subject

Subject is a special Observable variant (and therefore also a stream), which is always the hot multicast type. It is possible to subscribe to a subject, but it also offers following observer methods (next/error/complete) which allow us to push new events into the stream in an imperative way. An angular example of a subject is e.g. EventEmitter (which is used in components together with @Output decorator).

The RxJS library offers several types of subjects:

Subject

A basic variant that doesn’t store any information about the values in the stream. Values that are pushed into the stream before the subscription is created won’t be delivered.

BehaviorSubject

A subject variant that introduces the concept of the current value in the stream. By creating a BehaviorSubject instance, we give the stream an initial value (which also becomes the current value), and then any subsequent value pushed into the stream overwrites it. By subscribing to such a subject, the observer immediately gets the current value. BehaviorSubject also allows the current value to be read synchronously (using a getter named ‘value’).

ReplySubject

This variant is similar to BehaviorSubject (an observer can receive a value that was pushed into the stream before the subscription was created), but ReplySubject is not limited to a single value and can cache (and then send to new subscribers) multiple values which were previously pushed into the stream. The number of values that the subject will cache can be limited by the arguments taken by the constructor (specifying the maximum number of values pushed, or the length of the time window for which we cache events).

AsyncSubject

This variant only provides the subscribers with the last value pushed into the stream and only after the ‘complete’ event has been emitted in the stream. AsyncSubject is similar, in its behavior, to the ‘last()’ operator which also waits for the ‘complete’ event and returns the last value.

Summary

The information above provides a solid base for a conscious use of RxJS, in order to feel comfortable when interacting with the RxJS code and to be able to have a discussion about this library (you may score at a job interview!).

Of course much more could be said about each of the topics mentioned above and even more topics have been left out. 

Let us know in the comments below if you would be interested in discussing any of the addressed topics further or whether you would like to read an article about something completely different (concerning RxJS).

Sources

  1. https://rxjs.dev/
  2. https://www.learnrxjs.io/ 
  3. https://angular.io/guide/rx-library
  4. https://anchor.fm/angular-master/episodes/AMP-4-Target-RxJS-part-I-with-Michael-Hladky-e121imn
  5. https://www.youtube.com/watch?v=y2aBiA5N4h8

About the author

Mateusz Dobrowolski

Angular Developer at House of Angular. Mateusz is a Typescript sympathizer with several years of experience in developing Angular applications.

Don’t miss anything! Subscribe to our newsletter. Stay up-to-date with the latest trends, tips, meetups, courses and be a part of a thriving community. The job market appreciates community members.

Leave a Reply

Your email address will not be published. Required fields are marked *