本文档提供了在 .NET Core 8 环境中设置和使用 RabbitMQ 和 MassTransit 的综合指南。它涵盖了安装、配置和基本使用示例,以帮助开发人员将消息传递集成到他们的应用程序中。
从 Microsoft .NET 官方网站下载并安装 .NET Core 8 SDK。
按照 RabbitMQ 网站上的说明安装 RabbitMQ。
使用 NuGet 将 MassTransit 添加到 .NET Core 项目。
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ
应用程序由两个主要服务组成:a 和 a。这些服务使用 MassTransit 通过 RabbitMQ 进行通信。以下是该流程的详细说明:QueueSenderServiceQueueReceiverService
QueueSenderService:
QueueReceiverService:
该项目的结构如下:
/app
│
├── QueueSenderService
│ ├── Controllers
│ │ └── QueueSenderController.cs
│ ├── Program.cs
│ └── ... (other files)
│
├── QueueReceiverService
│ ├── QueueServices
│ │ ├── PublishConsumer
│ │ │ └── PublisherService.cs
│ │ ├── RequestResponseConsumer
│ │ │ └── RequestResponseService.cs
│ │ ├── SenderConsumer
│ │ │ └── SenderService.cs
│ ├── Program.cs
│ └── ... (other files)
│
├── CommonResources
│ ├── Models
│ │ └── (model files)
│ └── ... (other files)
│
在您的文件中,使用 RabbitMQ 配置 MassTransit。Program.cs
using CommonResources;
using MassTransit;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddMassTransit(x =>
{
x.AddRequestClient<TransferData>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
{
config.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
}));
});
builder.Services.AddMassTransitHostedService();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
在QueueSenderController.cs
using CommonResources;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Threading.Tasks;
namespace QueueSenderService.Controllers
{
[ApiController]
[Route("[controller]")]
public class QueueSenderController : ControllerBase
{
private readonly IBus _bus;
private readonly IRequestClient<TransferData> _client;
public QueueSenderController(IBus bus, IRequestClient<TransferData> client)
{
_bus = bus;
_client = client;
}
[HttpPost("send-command")]
public async Task<IActionResult> SendCommand()
{
var account = new Account()
{
Name = "David Bytyqi",
Deposit = 500
};
var url = new Uri("rabbitmq://localhost/send-command");
var endpoint = await _bus.GetSendEndpoint(url);
await endpoint.Send(account);
return Ok("Command sent successfully");
}
[HttpPost("publish-event")]
public async Task<IActionResult> PublishEvent()
{
await _bus.Publish(new Client()
{
Name = "David Bytyqi",
Pin = 123456
});
return Ok("Event published successfully");
}
[HttpPost("request-response")]
public async Task<IActionResult> RequestResponse()
{
var requestData = new TransferData()
{
Type = "Withdrawal",
Amount = 25
};
var request = _client.Create(requestData);
var response = await request.GetResponse<CurrentBalance>();
return Ok(response);
}
}
}
Program.cs针对 QueueReceiverService
using MassTransit;
using QueueReceiverService.QueueServices.PublishConsumer;
using QueueReceiverService.QueueServices.RequestResponseConsumer;
using QueueReceiverService.QueueServices.SenderConsumer;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<PublisherService>();
x.AddConsumer<SenderService>();
x.AddConsumer<RequestResponseService>();
x.UsingRabbitMq((context, config) =>
{
config.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
config.ReceiveEndpoint("send-command", e =>
{
e.Consumer<SenderService>(context);
});
config.ReceiveEndpoint("publish-event", e =>
{
e.Consumer<PublisherService>(context);
});
config.ReceiveEndpoint("request-response", e =>
{
e.Consumer<RequestResponseService>(context);
});
});
});
builder.Services.AddMassTransitHostedService();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
PublisherService.cs
using CommonResources;
using MassTransit;
namespace QueueReceiverService.QueueServices.PublishConsumer
{
public class PublisherService : IConsumer<Client>
{
public async Task Consume(ConsumeContext<Client> context)
{
var info = context.Message;
}
}
}
RequestResponseService.cs
using CommonResources;
using MassTransit;
namespace QueueReceiverService.QueueServices.RequestResponseConsumer
{
public class RequestResponseService : IConsumer<TransferData>
{
public async Task Consume(ConsumeContext<TransferData> context)
{
var data = context.Message;
var nowBalance = new CurrentBalance()
{
Balance = 1000 - data.Amount
};
await context.RespondAsync(nowBalance);
}
}
}
SenderService.cs
using CommonResources;
using MassTransit;
namespace QueueReceiverService.QueueServices.SenderConsumer
{
public class SenderService : IConsumer<Account>
{
public async Task Consume(ConsumeContext<Account> context)
{
var product = context.Message;
}
}
}
确保 RabbitMQ 正在运行且可访问。您应该看到消息正在按照配置发送和使用。