using Infrastructure.Helpers; using RabbitMQ.Client; using System.Text; namespace OpenAuth.WebApi.Model.RabbitMQService { public class RabbitMqSenderService : IAsyncDisposable { private readonly ILogger _logger; private readonly ConnectionFactory _factory; private IConnection? _connection; public RabbitMqSenderService(ILogger logger) { _logger = logger; var config = ConfigHelper.GetConfigRoot(); _factory = new ConnectionFactory { HostName = config["RabbitMQ:HostName"], UserName = config["RabbitMQ:UserName"], Password = config["RabbitMQ:Password"], Port = int.Parse(config["RabbitMQ:Port"]) }; } /// /// 发送消息到指定队列 /// public async Task PublishAsync(string queueName,string message,CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(queueName)) throw new ArgumentException("queueName cannot be empty", nameof(queueName)); try { var connection = await GetOrCreateConnectionAsync(cancellationToken); await using var channel = await connection.CreateChannelAsync(); await channel.QueueDeclareAsync( queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(message); var props = new BasicProperties { Persistent = true, MessageId = Guid.NewGuid().ToString() }; await channel.BasicPublishAsync( exchange: "", routingKey: queueName, mandatory: false, basicProperties: props, body: body, cancellationToken: cancellationToken); _logger.LogInformation( "RabbitMQ message published. Queue={Queue}, MessageId={MessageId}", queueName, props.MessageId); } catch (Exception ex) { _logger.LogError(ex, "RabbitMQ publish failed. Queue={Queue}", queueName); throw; } } private async Task GetOrCreateConnectionAsync(CancellationToken token) { if (_connection is { IsOpen: true }) return _connection; _connection?.Dispose(); _connection = await _factory.CreateConnectionAsync(token); _logger.LogInformation("RabbitMQ sender connected."); return _connection; } public async ValueTask DisposeAsync() { //if (_channel != null) // await _channel.CloseAsync(); if (_connection != null) await _connection.CloseAsync(); } } }