using Infrastructure.Helpers; using RabbitMQ.Client; using System.Text; namespace OpenAuth.WebApi.Model.RabbitMQService { public class RabbitMqSenderService : IAsyncDisposable { private readonly ILogger _logger; private readonly ConnectionFactory _factory; private IConnection? _connection; private readonly IConfiguration _configuration; public RabbitMqSenderService(ILogger logger, IConfiguration configuration) { _logger = logger; _configuration = configuration; _factory = new ConnectionFactory { HostName = _configuration["RabbitMQ:HostName"], UserName = _configuration["RabbitMQ:UserName"], Password = _configuration["RabbitMQ:Password"], Port = int.Parse(_configuration["RabbitMQ:Port"]) }; } /// /// 发送消息到指定队列 /// public async Task PublishAsync(string queueName,string message,CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(queueName)) throw new ArgumentException("queueName cannot be empty", nameof(queueName)); try { var connection = await GetOrCreateConnectionAsync(cancellationToken); await using var channel = await connection.CreateChannelAsync(); await channel.QueueDeclareAsync( queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(message); var props = new BasicProperties { Persistent = true, MessageId = Guid.NewGuid().ToString() }; await channel.BasicPublishAsync( exchange: "", routingKey: queueName, mandatory: false, basicProperties: props, body: body, cancellationToken: cancellationToken); _logger.LogInformation( "RabbitMQ message published. Queue={Queue}, MessageId={MessageId}", queueName, props.MessageId); } catch (Exception ex) { _logger.LogError(ex, "RabbitMQ publish failed. Queue={Queue}", queueName); throw; } } private async Task GetOrCreateConnectionAsync(CancellationToken token) { if (_connection is { IsOpen: true }) return _connection; _connection?.Dispose(); _connection = await _factory.CreateConnectionAsync(token); _logger.LogInformation("RabbitMQ sender connected."); return _connection; } public async ValueTask DisposeAsync() { //if (_channel != null) // await _channel.CloseAsync(); if (_connection != null) await _connection.CloseAsync(); } } }