RxJS Basics
RxJS is the foundation of reactive programming in Angular and one of the most challenging concepts for React developers to learn.
1. What Are Observables?
An Observable represents a stream of data over time. It provides a unified API for handling anything that produces values asynchronously — mouse clicks, HTTP responses, WebSocket messages, and more.
Promise vs Observable
| Promise | Observable | |
|---|---|---|
| Number of values | One (resolve/reject) | Zero to infinite |
| Execution | Eager — runs on new Promise() | Lazy — runs only when subscribe() is called |
| Cancellation | Not possible | Cancellable via unsubscribe() |
| Operators | .then() / .catch() only | 100+ operators for transformation and combination |
| Angular examples | — | HttpClient, Router.events, FormControl.valueChanges |
import { Observable, of } from 'rxjs';
// Creating a simple Observable
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// Using of() for convenience
const greet$ = of('Hello', 'World');
By convention, Observable variables are suffixed with $ (e.g. users$, loading$).
2. Basic Subscription Patterns
An Observable does nothing on its own. Calling subscribe() starts the stream.
Using subscribe()
import { of } from 'rxjs';
const source$ = of(1, 2, 3);
// Short form: next callback only
source$.subscribe(value => console.log(value));
// => 1, 2, 3
// Full observer object
source$.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Done')
});
unsubscribe() and Preventing Memory Leaks
If a subscription outlives the component, it causes memory leaks and stale event handlers.
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription, interval } from 'rxjs';
@Component({ selector: 'app-example', template: '' })
export class ExampleComponent implements OnInit, OnDestroy {
private subscription!: Subscription;
ngOnInit() {
this.subscription = interval(1000).subscribe(n => console.log(n));
}
ngOnDestroy() {
this.subscription.unsubscribe(); // Always clean up
}
}
Observables returned by HttpClient complete automatically after the response, so manual unsubscribe() is not needed. However, interval, Subject, and similar sources require explicit unsubscription.
3. Common Operators
Operators are pure functions composed inside pipe() to transform, filter, or combine Observables.
Transformation Operators
import { of } from 'rxjs';
import { map, switchMap, mergeMap, concatMap } from 'rxjs/operators';
// map: transforms each value (like Array.map)
of(1, 2, 3).pipe(
map(n => n * 2)
).subscribe(console.log); // 2, 4, 6
// switchMap: cancels the previous inner Observable when a new one arrives
// → ideal for search-as-you-type where only the latest result matters
this.searchControl.valueChanges.pipe(
switchMap(query => this.api.search(query))
).subscribe(results => this.results = results);
| Operator | Parallel? | Previous stream | Primary use |
|---|---|---|---|
switchMap | No | Cancelled | Search, navigation |
mergeMap | Yes | Kept alive | Parallel HTTP (e.g. likes) |
concatMap | No | Awaited | Order-sensitive operations |
exhaustMap | No | Ignored | Prevent button double-submit |
Filtering Operators
import { filter, distinctUntilChanged, debounceTime } from 'rxjs/operators';
this.searchControl.valueChanges.pipe(
debounceTime(300), // Wait until typing pauses for 300ms
distinctUntilChanged(), // Ignore unchanged values
filter(query => query.length >= 2) // At least 2 characters
).subscribe(query => this.search(query));
Combination Operators
import { combineLatest, merge, forkJoin } from 'rxjs';
// combineLatest: emits the latest combination whenever any source emits
combineLatest([user$, permissions$]).subscribe(([user, perms]) => { /* ... */ });
// forkJoin: waits for all to complete, then emits once (like Promise.all)
forkJoin([this.api.getUser(), this.api.getPosts()]).subscribe(([user, posts]) => { /* ... */ });
// merge: interleaves multiple streams into one
merge(click$, keydown$).subscribe(event => this.handleEvent(event));
Utility Operators
import { tap, catchError, retry, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';
this.api.getData().pipe(
tap(data => console.log('Received:', data)), // Side effect (great for debugging)
retry(3), // Retry up to 3 times on error
catchError(err => {
this.error = err.message;
return EMPTY; // Swallow error and complete
}),
finalize(() => this.loading = false) // Runs on both success and error
).subscribe(data => this.data = data);
4. Subject and BehaviorSubject
A Subject is both an Observable and an Observer. You can call next() on it externally and it multicasts values to all subscribers simultaneously.
Subject
import { Subject } from 'rxjs';
const subject = new Subject<string>();
subject.subscribe(v => console.log('A:', v));
subject.subscribe(v => console.log('B:', v));
subject.next('Hello'); // A: Hello B: Hello
subject.next('World'); // A: World B: World
// Values emitted before subscribe() are lost
BehaviorSubject
Holds the current value and delivers it immediately to any new subscriber.
import { BehaviorSubject } from 'rxjs';
const count$ = new BehaviorSubject<number>(0); // initial value: 0
count$.subscribe(v => console.log('Component A:', v)); // immediately receives 0
count$.next(1);
count$.next(2);
// Late subscriber immediately receives the latest value (2)
count$.subscribe(v => console.log('Component B:', v)); // immediately receives 2
// Synchronous access to the current value
console.log(count$.getValue()); // 2
Conceptual Comparison with React
| Angular (RxJS) | React Equivalent |
|---|---|
BehaviorSubject | useState |
BehaviorSubject + asObservable() | Context (read-only exposure) |
Root-scoped service with BehaviorSubject | Zustand / Jotai |
5. AsyncPipe
The async pipe is the official way to consume Observables in templates. It handles subscribe() and unsubscribe() automatically.
@Component({
template: `
@if (users$ | async; as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
} @else {
<p>Loading...</p>
}
`
})
export class UsersComponent {
users$ = this.userService.getUsers(); // No subscribe() needed
constructor(private userService: UserService) {}
}
Benefits of AsyncPipe
- Automatically unsubscribes when the component is destroyed
- Works seamlessly with
OnPushchange detection for better performance - Keeps component code declarative — no
ngOnDestroycleanup needed
Avoid calling subscribe() inside components whenever possible. Prefer AsyncPipe or the toSignal() interop described below.
6. RxJS and Signals
Angular 16 introduced Signals as a simpler reactive primitive for synchronous state. RxJS Observables and Signals each excel in different scenarios, and Angular provides first-class utilities to convert between them.
toSignal() — Observable → Signal
import { toSignal } from '@angular/core/rxjs-interop';
import { Component, inject } from '@angular/core';
@Component({
template: `
<!-- Signal syntax: call as a function, no async pipe needed -->
<p>{{ users() }}</p>
`
})
export class UsersComponent {
private userService = inject(UserService);
// Converts Observable to Signal (auto subscribe/unsubscribe)
users = toSignal(this.userService.getUsers(), { initialValue: [] });
}
toObservable() — Signal → Observable
import { toObservable } from '@angular/core/rxjs-interop';
import { signal } from '@angular/core';
import { switchMap } from 'rxjs/operators';
const userId = signal(1);
// Convert a Signal to an Observable to use RxJS operators
const userDetails$ = toObservable(userId).pipe(
switchMap(id => this.api.getUser(id))
);
When to Use Each
| Scenario | Recommendation |
|---|---|
| Simple component state | Signal |
| HTTP requests, WebSockets | Observable |
Existing RxJS APIs (e.g. valueChanges) | Observable |
| Template bindings (new code) | Signal or toSignal() |
| Complex transformations (debounce, switchMap, etc.) | Observable |
7. Practical Patterns
Handling HTTP Requests
import { HttpClient } from '@angular/common/http';
import { inject, Component } from '@angular/core';
import { catchError, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';
@Component({ /* ... */ })
export class ProductListComponent {
private http = inject(HttpClient);
loading = false;
error = '';
products$ = this.http.get<Product[]>('/api/products').pipe(
catchError(err => {
this.error = err.message;
return EMPTY;
}),
finalize(() => this.loading = false)
);
}
Debounced Search Input
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { of } from 'rxjs';
export class SearchComponent {
searchControl = new FormControl('');
results$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => query ? this.api.search(query) : of([]))
);
}
Unsubscription Patterns
Angular 16+ makes takeUntilDestroyed() the cleanest option.
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject, Component, OnInit } from '@angular/core';
@Component({ /* ... */ })
export class DashboardComponent implements OnInit {
private destroyRef = inject(DestroyRef);
ngOnInit() {
interval(5000).pipe(
takeUntilDestroyed(this.destroyRef) // Auto-unsubscribes on component destroy
).subscribe(() => this.refresh());
}
}
| Pattern | Angular Version | Notes |
|---|---|---|
takeUntilDestroyed() | 16+ | Recommended. Works in inject() context |
AsyncPipe | All | Best for template bindings |
toSignal() | 16+ | When Signals integration is needed |
Manual unsubscribe() | All | Simple, but requires ngOnDestroy |