机场日志添加通道

main
zhangbin 2026-03-12 11:18:37 +08:00
parent 5d0314fe78
commit 4e2a0cffe8
5 changed files with 239 additions and 5 deletions

View File

@ -45,7 +45,7 @@ namespace OpenAuth.App.ServiceApp.Algo
private readonly IConfiguration _configuration; private readonly IConfiguration _configuration;
public DaHuaAiApp(ISugarUnitOfWork<SugarDbContext> unitWork, MinioService minioService, ILogger<DaHuaAiApp> logger, MqttClientManager mqttClientManager, public DaHuaAiApp(ISugarUnitOfWork<SugarDbContext> unitWork, MinioService minioService, ILogger<DaHuaAiApp> logger, MqttClientManager mqttClientManager,
ISimpleClient<LasaAlgorithmsRepository> repository, IAuth auth, ISimpleClient<LasaAlgorithmsRepository> repository, IAuth auth,
IConfiguration configuration) : base(unitWork, repository, auth) IConfiguration configuration) : base(unitWork, repository, auth)
{ {
_minioService = minioService; _minioService = minioService;
_logger = logger; _logger = logger;
@ -667,6 +667,129 @@ namespace OpenAuth.App.ServiceApp.Algo
}; };
} }
} }
/// <summary>
///
/// </summary>
/// <param name="taskid">任务id,改为算法任务id</param>
/// <param name="path">图片地址</param>
/// <param name="aiid">算法id</param>
/// <param name="drone_info">无人机信息</param>
/// <param name="tag">标签信息</param>
/// <returns></returns>
public async Task<Response<bool>> AddImgnew(string taskid, string path, string aiid, dynamic drone_info, List<TagItem> tag)
{
using (var db = UnitWork.CreateContext())
{
_logger.LogError("标签信息:" + JsonConvert.SerializeObject(tag));
_logger.LogError("aiid" + aiid);
var Ip = "http://" + _configuration["Minio:Endpoint"] + "/" + _configuration["Minio:BucketName"] + "/";
foreach (var item in tag)
{
//var aitaskinfo = await db.LasaTaskAi.GetFirstAsync(r => r.AiTaskId == taskid);
//if (aitaskinfo == null)
// continue;
var info = await db.LasaAiAchievement.GetFirstAsync(r => r.TaskId == taskid && r.Tag == item.class_id.ToString());
var modelinfo = await db.LasaModelLabel.AsQueryable().Where(r => r.PId == aiid).ToListAsync();
double lat = 0.0, lng = 0.0;
if (drone_info != null)
{
//var root = JsonNode.Parse(drone_info)?.AsObject();
//lat = root?["data"]?["latitude"]?.GetValue<double>() ?? 0.0;
//lng = root?["data"]?["longitude"]?.GetValue<double>() ?? 0.0;
lat = (double?)drone_info.data?.latitude ?? 0.0;
lng = (double?)drone_info.data?.longitude ?? 0.0;
}
if (info == null)
{
LasaAiAchievement lasaAiAchievement = new LasaAiAchievement();
lasaAiAchievement.Id = Guid.NewGuid().ToString();
lasaAiAchievement.CreateTime = DateTime.Now;
lasaAiAchievement.TaskId = taskid;
lasaAiAchievement.AiModel = "yolo12x";
lasaAiAchievement.Tag = item.class_id.ToString();
lasaAiAchievement.AlgoId = aiid;
var modelid = modelinfo.Where(r => r.EnumValue == item.class_id).FirstOrDefault();
if (modelid != null)
{
lasaAiAchievement.Title = modelid.Name;
}
else
{
lasaAiAchievement.Title = "";
}
//var confidence = tag.Select(r => r.confidence).Max();
var confidence = item.confidence;
if (confidence < 0.3)
{
break;
}
lasaAiAchievement.ConfidenceLevel = (float)Math.Round(confidence, 2) * 100;
//插入详情
LasaAiAchievementDetail lasaAiAchievementDetail = new LasaAiAchievementDetail()
{
Id = Guid.NewGuid().ToString(),
//Image = "http://175.27.168.120:6013/test/" + path,
Image = Ip + path,
AiAchievementId = lasaAiAchievement.Id,
Lat = lat,
Lng = lng
};
//lasaAiAchievement.Cover = "http://175.27.168.120:6013/test/" + path;
lasaAiAchievement.Cover = Ip + path;
lasaAiAchievement.Lat = (float)lat;
lasaAiAchievement.Lng = (float)lng;
await db.LasaAiAchievement.InsertAsync(lasaAiAchievement);
await db.LasaAiAchievementDetail.InsertAsync(lasaAiAchievementDetail);
}
else
{
//插入详情
LasaAiAchievementDetail lasaAiAchievementDetail = new LasaAiAchievementDetail()
{
Id = Guid.NewGuid().ToString(),
Image = Ip + path,
AiAchievementId = info.Id,
Lat = lat,
Lng = lng
};
await db.LasaAiAchievementDetail.InsertAsync(lasaAiAchievementDetail);
}
}
#region 推送到防火平台
//await _mqttClientManager.PublishAsync("fireclueinfo", JsonConvert.SerializeObject(
// new
// {
// ReportPerson = "",
// Describe = "",
// Lng = "",
// La = "",
// Address = "",
// AreaName = "",
// Image = path,
// DegreeType = 3
// }));
#endregion
if (db.Commit())
return new Response<bool>
{
Result = true,
Message = "添加成功"
};
else
return new Response<bool>
{
Result = false,
Message = "添加失败"
};
}
}
public async void PublishMsg(List<string> topic, string payload) public async void PublishMsg(List<string> topic, string payload)
{ {
foreach (var topicItem in topic) foreach (var topicItem in topic)

View File

@ -0,0 +1,79 @@
using Microsoft.AspNetCore.HttpOverrides;
using OpenAuth.App.ServiceApp;
using OpenAuth.Repository.Domain;
using Org.BouncyCastle.Crypto.Tls;
using SqlSugar;
namespace OpenAuth.WebApi.Model.mqtt.ChannelDB
{
public class LogBatchWriter : BackgroundService
{
private readonly LogQueueService _queue;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<LogBatchWriter> _logger;
private const int BatchSize = 100;
private const int FlushIntervalMs = 10000;
public LogBatchWriter(LogQueueService queue, IServiceScopeFactory scopeFactory, ILogger<LogBatchWriter> logger)
{
_queue = queue;
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var buffer = new List<LasaLog>(BatchSize);
while (!stoppingToken.IsCancellationRequested)
{
buffer.Clear();
var deadline = DateTime.UtcNow.AddMilliseconds(FlushIntervalMs);
while (buffer.Count < BatchSize && DateTime.UtcNow < deadline)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero) break;
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
cts.CancelAfter(remaining);
var log = await _queue.Reader.ReadAsync(cts.Token);
buffer.Add(log);
}
catch (OperationCanceledException)
{
break;
}
}
if (buffer.Count > 0)
{
await FlushAsync(buffer, stoppingToken);
}
}
}
private async Task FlushAsync(List<LasaLog> logs, CancellationToken ct)
{
try
{
using var scope = _scopeFactory.CreateScope();
// 直接从 DI 拿 SqlSugar 的 ISqlSugarClient
var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
// SqlSugar 批量插入
await db.Insertable(logs).ExecuteCommandAsync();
_logger.LogDebug($"批量写入 {logs.Count} 条日志");
}
catch (Exception ex)
{
_logger.LogError(ex, $"批量写入失败,丢失 {logs.Count} 条");
}
}
}
}

