1. bugfix: 直播失败修复

2. bugfix: 计划任务调用ai 模型兼容性问题
main
陈伟 1 week ago
parent 53addaf7a7
commit 67b7cd9439

@ -226,7 +226,7 @@ public class MqttClientManager
var result = await _outBoundClient.PublishAsync(mqttMsg, CancellationToken.None);
if (result.IsSuccess)
{
//Console.WriteLine($"{topic} {message}发布成功");
Console.WriteLine($"{topic} {message}发布成功");
}
else
{

@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Drawing;
using System.Dynamic;
@ -19,6 +20,7 @@ using Microsoft.Net.Http.Headers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using OpenAuth.App.BaseApp.Base;
using OpenAuth.App.BaseApp.Subscribe;
using OpenAuth.App.BasicQueryService;
using OpenAuth.App.Interface;
using OpenAuth.App.Request;
@ -44,6 +46,8 @@ namespace OpenAuth.App.ServiceApp
private readonly OpenJobApp _openJobApp;
private readonly ILogger<LasaDronePort> _logger;
CommonDataManager _commonDataManager;
private readonly ConcurrentDictionary<string, DateTime> _processedMessages = new();
private readonly TimeSpan _deduplicationWindow = TimeSpan.FromMinutes(1);
public ManageApp(ISugarUnitOfWork<SugarDbContext> unitWork, ISimpleClient<LasaDronePort> repository, IAuth auth,
MqttClientManager mqttClientManager, CommonDataManager commonDataManager, MinioService minioService,
@ -2406,6 +2410,14 @@ namespace OpenAuth.App.ServiceApp
// 更新
await db.Updateable(aiInspection).IgnoreNullColumns().ExecuteCommandAsync();
}
else
{
var aiInspection = Repository
.ChangeRepository<SugarRepositiry<LasaAiInspection>>()
.AsQueryable().Where(x => x.TaskId == req.TaskId).First();
req.AlgoInstanceId = aiInspection.AlgoInstanceId;
}
var algoInstances = await db
.Queryable<LasaAlgoInstance>()
.Where(x => x.Id == req.AlgoInstanceId)
@ -2438,13 +2450,14 @@ namespace OpenAuth.App.ServiceApp
await db.Updateable(taskRecord).IgnoreNullColumns().ExecuteCommandAsync();
var tag = await db
.Queryable<LasaModelLabel>()
.Where(x => tagsIds.Contains(x.Id))
.Select(x => x.EnumValue)
.Where(l => tagsIds.Contains(l.Id))
.Select(l => l.EnumValue)
.ToArrayAsync();
json.tag = new int [0, 1, 2, 3, 4, 5];
var content = new StringContent(JsonConvert.SerializeObject(json), Encoding.UTF8, "application/json");
using var httpClient = new HttpClient();
var response = await httpClient.PostAsync("http://192.168.10.131:9025/start_detection", content);
_logger.LogDebug($"成功调用{response.IsSuccessStatusCode}");
db.Ado.CommitTran();
}
catch (Exception ex)
@ -2465,7 +2478,7 @@ namespace OpenAuth.App.ServiceApp
var task = new LasaTask()
{
Id = taskid,
// ScheduledEndTime = DateTime.Now,
// ScheduledEndTime = DateTime.Now,
CompletedTime = DateTime.Now,
Status = 5 // 成功
};
@ -2491,5 +2504,148 @@ namespace OpenAuth.App.ServiceApp
Result = true
};
}
public bool IsDuplicate(string messageId)
{
var now = DateTime.UtcNow;
// 清理过期消息
foreach (var kvp in _processedMessages)
{
if (now - kvp.Value > _deduplicationWindow)
{
_processedMessages.TryRemove(kvp.Key, out _);
}
}
// 检查是否已存在
if (_processedMessages.ContainsKey(messageId))
{
return true;
}
_processedMessages[messageId] = now;
return false;
}
public async Task<Response<bool>> TestZhiBao(string message)
{
var sn = "8UUXN5400A079H";
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
var method = result.method;
var data = result.data;
//_logger.LogInformation($"主题:{topic}\n消息{message}");
long code = 0;
var isHandle = IsDuplicate(Md5.Encrypt(message));
//_logger.LogDebug($"md5: {isHandle} 重复否:{IsDuplicate(Md5.Encrypt(message))} 信息:{message} ");
if (isHandle)
{
_logger.LogInformation("跳过处理");
return null;
}
_logger.LogDebug("航线进度未跳过处理");
code = data.result; // result
var waylineMissionState = (int)data.output.ext.wayline_mission_state;
string flightId1 = (string)data.output.ext.flight_id;
var taskAssign1 = GetTaskAssignByFlightId(flightId1);
// 处理航线进度 ,也有可能是失败
var step = (int)data.output.progress.current_step;
_logger.LogDebug($"航线进度:{waylineMissionState} {step} {message}");
if (step.Equals(25)) // 航线执行
{
var task = await Repository.AsSugarClient().Queryable<LasaTask>()
.FirstAsync(y => y.Id == taskAssign1.TaskId);
if (task != null && !string.IsNullOrEmpty(task.AIInspection) &&
task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl))
{
_logger.LogDebug("执行AI 智能巡检。。。。。");
var rtmp = "rtmp://box.wisestcity.com:1935/live/55";
var param =
@$"{{""bid"": ""{Guid.NewGuid().ToString()}"",""method"": ""live_start_push"",""tid"": ""{Guid.NewGuid().ToString()}"",
""timestamp"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""data"": {{
""url_type"": 1,
""url"": ""{rtmp}"",
""video_id"": ""8UUXN5400A079H/165-0-7/normal-0"",
""video_quality"": 3
}}
}}";
//thing/product/{gateway_sn}/services
var topicRequest = $"thing/product/{sn}/services";
var x = RemoveSpecificChars(param);
_logger.LogDebug($"直播参数:{x}");
await _mqttClientManager.PublishAsync(topicRequest, x);
var req = new CallAiModel { TaskId = taskAssign1.TaskId, RtmpUrl = rtmp };
await CallAiModel(req);
}
}
// 航线成功
/*
if (waylineMissionState.Equals(9)) // 航结结束,更新任务状态
{
var task = await Repository.AsSugarClient().Queryable<LasaTask>()
.FirstAsync(y => y.Id == taskAssign1.TaskId);
if (!string.IsNullOrEmpty(task.AIInspection) && task.AIInspection.Equals(true) &&
!string.IsNullOrEmpty(task.PushUrl))
{
// todo 停止直播
// todo 停止 aimodel 运行
var para = @$"{{
""bid"": ""{Guid.NewGuid().ToString()}"",
""data"": {{
""video_id"": ""8UUXN5400A079H/165-0-7/normal-0""
}},
""tid"":""{Guid.NewGuid().ToString()}"",
""timestamp:"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""method"": ""live_stop_push""
}}";
var topicRequest = $"thing/product/{sn}/services";
await _mqttClientManager.PublishAsync(topicRequest, RemoveSpecificChars(para));
using var httpClient = new HttpClient();
await httpClient.PostAsync("http://192.168.10.131:9025/stop_detection", null);
}
var record = new LasaTask()
{
Id = taskAssign1.TaskId,
Status = 5
};
// await Repository.AsSugarClient().Updateable(record).IgnoreNullColumns().ExecuteCommandAsync();
}
*/
return null;
}
public void CloseZhibo(string videoId)
{
var sn = "8UUXN5400A079H";
var para = @$"{{
""bid"": ""{Guid.NewGuid().ToString()}"",
""data"": {{
""video_id"": ""8UUXN5400A079H/165-0-7/normal-0""
}},
""tid"":""{Guid.NewGuid().ToString()}"",
""timestamp:"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""method"": ""live_stop_push""
}}";
var topicRequest = $"thing/product/{sn}/services";
_mqttClientManager.PublishAsync(topicRequest, para);
using var httpClient = new HttpClient();
httpClient.PostAsync("http://192.168.10.131:9025/stop_detection", null);
}
public static string RemoveSpecificChars(string input)
{
if (string.IsNullOrEmpty(input))
return input;
var charsToRemove = new char[] { ' ', '\r', '\n', '\t' };
return string.Concat(input.Where(c => !charsToRemove.Contains(c)));
}
}
}

