using System.Collections.Concurrent; using System.Dynamic; using System.Net; using System.Net.Http.Headers; using System.Text; using Infrastructure.Cache; using Infrastructure.CloudSdk; using Infrastructure.CloudSdk.minio; using Infrastructure.CloudSdk.wayline; using Infrastructure.Extensions; using Infrastructure.Helpers; using MetadataExtractor; using MetadataExtractor.Formats.Exif; using MetadataExtractor.Formats.Xmp; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MQTTnet.Client; using Newtonsoft.Json; using OpenAuth.App.ServiceApp; using OpenAuth.App.ServiceApp.FlyTask.Request; using OpenAuth.App.ServiceApp.Subscribe; using OpenAuth.Repository.Domain; using OpenAuth.WebApi; using SqlSugar; namespace OpenAuth.App.BaseApp.Subscribe; public class ConfigSubscribe : IHostedService { private readonly MqttClientManager _mqttClientManager; private readonly ISqlSugarClient _sqlSugarClient; private readonly RedisCacheContext _redisCacheContext; private readonly ManageApp _manageApp; private readonly MinioService _minioService; private object _locker = new(); private object _dockUploadFileLocker = new(); private readonly ILogger _logger; private readonly ConcurrentDictionary _processedMessages = new(); private readonly ConcurrentDictionary _liveInfo = new(); private readonly TimeSpan _deduplicationWindow = TimeSpan.FromMinutes(1); private readonly IConfiguration _configuration; public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient, ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService, ILogger logger, IConfiguration configuration) { _mqttClientManager = mqttClientManager; _sqlSugarClient = sqlSugarClient; _redisCacheContext = redisCacheContext as RedisCacheContext; _manageApp = manageApp; _minioService = minioService; _logger = logger; _configuration = configuration; } private async Task Subscribe() { // 或者 thing/product/# // sys/product/# string[] topicList = { "thing/product/+/services_reply", "thing/product/+/events", "thing/product/+/requests", "thing/aircraft/+/service_reply" //大飞机推流消息 //"thing/product/+/osd", //"thing/product/+/status" }; _logger.LogInformation("开启监听"); await _mqttClientManager .SubscribeAsync(topicList, async (args) => { //args.IsHandled = false; // todo 待实验确定 //args.AutoAcknowledge = true; // todo 待实验确定 await HandleTopic(args, args.ApplicationMessage.Topic, Encoding.UTF8.GetString(args.ApplicationMessage.Payload)); }); } public async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic, string message) { // 序列号提取 var sn = topic.Split("/")[2]; var tempStr = topic.Replace(sn, "*"); //Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}"); // 主题方法 var result = JsonConvert.DeserializeObject>(message); var method = result.method; var data = result.data; //_logger.LogInformation($"主题:{topic}\n消息:{message}"); long code = 0; // rtmp://175.27.168.120:6019/live/ //var rtmp = "rtmp://box.wisestcity.com:1935/live/7"; // 机场推流地址地址 //var config = ConfigHelper.GetConfigRoot(); var rtmp = _configuration["AIModelApi:DronePortRtmp"]; //var rtmp = "rtmp://175.27.168.120:6019/live/"; switch (tempStr) { // 目前主要处理了获取航线文件及临时凭证上传 case "thing/product/*/requests": switch (method) { // 临时凭证上传 case "storage_config_get": _logger.LogInformation($"进入临时凭证获取处理"); // 配置中读取minio配置 var storageConfigRequest = new TopicServicesRequest() { method = "storage_config_get", tid = result.tid, bid = result.bid, timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(), data = new { result = 0, output = new { bucket = _minioService._bucketName, endpoint = $"http://{_minioService.endPoint}", object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id ? provider = "minio", region = "linyi", credentials = new { access_key_id = $"{_minioService.AccessKey}", access_key_secret = $"{_minioService.SecretKey}", expire = 3600, security_token = "" } } } }; var tempTopic = $"thing/product/{sn}/requests_reply"; await _mqttClientManager.PublishAsync(tempTopic, JsonConvert.SerializeObject(storageConfigRequest)); break; case "flight_areas_get": //Console.WriteLine("跳过自定义飞行区文件获取"); break; // 获取航线 case "flighttask_resource_get": string flightId = data.flight_id + ""; _logger.LogInformation($"进入资源获取处理: 任务id {flightId}"); // eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight // http://175.27.168.120:6013/test/2025062209390863860047.kmz // md5 585c833012ddb794eaac1050ef71aa31 // 这一小段运行异常 var taskAssign = await _sqlSugarClient .Queryable() .Where(x => x.FlightId == flightId) .SingleAsync(); _logger.LogInformation($"任务信息:{JsonConvert.SerializeObject(taskAssign)}"); /*var taskAssign = manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, flightId);*/ var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply"; dynamic outData = new ExpandoObject(); outData.result = 0; outData.output = new { file = new { fingerprint = taskAssign.Md5, url = taskAssign.Wpml } }; var outRequest = new TopicServicesRequest() { bid = result.bid, data = outData, tid = result.tid, timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(), method = "flighttask_resource_get" }; /*Console.WriteLine( $"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");*/ await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic, JsonConvert.SerializeObject(outRequest)); break; case "config": break; default: //_logger.LogInformation($"未处理消息:{message}"); break; } break; // 主要处理了文件回传及航线进度 case "thing/product/*/events": switch (method) { case "file_upload_callback": // 文件上传 _logger.LogDebug("进入文件上传处理"); _logger.LogDebug($"文件上传处理:{message}"); //飞行任务 0 手飞任务 1 // todo 如果是手飞任务,生成任务 关于任务类型? int flightType = data.flight_task.flight_type; string flightId = data.file.ext.flight_id; // 关于flightId 没有值的问题怎么办??? var taskAssign = _manageApp.GetTaskAssignByFlightId(flightId); var taskId = ""; var taskName = ""; var workspaceId = ""; LasaTask executeTask = null; if (taskAssign != null) { taskId = taskAssign.TaskId; executeTask = await _sqlSugarClient .Queryable() .SingleAsync(a => a.Id == taskId); if (!string.IsNullOrEmpty(executeTask.TaskName)) { taskName = executeTask.TaskName; } if (!string.IsNullOrEmpty(executeTask.WorkspaceId)) { workspaceId = executeTask.WorkspaceId; } } else { taskName = "手飞任务"; } string objectKey = data.file.object_key; var folderKey = ((string)data.file.object_key).Split("/"); var parentKey = folderKey[2]; if (flightType.Equals(1)) { parentKey = flightId; } var isExist = await _sqlSugarClient .Queryable() .Where(x => x.Id.Equals(parentKey)).CountAsync(); if (isExist == 0) // { var date = DateTime.Now; var timeStr = date.ToString("yyyy-MM-dd HH:mm:ss"); var parent1 = new LasaMediaFile() { Id = parentKey, FlightId = flightId, TaskId = taskId, ParentKey = "0", Name = $"{taskName} {timeStr}", WorkspaceId = workspaceId, CreateTime = date, }; await _sqlSugarClient.Insertable(parent1).ExecuteCommandAsync(); } // 重复检测 var mediaFile = await _sqlSugarClient .Queryable() .Where(a => a.FlightId.Equals(flightId)) .Where(a => a.ObjectKey.Equals(objectKey)).SingleAsync(); if (mediaFile == null) { var type = 0; var preSize = 1; // 判断是不是图片 if (objectKey.EndsWith(".jpeg")) // todo 是否有其它类型的图片,待确定 { preSize = 65535; var fileName = Path.GetFileNameWithoutExtension(objectKey); var fileNameParts = fileName.Split("_"); var suffix = fileNameParts[^1]; type = suffix switch { // 0 未知 1 可见光 2 红外 3 变焦 4.广角 5 视频 "V" => 1, "T" => 2, "Z" => 3, // 变焦 "W" => 4, // 广角 _ => type }; } else if (objectKey.EndsWith(".mp4")) { type = 5; } string suoluokey = ""; long? fileSize = 0; int width = 0, height = 0, focalLength = 0; int offset = 0, length = 0; string model = ""; float? gimbalRoll = 0, gimbalPitch = 0; float? digitalZoomRatio = 1; var fileUrl = "http://" + _minioService.endPoint + "/" + _minioService._bucketName + "/" + objectKey; using (var httpClient = new HttpClient()) { suoluokey = "minipic/" + data.file.name.ToString(); // 目前读取64KB // 添加Range请求头 httpClient.DefaultRequestHeaders.Range = new RangeHeaderValue(0, preSize); try { var response = httpClient .GetAsync(fileUrl, HttpCompletionOption.ResponseHeadersRead).Result; if (response.StatusCode == HttpStatusCode.PartialContent) { var contentRange = response.Content.Headers.ContentRange; if (contentRange != null) { fileSize = contentRange.Length.Value; } if (objectKey.ToLower().EndsWith("jpeg")) { // 成功获取部分内容 var y = response.Content.ReadAsByteArrayAsync().Result; var ms = new MemoryStream(y); var directories = ImageMetadataReader.ReadMetadata(ms); var xmpDirectory = directories.OfType().FirstOrDefault(); if (xmpDirectory != null) { var xmpXml = xmpDirectory.GetXmpProperties(); foreach (var keyValuePair in xmpXml) { switch (keyValuePair.Key) { case "drone-dji:GimbalPitchDegree": gimbalPitch = float.Parse(keyValuePair.Value); break; case "drone-dji:GimbalRollDegree": gimbalRoll = float.Parse(keyValuePair.Value); break; } } } foreach (var directory in directories) { if (directory is ExifDirectoryBase) { if (directory.Name.Equals("Exif IFD0")) { foreach (var tag in directory.Tags) { if (tag.Name.Equals("Model")) { model = tag.Description; } } } if (directory.Name.Equals("Exif SubIFD")) { // Digital Zoom Ratio: 1 Exif SubIFD foreach (var tag in directory.Tags) { if (tag.Name.Equals("Digital Zoom Ratio")) { digitalZoomRatio = float.Parse(tag.Description); } if (tag.Name.Equals("Exif Image Width")) { width = int.Parse(tag.Description.Replace("pixels", "") .Trim()); } if (tag.Name.Equals("Exif Image Height")) { height = int.Parse(tag.Description.Replace("pixels", "") .Trim()); } if (tag.Name.Equals("Focal Length 35")) { focalLength = int.Parse(tag.Description .Replace("mm", "") .Trim()); } } } //Console.WriteLine(directory.Name); if (directory.Name.Equals("Exif Thumbnail")) { foreach (var tag in directory.Tags) { if (tag.Name.Equals("Thumbnail Offset")) { offset = int.Parse(tag.Description.Replace("bytes", "") .Trim()); } if (tag.Name.Equals("Thumbnail Length")) { length = int.Parse(tag.Description.Replace("bytes", "") .Trim()); } } } } } ms.Seek(offset + 6, SeekOrigin.Begin); byte[] buffer = new byte[length]; int bytesRead = ms.Read(buffer, 0, length); // 上传缩略图到MinIO await _minioService.PutObjectAsync("", data.file.name.ToString(), suoluokey, new MemoryStream(buffer)); } } else if (response.StatusCode == HttpStatusCode.OK) { // 服务器不支持Range请求,返回完整内容 throw new InvalidOperationException("服务器不支持Range请求"); } else { throw new HttpRequestException($"请求失败: {response.StatusCode}"); } } catch (Exception ex) { throw new Exception($"执行错误: {ex.Message}", ex); } } var createdTimeStr = (string)data.file.metadata.created_time; var createTime = string.IsNullOrEmpty(createdTimeStr) ? DateTime.Now : createdTimeStr.ToDateTime(); _logger.LogDebug("执行到保存媒体文件之前"); var fileUpload = new LasaMediaFile() { Id = Guid.NewGuid().ToString(), FlightId = flightId, // 计划id TaskId = taskId, // 任务id DroneModelKey = data.file.ext.drone_model_key, // 无人机型号 PayloadModelKey = data.file.ext.payload_model_key, //这应该可以标明是什么设置 IsOriginal = data.file.ext.is_original, MediaIndex = data.file.ext.media_index, AbsoluteAltitude = data.file.metadata.absolute_altitude, // 拍摄绝对高度 GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度 RelativeAltitude = data.file.metadata.relative_altitude, // 拍摄相对高度 Lat = data.file.metadata.shoot_position.lat, Lng = data.file.metadata.shoot_position.lng, Name = data.file.name, ObjectKey = data.file.object_key, Path = data.file.path, // 目前这个好像没有值 CreateTime = createTime, WorkspaceId = workspaceId, ParentKey = parentKey, Tid = result.tid, Bid = result.bid, FlightType = flightType, Width = width, Height = height, minipic = suoluokey, Size = fileSize, ShowOnMap = 1, display = 1, FocalLength = focalLength, PayloadModelName = model, Type = type, GimbalPitchDegree = gimbalPitch, GimbalRollDegree = gimbalRoll, DigitalZoomRatio = digitalZoomRatio }; if (executeTask != null) { _logger.LogDebug($"任务信息:{JsonConvert.SerializeObject(executeTask)}"); if (!string.IsNullOrEmpty(executeTask.CreateUserName)) { fileUpload.CreateUserName = executeTask.CreateUserName; } if (executeTask.CreateId != null) { fileUpload.CreateUserId = executeTask.CreateId; } } // 添加事务 await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync(); } if (result.need_reply.Equals(1)) { var fileUploadCallbackEventReply = new FileUploadCallbackEventReply() { bid = result.bid, tid = result.tid, method = "file_upload_callback", gateway = sn, data = new { result = 0 }, timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(), }; _ = _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply", JsonConvert.SerializeObject(fileUploadCallbackEventReply)); } var expectFileCount = data.flight_task.expected_file_count; var uploadedFileCount = data.flight_task.uploaded_file_count; var taskRecord = new LasaTask() { Id = taskId, ExpectedFileCount = expectFileCount, // 期望文件数量 UploadedFileCount = uploadedFileCount // 已上传文件数量 }; // 当expectFileCount 等于uploadedFileCount时,则表示航线执行完成 /* if (uploadedFileCount.Equals(expectFileCount)) { taskRecord.Status = 5; // 成功状态 } */ await _sqlSugarClient.Updateable(taskRecord) .IgnoreNullColumns().ExecuteCommandAsync(); break; case "release_terminal_control_area": //暂不处理 break; case "flighttask_progress": { _logger.LogDebug($"任务进度信息:{message}"); string flightId1 = (string)data.output.ext.flight_id; code = data.result; // result var taskAssign1 = _manageApp.GetTaskAssignByFlightId(flightId1); // 处理航线进度 ,也有可能是失败 if (code != 0) { Console.WriteLine($"航线进度错误信息:{DjiErrorMapHelper.ErrorMap[code]} {message}"); // 取消任务 var cancelTaskTopic = $"thing/product/{sn}/services"; var cancelTaskRequest = new TopicServicesRequest() { method = "flighttask_undo", tid = result.tid, bid = result.bid, timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(), data = new { flight_ids = new[] { data.output.ext.flight_id } } }; await _mqttClientManager.PublishAsync(cancelTaskTopic, JsonConvert.SerializeObject(cancelTaskRequest)); //更新任务状态及失败原因? // "773":"低电量返航导致航线中断" int reasonCode = data.output.ext.break_point.break_reason; // 添加断点信息 var taskRecord1 = new LasaTask() { Id = taskAssign1.TaskId, Status = 2, Reason = DjiErrorMapHelper.ErrorMap[code], BreakPoint = JsonConvert.SerializeObject(data.output.ext.break_point) }; // 创建一个条件任务怎么样? await _sqlSugarClient.Updateable(taskRecord1) .IgnoreNullColumns().ExecuteCommandAsync(); // todo 关于断点原因为773(电量低),处置 var taskHistory = await _sqlSugarClient .Queryable() .Where(x => x.FlightId == flightId1) .FirstAsync(); var taskHistoryUpdate = new LasaTaskHistory() { Id = taskHistory.Id, Status = 2, // Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {DjiErrorMapHelper.ErrorMap[code]}(错误码: {code})" }; await _sqlSugarClient.Updateable(taskHistoryUpdate) .IgnoreNullColumns().ExecuteCommandAsync(); } else { var task = _manageApp.GetTaskById(taskAssign1.TaskId); var step = (int)data.output.progress.current_step; _logger.LogDebug($"当前步骤:{step} 任务信息:{JsonConvert.SerializeObject(task)}"); // 航线成功 _logger.LogDebug($"航线成功,数据如下::{JsonConvert.SerializeObject(data)}"); var waylineMissionState = (int)data.output.ext.wayline_mission_state; if (waylineMissionState.Equals(9)) // 航结结束,更新任务状态 { var isHandle = IsDuplicate(Md5.Encrypt($"{result.bid}{flightId1}")); if (isHandle) { break; } // 如果开启了智能巡检 if (!string.IsNullOrEmpty(task.AIInspection) && task.AIInspection.Equals("true") && !string.IsNullOrEmpty(task.PushUrl)) { var lasaDronePort = _manageApp.GetDronePortWithUavInfo(sn); var cameraIndex = "99-0-0"; if (lasaDronePort.TypeId == "M3TD") { cameraIndex = "81-0-0"; } var para = @$"{{ ""bid"": {Guid.NewGuid().ToString()}, ""data"": {{ ""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0"" }}, ""tid"":{Guid.NewGuid().ToString()}, ""timestamp:"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()}, ""method"": ""live_stop_push"" }}"; var topicRequest = $"thing/product/{sn}/services"; await _mqttClientManager.PublishAsync(topicRequest, para); var url = _configuration["AIModelApi:Url"]; using var httpClient = new HttpClient(); await httpClient.PostAsync($"{url}/stop_detection", null); } var record = new LasaTask() { Id = taskAssign1.TaskId, Status = 5, CompletedTime = DateTime.Now, FlyNumber = task.FlyNumber + 1 }; _logger.LogDebug($"要更新的任务信息:{JsonConvert.SerializeObject(record)}"); await _sqlSugarClient.Updateable(record).IgnoreNullColumns().ExecuteCommandAsync(); var taskHistory = await _sqlSugarClient .Queryable() .Where(x => x.FlightId == flightId1) .FirstAsync(); var taskHistoryUpdate = new LasaTaskHistory() { Id = taskHistory.Id, Status = 5, CompletedTime = DateTime.Now, //ActualExecuteDuration =(((DateTimeOffset)DateTime.Now).ToUnixTimeSeconds() - (DateTimeOffset)taskHistory.ExecuteTime); }; await _sqlSugarClient.Updateable(taskHistoryUpdate) .IgnoreNullColumns().ExecuteCommandAsync(); } if (step.Equals(25)) { var isHandle = IsDuplicate(Md5.Encrypt($"{result.bid}{flightId1}")); if (isHandle) { break; } if (task != null && !string.IsNullOrEmpty(task.AIInspection) && task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl)) { _logger.LogDebug("机场无人机信息查询"); var lasaDronePort = _manageApp.GetDronePortWithUavInfo(sn); _logger.LogDebug($"无人机信息:{JsonConvert.SerializeObject(lasaDronePort)}"); var cameraIndex = "99-0-0"; if (lasaDronePort.TypeId == "M3TD") { // 81-0-0 cameraIndex = "81-0-0"; } //var rtmp = "rtmp://box.wisestcity.com:1935/live/7"; var bid = Guid.NewGuid().ToString(); var tid = Guid.NewGuid().ToString(); var param = @$"{{ ""bid"": ""{bid}"", ""method"": ""live_start_push"", ""tid"": ""{tid}"", ""timestamp"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()}, ""data"": {{ ""url_type"": 1, ""url"": ""{rtmp}{lasaDronePort.UavSn}"", ""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0"", ""video_quality"": 3 }} }}"; _logger.LogDebug($"直播参数:{param}"); _liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId + "," + lasaDronePort.UavSn; var topicRequest = $"thing/product/{sn}/services"; // 开启直播 await _mqttClientManager.PublishAsync(topicRequest, param); } } } break; } default: { if (!method.Equals("hms")) { //Console.WriteLine($"未处理事件events:{message}"); } break; } } break; case "thing/product/*/services_reply": switch (method) { case "live_start_push": if (IsDuplicate(Md5.Encrypt(message))) { break; } // 已验证tid bid 是相同的 // 开启直播成功调用ai model _logger.LogDebug($"开启直播成功 {message}"); // 关于直播是否开启成功 // 取得taskid 然后从liveInfo中移除 var tempTaskId = _liveInfo[$"{result.tid}{result.bid}"].Split(","); _logger.LogDebug($"智能巡检TaskId:{tempTaskId[0]}"); _liveInfo.TryRemove($"{result.tid}{result.bid}", out _); var req = new CallAiModel { TaskId = tempTaskId[0], RtmpUrl = rtmp + tempTaskId[1] }; _logger.LogDebug($"智能巡检调用参数:{JsonConvert.SerializeObject(req)}"); _ = _manageApp.CallAiModel(req); break; case "live_stop_push": _logger.LogDebug($"停止直播成功 {message}"); break; case "flighttask_prepare": // 下发任务响应 // 顺序处理,多余的不再处理 lock (_locker) { // 报错处理 Console.WriteLine("进入prepare订阅消息"); code = data.result; var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid); //Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}"); if (code == 0) { if (taskAssign == null) { Console.WriteLine("已跳过prepare处理"); return; // 不存在不操作 } var flightId = taskAssign.FlightId; var request = new TopicServicesRequest(); dynamic data1 = new ExpandoObject(); data1.flight_id = flightId; // todo 检查设备是否在线 request.SetMethod("flighttask_execute") .SetTid(result.tid) .SetBid(result.bid) .SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds()) .SetData(data1); // 任务执行 _ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services", JsonConvert.SerializeObject(request)); var taskAssignRecord = new LasaTaskAssign() { Id = taskAssign.Id, Status = 1 }; _sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns() .ExecuteCommand(); /* 冗余逻辑去除 var taskRecord = new LasaTask() { Id = taskAssign.TaskId, FlightId = flightId, }; _sqlSugarClient.Updateable(taskRecord).IgnoreNullColumns() .ExecuteCommand(); // todo 锁定这个机场 ,不再执行其它任务*/ } else { // 错误处理 var errorMsg = DjiErrorMapHelper.ErrorMap[code]; var taskAssignRecord = new LasaTaskAssign() { Id = taskAssign.Id, Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})", Status = 2 }; _sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand(); var taskUpdate = new LasaTask { Id = taskAssign.TaskId, Status = 2, // todo 状态待定 Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})" }; _sqlSugarClient.Updateable(taskUpdate).IgnoreNullColumns().ExecuteCommand(); var taskHistory1 = _sqlSugarClient .Queryable() .Where(x => x.TaskId == taskAssign.FlightId) .First(); var taskHistoryUpdate1 = new LasaTaskHistory() { Id = taskHistory1.Id, Status = 2, Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})" }; _sqlSugarClient.Updateable(taskHistoryUpdate1).IgnoreNullColumns().ExecuteCommand(); } } break; case "flighttask_execute": // 执行任务响应 code = data.result; var taskAssignExecute = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, 1); var taskRecordExecute = new LasaTask() { Id = taskAssignExecute.TaskId, }; var taskHistory = await _sqlSugarClient .Queryable() .Where(x => x.TaskId == taskAssignExecute.FlightId) .FirstAsync(); var taskHistoryUpdate = new LasaTaskHistory() { Id = taskHistory.Id }; var flyTask = await _sqlSugarClient.Queryable() .Where(x => x.Id == taskAssignExecute.TaskId) .FirstAsync(); if (code != 0) { var errorMsg = DjiErrorMapHelper.ErrorMap[code]; //Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}"); // 任务执行失败 // 和航线进度方法中返回的错误有没有区别 ???? taskRecordExecute.Status = 2; taskRecordExecute.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})"; taskHistoryUpdate.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})"; taskHistoryUpdate.Status = 2; } else { // 任务开始执行 taskRecordExecute.Status = 1; // 任务执行中 //taskRecordExecute.FlyNumber = flyTask.FlyNumber + 1; taskHistoryUpdate.Status = 1; Console.WriteLine($"任务执行响应 {code} {message}"); } // if 查看是否是部级飞行任务 // 先得到部级任务id , 然后查看有哪些任务 ,是否全部完成 var detail = await _sqlSugarClient.Queryable() .Where(x => x.flighttaskid == taskAssignExecute.FlightId) .FirstAsync(); if (detail != null) { var droneTask = new DroneDocktask() { id = detail.taskid, // 这里的id是部级任务表的id state = 1 }; await _sqlSugarClient.Updateable(droneTask).IgnoreNullColumns().ExecuteCommandAsync(); } await _sqlSugarClient.Updateable(taskHistoryUpdate).IgnoreNullColumns().ExecuteCommandAsync(); await _sqlSugarClient.Updateable(taskRecordExecute).IgnoreNullColumns().ExecuteCommandAsync(); break; } break; case "thing/product/*/osd": //Console.WriteLine($"osd消息: {message}"); break; default: //Console.WriteLine($"未进入主题处理"); break; } } public DateTime GetDateTimeFromSeconds(long miliseconds) { var seconds = miliseconds / 1000; // 去除末尾三位 return DateTimeOffset.FromUnixTimeSeconds(seconds).DateTime; } public bool IsDuplicate(string messageId) { var now = DateTime.UtcNow; // 清理过期消息 foreach (var kvp in _processedMessages) { if (now - kvp.Value > _deduplicationWindow) { _processedMessages.TryRemove(kvp.Key, out _); } } // 检查是否已存在 if (_processedMessages.ContainsKey(messageId)) { return true; } _processedMessages[messageId] = now; return false; } public Task StartAsync(CancellationToken cancellationToken) { return Subscribe(); } public Task StopAsync(CancellationToken cancellationToken) { // todo 取消订阅 throw new NotImplementedException(); } }