View File

@ -0,0 +1,17 @@
using OpenAuth.Repository.Domain;
using System.Threading.Channels;
namespace OpenAuth.WebApi.Model.mqtt.ChannelDB
{
public class LogQueueService
{
private readonly Channel<LasaLog> _channel = Channel.CreateBounded<LasaLog>(
new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.DropOldest
});
public ChannelWriter<LasaLog> Writer => _channel.Writer;
public ChannelReader<LasaLog> Reader => _channel.Reader;
}
}

View File

@ -3,6 +3,7 @@ using Infrastructure.Cache;
using Infrastructure.CloudSdk.mqttmessagecenter; using Infrastructure.CloudSdk.mqttmessagecenter;
using OpenAuth.App.ServiceApp; using OpenAuth.App.ServiceApp;
using OpenAuth.Repository.Domain; using OpenAuth.Repository.Domain;
using OpenAuth.WebApi.Model.mqtt.ChannelDB;
using System.Text.Json.Nodes; using System.Text.Json.Nodes;
namespace OpenAuth.WebApi.Model.mqtt namespace OpenAuth.WebApi.Model.mqtt
@ -12,11 +13,13 @@ namespace OpenAuth.WebApi.Model.mqtt
private readonly ILogger<ThingRequestHandler> _logger; private readonly ILogger<ThingRequestHandler> _logger;
AirportMaintenanceApp _app; AirportMaintenanceApp _app;
private readonly ICacheContext _cache; private readonly ICacheContext _cache;
public ThingOsdHandler(ILogger<ThingRequestHandler> logger, AirportMaintenanceApp app, ICacheContext cache) private readonly LogQueueService _logQueue;
public ThingOsdHandler(ILogger<ThingRequestHandler> logger, AirportMaintenanceApp app, ICacheContext cache, LogQueueService logQueue)
{ {
_logger = logger; _logger = logger;
_app = app; _app = app;
_cache = cache; _cache = cache;
_logQueue = logQueue;
} }
public bool CanHandle(string topic) public bool CanHandle(string topic)
@ -29,7 +32,16 @@ namespace OpenAuth.WebApi.Model.mqtt
//_logger.LogError($"[osd] Topic={topic}, Payload={payload}"); //_logger.LogError($"[osd] Topic={topic}, Payload={payload}");
//Console.WriteLine($"[osd] Topic={topic}, Payload={payload}"); //Console.WriteLine($"[osd] Topic={topic}, Payload={payload}");
var root = JsonNode.Parse(payload)?.AsObject(); var root = JsonNode.Parse(payload)?.AsObject();
await _app.AddLog(new LasaLog //await _app.AddLog(new LasaLog
//{
// Id = Guid.NewGuid().ToString(),
// Topic = topic,
// Method = root["method"]?.ToString() ?? "",
// CreateTime = DateTime.Now,
// Data = payload
//});
// ✅ 只入队,不写库
await _logQueue.Writer.WriteAsync(new LasaLog
{ {
Id = Guid.NewGuid().ToString(), Id = Guid.NewGuid().ToString(),
Topic = topic, Topic = topic,

View File

@ -1,5 +1,4 @@
using System.Reflection; using Autofac;
using Autofac;
using Autofac.Extensions.DependencyInjection; using Autofac.Extensions.DependencyInjection;
using ce.autofac.extension; using ce.autofac.extension;
using IdentityServer4.AccessTokenValidation; using IdentityServer4.AccessTokenValidation;
@ -27,10 +26,12 @@ using OpenAuth.App.ServiceApp;
using OpenAuth.Repository; using OpenAuth.Repository;
using OpenAuth.WebApi.Model; using OpenAuth.WebApi.Model;
using OpenAuth.WebApi.Model.mqtt; using OpenAuth.WebApi.Model.mqtt;
using OpenAuth.WebApi.Model.mqtt.ChannelDB;
using OpenAuth.WebApi.Model.RabbitMQService; using OpenAuth.WebApi.Model.RabbitMQService;
using OpenAuth.WebApi.SystemTask; using OpenAuth.WebApi.SystemTask;
using SqlSugar; using SqlSugar;
using Swashbuckle.AspNetCore.SwaggerUI; using Swashbuckle.AspNetCore.SwaggerUI;
using System.Reflection;
using Yitter.IdGenerator; using Yitter.IdGenerator;
namespace OpenAuth.WebApi namespace OpenAuth.WebApi
@ -186,6 +187,8 @@ namespace OpenAuth.WebApi
services.AddSingleton<IMqttMessageHandler, ThingAiTaskHandler>(); services.AddSingleton<IMqttMessageHandler, ThingAiTaskHandler>();
services.AddSingleton<IMqttMessageHandler, ThingOnboardCaseHandler>(); services.AddSingleton<IMqttMessageHandler, ThingOnboardCaseHandler>();
services.AddHostedService<MqttHostedService>(); services.AddHostedService<MqttHostedService>();
services.AddSingleton<LogQueueService>();
services.AddHostedService<LogBatchWriter>();
#endregion #endregion
services.AddHostedService<DelayedExecutionService>(); services.AddHostedService<DelayedExecutionService>();