From b889e575ce204746c37e3273266c75ed664c21cb Mon Sep 17 00:00:00 2001 From: zhangbin <460190368@qq.com> Date: Mon, 18 Aug 2025 10:48:43 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E9=80=9A=E8=AE=AF=E5=8F=8A=E5=A4=A7?= =?UTF-8?q?=E5=8D=8E=E7=AE=97=E6=B3=95=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ServiceApp/Subscribe/ConfigSubscribe.cs | 2 +- .../ServiceControllers/DaHuaAiController.cs | 23 ++- .../RabbitMqListenerService.cs | 139 +++++++++++++++++ OpenAuth.WebApi/OpenAuth.WebApi.csproj | 147 +++++++++--------- OpenAuth.WebApi/Startup.cs | 6 + 5 files changed, 241 insertions(+), 76 deletions(-) create mode 100644 OpenAuth.WebApi/Model/RabbitMQService/RabbitMqListenerService.cs diff --git a/OpenAuth.App/ServiceApp/Subscribe/ConfigSubscribe.cs b/OpenAuth.App/ServiceApp/Subscribe/ConfigSubscribe.cs index f5fc5b6..f4b70a0 100644 --- a/OpenAuth.App/ServiceApp/Subscribe/ConfigSubscribe.cs +++ b/OpenAuth.App/ServiceApp/Subscribe/ConfigSubscribe.cs @@ -96,7 +96,7 @@ public class ConfigSubscribe : IJob var result = JsonConvert.DeserializeObject>(message); var method = result.method; var data = result.data; - _logger.LogDebug($"主题:{topic}\n消息:{message}"); + //_logger.LogDebug($"主题:{topic}\n消息:{message}"); long code = 0; switch (tempStr) { diff --git a/OpenAuth.WebApi/Controllers/ServiceControllers/DaHuaAiController.cs b/OpenAuth.WebApi/Controllers/ServiceControllers/DaHuaAiController.cs index d398f94..b52b8e9 100644 --- a/OpenAuth.WebApi/Controllers/ServiceControllers/DaHuaAiController.cs +++ b/OpenAuth.WebApi/Controllers/ServiceControllers/DaHuaAiController.cs @@ -3,9 +3,12 @@ using Infrastructure; using Infrastructure.Helpers; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; +using Microsoft.CodeAnalysis.Text; using Newtonsoft.Json; using OpenAuth.App.ServiceApp.Request; using OpenAuth.Repository.Domain; +using OpenAuth.WebApi.Model.RabbitMQService; +using RabbitMQ.Client; using System.Globalization; using System.Security.Cryptography; using System.Text; @@ -22,6 +25,11 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers private readonly string ak = "e3d019924e7f40089bcffba4"; private readonly string sk = "cf83a12caa155b994eb34fa9"; private readonly string baseUrl = "https://123.132.248.154:6405"; + private readonly RabbitMqListenerService _listener; + public DaHuaAiController(RabbitMqListenerService listener) + { + _listener = listener; + } private async Task> PostJsonAsync(string path, object body) { var handler = new HttpClientHandler @@ -159,9 +167,20 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers /// [HttpPost] [AllowAnonymous] - public Task> CreateTasks(DaHuaTaskInfo data) + public async Task> CreateTasks(DaHuaTaskInfo data) { - return PostJsonAsync("/processing/behavior/realtime/tasks", data); + await _listener.AddQueueBindingAsync("processing_event1", "topic", $"event.behaviorAlarm.1.{data.channelId}"); + return await PostJsonAsync("/processing/behavior/realtime/tasks", data); + } + /// + /// 模拟添加队列 + /// + /// + [HttpPost] + [AllowAnonymous] + public async void AddQueueBinding(string channelId) + { + await _listener.AddQueueBindingAsync("processing_event", "topic", $"event.trafficJunction.1.{channelId}"); } /// /// 删除行为场景分析任务 diff --git a/OpenAuth.WebApi/Model/RabbitMQService/RabbitMqListenerService.cs b/OpenAuth.WebApi/Model/RabbitMQService/RabbitMqListenerService.cs new file mode 100644 index 0000000..f88707d --- /dev/null +++ b/OpenAuth.WebApi/Model/RabbitMQService/RabbitMqListenerService.cs @@ -0,0 +1,139 @@ +using Microsoft.AspNetCore.Connections; +using RabbitMQ.Client.Events; +using RabbitMQ.Client; +using System.Text; + +namespace OpenAuth.WebApi.Model.RabbitMQService +{ + public class RabbitMqListenerService : BackgroundService + { + private readonly ConnectionFactory _factory; + private IConnection _connection; + private IChannel _channel; + private readonly Dictionary _queueRoutingMap = new(); + private readonly object _lock = new(); + private readonly List _bindings = new(); + public RabbitMqListenerService() + { + _factory = new ConnectionFactory + { + //HostName = "123.132.248.154", + //UserName = "DHCloudg1", + //Password = "Cloud0#4fCraQrm", + //Port = 9103 + HostName = "localhost", + UserName = "guest", + Password = "guest", + Port = 5672 + }; + } + public async Task AddQueueBindingAsync(string exchange, string exchangeType, string routingKey) + { + _bindings.Add(new QueueBinding + { + Exchange = exchange, + ExchangeType = exchangeType, + RoutingKey = routingKey + }); + + if (_channel != null) + { + await BindQueueAsync(_channel, exchange, exchangeType, routingKey); + } + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _connection = await _factory.CreateConnectionAsync(stoppingToken); + _channel = await _connection.CreateChannelAsync(); + await BindQueueAsync(_channel, "processing_event", "topic", "event.trafficJunction.1.sdhc"); + foreach (var binding in _bindings) + { + await BindQueueAsync(_channel, binding.Exchange, binding.ExchangeType, binding.RoutingKey); + } + } + + private async Task BindQueueAsync(IChannel channel, string exchange, string exchangeType, string routingKey) + { + await channel.ExchangeDeclareAsync(exchange, exchangeType, durable: true); + await channel.QueueDeclareAsync("", durable: true, exclusive: false, autoDelete: false); + await channel.QueueBindAsync("", exchange, routingKey); + + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.ReceivedAsync += async (sender, ea) => + { + + var body = ea.Body.ToArray(); + var message = Encoding.UTF8.GetString(body); + Console.WriteLine($"[processing_event]: {message}"); + // Exchange 来做不同处理 + switch (ea.Exchange) + { + case "processing_event": + Console.WriteLine($"[processing_event]: {message}"); + break; + case "thing_event": + Console.WriteLine($"[thing_event] {message}"); + break; + default: + Console.WriteLine($"[Other:{ea.RoutingKey}] {message}"); + break; + } + + await Task.Yield(); + }; + + await channel.BasicConsumeAsync("", autoAck: true, consumer); + } + //public async Task AddListenerAsync(string exchange, string exchangeType, string routingKey) + //{ + // _connection = await _factory.CreateConnectionAsync(); + + // foreach (var binding in _bindings) + // { + // var channel = await _connection.CreateChannelAsync(); + + // // 声明交换机 + // await channel.ExchangeDeclareAsync(binding.Exchange, binding.ExchangeType, durable: true); + // await channel.QueueDeclareAsync("", durable: true, exclusive: false, autoDelete: false); + // // 绑定队列到交换机 + // await channel.QueueBindAsync(queue: "", exchange: binding.Exchange, routingKey: binding.RoutingKey); + + // var consumer = new AsyncEventingBasicConsumer(channel); + // consumer.ReceivedAsync += async (model, ea) => + // { + // var body = ea.Body.ToArray(); + // var message = Encoding.UTF8.GetString(body); + + // using var scope = _serviceProvider.CreateScope(); + + // foreach (var handler in handlers) + // { + // await handler.HandleMessageAsync(binding.Exchange, ea.RoutingKey, message); + // } + + // await channel.BasicAckAsync(ea.DeliveryTag, multiple: false); + // }; + // await channel.BasicConsumeAsync(queue: "", autoAck: false, consumer: consumer); + + // _channels.Add(channel); + // } + //} + + //public async ValueTask DisposeAsync() + //{ + // foreach (var channel in _channels) + // { + // if (channel != null) + // await channel.CloseAsync(); + // } + // if (_connection != null) + // await _connection.CloseAsync(); + //} + private class QueueBinding + { + public string Exchange { get; set; } = ""; + public string ExchangeType { get; set; } = ""; + public string RoutingKey { get; set; } = ""; + } + } +} diff --git a/OpenAuth.WebApi/OpenAuth.WebApi.csproj b/OpenAuth.WebApi/OpenAuth.WebApi.csproj index ec8a294..b66ef05 100644 --- a/OpenAuth.WebApi/OpenAuth.WebApi.csproj +++ b/OpenAuth.WebApi/OpenAuth.WebApi.csproj @@ -1,86 +1,87 @@  - - net6.0 - enable - Linux - + + net6.0 + enable + Linux + - - bin\Debug\OpenAuth.WebApi.xml - 1701;1702;1591;1573;1572;1570 - + + bin\Debug\OpenAuth.WebApi.xml + 1701;1702;1591;1573;1572;1570 + - - bin\Release\OpenAuth.WebApi.xml - + + bin\Release\OpenAuth.WebApi.xml + - - - - - - - - - - - + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - - - + + + + + - - - ..\..\..\Users\Administrator\.nuget\packages\autofac.extensions.dependencyinjection\4.0.0\lib\netstandard1.1\Autofac.Extensions.DependencyInjection.dll - - + + + ..\..\..\Users\Administrator\.nuget\packages\autofac.extensions.dependencyinjection\4.0.0\lib\netstandard1.1\Autofac.Extensions.DependencyInjection.dll + + - - - PreserveNewest - - - Always - - - Always - - + + + PreserveNewest + + + Always + + + Always + + - - - Always - - + + + Always + + - - - + + + diff --git a/OpenAuth.WebApi/Startup.cs b/OpenAuth.WebApi/Startup.cs index 235590b..99bdaba 100644 --- a/OpenAuth.WebApi/Startup.cs +++ b/OpenAuth.WebApi/Startup.cs @@ -26,6 +26,7 @@ using OpenAuth.App.ServiceApp; using OpenAuth.Repository; using OpenAuth.WebApi.Model; using OpenAuth.WebApi.Model.mqtt; +using OpenAuth.WebApi.Model.RabbitMQService; using SqlSugar; using Swashbuckle.AspNetCore.SwaggerUI; using Yitter.IdGenerator; @@ -181,6 +182,11 @@ namespace OpenAuth.WebApi services.AddHostedService(); #endregion + #region rabbitmq + services.AddSingleton(); + services.AddHostedService(sp => sp.GetRequiredService()); + #endregion + #region AppSetting services.Configure(Configuration.GetSection("AppSetting"));