@ -153,12 +153,6 @@ public class ConfigSubscribe : IJob
case "flight_areas_get":
//Console.WriteLine("跳过自定义飞行区文件获取");
break;
//{"bid":"f936a236-030c-4358-bee9-b5075e1e2ddf",
//"data":{"flight_id":"e5ce8433-c264-4357-84d9-b701faf90d9e"},
//"method":"flighttask_resource_get",
//"tid":"61b6389a-7b72-49ae-bb46-0729e85c95d2",
//"timestamp":1750554644321,
//"gateway":"8UUXN5400A079H"}
// 获取航线
case "flighttask_resource_get":
string flightId = data.flight_id + "";
@ -573,12 +567,14 @@ public class ConfigSubscribe : IJob
// "28":"降落以后的关盖"
// "29":"机场退出工作模式"
// "30":"机场异常恢复","31":"机场上传飞行系统日志","32":"相机录像状态检查","33":"获取媒体文件数量","34":"机场起飞开盖的异常恢复","35":"通知任务结果","36":"日志列表拉取 - 飞行器列表","37":"日志列表拉取 - 拉取机场列表","38":"日志列表拉取 - 上传日志列表结果","39":"日志拉取-拉取飞行器日志","40":"日志拉取-拉取机场日志","41":"日志拉取-压缩飞行器日志","42":"日志拉取-压缩机场日志","43":"日志拉取-上传飞行器日志","44":"日志拉取-上传机场日志","45":"日志拉取-通知结果","46":"自定义飞行区文件更新准备中","47":"自定义飞行区更新中","48":"离线地图更新准备中","49":"离线地图更新中","65533":"结束后等待服务回包","65534":"无具体状态","65535":"UNKNOWN"}
_logger.LogDebug($"md5: {Md5.Encrypt(message)} 重复否:{IsDuplicate(Md5.Encrypt(message))} 信息:{message} ");
if (IsDuplicate(Md5.Encrypt(message)))
var isHandle = IsDuplicate(Md5.Encrypt(message));
_logger.LogDebug($"md5: {isHandle} 重复否:{IsDuplicate(Md5.Encrypt(message))} 信息:{message} ");
if (isHandle)
{
_logger.LogInformation("跳过处理");
break;
}
_logger.LogDebug("航线进度未跳过处理");
code = data.result; // result
var waylineMissionState = (int)data.output.ext.wayline_mission_state;
@ -632,14 +628,15 @@ public class ConfigSubscribe : IJob
task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl))
{
_logger.LogDebug("执行AI 智能巡检。。。。。");
var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
var param = @$"{{
""bid"": {Guid.NewGuid().ToString()},
""bid"": ""{Guid.NewGuid().ToString()}"",
""method"": ""live_start_push"",
""tid"": {Guid.NewGuid().ToString()},
""tid"": ""{Guid.NewGuid().ToString()}"",
""timestamp"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""data"": {{
""url_type"": 1,
""url"": ""rtmp://box.wisestcity.com:1935/live/7"",
""url"": ""{rtmp}"",
""video_id"": ""1581F8HGX254V00A0BUY/99-0-0/normal-0"",
""video_quality"": 3
}}
@ -647,7 +644,7 @@ public class ConfigSubscribe : IJob
_logger.LogDebug($"直播参数:{param}");
var topicRequest = $"thing/product/{sn}/services";
await _mqttClientManager.PublishAsync(topicRequest, param);
var req = new CallAiModel { TaskId = taskAssign1.TaskId };
var req = new CallAiModel { TaskId = taskAssign1.TaskId ,RtmpUrl = rtmp};
await _manageApp.CallAiModel(req);
}
}
@ -726,8 +723,15 @@ public class ConfigSubscribe : IJob
switch (method)
{
case "live_start_push":
if (IsDuplicate(Md5.Encrypt(message)))
{
break;
}
_logger.LogDebug($"开启直播成功 {message}");
break;
case "live_stop_push":
_logger.LogDebug($"停止直播成功 {message}");
break;
case "flighttask_prepare": // 下发任务响应
// 顺序处理,多余的不再处理
lock (_locker)

@ -891,5 +891,18 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
return await _app.EndHandFlyTask(taskid);
}
[HttpPost]
[AllowAnonymous]
public async Task<Response<bool>> TestZhiBao(string message)
{
return await _app.TestZhiBao(message);
}
[HttpPost]
[AllowAnonymous]
public async void CloseZhibo(string videoId)
{
_app.CloseZhibo(videoId);
}
}
}
Loading…
Cancel
Save