修改rabbit

main
zhangbin 2025-11-26 15:32:18 +08:00
parent ddf1f27bf9
commit 2b18e0089b
1 changed files with 54 additions and 143 deletions

View File

@ -9,173 +9,84 @@ namespace OpenAuth.WebApi.Model.RabbitMQService
{ {
public class RabbitMqListenerService : BackgroundService public class RabbitMqListenerService : BackgroundService
{ {
private readonly ConnectionFactory _factory; private readonly ILogger<RabbitMqListenerService> _logger;
private readonly DaHuaAiApp _app;
private IConnection _connection; private IConnection _connection;
private IChannel _channel; private IChannel _channel;
private readonly Dictionary<string, string> _queueRoutingMap = new();
private readonly object _lock = new(); private readonly ConnectionFactory _factory = new()
private readonly List<QueueBinding> _bindings = new(); {
private readonly ILogger<RabbitMqListenerService> _logger; HostName = "123.132.248.154",
private DaHuaAiApp _app; UserName = "DHCloudg1",
Password = "Cloud0#4fCraQrm",
Port = 9103
};
public RabbitMqListenerService(ILogger<RabbitMqListenerService> logger, DaHuaAiApp app) public RabbitMqListenerService(ILogger<RabbitMqListenerService> logger, DaHuaAiApp app)
{ {
_logger = logger; _logger = logger;
_app = app; _app = app;
_factory = new ConnectionFactory
{
HostName = "123.132.248.154",
UserName = "DHCloudg1",
Password = "Cloud0#4fCraQrm",
Port = 9103
//HostName = "localhost",
//UserName = "guest",
//Password = "guest",
//Port = 5672
};
} }
public async Task AddQueueBindingAsync(string exchange, string exchangeType, string routingKey)
{
_bindings.Add(new QueueBinding
{
Exchange = exchange,
ExchangeType = exchangeType,
RoutingKey = routingKey
});
if (_channel != null)
{
await BindQueueAsync(_channel, exchange, exchangeType, routingKey);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
_connection = await _factory.CreateConnectionAsync(stoppingToken); _logger.LogInformation("RabbitMQ listener starting...");
_channel = await _connection.CreateChannelAsync();
//await BindQueueAsync(_channel, "processing_event", "topic", "event.trafficJunction.1.sdhc"); while (!stoppingToken.IsCancellationRequested)
var queueOk = await _channel.QueueDeclareAsync(queue: "",
durable: false,
exclusive: true,
autoDelete: true);
//await _channel.QueueBindAsync(queueOk.QueueName, "processing_event", "event.eventNotAlarm.picture.1.#");
await _channel.QueueBindAsync(queueOk.QueueName, "processing_event", "event.behaviorAlarm.picture.1.#");
await _channel.QueueBindAsync(queueOk.QueueName, "storage_event", "event.trafficEvent.1.#");
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (sender, ea) =>
{ {
try
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//Console.WriteLine($"接收数据[processing_event]: {message}");
// Exchange 来做不同处理
switch (ea.Exchange)
{ {
case "processing_event": // 建立连接
await _app.AddDaHuaImg(message); _connection = await _factory.CreateConnectionAsync(stoppingToken);
break; _channel = await _connection.CreateChannelAsync();
case "storage_event":
await _app.AddDaHuaSmokeDetectionImg(message); _logger.LogInformation("RabbitMQ connected successfully.");
break;
default: await StartConsumeAsync(_channel, stoppingToken);
Console.WriteLine($"[Other:{ea.RoutingKey}] {message}");
break; // 阻塞直到断开
await Task.Delay(Timeout.Infinite, stoppingToken);
} }
catch (Exception ex)
await Task.Yield(); {
}; _logger.LogError(ex, "RabbitMQ connection failed. Retrying in 5 seconds...");
await Task.Delay(5000, stoppingToken);
await _channel.BasicConsumeAsync("", autoAck: true, consumer); }
//foreach (var binding in _bindings) }
//{
// await BindQueueAsync(_channel, binding.Exchange, binding.ExchangeType, binding.RoutingKey);
//}
} }
private async Task BindQueueAsync(IChannel channel, string exchange, string exchangeType, string routingKey) private async Task StartConsumeAsync(IChannel channel, CancellationToken token)
{ {
//await channel.ExchangeDeclareAsync(exchange, exchangeType, durable: true); var q = await channel.QueueDeclareAsync("", false, true, true);
//await channel.QueueDeclareAsync("", durable: true, exclusive: false, autoDelete: false); await channel.QueueBindAsync(q.QueueName, "processing_event", "event.behaviorAlarm.picture.1.#");
//var queueOk = await _channel.QueueDeclareAsync(queue: "", await channel.QueueBindAsync(q.QueueName, "storage_event", "event.trafficEvent.1.#");
// durable: false,
// exclusive: true,
// autoDelete: true);
//await channel.QueueBindAsync(queueOk.QueueName, exchange, routingKey);
var consumer = new AsyncEventingBasicConsumer(channel); var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (sender, ea) => consumer.ReceivedAsync += async (_, ea) =>
{ {
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var body = ea.Body.ToArray(); try
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[processing_event]: {message}");
// Exchange 来做不同处理
switch (ea.Exchange)
{ {
case "processing_event": switch (ea.Exchange)
Console.WriteLine($"[processing_event]: {message}"); {
break; case "processing_event":
case "thing_event": await _app.AddDaHuaImg(message);
Console.WriteLine($"[thing_event] {message}"); break;
break; case "storage_event":
default: await _app.AddDaHuaSmokeDetectionImg(message);
Console.WriteLine($"[Other:{ea.RoutingKey}] {message}"); break;
break; }
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息失败");
} }
await Task.Yield();
}; };
await channel.BasicConsumeAsync("", autoAck: true, consumer); await channel.BasicConsumeAsync(q.QueueName, autoAck: true, consumer);
}
//public async Task AddListenerAsync(string exchange, string exchangeType, string routingKey)
//{
// _connection = await _factory.CreateConnectionAsync();
// foreach (var binding in _bindings) _logger.LogInformation("RabbitMQ Consumer started.");
// {
// var channel = await _connection.CreateChannelAsync();
// // 声明交换机
// await channel.ExchangeDeclareAsync(binding.Exchange, binding.ExchangeType, durable: true);
// await channel.QueueDeclareAsync("", durable: true, exclusive: false, autoDelete: false);
// // 绑定队列到交换机
// await channel.QueueBindAsync(queue: "", exchange: binding.Exchange, routingKey: binding.RoutingKey);
// var consumer = new AsyncEventingBasicConsumer(channel);
// consumer.ReceivedAsync += async (model, ea) =>
// {
// var body = ea.Body.ToArray();
// var message = Encoding.UTF8.GetString(body);
// using var scope = _serviceProvider.CreateScope();
// foreach (var handler in handlers)
// {
// await handler.HandleMessageAsync(binding.Exchange, ea.RoutingKey, message);
// }
// await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
// };
// await channel.BasicConsumeAsync(queue: "", autoAck: false, consumer: consumer);
// _channels.Add(channel);
// }
//}
//public async ValueTask DisposeAsync()
//{
// foreach (var channel in _channels)
// {
// if (channel != null)
// await channel.CloseAsync();
// }
// if (_connection != null)
// await _connection.CloseAsync();
//}
private class QueueBinding
{
public string Exchange { get; set; } = "";
public string ExchangeType { get; set; } = "";
public string RoutingKey { get; set; } = "";
} }
} }
} }