RxJSの基礎
RxJSはAngularのリアクティブプログラミングの基盤であり、React開発者にとって最も学習コストの高い概念のひとつです。
1. Observableとは何か
Observable はデータの「ストリーム(流れ)」を表すオブジェクトです。マウスクリック・HTTPレスポンス・WebSocketメッセージなど、時間の経過とともに発生するあらゆる値を一貫したAPIで扱えます。
PromiseとObservableの比較
| 比較軸 | Promise | Observable |
|---|---|---|
| 値の数 | 1つだけ(resolve/reject) | 0個〜無限に複数の値 |
| 実行タイミング | new Promise() 時点で即座に実行 | subscribe() されて初めて実行(遅延実行) |
| キャンセル | 不可 | unsubscribe() でキャンセル可 |
| 演算子 | .then() .catch() のみ | 100以上の演算子で強力に変換・結合 |
| Angularでの代表例 | — | HttpClient、Router.events、FormControl.valueChanges |
import { Observable, of } from 'rxjs';
// シンプルなObservableの作成
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// of() でも簡単に作れる
const greet$ = of('Hello', 'World');
RxJSのObservableは変数名の末尾に $ を付ける慣習があります(例: users$、loading$)。
2. 基本的な購読パターン
Observableはそのままでは何も起きません。subscribe() を呼ぶことで初めてストリームが開始されます。
subscribe() の基本
import { of } from 'rxjs';
const source$ = of(1, 2, 3);
// 簡略形: next コールバックのみ
source$.subscribe(value => console.log(value));
// => 1, 2, 3
// 完全なオブザーバーオブジェクト
source$.subscribe({
next: value => console.log('値:', value),
error: err => console.error('エラー:', err),
complete: () => console.log('完了')
});
unsubscribe() とメモリリークの防止
コンポーネントが破棄されても購読が残り続けると、メモリリークやゾンビイベントの原因になります。
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription, interval } from 'rxjs';
@Component({ selector: 'app-example', template: '' })
export class ExampleComponent implements OnInit, OnDestroy {
private subscription!: Subscription;
ngOnInit() {
this.subscription = interval(1000).subscribe(n => console.log(n));
}
ngOnDestroy() {
this.subscription.unsubscribe(); // 必ず解除
}
}
HttpClient が返す Observable はリクエスト完了後に自動的に complete されるため、手動での unsubscribe は不要です。一方、interval や Subject などは明示的に解除が必要です。
3. よく使うオペレータ
オペレータはパイプライン(pipe())の中でObservableを変換・結合・フィルタリングする関数です。
変換系
import { of, from } from 'rxjs';
import { map, switchMap, mergeMap, concatMap } from 'rxjs/operators';
// map: 各値を変換(Array.map と同じ感覚)
of(1, 2, 3).pipe(
map(n => n * 2)
).subscribe(console.log); // 2, 4, 6
// switchMap: 新しい内部Observableが来たら前のをキャンセル
// → 検索候補のような「最新だけ必要」なケースに最適
this.searchControl.valueChanges.pipe(
switchMap(query => this.api.search(query))
).subscribe(results => this.results = results);
// concatMap: 順番を保って直列実行
// mergeMap: 並列実行(順不同)
| オペレータ | 並列性 | 前の処理 | 主なユース |
|---|---|---|---|
switchMap | × | キャンセル | 検索、ナビゲーション |
mergeMap | ○ | 継続 | 並列HTTP like いいね |
concatMap | × | 完了待ち | 順序が重要な操作 |
exhaustMap | × | 無視 | ボタン連打防止 |
フィルタリング系
import { filter, distinctUntilChanged, debounceTime } from 'rxjs/operators';
this.searchControl.valueChanges.pipe(
debounceTime(300), // 300ms タイピングが止まるまで待機
distinctUntilChanged(), // 前と同じ値は流さない
filter(query => query.length >= 2) // 2文字以上のみ
).subscribe(query => this.search(query));
結合系
import { combineLatest, merge, forkJoin } from 'rxjs';
// combineLatest: いずれかが更新されるたびに最新の組合せを発行
combineLatest([user$, permissions$]).subscribe(([user, perms]) => { ... });
// forkJoin: 全て完了してからまとめて結果を受け取る(Promise.all 相当)
forkJoin([this.api.getUser(), this.api.getPosts()]).subscribe(([user, posts]) => { ... });
// merge: 複数ストリームをひとつにまとめる(発行順)
merge(click$, keydown$).subscribe(event => this.handleEvent(event));
ユーティリティ系
import { tap, catchError, retry, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';
this.api.getData().pipe(
tap(data => console.log('受信:', data)), // 副作用(デバッグなどに)
retry(3), // エラー時に最大3回リトライ
catchError(err => {
this.error = err.message;
return EMPTY; // エラーを吸収してストリーム終了
}),
finalize(() => this.loading = false) // 成功・失敗どちらでも実行
).subscribe(data => this.data = data);
4. SubjectとBehaviorSubject
Subject はObservableとObserverを兼ねる特殊なクラスです。外部から next() を呼んで値を発行でき、複数の購読者に同時に届けるマルチキャストが可能です。
Subject
import { Subject } from 'rxjs';
const subject = new Subject<string>();
subject.subscribe(v => console.log('A:', v));
subject.subscribe(v => console.log('B:', v));
subject.next('Hello'); // A: Hello, B: Hello
subject.next('World'); // A: World, B: World
// subscribe() より前に next() した値は受け取れない
BehaviorSubject
現在の値(初期値)を持ち、新しい購読者は即座に最新値を受け取ります。
import { BehaviorSubject } from 'rxjs';
const count$ = new BehaviorSubject<number>(0); // 初期値 0
count$.subscribe(v => console.log('Component A:', v)); // 即 0 が届く
count$.next(1);
count$.next(2);
// 後から購読しても最新値(2)を即受け取れる
count$.subscribe(v => console.log('Component B:', v)); // 即 2 が届く
// 現在値の同期取得
console.log(count$.getValue()); // 2
Reactとの概念対比
| Angular (RxJS) | React |
|---|---|
BehaviorSubject | useState |
BehaviorSubject + asObservable() | Context (読み取り専用公開) |
Service in providedIn: 'root' に BehaviorSubject | Zustand / Jotai |
5. AsyncPipe
テンプレートで Observable を直接扱う公式の方法です。subscribe() と unsubscribe() を自動的に処理してくれます。
// component.ts
@Component({
template: `
@if (users$ | async; as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
} @else {
<p>読み込み中...</p>
}
`
})
export class UsersComponent {
users$ = this.userService.getUsers(); // subscribe しない!
constructor(private userService: UserService) {}
}
AsyncPipe を使うメリット
- コンポーネントが破棄されると自動的に
unsubscribe() OnPush変更検知と相性が良く、パフォーマンス最適化に効果的- コードが宣言的になり、
ngOnDestroyでの手動クリーンアップが不要
subscribe() はできる限りコンポーネントのテンプレート外で使わず、AsyncPipe か後述の toSignal() を優先しましょう。
6. RxJSとSignalsの関係
Angular 16以降、Signals が导入されました。RxJSのObservableとSignalsはそれぞれ得意な場面が異なり、相互変換することで両者の強みを活かせます。
toSignal() — Observable → Signal
import { toSignal } from '@angular/core/rxjs-interop';
import { Component, inject } from '@angular/core';
@Component({
template: `
<!-- signal なので () が必要、AsyncPipe は不要 -->
<p>{{ users() }}</p>
`
})
export class UsersComponent {
private userService = inject(UserService);
// Observable を Signal に変換(自動購読・解除)
users = toSignal(this.userService.getUsers(), { initialValue: [] });
}
toObservable() — Signal → Observable
import { toObservable } from '@angular/core/rxjs-interop';
import { signal } from '@angular/core';
import { switchMap } from 'rxjs/operators';
const userId = signal(1);
// Signal を Observable に変換して RxJS オペレータを適用
const userDetails$ = toObservable(userId).pipe(
switchMap(id => this.api.getUser(id))
);
どちらを使うべきか
| ケース | 推奨 |
|---|---|
| コンポーネントの状態(単純な値) | Signal |
| HTTPリクエスト・WebSocket など非同期ストリーム | Observable |
フォームの valueChanges など既存RxJS API | Observable |
| テンプレートでのバインディング(新規コード) | Signal または toSignal() |
| 複雑な変換・結合(debounce, switchMap など) | Observable(RxJSオペレータを活用) |
7. 実践パターン
HTTPリクエストの処理
import { HttpClient } from '@angular/common/http';
import { inject, Component } from '@angular/core';
import { catchError, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';
@Component({ /* ... */ })
export class ProductListComponent {
private http = inject(HttpClient);
loading = false;
error = '';
products$ = this.http.get<Product[]>('/api/products').pipe(
catchError(err => {
this.error = err.message;
return EMPTY;
}),
finalize(() => this.loading = false)
);
}
フォーム入力のデバウンス処理
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
export class SearchComponent {
searchControl = new FormControl('');
results$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => query ? this.api.search(query) : of([]))
);
}
コンポーネント破棄時の購読解除パターン
Angular 16以降は takeUntilDestroyed() が最も簡潔です。
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject, Component, OnInit } from '@angular/core';
@Component({ /* ... */ })
export class DashboardComponent implements OnInit {
private destroyRef = inject(DestroyRef);
ngOnInit() {
interval(5000).pipe(
takeUntilDestroyed(this.destroyRef) // コンポーネント破棄で自動解除
).subscribe(() => this.refresh());
}
}
| 解除パターン | Angular バージョン | 説明 |
|---|---|---|
takeUntilDestroyed() | 16+ | 最も推奨。inject() コンテキストで使用 |
AsyncPipe | 全バージョン | テンプレートで使うなら最善 |
toSignal() | 16+ | Signals との連携が必要な場合 |
手動 unsubscribe() | 全バージョン | シンプルだが ngOnDestroy が必要 |