メインコンテンツまでスキップ

RxJSの基礎

RxJSはAngularのリアクティブプログラミングの基盤であり、React開発者にとって最も学習コストの高い概念のひとつです。

1. Observableとは何か

Observable はデータの「ストリーム(流れ)」を表すオブジェクトです。マウスクリック・HTTPレスポンス・WebSocketメッセージなど、時間の経過とともに発生するあらゆる値を一貫したAPIで扱えます。

PromiseとObservableの比較

比較軸PromiseObservable
値の数1つだけ(resolve/reject)0個〜無限に複数の値
実行タイミングnew Promise() 時点で即座に実行subscribe() されて初めて実行(遅延実行)
キャンセル不可unsubscribe() でキャンセル可
演算子.then() .catch() のみ100以上の演算子で強力に変換・結合
Angularでの代表例HttpClientRouter.eventsFormControl.valueChanges
import { Observable, of } from 'rxjs';

// シンプルなObservableの作成
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

// of() でも簡単に作れる
const greet$ = of('Hello', 'World');
命名規則

RxJSのObservableは変数名の末尾に $ を付ける慣習があります(例: users$loading$)。

2. 基本的な購読パターン

Observableはそのままでは何も起きません。subscribe() を呼ぶことで初めてストリームが開始されます。

subscribe() の基本

import { of } from 'rxjs';

const source$ = of(1, 2, 3);

// 簡略形: next コールバックのみ
source$.subscribe(value => console.log(value));
// => 1, 2, 3

// 完全なオブザーバーオブジェクト
source$.subscribe({
next: value => console.log('値:', value),
error: err => console.error('エラー:', err),
complete: () => console.log('完了')
});

unsubscribe() とメモリリークの防止

コンポーネントが破棄されても購読が残り続けると、メモリリークやゾンビイベントの原因になります。

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription, interval } from 'rxjs';

@Component({ selector: 'app-example', template: '' })
export class ExampleComponent implements OnInit, OnDestroy {
private subscription!: Subscription;

ngOnInit() {
this.subscription = interval(1000).subscribe(n => console.log(n));
}

ngOnDestroy() {
this.subscription.unsubscribe(); // 必ず解除
}
}
注意

HttpClient が返す Observable はリクエスト完了後に自動的に complete されるため、手動での unsubscribe は不要です。一方、intervalSubject などは明示的に解除が必要です。

3. よく使うオペレータ

オペレータはパイプライン(pipe())の中でObservableを変換・結合・フィルタリングする関数です。

変換系

import { of, from } from 'rxjs';
import { map, switchMap, mergeMap, concatMap } from 'rxjs/operators';

// map: 各値を変換(Array.map と同じ感覚)
of(1, 2, 3).pipe(
map(n => n * 2)
).subscribe(console.log); // 2, 4, 6

// switchMap: 新しい内部Observableが来たら前のをキャンセル
// → 検索候補のような「最新だけ必要」なケースに最適
this.searchControl.valueChanges.pipe(
switchMap(query => this.api.search(query))
).subscribe(results => this.results = results);

// concatMap: 順番を保って直列実行
// mergeMap: 並列実行(順不同)
オペレータ並列性前の処理主なユース
switchMap×キャンセル検索、ナビゲーション
mergeMap継続並列HTTP like いいね
concatMap×完了待ち順序が重要な操作
exhaustMap×無視ボタン連打防止

フィルタリング系

import { filter, distinctUntilChanged, debounceTime } from 'rxjs/operators';

this.searchControl.valueChanges.pipe(
debounceTime(300), // 300ms タイピングが止まるまで待機
distinctUntilChanged(), // 前と同じ値は流さない
filter(query => query.length >= 2) // 2文字以上のみ
).subscribe(query => this.search(query));

結合系

import { combineLatest, merge, forkJoin } from 'rxjs';

// combineLatest: いずれかが更新されるたびに最新の組合せを発行
combineLatest([user$, permissions$]).subscribe(([user, perms]) => { ... });

// forkJoin: 全て完了してからまとめて結果を受け取る(Promise.all 相当)
forkJoin([this.api.getUser(), this.api.getPosts()]).subscribe(([user, posts]) => { ... });

// merge: 複数ストリームをひとつにまとめる(発行順)
merge(click$, keydown$).subscribe(event => this.handleEvent(event));

ユーティリティ系

