You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

76 lines
3.2 KiB
C#

using RabbitMQ.Client;
using System.Text;
namespace OpenAuth.WebApi.Model.RabbitMQService
{
public class RabbitMqSender : IAsyncDisposable
{
private readonly IConnection _connection;
private readonly IChannel _channel;
private const string ExchangeName = "mps_event";
private const string ExchangeType = "topic";
public RabbitMqSender(IConnection connection, IChannel channel)
{
_connection = connection;
_channel = channel;
}
public static async Task<RabbitMqSender> CreateAsync(string hostName, int port, string userName, string password)
{
var factory = new ConnectionFactory()
{
HostName = hostName,
UserName = userName,
Password = password,
Port = port,
AutomaticRecoveryEnabled = true, // 自动重连
NetworkRecoveryInterval = TimeSpan.FromSeconds(5),
};
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType, durable: true);
//await channel.QueueDeclareAsync("sdhc", durable: true, exclusive: false, autoDelete: false);
var queueOk = await channel.QueueDeclareAsync(queue: "",
durable: false,
exclusive: true,
autoDelete: true);
await channel.QueueBindAsync(queueOk.QueueName, "mps_event", "event.capturePic.1.sdhc04");
return new RabbitMqSender(connection, channel);
}
public async Task SendAsync(string routeKey, string message)
{
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties()
{
Persistent = true
};
// 监听 BasicReturn 事件
_channel.BasicReturnAsync += (sender, args) =>
{
var returnedMessage = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine($"[BasicReturn] Message returned: {returnedMessage}");
Console.WriteLine($"ReplyCode: {args.ReplyCode}, ReplyText: {args.ReplyText}");
Console.WriteLine($"Exchange: {args.Exchange}, RoutingKey: {args.RoutingKey}");
return Task.CompletedTask;
};
await _channel.BasicPublishAsync(exchange: "mps_event",
routingKey: routeKey,
mandatory: true,
basicProperties: properties,
body: body);
Console.WriteLine($"[RabbitMQ] 已发送到routeKey {routeKey}{message}");
}
public async ValueTask DisposeAsync()
{
if (_channel != null)
await _channel.CloseAsync();
if (_connection != null)
await _connection.CloseAsync();
}
}
}