LASAPlatform/OpenAuth.App/ServiceApp/Subscribe/ConfigSubscribe.cs

936 lines
50 KiB
C#
Raw Normal View History

using System.Collections.Concurrent;
using System.Dynamic;
2025-08-08 14:45:57 +08:00
using System.Net;
using System.Net.Http.Headers;
using System.Text;
using Infrastructure;
using Infrastructure.Cache;
using Infrastructure.CloudSdk;
using Infrastructure.CloudSdk.minio;
using Infrastructure.CloudSdk.wayline;
using Infrastructure.Extensions;
using Infrastructure.Helpers;
using MetadataExtractor;
using MetadataExtractor.Formats.Exif;
2025-08-21 10:09:46 +08:00
using MetadataExtractor.Formats.Xmp;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet.Client;
using Newtonsoft.Json;
using OpenAuth.App.ServiceApp;
using OpenAuth.App.ServiceApp.FlyTask.Request;
using OpenAuth.App.ServiceApp.Subscribe;
using OpenAuth.Repository.Domain;
using OpenAuth.WebApi;
using Quartz;
using SqlSugar;
namespace OpenAuth.App.BaseApp.Subscribe;
public class ConfigSubscribe : IHostedService
{
private readonly MqttClientManager _mqttClientManager;
private readonly ISqlSugarClient _sqlSugarClient;
private readonly RedisCacheContext _redisCacheContext;
private readonly ManageApp _manageApp;
private readonly MinioService _minioService;
private object _locker = new();
private object _dockUploadFileLocker = new();
private readonly ILogger<ConfigSubscribe> _logger;
private readonly ConcurrentDictionary<string, DateTime> _processedMessages = new();
2025-09-11 14:49:56 +08:00
private readonly ConcurrentDictionary<string, string> _liveInfo = new();
private readonly TimeSpan _deduplicationWindow = TimeSpan.FromMinutes(1);
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService,
ILogger<ConfigSubscribe> logger)
{
_mqttClientManager = mqttClientManager;
_sqlSugarClient = sqlSugarClient;
_redisCacheContext = redisCacheContext as RedisCacheContext;
_manageApp = manageApp;
_minioService = minioService;
_logger = logger;
}
private async Task Subscribe()
{
2025-06-30 09:11:36 +08:00
// 或者 thing/product/#
// sys/product/#
string[] topicList =
{
"thing/product/+/services_reply",
"thing/product/+/events",
2025-07-08 09:30:35 +08:00
"thing/product/+/requests",
"thing/aircraft/+/service_reply" //大飞机推流消息
//"thing/product/+/osd",
//"thing/product/+/status"
};
2025-08-05 11:04:27 +08:00
_logger.LogInformation("开启监听");
await _mqttClientManager
.SubscribeAsync(topicList,
async (args) =>
{
//args.IsHandled = false; // todo 待实验确定
//args.AutoAcknowledge = true; // todo 待实验确定
await HandleTopic(args, args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
});
}
2025-07-31 10:15:42 +08:00
public async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
string message)
{
2025-09-11 17:21:33 +08:00
// 序列号提取
var sn = topic.Split("/")[2];
2025-06-20 15:58:52 +08:00
var tempStr = topic.Replace(sn, "*");
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
// 主题方法
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
var method = result.method;
var data = result.data;
2025-08-27 16:33:05 +08:00
//_logger.LogInformation($"主题:{topic}\n消息{message}");
long code = 0;
2025-09-09 09:40:41 +08:00
// rtmp://175.27.168.120:6019/live/
2025-09-11 14:49:56 +08:00
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
2025-09-26 15:07:02 +08:00
// 机场推流地址地址
var config = ConfigHelper.GetConfigRoot();
var rtmp = config["AIModelApi:DronePortRtmp"];
//var rtmp = "rtmp://175.27.168.120:6019/live/";
switch (tempStr)
{
2025-09-11 17:21:33 +08:00
// 目前主要处理了获取航线文件及临时凭证上传
case "thing/product/*/requests":
switch (method)
{
// 临时凭证上传
case "storage_config_get":
_logger.LogInformation($"进入临时凭证获取处理");
// 配置中读取minio配置
var storageConfigRequest = new TopicServicesRequest<object>()
{
method = "storage_config_get",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
{
result = 0,
output = new
{
2025-08-08 14:21:16 +08:00
bucket = _minioService._bucketName,
endpoint = $"http://{_minioService.endPoint}",
2025-07-15 08:44:51 +08:00
object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id
provider = "minio",
region = "linyi",
credentials = new
{
2025-08-08 14:21:16 +08:00
access_key_id = $"{_minioService.AccessKey}",
access_key_secret = $"{_minioService.SecretKey}",
2025-07-15 08:44:51 +08:00
expire = 3600,
security_token = ""
}
}
}
};
var tempTopic = $"thing/product/{sn}/requests_reply";
await _mqttClientManager.PublishAsync(tempTopic,
JsonConvert.SerializeObject(storageConfigRequest));
break;
case "flight_areas_get":
//Console.WriteLine("跳过自定义飞行区文件获取");
break;
// 获取航线
case "flighttask_resource_get":
string flightId = data.flight_id + "";
_logger.LogInformation($"进入资源获取处理: 任务id {flightId}");
// eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
// md5 585c833012ddb794eaac1050ef71aa31
2025-07-17 09:17:45 +08:00
// 这一小段运行异常
var taskAssign = await _sqlSugarClient
.Queryable<LasaTaskAssign>()
.Where(x => x.FlightId == flightId)
.SingleAsync();
_logger.LogInformation($"任务信息:{JsonConvert.SerializeObject(taskAssign)}");
/*var taskAssign =
manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, flightId);*/
var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
dynamic outData = new ExpandoObject();
outData.result = 0;
outData.output = new
{
file = new
{
fingerprint = taskAssign.Md5,
url = taskAssign.Wpml
}
};
var outRequest = new TopicServicesRequest<object>()
{
bid = result.bid,
data = outData,
tid = result.tid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
method = "flighttask_resource_get"
};
2025-06-30 09:11:36 +08:00
/*Console.WriteLine(
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");*/
await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
JsonConvert.SerializeObject(outRequest));
break;
case "config":
break;
default:
2025-11-14 10:12:23 +08:00
//_logger.LogInformation($"未处理消息:{message}");
break;
}
break;
2025-09-11 17:21:33 +08:00
// 主要处理了文件回传及航线进度
case "thing/product/*/events":
2025-06-30 09:11:36 +08:00
switch (method)
{
2025-06-30 09:11:36 +08:00
case "file_upload_callback":
// 文件上传
_logger.LogDebug("进入文件上传处理");
_logger.LogDebug($"文件上传处理:{message}");
//飞行任务 0 手飞任务 1
// todo 如果是手飞任务,生成任务 关于任务类型?
int flightType = data.flight_task.flight_type;
2025-07-15 08:44:51 +08:00
string flightId = data.file.ext.flight_id;
2025-08-21 10:09:46 +08:00
// 关于flightId 没有值的问题怎么办???
2025-07-17 09:17:45 +08:00
var taskAssign = _manageApp.GetTaskAssignByFlightId(flightId);
2025-08-21 10:09:46 +08:00
var taskId = "";
var taskName = "";
var workspaceId = "";
LasaTask executeTask = null;
2025-08-21 10:09:46 +08:00
if (taskAssign != null)
{
taskId = taskAssign.TaskId;
executeTask = await _sqlSugarClient
2025-08-21 10:09:46 +08:00
.Queryable<LasaTask>()
.SingleAsync(a => a.Id == taskId);
if (!string.IsNullOrEmpty(executeTask.TaskName))
{
taskName = executeTask.TaskName;
}
if (!string.IsNullOrEmpty(executeTask.WorkspaceId))
{
workspaceId = executeTask.WorkspaceId;
}
}
else
{
2025-08-25 16:54:30 +08:00
taskName = "手飞任务";
2025-08-21 10:09:46 +08:00
}
2025-07-16 09:18:15 +08:00
string objectKey = data.file.object_key;
var folderKey = ((string)data.file.object_key).Split("/");
2025-07-18 14:23:28 +08:00
var parentKey = folderKey[2];
if (flightType.Equals(1))
{
parentKey = flightId;
}
var isExist = await _sqlSugarClient
.Queryable<LasaMediaFile>()
.Where(x => x.Id.Equals(parentKey)).CountAsync();
2025-09-26 15:07:02 +08:00
if (isExist == 0) //
{
2025-08-05 11:04:27 +08:00
var date = DateTime.Now;
var timeStr = date.ToString("yyyy-MM-dd HH:mm:ss");
var parent1 = new LasaMediaFile()
{
Id = parentKey,
FlightId = flightId,
TaskId = taskId,
ParentKey = "0",
2025-08-21 10:09:46 +08:00
Name = $"{taskName} {timeStr}",
WorkspaceId = workspaceId,
2025-08-05 11:04:27 +08:00
CreateTime = date,
};
await _sqlSugarClient.Insertable(parent1).ExecuteCommandAsync();
}
2025-07-16 09:18:15 +08:00
// 重复检测
var mediaFile = await _sqlSugarClient
.Queryable<LasaMediaFile>()
.Where(a => a.FlightId.Equals(flightId))
.Where(a => a.ObjectKey.Equals(objectKey)).SingleAsync();
if (mediaFile == null)
2025-06-30 09:11:36 +08:00
{
var type = 0;
var preSize = 1;
// 判断是不是图片
if (objectKey.EndsWith(".jpeg")) // todo 是否有其它类型的图片,待确定
{
preSize = 65535;
var fileName = Path.GetFileNameWithoutExtension(objectKey);
var fileNameParts = fileName.Split("_");
var suffix = fileNameParts[^1];
type = suffix switch
{
// 0 未知 1 可见光 2 红外 3 变焦 4.广角 5 视频
"V" => 1,
"T" => 2,
"Z" => 3, // 变焦
"W" => 4, // 广角
_ => type
};
}
else if (objectKey.EndsWith(".mp4"))
{
type = 5;
}
2025-08-08 14:21:16 +08:00
string suoluokey = "";
long? fileSize = 0;
int width = 0, height = 0, focalLength = 0;
int offset = 0, length = 0;
2025-08-08 09:51:12 +08:00
string model = "";
float? gimbalRoll = 0, gimbalPitch = 0;
2025-08-21 13:50:23 +08:00
float? digitalZoomRatio = 1;
var fileUrl = "http://" + _minioService.endPoint + "/" + _minioService._bucketName +
"/" + objectKey;
using (var httpClient = new HttpClient())
{
suoluokey = "minipic/" + data.file.name.ToString();
// 目前读取64KB
// 添加Range请求头
httpClient.DefaultRequestHeaders.Range =
new RangeHeaderValue(0, preSize);
try
{
var response = httpClient
.GetAsync(fileUrl, HttpCompletionOption.ResponseHeadersRead).Result;
if (response.StatusCode == HttpStatusCode.PartialContent)
{
var contentRange = response.Content.Headers.ContentRange;
if (contentRange != null)
{
fileSize = contentRange.Length.Value;
}
if (objectKey.ToLower().EndsWith("jpeg"))
{
2025-08-12 16:25:47 +08:00
// 成功获取部分内容
var y = response.Content.ReadAsByteArrayAsync().Result;
2025-08-21 10:09:46 +08:00
var ms = new MemoryStream(y);
2025-08-12 16:25:47 +08:00
var directories = ImageMetadataReader.ReadMetadata(ms);
2025-08-21 10:09:46 +08:00
var xmpDirectory = directories.OfType<XmpDirectory>().FirstOrDefault();
if (xmpDirectory != null)
{
var xmpXml = xmpDirectory.GetXmpProperties();
foreach (var keyValuePair in xmpXml)
{
switch (keyValuePair.Key)
{
2025-08-21 13:50:23 +08:00
case "drone-dji:GimbalPitchDegree":
2025-08-21 10:09:46 +08:00
gimbalPitch = float.Parse(keyValuePair.Value);
break;
2025-08-21 13:50:23 +08:00
case "drone-dji:GimbalRollDegree":
2025-08-21 10:09:46 +08:00
gimbalRoll = float.Parse(keyValuePair.Value);
break;
}
}
}
2025-08-12 16:25:47 +08:00
foreach (var directory in directories)
{
2025-08-12 16:25:47 +08:00
if (directory is ExifDirectoryBase)
2025-08-08 09:51:12 +08:00
{
2025-08-12 16:25:47 +08:00
if (directory.Name.Equals("Exif IFD0"))
2025-08-08 09:51:12 +08:00
{
2025-08-12 16:25:47 +08:00
foreach (var tag in directory.Tags)
2025-08-08 09:51:12 +08:00
{
2025-08-12 16:25:47 +08:00
if (tag.Name.Equals("Model"))
{
model = tag.Description;
}
2025-08-08 09:51:12 +08:00
}
}
2025-08-12 16:25:47 +08:00
if (directory.Name.Equals("Exif SubIFD"))
{
2025-08-21 13:50:23 +08:00
// Digital Zoom Ratio: 1 Exif SubIFD
2025-08-12 16:25:47 +08:00
foreach (var tag in directory.Tags)
{
2025-08-21 13:50:23 +08:00
if (tag.Name.Equals("Digital Zoom Ratio"))
{
digitalZoomRatio = float.Parse(tag.Description);
}
2025-08-12 16:25:47 +08:00
if (tag.Name.Equals("Exif Image Width"))
{
width = int.Parse(tag.Description.Replace("pixels", "")
.Trim());
}
2025-08-12 16:25:47 +08:00
if (tag.Name.Equals("Exif Image Height"))
{
height = int.Parse(tag.Description.Replace("pixels", "")
.Trim());
}
2025-08-12 16:25:47 +08:00
if (tag.Name.Equals("Focal Length 35"))
{
focalLength = int.Parse(tag.Description
.Replace("mm", "")
.Trim());
}
}
}
2025-08-12 16:25:47 +08:00
//Console.WriteLine(directory.Name);
if (directory.Name.Equals("Exif Thumbnail"))
{
2025-08-12 16:25:47 +08:00
foreach (var tag in directory.Tags)
{
2025-08-12 16:25:47 +08:00
if (tag.Name.Equals("Thumbnail Offset"))
{
offset = int.Parse(tag.Description.Replace("bytes", "")
.Trim());
}
2025-08-12 16:25:47 +08:00
if (tag.Name.Equals("Thumbnail Length"))
{
length = int.Parse(tag.Description.Replace("bytes", "")
.Trim());
}
}
}
}
}
2025-08-12 16:25:47 +08:00
ms.Seek(offset + 6, SeekOrigin.Begin);
byte[] buffer = new byte[length];
int bytesRead = ms.Read(buffer, 0, length);
// 上传缩略图到MinIO
await _minioService.PutObjectAsync("", data.file.name.ToString(), suoluokey,
new MemoryStream(buffer));
}
}
else if (response.StatusCode == HttpStatusCode.OK)
{
// 服务器不支持Range请求返回完整内容
throw new InvalidOperationException("服务器不支持Range请求");
}
else
{
throw new HttpRequestException($"请求失败: {response.StatusCode}");
}
}
catch (Exception ex)
{
throw new Exception($"执行错误: {ex.Message}", ex);
}
}
2025-08-12 16:25:47 +08:00
var createdTimeStr = (string)data.file.metadata.created_time;
var createTime = string.IsNullOrEmpty(createdTimeStr)
? DateTime.Now
: createdTimeStr.ToDateTime();
_logger.LogDebug("执行到保存媒体文件之前");
2025-07-16 09:18:15 +08:00
var fileUpload = new LasaMediaFile()
{
Id = Guid.NewGuid().ToString(),
FlightId = flightId, // 计划id
TaskId = taskId, // 任务id
2025-07-16 09:18:15 +08:00
DroneModelKey = data.file.ext.drone_model_key, // 无人机型号
2025-08-05 11:04:27 +08:00
PayloadModelKey = data.file.ext.payload_model_key, //这应该可以标明是什么设置
2025-07-16 09:18:15 +08:00
IsOriginal = data.file.ext.is_original,
MediaIndex = data.file.ext.media_index,
AbsoluteAltitude = data.file.metadata.absolute_altitude, // 拍摄绝对高度
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
RelativeAltitude = data.file.metadata.relative_altitude, // 拍摄相对高度
Lat = data.file.metadata.shoot_position.lat,
Lng = data.file.metadata.shoot_position.lng,
Name = data.file.name,
ObjectKey = data.file.object_key,
Path = data.file.path, // 目前这个好像没有值
2025-08-12 16:25:47 +08:00
CreateTime = createTime,
2025-08-21 10:09:46 +08:00
WorkspaceId = workspaceId,
ParentKey = parentKey,
2025-07-18 13:48:38 +08:00
Tid = result.tid,
Bid = result.bid,
2025-07-24 14:54:59 +08:00
FlightType = flightType,
Width = width,
Height = height,
minipic = suoluokey,
Size = fileSize,
ShowOnMap = 1,
display = 1,
2025-08-08 09:51:12 +08:00
FocalLength = focalLength,
PayloadModelName = model,
2025-08-21 10:09:46 +08:00
Type = type,
GimbalPitchDegree = gimbalPitch,
2025-08-21 13:50:23 +08:00
GimbalRollDegree = gimbalRoll,
DigitalZoomRatio = digitalZoomRatio
2025-07-16 09:18:15 +08:00
};
if (executeTask != null)
{
2025-09-03 09:37:23 +08:00
_logger.LogDebug($"任务信息:{JsonConvert.SerializeObject(executeTask)}");
if (!string.IsNullOrEmpty(executeTask.CreateUserName))
{
fileUpload.CreateUserName = executeTask.CreateUserName;
}
if (executeTask.CreateId != null)
{
fileUpload.CreateUserId = executeTask.CreateId;
}
}
2025-09-01 16:44:24 +08:00
2025-08-21 10:09:46 +08:00
// 添加事务
2025-07-16 09:18:15 +08:00
await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync();
}
2025-07-15 08:44:51 +08:00
if (result.need_reply.Equals(1))
{
var fileUploadCallbackEventReply = new FileUploadCallbackEventReply<object>()
{
2025-07-15 08:44:51 +08:00
bid = result.bid,
tid = result.tid,
method = "file_upload_callback",
gateway = sn,
data = new
{
2025-07-15 08:44:51 +08:00
result = 0
},
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
};
2025-07-15 08:44:51 +08:00
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply",
JsonConvert.SerializeObject(fileUploadCallbackEventReply));
}
2025-07-15 08:44:51 +08:00
var expectFileCount = data.flight_task.expected_file_count;
var uploadedFileCount = data.flight_task.uploaded_file_count;
var taskRecord = new LasaTask()
{
Id = taskId,
ExpectedFileCount = expectFileCount, // 期望文件数量
UploadedFileCount = uploadedFileCount // 已上传文件数量
};
// 当expectFileCount 等于uploadedFileCount时则表示航线执行完成
2025-07-24 16:21:53 +08:00
/*
2025-07-15 08:44:51 +08:00
if (uploadedFileCount.Equals(expectFileCount))
{
taskRecord.Status = 5; // 成功状态
2025-07-08 09:30:35 +08:00
}
2025-07-24 16:21:53 +08:00
*/
2025-07-15 08:44:51 +08:00
await _sqlSugarClient.Updateable(taskRecord)
.IgnoreNullColumns().ExecuteCommandAsync();
2025-06-30 09:11:36 +08:00
break;
case "release_terminal_control_area":
//暂不处理
break;
case "flighttask_progress":
{
string flightId1 = (string)data.output.ext.flight_id;
var waylineMissionState = (int)data.output.ext.wayline_mission_state;
_logger.LogDebug("航线进度未跳过处理");
2025-07-24 16:21:53 +08:00
code = data.result; // result
2025-09-11 17:21:33 +08:00
var taskAssign1 = _manageApp.GetTaskAssignByFlightId(flightId1);
2025-06-30 09:11:36 +08:00
// 处理航线进度 ,也有可能是失败
if (code != 0)
{
Console.WriteLine($"航线进度错误信息:{DjiErrorMapHelper.ErrorMap[code]} {message}");
2025-06-30 09:11:36 +08:00
// 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
{
2025-06-30 09:11:36 +08:00
method = "flighttask_undo",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
{
flight_ids = new[] { data.output.ext.flight_id }
}
};
await _mqttClientManager.PublishAsync(cancelTaskTopic,
JsonConvert.SerializeObject(cancelTaskRequest));
2025-07-25 15:21:37 +08:00
//更新任务状态及失败原因?
2025-08-21 10:09:46 +08:00
// "773":"低电量返航导致航线中断"
int reasonCode = data.output.ext.break_point.break_reason;
2025-07-25 15:21:37 +08:00
// 添加断点信息
var taskRecord1 = new LasaTask()
{
Id = taskAssign1.TaskId,
2025-07-25 15:21:37 +08:00
Status = 2,
Reason = DjiErrorMapHelper.ErrorMap[code],
2025-07-25 15:21:37 +08:00
BreakPoint = JsonConvert.SerializeObject(data.output.ext.break_point)
};
2025-08-21 10:09:46 +08:00
// 创建一个条件任务怎么样?
2025-07-25 15:21:37 +08:00
await _sqlSugarClient.Updateable(taskRecord1)
.IgnoreNullColumns().ExecuteCommandAsync();
2025-08-21 10:09:46 +08:00
// todo 关于断点原因为773(电量低),处置
2025-09-26 15:07:02 +08:00
var taskHistory = await _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.FlightId == flightId1)
2025-09-26 15:07:02 +08:00
.FirstAsync();
var taskHistoryUpdate = new LasaTaskHistory()
{
Id = taskHistory.Id,
Status = 2, //
Reason =
$"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {DjiErrorMapHelper.ErrorMap[code]}(错误码: {code}"
2025-09-26 15:07:02 +08:00
};
await _sqlSugarClient.Updateable(taskHistoryUpdate)
.IgnoreNullColumns().ExecuteCommandAsync();
2025-06-30 09:11:36 +08:00
}
else
{
var isHandle = IsDuplicate(Md5.Encrypt($"{result.bid}{flightId1}"));
if (isHandle)
{
break;
}
// ManageApp
var task = _manageApp.GetTaskById(taskAssign1.TaskId);
2025-07-24 16:21:53 +08:00
// 航线成功
if (waylineMissionState.Equals(9)) // 航结结束,更新任务状态
{
// 如果开启了智能巡检
if (!string.IsNullOrEmpty(task.AIInspection) && task.AIInspection.Equals("true") &&
!string.IsNullOrEmpty(task.PushUrl))
{
2025-11-11 14:47:17 +08:00
var lasaDronePort = await _sqlSugarClient.Queryable<LasaDronePort>()
.LeftJoin<LasaUav>((a, b) => a.Id == b.PId)
.Where((a, b) => a.Sn == sn).Select<dynamic>(
(a, b) => new
{
UavSn = b.Sn,
b.TypeId
}
).FirstAsync();
var cameraIndex = "99-0-0";
if (lasaDronePort.TypeId == "M3TD")
{
cameraIndex = "81-0-0";
2025-11-11 14:47:17 +08:00
}
var para = @$"{{
""bid"": {Guid.NewGuid().ToString()},
""data"": {{
2025-11-11 14:47:17 +08:00
""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0""
}},
""tid"":{Guid.NewGuid().ToString()},
""timestamp:"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""method"": ""live_stop_push""
}}";
var topicRequest = $"thing/product/{sn}/services";
await _mqttClientManager.PublishAsync(topicRequest, para);
2025-09-01 16:44:24 +08:00
var url = config["AIModelApi:Url"];
using var httpClient = new HttpClient();
2025-09-01 16:44:24 +08:00
await httpClient.PostAsync($"{url}/stop_detection", null);
}
2025-07-24 16:21:53 +08:00
var record = new LasaTask()
{
Id = taskAssign1.TaskId,
Status = 5,
CompletedTime = DateTime.Now,
FlyNumber = task.FlyNumber + 1
};
_logger.LogDebug($"要更新的任务信息:{JsonConvert.SerializeObject(record)}");
2025-07-24 16:21:53 +08:00
await _sqlSugarClient.Updateable(record).IgnoreNullColumns().ExecuteCommandAsync();
2025-09-26 15:07:02 +08:00
var taskHistory = await _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.FlightId == flightId1)
2025-09-26 15:07:02 +08:00
.FirstAsync();
var taskHistoryUpdate = new LasaTaskHistory()
{
Id = taskHistory.Id,
Status = 5,
CompletedTime = DateTime.Now,
//ActualExecuteDuration =(((DateTimeOffset)DateTime.Now).ToUnixTimeSeconds() - (DateTimeOffset)taskHistory.ExecuteTime);
2025-09-26 15:07:02 +08:00
};
await _sqlSugarClient.Updateable(taskHistoryUpdate)
.IgnoreNullColumns().ExecuteCommandAsync();
}
2025-09-11 17:21:33 +08:00
var step = (int)data.output.progress.current_step;
_logger.LogDebug($"航线进度:{waylineMissionState} {step} {message}");
if (step.Equals(25)) // todo 关于会接收到不同消息问题,如何处理
{
Thread.Sleep(10);
_logger.LogDebug("诡异错误之后");
if (task != null && !string.IsNullOrEmpty(task.AIInspection) &&
task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl))
{
var lasaDronePort = await _manageApp.GetDronePortWithUavInfo(sn);
var cameraIndex = "99-0-0";
2025-11-11 14:47:17 +08:00
if (lasaDronePort.TypeId == "M3TD")
{
// 81-0-0
cameraIndex = "81-0-0";
2025-11-11 14:47:17 +08:00
}
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
var bid = Guid.NewGuid().ToString();
var tid = Guid.NewGuid().ToString();
var param = @$"{{
""bid"": ""{bid}"",
""method"": ""live_start_push"",
""tid"": ""{tid}"",
""timestamp"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""data"": {{
""url_type"": 1,
""url"": ""{rtmp}{lasaDronePort.UavSn}"",
""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0"",
""video_quality"": 3
}}
}}";
// todo 到这里未执行?? 确定是否是123上执行的 但如何后面没执行,怎么开启的直播呢?
// todo 关于开启直播的,再在这里调用开启直播,是否还会继续执行下去?
_logger.LogDebug($"直播参数:{param}");
_liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId + "," + lasaDronePort.UavSn;
var topicRequest = $"thing/product/{sn}/services";
// 开启直播
await _mqttClientManager.PublishAsync(topicRequest, param);
}
}
2025-07-24 16:21:53 +08:00
}
2025-06-30 09:11:36 +08:00
break;
}
default:
{
if (!method.Equals("hms"))
{
2025-07-02 15:05:59 +08:00
//Console.WriteLine($"未处理事件events{message}");
2025-06-30 09:11:36 +08:00
}
break;
}
}
break;
2025-06-20 15:58:52 +08:00
case "thing/product/*/services_reply":
switch (method)
{
case "live_start_push":
if (IsDuplicate(Md5.Encrypt(message)))
{
break;
}
2025-09-11 17:21:33 +08:00
2025-09-08 16:55:39 +08:00
// 已验证tid bid 是相同的
// 开启直播成功调用ai model
_logger.LogDebug($"开启直播成功 {message}");
2025-09-08 16:55:39 +08:00
// 关于直播是否开启成功
// 取得taskid 然后从liveInfo中移除
var tempTaskId = _liveInfo[$"{result.tid}{result.bid}"].Split(",");
_logger.LogDebug($"智能巡检TaskId:{tempTaskId[0]}");
2025-09-11 14:49:56 +08:00
_liveInfo.TryRemove($"{result.tid}{result.bid}", out _);
var req = new CallAiModel { TaskId = tempTaskId[0], RtmpUrl = rtmp + tempTaskId[1] };
_logger.LogDebug($"智能巡检调用参数:{JsonConvert.SerializeObject(req)}");
_ = _manageApp.CallAiModel(req);
break;
case "live_stop_push":
_logger.LogDebug($"停止直播成功 {message}");
break;
case "flighttask_prepare": // 下发任务响应
2025-07-08 09:30:35 +08:00
// 顺序处理,多余的不再处理
2025-06-30 09:11:36 +08:00
lock (_locker)
{
2025-06-30 09:11:36 +08:00
// 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
//Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
{
2025-06-30 09:11:36 +08:00
if (taskAssign == null)
{
Console.WriteLine("已跳过prepare处理");
return; // 不存在不操作
}
2025-06-30 09:11:36 +08:00
var flightId = taskAssign.FlightId;
var request = new TopicServicesRequest<object>();
dynamic data1 = new ExpandoObject();
data1.flight_id = flightId;
// todo 检查设备是否在线
request.SetMethod("flighttask_execute")
.SetTid(result.tid)
.SetBid(result.bid)
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1);
// 任务执行
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request));
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Status = 1
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
.ExecuteCommand();
2025-09-26 15:07:02 +08:00
/*
var taskRecord = new LasaTask()
2025-06-30 09:11:36 +08:00
{
Id = taskAssign.TaskId,
FlightId = flightId,
};
_sqlSugarClient.Updateable(taskRecord).IgnoreNullColumns()
.ExecuteCommand();
2025-09-26 15:07:02 +08:00
// todo 锁定这个机场 ,不再执行其它任务*/
2025-06-30 09:11:36 +08:00
}
else
{
2025-06-30 09:11:36 +08:00
// 错误处理
var errorMsg = DjiErrorMapHelper.ErrorMap[code];
2025-06-30 09:11:36 +08:00
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
2025-09-10 16:15:24 +08:00
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}",
2025-06-30 09:11:36 +08:00
Status = 2
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
2025-06-30 11:37:34 +08:00
var taskUpdate = new LasaTask
{
Id = taskAssign.TaskId,
Status = 2, // todo 状态待定
2025-09-10 16:15:24 +08:00
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}"
2025-06-30 11:37:34 +08:00
};
2025-06-30 15:26:10 +08:00
_sqlSugarClient.Updateable(taskUpdate).IgnoreNullColumns().ExecuteCommand();
2025-09-26 15:07:02 +08:00
var taskHistory1 = _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.TaskId == taskAssign.FlightId)
.First();
var taskHistoryUpdate1 = new LasaTaskHistory()
{
Id = taskHistory1.Id,
Status = 2,
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}"
};
_sqlSugarClient.Updateable(taskHistoryUpdate1).IgnoreNullColumns().ExecuteCommand();
2025-06-30 09:11:36 +08:00
}
}
break;
case "flighttask_execute": // 执行任务响应
code = data.result;
2025-06-30 09:11:36 +08:00
var taskAssignExecute = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, 1);
var taskRecordExecute = new LasaTask()
{
Id = taskAssignExecute.TaskId,
};
2025-09-26 15:07:02 +08:00
var taskHistory = await _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.TaskId == taskAssignExecute.FlightId)
.FirstAsync();
var taskHistoryUpdate = new LasaTaskHistory()
{
Id = taskHistory.Id
};
2025-09-06 10:20:08 +08:00
var flyTask = await _sqlSugarClient.Queryable<LasaTask>()
.Where(x => x.Id == taskAssignExecute.TaskId)
.FirstAsync();
if (code != 0)
{
var errorMsg = DjiErrorMapHelper.ErrorMap[code];
2025-06-30 09:11:36 +08:00
//Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}");
2025-06-30 15:26:10 +08:00
// 任务执行失败
// 和航线进度方法中返回的错误有没有区别
2025-06-30 15:26:10 +08:00
taskRecordExecute.Status = 2;
taskRecordExecute.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}";
2025-09-26 15:07:02 +08:00
taskHistoryUpdate.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}";
taskHistoryUpdate.Status = 2;
}
else
{
2025-06-30 09:11:36 +08:00
// 任务开始执行
2025-07-25 14:46:12 +08:00
taskRecordExecute.Status = 1; // 任务执行中
//taskRecordExecute.FlyNumber = flyTask.FlyNumber + 1;
2025-09-26 15:07:02 +08:00
taskHistoryUpdate.Status = 1;
Console.WriteLine($"任务执行响应 {code} {message}");
}
2025-09-06 10:20:08 +08:00
// if 查看是否是部级飞行任务
// 先得到部级任务id , 然后查看有哪些任务 ,是否全部完成
2025-09-06 10:20:08 +08:00
var detail = await _sqlSugarClient.Queryable<DroneDocktaskdetail>()
.Where(x => x.flighttaskid == taskAssignExecute.FlightId)
.FirstAsync();
if (detail != null)
{
var droneTask = new DroneDocktask()
{
2025-09-26 15:07:02 +08:00
id = detail.taskid, // 这里的id是部级任务表的id
2025-09-06 10:20:08 +08:00
state = 1
};
await _sqlSugarClient.Updateable(droneTask).IgnoreNullColumns().ExecuteCommandAsync();
}
2025-09-26 15:07:02 +08:00
await _sqlSugarClient.Updateable(taskHistoryUpdate).IgnoreNullColumns().ExecuteCommandAsync();
2025-06-30 15:26:10 +08:00
await _sqlSugarClient.Updateable(taskRecordExecute).IgnoreNullColumns().ExecuteCommandAsync();
break;
}
2025-07-08 09:30:35 +08:00
break;
case "thing/product/*/osd":
//Console.WriteLine($"osd消息: {message}");
break;
default:
//Console.WriteLine($"未进入主题处理");
break;
}
}
2025-07-18 13:48:38 +08:00
public DateTime GetDateTimeFromSeconds(long miliseconds)
{
var seconds = miliseconds / 1000; // 去除末尾三位
return DateTimeOffset.FromUnixTimeSeconds(seconds).DateTime;
}
public bool IsDuplicate(string messageId)
{
var now = DateTime.UtcNow;
// 清理过期消息
foreach (var kvp in _processedMessages)
{
if (now - kvp.Value > _deduplicationWindow)
{
_processedMessages.TryRemove(kvp.Key, out _);
}
}
// 检查是否已存在
if (_processedMessages.ContainsKey(messageId))
{
return true;
}
_processedMessages[messageId] = now;
return false;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Subscribe();
}
public Task StopAsync(CancellationToken cancellationToken)
{
// todo 取消订阅
throw new NotImplementedException();
}
}