RxJS in Angular. Cookbook
Let’s review most common scenarios that RxJS enables you to solve. With RxJS they are much easier than to do it full manual with no RxJS. Let’s begin.
Autocompletion
It’s common to have an input and fetch some list from a backend with available options to the user based on the provided input.
@Component({
template: `
<input [formControl]="control" type="text">
<div *ngFor="let item of list$ | async">{{ item.title }}</div>
`
})
export class MyComponent {
control = new FormControl(""); // create a control so we can subscribe to the valueChanges observable
list$ = this.control.valueChanges.pipe(
debounceTime(200), // wait for 200ms for value to stop changing
distinctUntilChanged(), // only emit data when previous value is different from the current one
switchMap(value => this.httpClient.get(...)), // make request to the server with input’s value
)
}
In conjunction with async
pipe it’s a piece of cake. No need to manually assign values, or handle 200ms to make sure we don’t DDOS our backend. switchMap
insures that we unsubscribe from the inner GET request when a new value arrives, so we don’t need to worry about old requests. We work only with the most recent one.
You might want to make requests only when the value is longer than N. It’s easily achievable by adding one more operator before the switchMap
.
filter(value => value.length > 3), // will only let values through that have more than 3 characters
Filtering & pagination
Let’s break it down.
- We start with combineLatest — it creates one stream out of latest values emitted by each observable in the array. It will then emit an array where each value will be latest emitted value of the corresponding. Since we can have tags and page change over time we would want to make a new request when each of them changes.
- We pipe
startWith
tothis.tags.valueChanges
observable so that it emits an empty array. We have two reasons for that:combineLatest
require all observables to emit at least one value,valueChanges
emits values only when the input changes. - We use
switchMap
to make a request. In our case I’m just returning a JSON to show that values are updated in real time with no actions required from our side.of
returns an observable out of values that you pass to it as arguments. - Finally we put the
posts$
observable into the template and use theasync
pipe to subscribe to it.
That’s it. Not much of manual work to dynamically update the page based on 2 different values (imaging doing it fully manual without RxJS). RxJS is great, isn’t it?
Fixing change detection
Let’s say you have a component where you have just added a subscription, but the data is not displayed in the component. Most commonly this is because your component, or one of its parent components has the OnPush strategy set. Then, your component doesn’t get checked when asynchronous events happen in the app. To fix it you will need to tell Angular that there are some changes to check for.
export class Component implements OnInit {
ngOnInit(): void {
this.httpClient.get(...).subscribe((data) => {
this.data = data;
this.changeDetectorRef.markForCheck(); // Tell Angular to check this component during the next change detection cycle
});
}
}
You also need to remember that you have to subscribe to an observable in order to have its code executed. I.e. if you want to make a GET request using HttpClient
— you should subscribe. If you don’t subscribe — the request won’t be made. If you subscribe twice — the request will be done twice.
Refreshing the access token
Usually when you have an access token, you also have a refresh token which has a longer TTL (time to live) than the access token. It’s used to renew the access token when the old one expires without the need for the user to enter their credentials once again.
@Injectable()
export class AuthInterceptor implements HttpInterceptor {
constructor(private authService: AuthService) {}
intercept(
req: HttpRequest<unknown>,
next: HttpHandler
): Observable<HttpEvent<unknown>> {
return defer(() => {
const token = this.authService.getAccessToken();
const newReq = req.clone({
headers: req.headers.set(`Authorization`, `Bearer ${token}`),
});
return next.handle(newReq);
}).pipe(
retry({
count: 1,
delay: (error) => {
if (error instanceof HttpErrorResponse && error.status === 401) {
return this.authService.refreshAccessToken();
}
return throwError(() => error);
},
}),
tap({
error: (error) => {
if (error instanceof HttpErrorResponse && error.status === 401) {
this.authService.logout()
}
}
})
);
}
}
Let’s break it down:
- We created (and provided) an HttpInterceptor as usual.
- We wrap our request logic into a defer function. This way when we retry it — the whole function will be executed once again. So that we write headers logic only once.
- Defer returns
next.handle(nextReq)
in order to make the request and have it work through the whole chain of interceptors. - We pipe the retry operator to the observable and pass a config object to it with the following properties:
count
: 1 — so that we attempt to refresh the token only once for the request;delay
— check if there is a 401 error if so we refresh the token (returns an observable), if it’s not a 401 error we re-throw the error. - We use a
tap
operator with a config object. It has the same signature assubscribe
. We use it so we can un-authenticate the user in case the request fails with 401 error.
If an observable in the delay property fails — the whole observable will fail with that error. For us it’s OK as both unauthorized request and failed access token refresh request usually fail with the same error.
Making observables multi-cast
First off, let’s rewind what is the difference between single-cast and multi-cast observable. By default observables are single-cast which means that their initializing logic (the callback contents) is executed for each subscription separately. Each subscriber will receive their own values emitted independently.
For multi-cast observables no matter how many subscriptions to one observable there are — it’s logic will be executed only once, and all of its subscribers will receive emissions simultaneously.
So, if you store an HttpClient.get request into a property and subscribe to it using async
pipe in multiple places — for each pipe the request will be re-made. To fix this you want to use either share
or shareReplay
. They make a single-cast observable multi-cast. This way the request made only once.
Let’s see how they are different:
const observable = new Observable(subscriber => {
subscriber.next(1);
setTimeout(() => subscriber.next(2), 1000);
setTimeout(() => subscriber.next(3), 2000);
}).pipe(
share()
);
observable.subscribe(console.log);
// will log 1, 2, 3
setTimeout(() => {
observable.subscribe(console.log);
// will log 2, 3
}, 500);
const observable = new Observable(subscriber => {
subscriber.next(1);
setTimeout(() => subscriber.next(2), 1000);
setTimeout(() => subscriber.next(3), 2000);
}).pipe(
shareReplay({
bufferSize: 1,
refCount: true
})
);
observable.subscribe(console.log);
// will log 1, 2, 3
setTimeout(() => {
observable.subscribe(console.log);
// will log 1, 2, 3
}, 500);
setTimeout(() => {
observable.subscribe(console.log);
// will log 3
}, 3500);
As you can see from the examples above, share
only makes both subscribers share the source observable and doesn’t affect the way values get emitted.
The shareReplay
accepts a config object where you can specify the amount of emissions happened before the subscription you want to repeat to the subscriber.
For the templates it might be a bit safer to use shareReplay
so that if there are any nested ngIf
s you won’t have any problems with them not receiving the values.
However, you should keep in mind that you should specify the refCount: true
. If not done so you may cause a memory leak.