RxJS in Angular. Cookbook

Sergey Gultyayev
6 min readDec 26, 2022

--

Let’s review most common scenarios that RxJS enables you to solve. With RxJS they are much easier than to do it full manual with no RxJS. Let’s begin.

Photo by Maxwell Nelson on Unsplash

Autocompletion

It’s common to have an input and fetch some list from a backend with available options to the user based on the provided input.

@Component({
template: `
<input [formControl]="control" type="text">

<div *ngFor="let item of list$ | async">{{ item.title }}</div>
`
})
export class MyComponent {
control = new FormControl(""); // create a control so we can subscribe to the valueChanges observable

list$ = this.control.valueChanges.pipe(
debounceTime(200), // wait for 200ms for value to stop changing
distinctUntilChanged(), // only emit data when previous value is different from the current one
switchMap(value => this.httpClient.get(...)), // make request to the server with input’s value
)
}

In conjunction with async pipe it’s a piece of cake. No need to manually assign values, or handle 200ms to make sure we don’t DDOS our backend. switchMap insures that we unsubscribe from the inner GET request when a new value arrives, so we don’t need to worry about old requests. We work only with the most recent one.

You might want to make requests only when the value is longer than N. It’s easily achievable by adding one more operator before the switchMap.

filter(value => value.length > 3), // will only let values through that have more than 3 characters

Filtering & pagination

Let’s break it down.

  1. We start with combineLatest — it creates one stream out of latest values emitted by each observable in the array. It will then emit an array where each value will be latest emitted value of the corresponding. Since we can have tags and page change over time we would want to make a new request when each of them changes.
  2. We pipe startWith to this.tags.valueChanges observable so that it emits an empty array. We have two reasons for that: combineLatest require all observables to emit at least one value, valueChanges emits values only when the input changes.
  3. We use switchMap to make a request. In our case I’m just returning a JSON to show that values are updated in real time with no actions required from our side. of returns an observable out of values that you pass to it as arguments.
  4. Finally we put the posts$ observable into the template and use the async pipe to subscribe to it.

That’s it. Not much of manual work to dynamically update the page based on 2 different values (imaging doing it fully manual without RxJS). RxJS is great, isn’t it?

Fixing change detection

Let’s say you have a component where you have just added a subscription, but the data is not displayed in the component. Most commonly this is because your component, or one of its parent components has the OnPush strategy set. Then, your component doesn’t get checked when asynchronous events happen in the app. To fix it you will need to tell Angular that there are some changes to check for.

export class Component implements OnInit {
ngOnInit(): void {
this.httpClient.get(...).subscribe((data) => {
this.data = data;
this.changeDetectorRef.markForCheck(); // Tell Angular to check this component during the next change detection cycle
});
}
}

You also need to remember that you have to subscribe to an observable in order to have its code executed. I.e. if you want to make a GET request using HttpClient — you should subscribe. If you don’t subscribe — the request won’t be made. If you subscribe twice — the request will be done twice.

Refreshing the access token

Usually when you have an access token, you also have a refresh token which has a longer TTL (time to live) than the access token. It’s used to renew the access token when the old one expires without the need for the user to enter their credentials once again.

@Injectable()
export class AuthInterceptor implements HttpInterceptor {
constructor(private authService: AuthService) {}

intercept(
req: HttpRequest<unknown>,
next: HttpHandler
): Observable<HttpEvent<unknown>> {
return defer(() => {
const token = this.authService.getAccessToken();
const newReq = req.clone({
headers: req.headers.set(`Authorization`, `Bearer ${token}`),
});

return next.handle(newReq);
}).pipe(
retry({
count: 1,
delay: (error) => {
if (error instanceof HttpErrorResponse && error.status === 401) {
return this.authService.refreshAccessToken();
}

return throwError(() => error);
},
}),
tap({
error: (error) => {
if (error instanceof HttpErrorResponse && error.status === 401) {
this.authService.logout()
}
}
})
);
}
}

Let’s break it down:

  1. We created (and provided) an HttpInterceptor as usual.
  2. We wrap our request logic into a defer function. This way when we retry it — the whole function will be executed once again. So that we write headers logic only once.
  3. Defer returns next.handle(nextReq) in order to make the request and have it work through the whole chain of interceptors.
  4. We pipe the retry operator to the observable and pass a config object to it with the following properties: count: 1 — so that we attempt to refresh the token only once for the request; delay — check if there is a 401 error if so we refresh the token (returns an observable), if it’s not a 401 error we re-throw the error.
  5. We use a tap operator with a config object. It has the same signature as subscribe. We use it so we can un-authenticate the user in case the request fails with 401 error.

If an observable in the delay property fails — the whole observable will fail with that error. For us it’s OK as both unauthorized request and failed access token refresh request usually fail with the same error.

Making observables multi-cast

First off, let’s rewind what is the difference between single-cast and multi-cast observable. By default observables are single-cast which means that their initializing logic (the callback contents) is executed for each subscription separately. Each subscriber will receive their own values emitted independently.

For multi-cast observables no matter how many subscriptions to one observable there are — it’s logic will be executed only once, and all of its subscribers will receive emissions simultaneously.

So, if you store an HttpClient.get request into a property and subscribe to it using async pipe in multiple places — for each pipe the request will be re-made. To fix this you want to use either share or shareReplay. They make a single-cast observable multi-cast. This way the request made only once.

Let’s see how they are different:

const observable = new Observable(subscriber => {
subscriber.next(1);

setTimeout(() => subscriber.next(2), 1000);
setTimeout(() => subscriber.next(3), 2000);
}).pipe(
share()
);

observable.subscribe(console.log);
// will log 1, 2, 3

setTimeout(() => {
observable.subscribe(console.log);
// will log 2, 3
}, 500);
const observable = new Observable(subscriber => {
subscriber.next(1);

setTimeout(() => subscriber.next(2), 1000);
setTimeout(() => subscriber.next(3), 2000);
}).pipe(
shareReplay({
bufferSize: 1,
refCount: true
})
);

observable.subscribe(console.log);
// will log 1, 2, 3

setTimeout(() => {
observable.subscribe(console.log);
// will log 1, 2, 3
}, 500);

setTimeout(() => {
observable.subscribe(console.log);
// will log 3
}, 3500);

As you can see from the examples above, share only makes both subscribers share the source observable and doesn’t affect the way values get emitted.

The shareReplay accepts a config object where you can specify the amount of emissions happened before the subscription you want to repeat to the subscriber.

For the templates it might be a bit safer to use shareReplay so that if there are any nested ngIfs you won’t have any problems with them not receiving the values.

However, you should keep in mind that you should specify the refCount: true . If not done so you may cause a memory leak.

--

--

Sergey Gultyayev
Sergey Gultyayev

Written by Sergey Gultyayev

A front-end developer who uses Angular as a main framework and loves it

No responses yet