Skip to main content

RxJS Basics

RxJS is the foundation of reactive programming in Angular and one of the most challenging concepts for React developers to learn.

1. What Are Observables?

An Observable represents a stream of data over time. It provides a unified API for handling anything that produces values asynchronously — mouse clicks, HTTP responses, WebSocket messages, and more.

Promise vs Observable

PromiseObservable
Number of valuesOne (resolve/reject)Zero to infinite
ExecutionEager — runs on new Promise()Lazy — runs only when subscribe() is called
CancellationNot possibleCancellable via unsubscribe()
Operators.then() / .catch() only100+ operators for transformation and combination
Angular examplesHttpClient, Router.events, FormControl.valueChanges
import { Observable, of } from 'rxjs';

// Creating a simple Observable
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

// Using of() for convenience
const greet$ = of('Hello', 'World');
Naming Convention

By convention, Observable variables are suffixed with $ (e.g. users$, loading$).

2. Basic Subscription Patterns

An Observable does nothing on its own. Calling subscribe() starts the stream.

Using subscribe()

import { of } from 'rxjs';

const source$ = of(1, 2, 3);

// Short form: next callback only
source$.subscribe(value => console.log(value));
// => 1, 2, 3

// Full observer object
source$.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Done')
});

unsubscribe() and Preventing Memory Leaks

If a subscription outlives the component, it causes memory leaks and stale event handlers.

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription, interval } from 'rxjs';

@Component({ selector: 'app-example', template: '' })
export class ExampleComponent implements OnInit, OnDestroy {
private subscription!: Subscription;

ngOnInit() {
this.subscription = interval(1000).subscribe(n => console.log(n));
}

ngOnDestroy() {
this.subscription.unsubscribe(); // Always clean up
}
}
caution

Observables returned by HttpClient complete automatically after the response, so manual unsubscribe() is not needed. However, interval, Subject, and similar sources require explicit unsubscription.

3. Common Operators

Operators are pure functions composed inside pipe() to transform, filter, or combine Observables.

Transformation Operators

import { of } from 'rxjs';
import { map, switchMap, mergeMap, concatMap } from 'rxjs/operators';

// map: transforms each value (like Array.map)
of(1, 2, 3).pipe(
map(n => n * 2)
).subscribe(console.log); // 2, 4, 6

// switchMap: cancels the previous inner Observable when a new one arrives
// → ideal for search-as-you-type where only the latest result matters
this.searchControl.valueChanges.pipe(
switchMap(query => this.api.search(query))
).subscribe(results => this.results = results);
OperatorParallel?Previous streamPrimary use
switchMapNoCancelledSearch, navigation
mergeMapYesKept aliveParallel HTTP (e.g. likes)
concatMapNoAwaitedOrder-sensitive operations
exhaustMapNoIgnoredPrevent button double-submit

Filtering Operators

import { filter, distinctUntilChanged, debounceTime } from 'rxjs/operators';

this.searchControl.valueChanges.pipe(
debounceTime(300), // Wait until typing pauses for 300ms
distinctUntilChanged(), // Ignore unchanged values
filter(query => query.length >= 2) // At least 2 characters
).subscribe(query => this.search(query));

Combination Operators

import { combineLatest, merge, forkJoin } from 'rxjs';

// combineLatest: emits the latest combination whenever any source emits
combineLatest([user$, permissions$]).subscribe(([user, perms]) => { /* ... */ });

// forkJoin: waits for all to complete, then emits once (like Promise.all)
forkJoin([this.api.getUser(), this.api.getPosts()]).subscribe(([user, posts]) => { /* ... */ });

// merge: interleaves multiple streams into one
merge(click$, keydown$).subscribe(event => this.handleEvent(event));

Utility Operators

