|
|
|
|
@ -35,7 +35,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
private object _dockUploadFileLocker = new();
|
|
|
|
|
private readonly ILogger<ConfigSubscribe> _logger;
|
|
|
|
|
private readonly ConcurrentDictionary<string, DateTime> _processedMessages = new();
|
|
|
|
|
private readonly ConcurrentDictionary<string, string> liveInfo = new();
|
|
|
|
|
private readonly ConcurrentDictionary<string, string> _liveInfo = new();
|
|
|
|
|
private readonly TimeSpan _deduplicationWindow = TimeSpan.FromMinutes(1);
|
|
|
|
|
|
|
|
|
|
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
|
|
|
|
|
@ -83,19 +83,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
private async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
|
|
|
|
|
string message)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
thing/product/{device_sn}/osd 设备端定频向云平台推送的设备属性(properties),
|
|
|
|
|
具体内容范围参见物模型内容
|
|
|
|
|
thing/product/{device_sn}/state 设备端按需上报向云平台推送的设备属性(properties),
|
|
|
|
|
具体内容范围参见物模型内容
|
|
|
|
|
thing/product/{gateway_sn}/services_reply 设备对 service 的回复、处理结果 (航线任务下发之类的)
|
|
|
|
|
thing/product/{gateway_sn}/events 设备端向云平台发送的,需要关注和处理的事件。
|
|
|
|
|
比如SD满了,飞机解禁禁飞区等信息(事件范围参见物模型内容)
|
|
|
|
|
thing/product/{gateway_sn}/requests 设备端向云平台发送请求,为了获取一些信息,比如上传的临时凭证
|
|
|
|
|
sys/product/{gateway_sn}/status 设备上下线、更新拓扑
|
|
|
|
|
thing/product/{gateway_sn}/property/set_reply 设备属性设置的响应
|
|
|
|
|
thing/product/{gateway_sn}/drc/up DRC 协议上行*/
|
|
|
|
|
|
|
|
|
|
// 序列号提取
|
|
|
|
|
var sn = topic.Split("/")[2];
|
|
|
|
|
var tempStr = topic.Replace(sn, "*");
|
|
|
|
|
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
|
|
|
|
|
@ -106,10 +94,11 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
//_logger.LogInformation($"主题:{topic}\n消息:{message}");
|
|
|
|
|
long code = 0;
|
|
|
|
|
// rtmp://175.27.168.120:6019/live/
|
|
|
|
|
var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
|
|
|
|
|
//var rtmp = "rtmp://175.27.168.120:6019/live/7";
|
|
|
|
|
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
|
|
|
|
|
var rtmp = "rtmp://175.27.168.120:6019/live/7";
|
|
|
|
|
switch (tempStr)
|
|
|
|
|
{
|
|
|
|
|
// 目前主要处理了获取航线文件及临时凭证上传
|
|
|
|
|
case "thing/product/*/requests":
|
|
|
|
|
switch (method)
|
|
|
|
|
{
|
|
|
|
|
@ -149,7 +138,6 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
// thing/product/{gateway_sn}/requests_reply
|
|
|
|
|
var tempTopic = $"thing/product/{sn}/requests_reply";
|
|
|
|
|
await _mqttClientManager.PublishAsync(tempTopic,
|
|
|
|
|
JsonConvert.SerializeObject(storageConfigRequest));
|
|
|
|
|
@ -205,6 +193,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
// 主要处理了文件回传及航线进度
|
|
|
|
|
case "thing/product/*/events":
|
|
|
|
|
switch (method)
|
|
|
|
|
{
|
|
|
|
|
@ -552,7 +541,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
var waylineMissionState = (int)data.output.ext.wayline_mission_state;
|
|
|
|
|
_logger.LogDebug("航线进度未跳过处理");
|
|
|
|
|
code = data.result; // result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var taskAssign1 = _manageApp.GetTaskAssignByFlightId(flightId1);
|
|
|
|
|
// 处理航线进度 ,也有可能是失败
|
|
|
|
|
if (code != 0)
|
|
|
|
|
@ -592,7 +581,6 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
// 航线成功
|
|
|
|
|
if (waylineMissionState.Equals(9)) // 航结结束,更新任务状态
|
|
|
|
|
{
|
|
|
|
|
@ -626,6 +614,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
};
|
|
|
|
|
await _sqlSugarClient.Updateable(record).IgnoreNullColumns().ExecuteCommandAsync();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var step = (int)data.output.progress.current_step;
|
|
|
|
|
_logger.LogDebug($"航线进度:{waylineMissionState} {step} {message}");
|
|
|
|
|
if (step.Equals(25)) // todo 关于会接收到不同消息问题,如何处理
|
|
|
|
|
@ -639,12 +628,15 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
_logger.LogInformation("跳过处理");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var task = await _sqlSugarClient.Queryable<LasaTask>()
|
|
|
|
|
.FirstAsync(y => y.Id == taskAssign1.TaskId);
|
|
|
|
|
if (task != null && !string.IsNullOrEmpty(task.AIInspection) &&
|
|
|
|
|
task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl))
|
|
|
|
|
{
|
|
|
|
|
_logger.LogDebug("执行AI 智能巡检。。。。。");
|
|
|
|
|
var cameraIndex = "99-0-0";
|
|
|
|
|
// todo 查询获取
|
|
|
|
|
var uavSn = "1581F8HGX254V00A0BUY";
|
|
|
|
|
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
|
|
|
|
|
var bid = Guid.NewGuid().ToString();
|
|
|
|
|
var tid = Guid.NewGuid().ToString();
|
|
|
|
|
@ -656,7 +648,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
""data"": {{
|
|
|
|
|
""url_type"": 1,
|
|
|
|
|
""url"": ""{rtmp}"",
|
|
|
|
|
""video_id"": ""1581F8HGX254V00A0BUY/99-0-0/normal-0"",
|
|
|
|
|
""video_id"": ""{uavSn}/{cameraIndex}/normal-0"",
|
|
|
|
|
""video_quality"": 3
|
|
|
|
|
}}
|
|
|
|
|
}}";
|
|
|
|
|
@ -664,10 +656,9 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
var topicRequest = $"thing/product/{sn}/services";
|
|
|
|
|
// 开启直播
|
|
|
|
|
await _mqttClientManager.PublishAsync(topicRequest, param);
|
|
|
|
|
liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId;
|
|
|
|
|
_liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*if (result.need_reply.Equals(1))
|
|
|
|
|
@ -703,9 +694,6 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
// 任务资源处理
|
|
|
|
|
// 航线进度处理
|
|
|
|
|
// 任务取消 thing/product/*/services_reply
|
|
|
|
|
case "thing/product/*/services_reply":
|
|
|
|
|
switch (method)
|
|
|
|
|
{
|
|
|
|
|
@ -714,13 +702,14 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 已验证tid bid 是相同的
|
|
|
|
|
// 开启直播成功调用ai model
|
|
|
|
|
_logger.LogDebug($"开启直播成功 {message}");
|
|
|
|
|
// 关于直播是否开启成功
|
|
|
|
|
// 取得taskid 然后从liveInfo中移除
|
|
|
|
|
var tempTaskId = liveInfo[$"{result.tid}{result.bid}"];
|
|
|
|
|
liveInfo.TryRemove($"{result.tid}{result.bid}", out _);
|
|
|
|
|
var tempTaskId = _liveInfo[$"{result.tid}{result.bid}"];
|
|
|
|
|
_liveInfo.TryRemove($"{result.tid}{result.bid}", out _);
|
|
|
|
|
var req = new CallAiModel { TaskId = tempTaskId, RtmpUrl = rtmp };
|
|
|
|
|
await _manageApp.CallAiModel(req);
|
|
|
|
|
break;
|
|
|
|
|
|