mqtt通讯及大华算法接口

main
zhangbin 3 weeks ago
parent 0cca478475
commit b889e575ce

@ -96,7 +96,7 @@ public class ConfigSubscribe : IJob
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
var method = result.method;
var data = result.data;
_logger.LogDebug($"主题:{topic}\n消息{message}");
//_logger.LogDebug($"主题:{topic}\n消息{message}");
long code = 0;
switch (tempStr)
{

@ -3,9 +3,12 @@ using Infrastructure;
using Infrastructure.Helpers;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.CodeAnalysis.Text;
using Newtonsoft.Json;
using OpenAuth.App.ServiceApp.Request;
using OpenAuth.Repository.Domain;
using OpenAuth.WebApi.Model.RabbitMQService;
using RabbitMQ.Client;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
@ -22,6 +25,11 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
private readonly string ak = "e3d019924e7f40089bcffba4";
private readonly string sk = "cf83a12caa155b994eb34fa9";
private readonly string baseUrl = "https://123.132.248.154:6405";
private readonly RabbitMqListenerService _listener;
public DaHuaAiController(RabbitMqListenerService listener)
{
_listener = listener;
}
private async Task<Response<string>> PostJsonAsync(string path, object body)
{
var handler = new HttpClientHandler
@ -159,9 +167,20 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
/// <returns></returns>
[HttpPost]
[AllowAnonymous]
public Task<Response<string>> CreateTasks(DaHuaTaskInfo data)
public async Task<Response<string>> CreateTasks(DaHuaTaskInfo data)
{
await _listener.AddQueueBindingAsync("processing_event1", "topic", $"event.behaviorAlarm.1.{data.channelId}");
return await PostJsonAsync("/processing/behavior/realtime/tasks", data);
}
/// <summary>
/// 模拟添加队列
/// </summary>
/// <param name="channelId"></param>
[HttpPost]
[AllowAnonymous]
public async void AddQueueBinding(string channelId)
{
return PostJsonAsync("/processing/behavior/realtime/tasks", data);
await _listener.AddQueueBindingAsync("processing_event", "topic", $"event.trafficJunction.1.{channelId}");
}
/// <summary>
/// 删除行为场景分析任务

@ -0,0 +1,139 @@
using Microsoft.AspNetCore.Connections;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
namespace OpenAuth.WebApi.Model.RabbitMQService
{
public class RabbitMqListenerService : BackgroundService
{
private readonly ConnectionFactory _factory;
private IConnection _connection;
private IChannel _channel;
private readonly Dictionary<string, string> _queueRoutingMap = new();
private readonly object _lock = new();
private readonly List<QueueBinding> _bindings = new();
public RabbitMqListenerService()
{
_factory = new ConnectionFactory
{
//HostName = "123.132.248.154",
//UserName = "DHCloudg1",
//Password = "Cloud0#4fCraQrm",
//Port = 9103
HostName = "localhost",
UserName = "guest",
Password = "guest",
Port = 5672
};
}
public async Task AddQueueBindingAsync(string exchange, string exchangeType, string routingKey)
{
_bindings.Add(new QueueBinding
{
Exchange = exchange,
ExchangeType = exchangeType,
RoutingKey = routingKey
});
if (_channel != null)
{
await BindQueueAsync(_channel, exchange, exchangeType, routingKey);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_connection = await _factory.CreateConnectionAsync(stoppingToken);
_channel = await _connection.CreateChannelAsync();
await BindQueueAsync(_channel, "processing_event", "topic", "event.trafficJunction.1.sdhc");
foreach (var binding in _bindings)
{
await BindQueueAsync(_channel, binding.Exchange, binding.ExchangeType, binding.RoutingKey);
}
}
private async Task BindQueueAsync(IChannel channel, string exchange, string exchangeType, string routingKey)
{
await channel.ExchangeDeclareAsync(exchange, exchangeType, durable: true);
await channel.QueueDeclareAsync("", durable: true, exclusive: false, autoDelete: false);
await channel.QueueBindAsync("", exchange, routingKey);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[processing_event]: {message}");
// Exchange 来做不同处理
switch (ea.Exchange)
{
case "processing_event":
Console.WriteLine($"[processing_event]: {message}");
break;
case "thing_event":
Console.WriteLine($"[thing_event] {message}");
break;
default:
Console.WriteLine($"[Other:{ea.RoutingKey}] {message}");
break;
}
await Task.Yield();
};
await channel.BasicConsumeAsync("", autoAck: true, consumer);
}
//public async Task AddListenerAsync(string exchange, string exchangeType, string routingKey)
//{
// _connection = await _factory.CreateConnectionAsync();
// foreach (var binding in _bindings)
// {
// var channel = await _connection.CreateChannelAsync();
// // 声明交换机
// await channel.ExchangeDeclareAsync(binding.Exchange, binding.ExchangeType, durable: true);
// await channel.QueueDeclareAsync("", durable: true, exclusive: false, autoDelete: false);
// // 绑定队列到交换机
// await channel.QueueBindAsync(queue: "", exchange: binding.Exchange, routingKey: binding.RoutingKey);
// var consumer = new AsyncEventingBasicConsumer(channel);
// consumer.ReceivedAsync += async (model, ea) =>
// {
// var body = ea.Body.ToArray();
// var message = Encoding.UTF8.GetString(body);
// using var scope = _serviceProvider.CreateScope();
// foreach (var handler in handlers)
// {
// await handler.HandleMessageAsync(binding.Exchange, ea.RoutingKey, message);
// }
// await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
// };
// await channel.BasicConsumeAsync(queue: "", autoAck: false, consumer: consumer);
// _channels.Add(channel);
// }
//}
//public async ValueTask DisposeAsync()
//{
// foreach (var channel in _channels)
// {
// if (channel != null)
// await channel.CloseAsync();
// }
// if (_connection != null)
// await _connection.CloseAsync();
//}
private class QueueBinding
{
public string Exchange { get; set; } = "";
public string ExchangeType { get; set; } = "";
public string RoutingKey { get; set; } = "";
}
}
}

@ -1,86 +1,87 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>bin\Debug\OpenAuth.WebApi.xml</DocumentationFile>
<NoWarn>1701;1702;1591;1573;1572;1570</NoWarn>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>bin\Debug\OpenAuth.WebApi.xml</DocumentationFile>
<NoWarn>1701;1702;1591;1573;1572;1570</NoWarn>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DocumentationFile>bin\Release\OpenAuth.WebApi.xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DocumentationFile>bin\Release\OpenAuth.WebApi.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<Compile Remove="log\**" />
<Compile Remove="wwwroot\**" />
<Content Remove="log\**" />
<Content Remove="wwwroot\**" />
<EmbeddedResource Remove="log\**" />
<EmbeddedResource Remove="wwwroot\**" />
<None Remove="log\**" />
<None Remove="wwwroot\**" />
<EmbeddedResource Include="index.html" />
</ItemGroup>
<ItemGroup>
<Compile Remove="log\**" />
<Compile Remove="wwwroot\**" />
<Content Remove="log\**" />
<Content Remove="wwwroot\**" />
<EmbeddedResource Remove="log\**" />
<EmbeddedResource Remove="wwwroot\**" />
<None Remove="log\**" />
<None Remove="wwwroot\**" />
<EmbeddedResource Include="index.html" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="log4net" Version="2.0.15" />
<PackageReference Include="IdentityServer4.AccessTokenValidation" Version="3.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="6.0.16" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.16" />
<PackageReference Include="Microsoft.AspNetCore.TestHost" Version="6.0.16" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Log4Net.AspNetCore" Version="6.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.2" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="6.0.13" />
<PackageReference Include="MiniProfiler.AspNetCore" Version="4.2.22" />
<PackageReference Include="MiniProfiler.AspNetCore.Mvc" Version="4.2.22" />
<PackageReference Include="MiniProfiler.Shared" Version="4.2.22" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="NPOI" Version="2.5.6" />
<PackageReference Include="NUnit" Version="3.13.1" />
<PackageReference Include="NUnit3TestAdapter" Version="3.17.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="log4net" Version="2.0.15" />
<PackageReference Include="IdentityServer4.AccessTokenValidation" Version="3.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="6.0.16" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.16" />
<PackageReference Include="Microsoft.AspNetCore.TestHost" Version="6.0.16" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Log4Net.AspNetCore" Version="6.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.2" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="6.0.13" />
<PackageReference Include="MiniProfiler.AspNetCore" Version="4.2.22" />
<PackageReference Include="MiniProfiler.AspNetCore.Mvc" Version="4.2.22" />
<PackageReference Include="MiniProfiler.Shared" Version="4.2.22" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="NPOI" Version="2.5.6" />
<PackageReference Include="NUnit" Version="3.13.1" />
<PackageReference Include="NUnit3TestAdapter" Version="3.17.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageReference Include="RabbitMQ.Client" Version="7.1.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Infrastructure\Infrastructure.csproj" />
<ProjectReference Include="..\OpenAuth.App\OpenAuth.App.csproj" />
<ProjectReference Include="..\OpenAuth.Repository\OpenAuth.Repository.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Infrastructure\Infrastructure.csproj" />
<ProjectReference Include="..\OpenAuth.App\OpenAuth.App.csproj" />
<ProjectReference Include="..\OpenAuth.Repository\OpenAuth.Repository.csproj" />
</ItemGroup>
<ItemGroup>
<Reference Include="Autofac.Extensions.DependencyInjection">
<HintPath>..\..\..\Users\Administrator\.nuget\packages\autofac.extensions.dependencyinjection\4.0.0\lib\netstandard1.1\Autofac.Extensions.DependencyInjection.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Reference Include="Autofac.Extensions.DependencyInjection">
<HintPath>..\..\..\Users\Administrator\.nuget\packages\autofac.extensions.dependencyinjection\4.0.0\lib\netstandard1.1\Autofac.Extensions.DependencyInjection.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Content Update="appsettings.Production.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<Content Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
<Content Update="Model\mqtt\hms.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<Content Update="appsettings.Production.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<Content Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
<Content Update="Model\mqtt\hms.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<None Update="Dockerfile11111">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<None Update="Dockerfile11111">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Folder Include="Controllers\BaseControllers\Permission\" />
</ItemGroup>
<ItemGroup>
<Folder Include="Controllers\BaseControllers\Permission\" />
</ItemGroup>
</Project>

@ -26,6 +26,7 @@ using OpenAuth.App.ServiceApp;
using OpenAuth.Repository;
using OpenAuth.WebApi.Model;
using OpenAuth.WebApi.Model.mqtt;
using OpenAuth.WebApi.Model.RabbitMQService;
using SqlSugar;
using Swashbuckle.AspNetCore.SwaggerUI;
using Yitter.IdGenerator;
@ -181,6 +182,11 @@ namespace OpenAuth.WebApi
services.AddHostedService<MqttHostedService>();
#endregion
#region rabbitmq
services.AddSingleton<RabbitMqListenerService>();
services.AddHostedService(sp => sp.GetRequiredService<RabbitMqListenerService>());
#endregion
#region AppSetting
services.Configure<AppSetting>(Configuration.GetSection("AppSetting"));

Loading…
Cancel
Save