Message Queues (メッセージキュー)
アプリケーション間の通信を非同期化し、システム全体の疎結合性を高めるための重要なパターンであるメッセージキューについて解説します。
メッセージキューとは
メッセージキューは、システム間やプロセス間でデータを非同期に送受信するための仕組みです。送信側(Producer)はメッセージをキューに送り、受信側(Consumer)はキューからメッセージを取り出して処理します。
主なメリット
- 疎結合(Decoupling): 送信側と受信側が互いの存在や状態(稼働中かどうか)を詳しく知る必要がありません。
- 負荷分散(Load Leveling): 受信側の処理能力を超えたリクエストが来ても、キューにメッセージを蓄積することで急激なスパイクを吸収できます。
- 信頼性(Reliability): 受信側がダウンしていてもメッセージはキューに保持され、復旧後に処理できます。
主要なメッセージブローカー
.NET開発においてよく利用されるメッセージブローカーを紹介します。
RabbitMQ
オープンソースのメッセージブローカーで、広く普及している標準的な選択肢です。
- プロトコル: AMQP (Advanced Message Queuing Protocol) を主に使用。
- 特徴: 軽量でデプロイが容易(Dockerコンテナ等)。柔軟なルーティング機能(Exchange/Queue)。
- ユースケース: オンプレミス環境、またはクラウドベンダーに依存しない構成が必要な場合。
Azure Service Bus
Microsoft Azureが提供するフルマネージドなエンタープライズメッセージングサービスです。
- プロトコル: AMQP, HTTP/REST。
- 特徴: サーバーレス、高い信頼性、トランザクションサポート、重複排除、順序保証。
- ユースケース: Azure上のシステム連携、高い信頼性が求められる金融系システムなど。
MassTransit による抽象化
.NETエコシステムでは、生のクライアントライブラリ(RabbitMQ.Client や Azure.Messaging.ServiceBus)を直接使うよりも、MassTransit のような高レベルの抽象化ライブラリを使用することが推奨されます。
MassTransit のメリット
- ブローカーの抽象化: コードの大部分を変更することなく、RabbitMQ から Azure Service Bus へ(またはその逆へ)移行できます。
- 消費パターンの簡素化: コンシューマー(受信側)の実装がシンプルになります。
- 回復性の組み込み: リトライポリシー、サーキットブレーカー、Dead Letter Queue (エラー時の退避) の処理が容易に設定できます。
実装例
MassTransit を使用して、簡単な Producer/Consumer パターンを実装します。ここではバックエンドとして RabbitMQ を例にしますが、設定を少し変えるだけで Azure Service Bus にも対応できます。
前提条件
必要なNuGetパッケージをインストールします。
dotnet add package MassTransit.RabbitMQ
dotnet add package MassTransit.Extensions.DependencyInjection
1. メッセージ契約の定義
送信側と受信側で共有するメッセージの型(クラスまたはインターフェース)を定義します。
namespace Shared.Contracts
{
public interface OrderCreated
{
Guid OrderId { get; }
string CustomerName { get; }
DateTime CreatedAt { get; }
}
}
2. Consumer (受信側) の実装
メッセージを受け取って処理するクラスを作成します。IConsumer<T> インターフェースを実装します。
using MassTransit;
using Shared.Contracts;
using Microsoft.Extensions.Logging;
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
private readonly ILogger<OrderCreatedConsumer> _logger;
public OrderCreatedConsumer(ILogger<OrderCreatedConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<OrderCreated> context)
{
_logger.LogInformation("注文を受け付けました: OrderId={OrderId}, Customer={Customer}",
context.Message.OrderId, context.Message.CustomerName);
// ここに実際のビジネスロジック (DB保存、メール送信など) を記述
await Task.CompletedTask;
}
}
3. 設定 (Program.cs)
DIコンテナに MassTransit を登録します。
using MassTransit;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMassTransit(x =>
{
// Consumerの登録
x.AddConsumer<OrderCreatedConsumer>();
// RabbitMQの設定
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h => {
h.Username("guest");
h.Password("guest");
});
// 受信エンドポイントの設定
cfg.ReceiveEndpoint("order-created-queue", e =>
{
e.ConfigureConsumer<OrderCreatedConsumer>(context);
});
});
// Azure Service Busの場合 (コメントアウト解除して切り替え可能)
/*
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("Endpoint=sb://YOUR-NAMESPACE.servicebus.windows.net/;...");
cfg.ConfigureEndpoints(context);
});
*/
});
var app = builder.Build();
app.Run();
4. Producer (送信側) の実装
APIコントローラーなどでメッセージを送信(Publish)します。
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using Shared.Contracts;
[ApiController]
[Route("[controller]")]
public class OrdersController : ControllerBase
{
private readonly IPublishEndpoint _publishEndpoint;
public OrdersController(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
[HttpPost]
public async Task<IActionResult> CreateOrder(string customerName)
{
// イベントの発行 (Publish)
// Exchangeを通して、購読している全てのQueueに配信されます
await _publishEndpoint.Publish<OrderCreated>(new
{
OrderId = Guid.NewGuid(),
CustomerName = customerName,
CreatedAt = DateTime.UtcNow
});
return Accepted();
}
}
Transactional Outbox パターン
分散システムにおいて、データベースへの保存とメッセージの送信を原子的に(両方成功するか、両方失敗するか)行うことは困難です。メッセージブローカーへの送信が失敗した場合、データ不整合が発生する可能性があります。
Transactional Outbox パターンは、メッセージを一旦データベース(Outboxテーブル)に保存することでこの問題を解決します。MassTransitはこのパターンをネイティブにサポートしています。
MassTransitでの設定 (EF Core)
EntityFramework Coreを使用する場合の実装例です。
- DbContextの設定:
AddInboxStateEntity,AddOutboxMessageEntity,AddOutboxStateEntityをモデルに追加します。
public class OrderDbContext : DbContext
{
public OrderDbContext(DbContextOptions<OrderDbContext> options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
// Outbox/Inboxテーブルの定義を追加
modelBuilder.AddInboxStateEntity();
modelBuilder.AddOutboxMessageEntity();
modelBuilder.AddOutboxStateEntity();
}
}
- DIの設定:
AddEntityFrameworkOutboxを使用して設定します。
builder.Services.AddMassTransit(x =>
{
// ... Consumer設定 ...
x.AddEntityFrameworkOutbox<OrderDbContext>(o =>
{
// プロバイダーに応じたロック戦略を選択
o.UseSqlServer();
// メッセージをBusだけでなく、Outboxにも送るように設定
o.UseBusOutbox();
});
// ... RabbitMQ/Azure Service Bus設定 ...
});
これにより、IPublishEndpoint.Publish を呼び出しても即座にブローカーへ送信されず、現在のDBトランザクションの一部としてOutboxテーブルに保存されます。その後、MassTransitのバックグラウンドサービスが自動的にメッセージを取り出し、ブローカーへ送信します。
図解: MassTransitのトポロジー
まとめ
- 非同期メッセージングは、システムのスケーラビリティと耐障害性を向上させます。
- MassTransitを使用することで、特定のブローカー(RabbitMQ, Azure Service Bus)への依存度を下げ、クリーンなアーキテクチャを実現できます。
- 開発環境ではRabbitMQ、本番環境ではAzure Service Busといった使い分けも容易になります。