2025-12-29 11:31:11 +08:00
|
|
|
|
using Infrastructure.Helpers;
|
|
|
|
|
|
using RabbitMQ.Client;
|
|
|
|
|
|
using System.Text;
|
|
|
|
|
|
|
|
|
|
|
|
namespace OpenAuth.WebApi.Model.RabbitMQService
|
|
|
|
|
|
{
|
|
|
|
|
|
public class RabbitMqSenderService : IAsyncDisposable
|
|
|
|
|
|
{
|
|
|
|
|
|
private readonly ILogger<RabbitMqSenderService> _logger;
|
|
|
|
|
|
private readonly ConnectionFactory _factory;
|
|
|
|
|
|
private IConnection? _connection;
|
2026-02-28 10:48:20 +08:00
|
|
|
|
private readonly IConfiguration _configuration;
|
2025-12-29 11:31:11 +08:00
|
|
|
|
|
2026-02-28 10:48:20 +08:00
|
|
|
|
public RabbitMqSenderService(ILogger<RabbitMqSenderService> logger, IConfiguration configuration)
|
2025-12-29 11:31:11 +08:00
|
|
|
|
{
|
|
|
|
|
|
_logger = logger;
|
2026-02-28 10:48:20 +08:00
|
|
|
|
_configuration = configuration;
|
2025-12-29 11:31:11 +08:00
|
|
|
|
_factory = new ConnectionFactory
|
|
|
|
|
|
{
|
2026-02-28 10:48:20 +08:00
|
|
|
|
HostName = _configuration["RabbitMQ:HostName"],
|
|
|
|
|
|
UserName = _configuration["RabbitMQ:UserName"],
|
|
|
|
|
|
Password = _configuration["RabbitMQ:Password"],
|
|
|
|
|
|
Port = int.Parse(_configuration["RabbitMQ:Port"])
|
2025-12-29 11:31:11 +08:00
|
|
|
|
};
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 发送消息到指定队列
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public async Task PublishAsync(string queueName,string message,CancellationToken cancellationToken = default)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(queueName))
|
|
|
|
|
|
throw new ArgumentException("queueName cannot be empty", nameof(queueName));
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
var connection = await GetOrCreateConnectionAsync(cancellationToken);
|
|
|
|
|
|
await using var channel = await connection.CreateChannelAsync();
|
|
|
|
|
|
|
|
|
|
|
|
await channel.QueueDeclareAsync(
|
|
|
|
|
|
queue: queueName,
|
|
|
|
|
|
durable: true,
|
|
|
|
|
|
exclusive: false,
|
|
|
|
|
|
autoDelete: false,
|
|
|
|
|
|
arguments: null);
|
|
|
|
|
|
|
|
|
|
|
|
var body = Encoding.UTF8.GetBytes(message);
|
|
|
|
|
|
|
|
|
|
|
|
var props = new BasicProperties
|
|
|
|
|
|
{
|
|
|
|
|
|
Persistent = true,
|
|
|
|
|
|
MessageId = Guid.NewGuid().ToString()
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
await channel.BasicPublishAsync(
|
|
|
|
|
|
exchange: "",
|
|
|
|
|
|
routingKey: queueName,
|
|
|
|
|
|
mandatory: false,
|
|
|
|
|
|
basicProperties: props,
|
|
|
|
|
|
body: body,
|
|
|
|
|
|
cancellationToken: cancellationToken);
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation(
|
|
|
|
|
|
"RabbitMQ message published. Queue={Queue}, MessageId={MessageId}",
|
|
|
|
|
|
queueName, props.MessageId);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError(ex,
|
|
|
|
|
|
"RabbitMQ publish failed. Queue={Queue}", queueName);
|
|
|
|
|
|
throw;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private async Task<IConnection> GetOrCreateConnectionAsync(CancellationToken token)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (_connection is { IsOpen: true })
|
|
|
|
|
|
return _connection;
|
|
|
|
|
|
|
|
|
|
|
|
_connection?.Dispose();
|
|
|
|
|
|
_connection = await _factory.CreateConnectionAsync(token);
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation("RabbitMQ sender connected.");
|
|
|
|
|
|
return _connection;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public async ValueTask DisposeAsync()
|
|
|
|
|
|
{
|
|
|
|
|
|
//if (_channel != null)
|
|
|
|
|
|
// await _channel.CloseAsync();
|
|
|
|
|
|
|
|
|
|
|
|
if (_connection != null)
|
|
|
|
|
|
await _connection.CloseAsync();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|