import { tap, catchError, retry, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';

this.api.getData().pipe(
tap(data => console.log('Received:', data)), // Side effect (great for debugging)
retry(3), // Retry up to 3 times on error
catchError(err => {
this.error = err.message;
return EMPTY; // Swallow error and complete
}),
finalize(() => this.loading = false) // Runs on both success and error
).subscribe(data => this.data = data);

4. Subject and BehaviorSubject

A Subject is both an Observable and an Observer. You can call next() on it externally and it multicasts values to all subscribers simultaneously.

Subject

import { Subject } from 'rxjs';

const subject = new Subject<string>();

subject.subscribe(v => console.log('A:', v));
subject.subscribe(v => console.log('B:', v));

subject.next('Hello'); // A: Hello B: Hello
subject.next('World'); // A: World B: World

// Values emitted before subscribe() are lost

BehaviorSubject

Holds the current value and delivers it immediately to any new subscriber.

import { BehaviorSubject } from 'rxjs';

const count$ = new BehaviorSubject<number>(0); // initial value: 0

count$.subscribe(v => console.log('Component A:', v)); // immediately receives 0

count$.next(1);
count$.next(2);

// Late subscriber immediately receives the latest value (2)
count$.subscribe(v => console.log('Component B:', v)); // immediately receives 2

// Synchronous access to the current value
console.log(count$.getValue()); // 2

Conceptual Comparison with React

Angular (RxJS)React Equivalent
BehaviorSubjectuseState
BehaviorSubject + asObservable()Context (read-only exposure)
Root-scoped service with BehaviorSubjectZustand / Jotai

5. AsyncPipe

The async pipe is the official way to consume Observables in templates. It handles subscribe() and unsubscribe() automatically.

@Component({
template: `
@if (users$ | async; as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
} @else {
<p>Loading...</p>
}
`
})
export class UsersComponent {
users$ = this.userService.getUsers(); // No subscribe() needed

constructor(private userService: UserService) {}
}

Benefits of AsyncPipe

  • Automatically unsubscribes when the component is destroyed
  • Works seamlessly with OnPush change detection for better performance
  • Keeps component code declarative — no ngOnDestroy cleanup needed
Best Practice

Avoid calling subscribe() inside components whenever possible. Prefer AsyncPipe or the toSignal() interop described below.

6. RxJS and Signals

Angular 16 introduced Signals as a simpler reactive primitive for synchronous state. RxJS Observables and Signals each excel in different scenarios, and Angular provides first-class utilities to convert between them.

toSignal() — Observable → Signal

import { toSignal } from '@angular/core/rxjs-interop';
import { Component, inject } from '@angular/core';

@Component({
template: `
<!-- Signal syntax: call as a function, no async pipe needed -->
<p>{{ users() }}</p>
`
})
export class UsersComponent {
private userService = inject(UserService);

// Converts Observable to Signal (auto subscribe/unsubscribe)
users = toSignal(this.userService.getUsers(), { initialValue: [] });
}

toObservable() — Signal → Observable

import { toObservable } from '@angular/core/rxjs-interop';
import { signal } from '@angular/core';
import { switchMap } from 'rxjs/operators';

const userId = signal(1);

// Convert a Signal to an Observable to use RxJS operators
const userDetails$ = toObservable(userId).pipe(
switchMap(id => this.api.getUser(id))
);

When to Use Each

ScenarioRecommendation
Simple component stateSignal
HTTP requests, WebSocketsObservable
Existing RxJS APIs (e.g. valueChanges)Observable
Template bindings (new code)Signal or toSignal()
Complex transformations (debounce, switchMap, etc.)Observable

7. Practical Patterns

Handling HTTP Requests

import { HttpClient } from '@angular/common/http';
import { inject, Component } from '@angular/core';
import { catchError, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';

@Component({ /* ... */ })
export class ProductListComponent {
private http = inject(HttpClient);

loading = false;
error = '';
products$ = this.http.get<Product[]>('/api/products').pipe(
catchError(err => {
this.error = err.message;
return EMPTY;
}),
finalize(() => this.loading = false)
);
}

Debounced Search Input

import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { of } from 'rxjs';

export class SearchComponent {
searchControl = new FormControl('');

results$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => query ? this.api.search(query) : of([]))
);
}

Unsubscription Patterns

Angular 16+ makes takeUntilDestroyed() the cleanest option.

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject, Component, OnInit } from '@angular/core';

@Component({ /* ... */ })
export class DashboardComponent implements OnInit {
private destroyRef = inject(DestroyRef);

ngOnInit() {
interval(5000).pipe(
takeUntilDestroyed(this.destroyRef) // Auto-unsubscribes on component destroy
).subscribe(() => this.refresh());
}
}
PatternAngular VersionNotes
takeUntilDestroyed()16+Recommended. Works in inject() context
AsyncPipeAllBest for template bindings
toSignal()16+When Signals integration is needed
Manual unsubscribe()AllSimple, but requires ngOnDestroy