LASAPlatform/OpenAuth.WebApi/Model/RabbitMQService/RabbitMqSenderService.cs

98 lines
3.3 KiB
C#

using Infrastructure.Helpers;
using RabbitMQ.Client;
using System.Text;
namespace OpenAuth.WebApi.Model.RabbitMQService
{
public class RabbitMqSenderService : IAsyncDisposable
{
private readonly ILogger<RabbitMqSenderService> _logger;
private readonly ConnectionFactory _factory;
private IConnection? _connection;
private readonly IConfiguration _configuration;
public RabbitMqSenderService(ILogger<RabbitMqSenderService> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
_factory = new ConnectionFactory
{
HostName = _configuration["RabbitMQ:HostName"],
UserName = _configuration["RabbitMQ:UserName"],
Password = _configuration["RabbitMQ:Password"],
Port = int.Parse(_configuration["RabbitMQ:Port"])
};
}
/// <summary>
/// 发送消息到指定队列
/// </summary>
public async Task PublishAsync(string queueName,string message,CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(queueName))
throw new ArgumentException("queueName cannot be empty", nameof(queueName));
try
{
var connection = await GetOrCreateConnectionAsync(cancellationToken);
await using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(message);
var props = new BasicProperties
{
Persistent = true,
MessageId = Guid.NewGuid().ToString()
};
await channel.BasicPublishAsync(
exchange: "",
routingKey: queueName,
mandatory: false,
basicProperties: props,
body: body,
cancellationToken: cancellationToken);
_logger.LogInformation(
"RabbitMQ message published. Queue={Queue}, MessageId={MessageId}",
queueName, props.MessageId);
}
catch (Exception ex)
{
_logger.LogError(ex,
"RabbitMQ publish failed. Queue={Queue}", queueName);
throw;
}
}
private async Task<IConnection> GetOrCreateConnectionAsync(CancellationToken token)
{
if (_connection is { IsOpen: true })
return _connection;
_connection?.Dispose();
_connection = await _factory.CreateConnectionAsync(token);
_logger.LogInformation("RabbitMQ sender connected.");
return _connection;
}
public async ValueTask DisposeAsync()
{
//if (_channel != null)
// await _channel.CloseAsync();
if (_connection != null)
await _connection.CloseAsync();
}
}
}