import { tap, catchError, retry, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';

this.api.getData().pipe(
tap(data => console.log('受信:', data)), // 副作用(デバッグなどに)
retry(3), // エラー時に最大3回リトライ
catchError(err => {
this.error = err.message;
return EMPTY; // エラーを吸収してストリーム終了
}),
finalize(() => this.loading = false) // 成功・失敗どちらでも実行
).subscribe(data => this.data = data);

4. SubjectとBehaviorSubject

Subject はObservableとObserverを兼ねる特殊なクラスです。外部から next() を呼んで値を発行でき、複数の購読者に同時に届けるマルチキャストが可能です。

Subject

import { Subject } from 'rxjs';

const subject = new Subject<string>();

subject.subscribe(v => console.log('A:', v));
subject.subscribe(v => console.log('B:', v));

subject.next('Hello'); // A: Hello, B: Hello
subject.next('World'); // A: World, B: World

// subscribe() より前に next() した値は受け取れない

BehaviorSubject

現在の値(初期値)を持ち、新しい購読者は即座に最新値を受け取ります。

import { BehaviorSubject } from 'rxjs';

const count$ = new BehaviorSubject<number>(0); // 初期値 0

count$.subscribe(v => console.log('Component A:', v)); // 即 0 が届く

count$.next(1);
count$.next(2);

// 後から購読しても最新値(2)を即受け取れる
count$.subscribe(v => console.log('Component B:', v)); // 即 2 が届く

// 現在値の同期取得
console.log(count$.getValue()); // 2

Reactとの概念対比

Angular (RxJS)React
BehaviorSubjectuseState
BehaviorSubject + asObservable()Context (読み取り専用公開)
Service in providedIn: 'root'BehaviorSubjectZustand / Jotai

5. AsyncPipe

テンプレートで Observable を直接扱う公式の方法です。subscribe()unsubscribe() を自動的に処理してくれます。

// component.ts
@Component({
template: `
@if (users$ | async; as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
} @else {
<p>読み込み中...</p>
}
`
})
export class UsersComponent {
users$ = this.userService.getUsers(); // subscribe しない!

constructor(private userService: UserService) {}
}

AsyncPipe を使うメリット

  • コンポーネントが破棄されると自動的に unsubscribe()
  • OnPush 変更検知と相性が良く、パフォーマンス最適化に効果的
  • コードが宣言的になり、ngOnDestroy での手動クリーンアップが不要
ベストプラクティス

subscribe() はできる限りコンポーネントのテンプレート外で使わず、AsyncPipe か後述の toSignal() を優先しましょう。

6. RxJSとSignalsの関係

Angular 16以降、Signals が导入されました。RxJSのObservableとSignalsはそれぞれ得意な場面が異なり、相互変換することで両者の強みを活かせます。

toSignal() — Observable → Signal

import { toSignal } from '@angular/core/rxjs-interop';
import { Component, inject } from '@angular/core';

@Component({
template: `
<!-- signal なので () が必要、AsyncPipe は不要 -->
<p>{{ users() }}</p>
`
})
export class UsersComponent {
private userService = inject(UserService);

// Observable を Signal に変換(自動購読・解除)
users = toSignal(this.userService.getUsers(), { initialValue: [] });
}

toObservable() — Signal → Observable

import { toObservable } from '@angular/core/rxjs-interop';
import { signal } from '@angular/core';
import { switchMap } from 'rxjs/operators';

const userId = signal(1);

// Signal を Observable に変換して RxJS オペレータを適用
const userDetails$ = toObservable(userId).pipe(
switchMap(id => this.api.getUser(id))
);

どちらを使うべきか

ケース推奨
コンポーネントの状態(単純な値)Signal
HTTPリクエスト・WebSocket など非同期ストリームObservable
フォームの valueChanges など既存RxJS APIObservable
テンプレートでのバインディング(新規コード)Signal または toSignal()
複雑な変換・結合(debounce, switchMap など)Observable(RxJSオペレータを活用)

7. 実践パターン

HTTPリクエストの処理

import { HttpClient } from '@angular/common/http';
import { inject, Component } from '@angular/core';
import { catchError, finalize } from 'rxjs/operators';
import { EMPTY } from 'rxjs';

@Component({ /* ... */ })
export class ProductListComponent {
private http = inject(HttpClient);

loading = false;
error = '';
products$ = this.http.get<Product[]>('/api/products').pipe(
catchError(err => {
this.error = err.message;
return EMPTY;
}),
finalize(() => this.loading = false)
);
}

フォーム入力のデバウンス処理

import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

export class SearchComponent {
searchControl = new FormControl('');

results$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => query ? this.api.search(query) : of([]))
);
}

コンポーネント破棄時の購読解除パターン

Angular 16以降は takeUntilDestroyed() が最も簡潔です。

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject, Component, OnInit } from '@angular/core';

@Component({ /* ... */ })
export class DashboardComponent implements OnInit {
private destroyRef = inject(DestroyRef);

ngOnInit() {
interval(5000).pipe(
takeUntilDestroyed(this.destroyRef) // コンポーネント破棄で自動解除
).subscribe(() => this.refresh());
}
}
解除パターンAngular バージョン説明
takeUntilDestroyed()16+最も推奨。inject() コンテキストで使用
AsyncPipe全バージョンテンプレートで使うなら最善
toSignal()16+Signals との連携が必要な場合
手動 unsubscribe()全バージョンシンプルだが ngOnDestroy が必要