feat(mqtt): 优化MQTT消息处理逻辑并调整摄像头索引配置

- 将HandleTopic方法从private改为public以支持外部调用
- 修改LasaTask查询方式从FirstAsync为SingleAsync确保唯一性
- 调整M3TD类型设备的摄像头索引值从99-0-1为81-0-0
- 重新组织直播参数构建逻辑并提前设置_liveInfo字典
- 移除冗余的命名空间引用减少代码体积
- 注入ConfigSubscribe服务并在ManageController中添加测试接口
- 添加TestConfSubscribe接口用于调试MQTT消息处理流程
main
陈伟 2025-11-22 08:44:37 +08:00
parent 2447f290ba
commit c946b67ee0
3 changed files with 29 additions and 15 deletions

View File

@ -82,7 +82,7 @@ public class ConfigSubscribe : IJob
}
private async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
public async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
string message)
{
// 序列号提取
@ -675,9 +675,8 @@ public class ConfigSubscribe : IJob
_logger.LogInformation("跳过处理");
break;
}
var task = await _sqlSugarClient.Queryable<LasaTask>()
.FirstAsync(y => y.Id == taskAssign1.TaskId);
.SingleAsync(y => y.Id == taskAssign1.TaskId);
if (task != null && !string.IsNullOrEmpty(task.AIInspection) &&
task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl))
{
@ -693,7 +692,8 @@ public class ConfigSubscribe : IJob
var cameraIndex = "99-0-0";
if (lasaDronePort.TypeId == "M3TD")
{
cameraIndex = "99-0-1";
// 81-0-0
cameraIndex = "81-0-0";
}
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
@ -707,15 +707,17 @@ public class ConfigSubscribe : IJob
""data"": {{
""url_type"": 1,
""url"": ""{rtmp}{lasaDronePort.UavSn}"",
""video_id"": ""{lasaDronePort.uavSn}/{cameraIndex}/normal-0"",
""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0"",
""video_quality"": 3
}}
}}";
// todo 到这里未执行?? 确定是否是123上执行的 但如何后面没执行,怎么开启的直播呢?
// todo 关于开启直播的,再在这里调用开启直播,是否还会继续执行下去?
_logger.LogDebug($"直播参数:{param}");
_liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId + "," + lasaDronePort.UavSn;
var topicRequest = $"thing/product/{sn}/services";
// 开启直播
await _mqttClientManager.PublishAsync(topicRequest, param);
_liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId + "," + lasaDronePort.UavSn;
}
}
}
@ -770,7 +772,6 @@ public class ConfigSubscribe : IJob
var tempTaskId = _liveInfo[$"{result.tid}{result.bid}"].Split(",");
_logger.LogDebug($"智能巡检TaskId:{tempTaskId[0]}");
_liveInfo.TryRemove($"{result.tid}{result.bid}", out _);
// todo
var req = new CallAiModel { TaskId = tempTaskId[0], RtmpUrl = rtmp + tempTaskId[1] };
_logger.LogDebug($"智能巡检调用参数:{JsonConvert.SerializeObject(req)}");
_ = _manageApp.CallAiModel(req);

View File

@ -1,9 +1,4 @@
using SqlSugar;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace OpenAuth.Repository.Domain
{
@ -153,9 +148,9 @@ namespace OpenAuth.Repository.Domain
/// 断点信息
/// </summary>
public string BreakPoint { get; set; }
public string PushUrl { get; set; }
public int? FlyNumber { get; set; }
[SugarColumn(IsIgnore = true)] public string AlgoInstanceId { get; set; }

View File

@ -6,6 +6,7 @@ using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using OpenAuth.App.BaseApp.Subscribe;
using OpenAuth.App.ServiceApp;
using OpenAuth.App.ServiceApp.AirLine.Request;
using OpenAuth.App.ServiceApp.FlyTask.Request;
@ -26,10 +27,13 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
private readonly ManageApp _app;
private readonly MqttClientManager _mqttClientManager;
public ManageController(ManageApp app, MqttClientManager mqttClientManager)
private readonly ConfigSubscribe _configSubscribe;
public ManageController(ManageApp app, MqttClientManager mqttClientManager, ConfigSubscribe configSubscribe)
{
_app = app;
_mqttClientManager = mqttClientManager;
_configSubscribe = configSubscribe;
}
#region 机场管理
@ -986,6 +990,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
return await _app.ListDronePort(lng, lat);
}
// todo
[HttpGet]
[AllowAnonymous]
@ -993,5 +998,18 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
return await _app.GetDronePortInfo(dronePortSn);
}
/// <summary>
/// 用于测试各种mqtt消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
[HttpPost]
//[AllowAnonymous]
public async void TestConfSubscribe(String topic, String message)
{
await _configSubscribe.HandleTopic(null, topic, message);
}
}
}