76 lines
3.2 KiB
C#
76 lines
3.2 KiB
C#
using RabbitMQ.Client;
|
||
using System.Text;
|
||
|
||
namespace OpenAuth.WebApi.Model.RabbitMQService
|
||
{
|
||
public class RabbitMqSender : IAsyncDisposable
|
||
{
|
||
private readonly IConnection _connection;
|
||
private readonly IChannel _channel;
|
||
private const string ExchangeName = "mps_event";
|
||
private const string ExchangeType = "topic";
|
||
|
||
public RabbitMqSender(IConnection connection, IChannel channel)
|
||
{
|
||
_connection = connection;
|
||
_channel = channel;
|
||
}
|
||
public static async Task<RabbitMqSender> CreateAsync(string hostName, int port, string userName, string password)
|
||
{
|
||
var factory = new ConnectionFactory()
|
||
{
|
||
HostName = hostName,
|
||
UserName = userName,
|
||
Password = password,
|
||
Port = port,
|
||
AutomaticRecoveryEnabled = true, // 自动重连
|
||
NetworkRecoveryInterval = TimeSpan.FromSeconds(5),
|
||
};
|
||
|
||
var connection = await factory.CreateConnectionAsync();
|
||
var channel = await connection.CreateChannelAsync();
|
||
await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType, durable: true);
|
||
//await channel.QueueDeclareAsync("sdhc", durable: true, exclusive: false, autoDelete: false);
|
||
var queueOk = await channel.QueueDeclareAsync(queue: "",
|
||
durable: false,
|
||
exclusive: true,
|
||
autoDelete: true);
|
||
await channel.QueueBindAsync(queueOk.QueueName, "mps_event", "event.capturePic.1.sdhc04");
|
||
return new RabbitMqSender(connection, channel);
|
||
}
|
||
public async Task SendAsync(string routeKey, string message)
|
||
{
|
||
var body = Encoding.UTF8.GetBytes(message);
|
||
var properties = new BasicProperties()
|
||
{
|
||
Persistent = true
|
||
};
|
||
// 监听 BasicReturn 事件
|
||
_channel.BasicReturnAsync += (sender, args) =>
|
||
{
|
||
var returnedMessage = Encoding.UTF8.GetString(args.Body.ToArray());
|
||
Console.WriteLine($"[BasicReturn] Message returned: {returnedMessage}");
|
||
Console.WriteLine($"ReplyCode: {args.ReplyCode}, ReplyText: {args.ReplyText}");
|
||
Console.WriteLine($"Exchange: {args.Exchange}, RoutingKey: {args.RoutingKey}");
|
||
return Task.CompletedTask;
|
||
};
|
||
|
||
await _channel.BasicPublishAsync(exchange: "mps_event",
|
||
routingKey: routeKey,
|
||
mandatory: true,
|
||
basicProperties: properties,
|
||
body: body);
|
||
|
||
Console.WriteLine($"[RabbitMQ] 已发送到routeKey {routeKey}:{message}");
|
||
}
|
||
public async ValueTask DisposeAsync()
|
||
{
|
||
if (_channel != null)
|
||
await _channel.CloseAsync();
|
||
|
||
if (_connection != null)
|
||
await _connection.CloseAsync();
|
||
}
|
||
}
|
||
}
|