SSE接続とSession Affinity
Server-Sent Events (SSE) は、サーバーからクライアントへの一方向リアルタイム通信を実現する技術です。複数サーバー環境でSSEを運用する際、Session Affinity(セッションアフィニティ) や Sticky Session(スティッキーセッション) の理解が不可欠です。
SSE (Server-Sent Events) とは
SSEは、HTTP接続を維持しながらサーバーからクライアントへイベントをプッシュする標準化された技術です。
特徴
- 一方向通信: サーバー → クライアント
- HTTP/1.1ベース: 既存のHTTPインフラを活用
- 自動再接続: クライアント側で自動的に再接続
- シンプル: WebSocketより実装が容易
- テキストベース:
text/event-streamContent-Type
使用例
- リアルタイム通知
- ダッシュボードの更新
- ログストリーミング
- 株価や為替レートの更新
- プログレス表示
Session Affinity が必要な理由
問題: 長時間接続とロードバランサー
SSE接続は以下の特性を持ちます:
- 長時間の接続維持: 数分〜数時間にわたり接続を保持
- 状態を持つ: サーバー側でクライアントごとの接続を管理
- 特定のサーバーインスタンスに依存: 接続が確立されたサーバーでのみイベントを送信可能
Session Affinityなしの場合、以下の問題が発生します:
- 再接続時に異なるサーバーにルーティングされる
- 元のサーバーに残った接続情報が使用できない
- クライアントがイベントを受信できない
Session Affinity(Sticky Session)の仕組み
Session Affinityは、同一クライアントからのリクエストを常に同じサーバーインスタンスにルーティングする技術です。
実装方式
1. Cookie ベース
ロードバランサーがCookieを設定し、後続リクエストで同じサーバーを選択します。
2. IPアドレスベース
クライアントのIPアドレスをハッシュ化し、サーバーを決定します。
メリット:
- Cookie不要
- 透過的
デメリット:
- NAT環境で複数クライアントが同一IPに見える
- 負荷分散が不均等になる可能性
ASP.NET Core でのSSE実装
基本実装
[ApiController]
[Route("api/[controller]")]
public class EventsController : ControllerBase
{
private readonly ILogger<EventsController> _logger;
public EventsController(ILogger<EventsController> logger)
{
_logger = logger;
}
[HttpGet("stream")]
public async Task StreamEvents(CancellationToken cancellationToken)
{
// SSE用のContent-Typeを設定
Response.Headers.Append("Content-Type", "text/event-stream");
Response.Headers.Append("Cache-Control", "no-cache");
Response.Headers.Append("Connection", "keep-alive");
// レスポンスのバッファリングを無効化
Response.Headers.Append("X-Accel-Buffering", "no");
try
{
// イベント送信ループ
while (!cancellationToken.IsCancellationRequested)
{
var eventData = $"data: {DateTime.UtcNow:O}\n\n";
await Response.WriteAsync(eventData, cancellationToken);
await Response.Body.FlushAsync(cancellationToken);
_logger.LogInformation("Event sent to client");
await Task.Delay(1000, cancellationToken); // 1秒ごと
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Client disconnected");
}
}
}
イベントIDと再接続
[HttpGet("stream")]
public async Task StreamEventsWithId(
[FromHeader(Name = "Last-Event-ID")] string? lastEventId,
CancellationToken cancellationToken)
{
Response.Headers.Append("Content-Type", "text/event-stream");
Response.Headers.Append("Cache-Control", "no-cache");
Response.Headers.Append("Connection", "keep-alive");
var eventId = long.TryParse(lastEventId, out var id) ? id : 0;
try
{
while (!cancellationToken.IsCancellationRequested)
{
eventId++;
// イベントIDを含める
var eventData = $"id: {eventId}\n" +
$"data: Event at {DateTime.UtcNow:O}\n\n";
await Response.WriteAsync(eventData, cancellationToken);
await Response.Body.FlushAsync(cancellationToken);
await Task.Delay(1000, cancellationToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation($"Client disconnected at event ID: {eventId}");
}
}
クライアント側 (JavaScript)
const eventSource = new EventSource('/api/events/stream');
eventSource.onmessage = (event) => {
console.log('Received:', event.data);
// UI更新処理
};
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
// エラーハンドリング
};
// 接続を明示的に閉じる
// eventSource.close();
Azure環境でのSession Affinity設定
Azure Application Gateway
Application GatewayはCookie-Based Affinityをサポートしています。
Azure Portal での設定
- Application Gateway → バックエンド設定
- Cookie ベースのアフィニティ を「有効」に設定
- アフィニティ Cookie 名を設定(省略可)
ARM テンプレート
{
"type": "Microsoft.Network/applicationGateways",
"properties": {
"backendHttpSettingsCollection": [
{
"properties": {
"port": 80,
"protocol": "Http",
"cookieBasedAffinity": "Enabled",
"affinityCookieName": "ApplicationGatewayAffinity",
"requestTimeout": 300
}
}
]
}
}
Bicep
resource appGateway 'Microsoft.Network/applicationGateways@2023-05-01' = {
name: 'myAppGateway'
location: location
properties: {
backendHttpSettingsCollection: [
{
name: 'appGatewayBackendHttpSettings'
properties: {
port: 80
protocol: 'Http'
cookieBasedAffinity: 'Enabled'
affinityCookieName: 'ApplicationGatewayAffinity'
requestTimeout: 300
}
}
]
}
}
Azure Kubernetes Service (AKS)
NGINX Ingress Controller
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: sse-app-ingress
annotations:
nginx.ingress.kubernetes.io/affinity: "cookie"
nginx.ingress.kubernetes.io/affinity-mode: "persistent"
nginx.ingress.kubernetes.io/session-cookie-name: "sse-affinity"
nginx.ingress.kubernetes.io/session-cookie-expires: "172800"
nginx.ingress.kubernetes.io/session-cookie-max-age: "172800"
# SSE用のタイムアウト設定
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
spec:
ingressClassName: nginx
rules:
- host: sse-app.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: sse-app-service
port:
number: 80
重要なアノテーション:
affinity: "cookie": Cookie-based Session Affinityaffinity-mode: "persistent": Cookieの永続化session-cookie-name: Cookie名を指定session-cookie-expires: Cookie有効期限(秒)proxy-read-timeout: 読み取りタイムアウト(SSEで重要)
Azure Load Balancer (Service)
apiVersion: v1
kind: Service
metadata:
name: sse-app-service
annotations:
service.beta.kubernetes.io/azure-load-balancer-internal: "true"
spec:
type: LoadBalancer
sessionAffinity: ClientIP
sessionAffinityConfig:
clientIP:
timeoutSeconds: 10800 # 3時間
ports:
- port: 80
targetPort: 8080
protocol: TCP
selector:
app: sse-app
sessionAffinityオプション:
None: アフィニティなし(デフォルト)ClientIP: クライアントIPベース
Gateway API と HTTPRoute
Kubernetes Gateway API を使用する場合、HTTPRoute で Session Affinity を設定できます。
Application Gateway for Containers (ALB) の場合:
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: sse-app-route
namespace: default
spec:
parentRefs:
- name: gateway-01
namespace: default
hostnames:
- "sse-app.example.com"
rules:
- backendRefs:
- name: sse-app-service
port: 80
sessionAffinity:
type: Cookie
cookie:
name: affinity
maxAge: 3600 # 1時間(秒単位)
Gateway API での Session Affinity 設定のポイント:
sessionAffinity.type: Cookie: Cookie-based Session Affinity を有効化cookie.name: セッションCookie名を指定cookie.maxAge: Cookie の有効期限(秒単位)
NGINX Gateway Fabric の場合:
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: sse-app-route
namespace: default
annotations:
# NGINX固有のアノテーション
nginx.org/session-cookie-name: "sse-affinity"
nginx.org/session-cookie-expires: "3600"
nginx.org/session-cookie-max-age: "3600"
spec:
parentRefs:
- name: nginx-gateway
namespace: nginx-gateway
hostnames:
- "sse-app.example.com"
rules:
- matches:
- path:
type: PathPrefix
value: /
backendRefs:
- name: sse-app-service
port: 80
Gateway と IngressClass の比較:
| 機能 | NGINX Ingress | Gateway API (ALB) | Gateway API (NGINX) |
|---|---|---|---|
| Session Affinity | Annotation | HTTPRoute Spec | Annotation |
| 標準化 | × | ○ | △ |
| Cookie制御 | 詳細 | 基本 | 詳細 |
| タイムアウト設定 | Annotation | 制限あり | Annotation |
Gateway API を使用する利点:
- 標準化: Kubernetes 標準の Gateway API を使用
- ロールベースルーティング: 複数の Gateway を管理しやすい
- 将来性: Ingress よりも拡張性が高い
- Azure ネイティブ: Application Gateway for Containers と統合
SSE 用の追加設定(ALB):
Application Gateway for Containers では、BackendTLSPolicy でタイムアウトを設定できます。
apiVersion: alb.networking.azure.io/v1
kind: BackendTLSPolicy
metadata:
name: sse-backend-policy
namespace: default
spec:
targetRef:
group: ""
kind: Service
name: sse-app-service
namespace: default
override:
timeout:
request: 3600s # SSE用に長めに設定
Azure Front Door
Front DoorはSession Affinityをサポートしていますが、SSEの長時間接続には注意が必要です。
resource frontDoor 'Microsoft.Cdn/profiles@2023-05-01' = {
name: 'myFrontDoor'
location: 'Global'
sku: {
name: 'Premium_AzureFrontDoor'
}
properties: {
originResponseTimeoutSeconds: 60 // 最大60秒
}
}
注意点:
- Front Doorのタイムアウトは最大60秒
- SSEの長時間接続にはApplication Gateway + AKSの組み合わせを推奨
Session Affinity の制限と代替手段
制限事項
-
スケールアウト時の影響
- 新しいインスタンスが追加されても、既存接続は移行しない
- 接続が特定サーバーに集中する可能性
-
サーバー障害時の影響
- サーバーダウン時、すべての接続が切断
- 再接続時に別サーバーへルーティング
-
水平スケーリングの制約
- 負荷分散が不均等になる可能性
- アイドル接続がリソースを占有
代替手段: SignalR
複数サーバー環境では、Azure SignalR Service の使用を検討してください。
SignalRの利点:
- スケールアウト対応: Azure SignalR Serviceが接続を管理
- 自動フェイルオーバー: サーバー障害時も接続維持
- 双方向通信: クライアント ⇔ サーバー
- 複数プロトコル: WebSocket、SSE、Long Pollingを自動選択
SignalR実装例
// Startup.cs / Program.cs
builder.Services.AddSignalR()
.AddAzureSignalR(options =>
{
options.ConnectionString = builder.Configuration["Azure:SignalR:ConnectionString"];
});
app.MapHub<EventHub>("/eventhub");
// EventHub.cs
public class EventHub : Hub
{
public async Task SendEvent(string message)
{
await Clients.All.SendAsync("ReceiveEvent", message);
}
}
// クライアント側
const connection = new signalR.HubConnectionBuilder()
.withUrl("/eventhub")
.withAutomaticReconnect()
.build();
connection.on("ReceiveEvent", (message) => {
console.log("Received:", message);
});
await connection.start();
代替手段: Redis Backplane
複数サーバー間でイベントを共有する方法として、Redisを使用できます。
// Program.cs
builder.Services.AddStackExchangeRedisCache(options =>
{
options.Configuration = builder.Configuration["Redis:ConnectionString"];
});
// イベントの発行
public class EventPublisher
{
private readonly IConnectionMultiplexer _redis;
public EventPublisher(IConnectionMultiplexer redis)
{
_redis = redis;
}
public async Task PublishEventAsync(string eventData)
{
var subscriber = _redis.GetSubscriber();
await subscriber.PublishAsync("sse-events", eventData);
}
}
// イベントの購読
public class EventSubscriber : BackgroundService
{
private readonly IConnectionMultiplexer _redis;
private readonly ILogger<EventSubscriber> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var subscriber = _redis.GetSubscriber();
await subscriber.SubscribeAsync("sse-events", (channel, message) =>
{
_logger.LogInformation($"Received: {message}");
// 接続中のクライアントへブロードキャスト
});
}
}
ベストプラクティス
1. タイムアウト設定を適切に
SSE接続は長時間維持されるため、各レイヤーでタイムアウトを調整します。
// ASP.NET Core
builder.WebHost.ConfigureKestrel(options =>
{
options.Limits.KeepAliveTimeout = TimeSpan.FromMinutes(30);
options.Limits.RequestHeadersTimeout = TimeSpan.FromMinutes(30);
});
2. ハートビートの実装
定期的にデータを送信し、接続の生存確認を行います。
while (!cancellationToken.IsCancellationRequested)
{
// 30秒ごとにコメント(ハートビート)を送信
await Response.WriteAsync(": heartbeat\n\n", cancellationToken);
await Response.Body.FlushAsync(cancellationToken);
await Task.Delay(30000, cancellationToken);
}
3. 再接続ロジックの実装
クライアント側で自動再接続を実装します。
let retryCount = 0;
const maxRetries = 5;
function connectSSE() {
const eventSource = new EventSource('/api/events/stream');
eventSource.onopen = () => {
console.log('SSE Connected');
retryCount = 0;
};
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
if (retryCount < maxRetries) {
retryCount++;
const delay = Math.min(1000 * Math.pow(2, retryCount), 30000);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(connectSSE, delay);
}
};
return eventSource;
}
const eventSource = connectSSE();
4. リソース管理
接続数を監視し、リソースリークを防ぎます。
public class SseConnectionManager
{
private readonly ConcurrentDictionary<string, HttpResponse> _connections = new();
private readonly ILogger<SseConnectionManager> _logger;
public void AddConnection(string connectionId, HttpResponse response)
{
_connections.TryAdd(connectionId, response);
_logger.LogInformation($"Active connections: {_connections.Count}");
}
public void RemoveConnection(string connectionId)
{
_connections.TryRemove(connectionId, out _);
_logger.LogInformation($"Active connections: {_connections.Count}");
}
public async Task BroadcastAsync(string message, CancellationToken cancellationToken)
{
var tasks = _connections.Values
.Select(async response =>
{
try
{
await response.WriteAsync($"data: {message}\n\n", cancellationToken);
await response.Body.FlushAsync(cancellationToken);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to send message to client");
}
});
await Task.WhenAll(tasks);
}
}
5. 認証・認可
SSE接続でも適切な認証を実装します。
[Authorize]
[HttpGet("stream")]
public async Task StreamEvents(CancellationToken cancellationToken)
{
var userId = User.FindFirstValue(ClaimTypes.NameIdentifier);
_logger.LogInformation($"SSE connection from user: {userId}");
// ユーザー固有のイベントのみを送信
// ...
}
// Bearer Token認証の例
const token = localStorage.getItem('access_token');
const eventSource = new EventSource('/api/events/stream', {
headers: {
'Authorization': `Bearer ${token}`
}
});
注意: EventSource APIは標準でカスタムヘッダーをサポートしていません。以下のいずれかの方法を使用します:
-
クエリパラメータでトークンを渡す
const eventSource = new EventSource(`/api/events/stream?token=${token}`); -
EventSource Polyfillを使用
import { EventSourcePolyfill } from 'event-source-polyfill';const eventSource = new EventSourcePolyfill('/api/events/stream', {headers: {'Authorization': `Bearer ${token}`}});
まとめ
- SSEは、サーバーからクライアントへのリアルタイムプッシュ通信に有効
- 複数サーバー環境ではSession Affinityが必須
- Azure環境ではApplication GatewayやNGINX Ingressで設定可能
- より高度なシナリオではAzure SignalR Serviceの使用を検討
- タイムアウト、ハートビート、再接続ロジックの実装が重要