LASAPlatform/OpenAuth.App/ServiceApp/Subscribe/ConfigSubscribe.cs

933 lines
49 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System.Collections.Concurrent;
using System.Dynamic;
using System.Net;
using System.Net.Http.Headers;
using System.Text;
using Infrastructure.Cache;
using Infrastructure.CloudSdk;
using Infrastructure.CloudSdk.minio;
using Infrastructure.CloudSdk.wayline;
using Infrastructure.Extensions;
using Infrastructure.Helpers;
using MetadataExtractor;
using MetadataExtractor.Formats.Exif;
using MetadataExtractor.Formats.Xmp;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet.Client;
using Newtonsoft.Json;
using OpenAuth.App.ServiceApp;
using OpenAuth.App.ServiceApp.FlyTask.Request;
using OpenAuth.App.ServiceApp.Subscribe;
using OpenAuth.Repository.Domain;
using OpenAuth.WebApi;
using SqlSugar;
namespace OpenAuth.App.BaseApp.Subscribe;
public class ConfigSubscribe : IHostedService
{
private readonly MqttClientManager _mqttClientManager;
private readonly ISqlSugarClient _sqlSugarClient;
private readonly RedisCacheContext _redisCacheContext;
private readonly ManageApp _manageApp;
private readonly MinioService _minioService;
private object _locker = new();
private object _dockUploadFileLocker = new();
private readonly ILogger<ConfigSubscribe> _logger;
private readonly ConcurrentDictionary<string, DateTime> _processedMessages = new();
private readonly ConcurrentDictionary<string, string> _liveInfo = new();
private readonly TimeSpan _deduplicationWindow = TimeSpan.FromMinutes(1);
private readonly IConfiguration _configuration;
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService,
ILogger<ConfigSubscribe> logger, IConfiguration configuration)
{
_mqttClientManager = mqttClientManager;
_sqlSugarClient = sqlSugarClient;
_redisCacheContext = redisCacheContext as RedisCacheContext;
_manageApp = manageApp;
_minioService = minioService;
_logger = logger;
_configuration = configuration;
}
private async Task Subscribe()
{
// 或者 thing/product/#
// sys/product/#
string[] topicList =
{
"thing/product/+/services_reply",
"thing/product/+/events",
"thing/product/+/requests",
"thing/aircraft/+/service_reply" //大飞机推流消息
//"thing/product/+/osd",
//"thing/product/+/status"
};
_logger.LogInformation("开启监听");
await _mqttClientManager
.SubscribeAsync(topicList,
async (args) =>
{
//args.IsHandled = false; // todo 待实验确定
//args.AutoAcknowledge = true; // todo 待实验确定
await HandleTopic(args, args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
});
}
public async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
string message)
{
// 序列号提取
var sn = topic.Split("/")[2];
var tempStr = topic.Replace(sn, "*");
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
// 主题方法
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
var method = result.method;
var data = result.data;
//_logger.LogInformation($"主题:{topic}\n消息{message}");
long code = 0;
// rtmp://175.27.168.120:6019/live/
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
// 机场推流地址地址
//var config = ConfigHelper.GetConfigRoot();
var rtmp = _configuration["AIModelApi:DronePortRtmp"];
//var rtmp = "rtmp://175.27.168.120:6019/live/";
switch (tempStr)
{
// 目前主要处理了获取航线文件及临时凭证上传
case "thing/product/*/requests":
switch (method)
{
// 临时凭证上传
case "storage_config_get":
_logger.LogInformation($"进入临时凭证获取处理");
// 配置中读取minio配置
var storageConfigRequest = new TopicServicesRequest<object>()
{
method = "storage_config_get",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
{
result = 0,
output = new
{
bucket = _minioService._bucketName,
endpoint = $"http://{_minioService.endPoint}",
object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id
provider = "minio",
region = "linyi",
credentials = new
{
access_key_id = $"{_minioService.AccessKey}",
access_key_secret = $"{_minioService.SecretKey}",
expire = 3600,
security_token = ""
}
}
}
};
var tempTopic = $"thing/product/{sn}/requests_reply";
await _mqttClientManager.PublishAsync(tempTopic,
JsonConvert.SerializeObject(storageConfigRequest));
break;
case "flight_areas_get":
//Console.WriteLine("跳过自定义飞行区文件获取");
break;
// 获取航线
case "flighttask_resource_get":
string flightId = data.flight_id + "";
_logger.LogInformation($"进入资源获取处理: 任务id {flightId}");
// eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
// md5 585c833012ddb794eaac1050ef71aa31
// 这一小段运行异常
var taskAssign = await _sqlSugarClient
.Queryable<LasaTaskAssign>()
.Where(x => x.FlightId == flightId)
.SingleAsync();
_logger.LogInformation($"任务信息:{JsonConvert.SerializeObject(taskAssign)}");
/*var taskAssign =
manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, flightId);*/
var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
dynamic outData = new ExpandoObject();
outData.result = 0;
outData.output = new
{
file = new
{
fingerprint = taskAssign.Md5,
url = taskAssign.Wpml
}
};
var outRequest = new TopicServicesRequest<object>()
{
bid = result.bid,
data = outData,
tid = result.tid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
method = "flighttask_resource_get"
};
/*Console.WriteLine(
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");*/
await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
JsonConvert.SerializeObject(outRequest));
break;
case "config":
break;
default:
//_logger.LogInformation($"未处理消息:{message}");
break;
}
break;
// 主要处理了文件回传及航线进度
case "thing/product/*/events":
switch (method)
{
case "file_upload_callback":
// 文件上传
_logger.LogDebug("进入文件上传处理");
_logger.LogDebug($"文件上传处理:{message}");
//飞行任务 0 手飞任务 1
// todo 如果是手飞任务,生成任务 关于任务类型?
int flightType = data.flight_task.flight_type;
string flightId = data.file.ext.flight_id;
// 关于flightId 没有值的问题怎么办???
var taskAssign = _manageApp.GetTaskAssignByFlightId(flightId);
var taskId = "";
var taskName = "";
var workspaceId = "";
LasaTask executeTask = null;
if (taskAssign != null)
{
taskId = taskAssign.TaskId;
executeTask = await _sqlSugarClient
.Queryable<LasaTask>()
.SingleAsync(a => a.Id == taskId);
if (!string.IsNullOrEmpty(executeTask.TaskName))
{
taskName = executeTask.TaskName;
}
if (!string.IsNullOrEmpty(executeTask.WorkspaceId))
{
workspaceId = executeTask.WorkspaceId;
}
}
else
{
taskName = "手飞任务";
}
string objectKey = data.file.object_key;
var folderKey = ((string)data.file.object_key).Split("/");
var parentKey = folderKey[2];
if (flightType.Equals(1))
{
parentKey = flightId;
}
var isExist = await _sqlSugarClient
.Queryable<LasaMediaFile>()
.Where(x => x.Id.Equals(parentKey)).CountAsync();
if (isExist == 0) //
{
var date = DateTime.Now;
var timeStr = date.ToString("yyyy-MM-dd HH:mm:ss");
var parent1 = new LasaMediaFile()
{
Id = parentKey,
FlightId = flightId,
TaskId = taskId,
ParentKey = "0",
Name = $"{taskName} {timeStr}",
WorkspaceId = workspaceId,
CreateTime = date,
};
await _sqlSugarClient.Insertable(parent1).ExecuteCommandAsync();
}
// 重复检测
var mediaFile = await _sqlSugarClient
.Queryable<LasaMediaFile>()
.Where(a => a.FlightId.Equals(flightId))
.Where(a => a.ObjectKey.Equals(objectKey)).SingleAsync();
if (mediaFile == null)
{
var type = 0;
var preSize = 1;
// 判断是不是图片
if (objectKey.EndsWith(".jpeg")) // todo 是否有其它类型的图片,待确定
{
preSize = 65535;
var fileName = Path.GetFileNameWithoutExtension(objectKey);
var fileNameParts = fileName.Split("_");
var suffix = fileNameParts[^1];
type = suffix switch
{
// 0 未知 1 可见光 2 红外 3 变焦 4.广角 5 视频
"V" => 1,
"T" => 2,
"Z" => 3, // 变焦
"W" => 4, // 广角
_ => type
};
}
else if (objectKey.EndsWith(".mp4"))
{
type = 5;
}
string suoluokey = "";
long? fileSize = 0;
int width = 0, height = 0, focalLength = 0;
int offset = 0, length = 0;
string model = "";
float? gimbalRoll = 0, gimbalPitch = 0;
float? digitalZoomRatio = 1;
var fileUrl = "http://" + _minioService.endPoint + "/" + _minioService._bucketName +
"/" + objectKey;
using (var httpClient = new HttpClient())
{
suoluokey = "minipic/" + data.file.name.ToString();
// 目前读取64KB
// 添加Range请求头
httpClient.DefaultRequestHeaders.Range =
new RangeHeaderValue(0, preSize);
try
{
var response = httpClient
.GetAsync(fileUrl, HttpCompletionOption.ResponseHeadersRead).Result;
if (response.StatusCode == HttpStatusCode.PartialContent)
{
var contentRange = response.Content.Headers.ContentRange;
if (contentRange != null)
{
fileSize = contentRange.Length.Value;
}
if (objectKey.ToLower().EndsWith("jpeg"))
{
// 成功获取部分内容
var y = response.Content.ReadAsByteArrayAsync().Result;
var ms = new MemoryStream(y);
var directories = ImageMetadataReader.ReadMetadata(ms);
var xmpDirectory = directories.OfType<XmpDirectory>().FirstOrDefault();
if (xmpDirectory != null)
{
var xmpXml = xmpDirectory.GetXmpProperties();
foreach (var keyValuePair in xmpXml)
{
switch (keyValuePair.Key)
{
case "drone-dji:GimbalPitchDegree":
gimbalPitch = float.Parse(keyValuePair.Value);
break;
case "drone-dji:GimbalRollDegree":
gimbalRoll = float.Parse(keyValuePair.Value);
break;
}
}
}
foreach (var directory in directories)
{
if (directory is ExifDirectoryBase)
{
if (directory.Name.Equals("Exif IFD0"))
{
foreach (var tag in directory.Tags)
{
if (tag.Name.Equals("Model"))
{
model = tag.Description;
}
}
}
if (directory.Name.Equals("Exif SubIFD"))
{
// Digital Zoom Ratio: 1 Exif SubIFD
foreach (var tag in directory.Tags)
{
if (tag.Name.Equals("Digital Zoom Ratio"))
{
digitalZoomRatio = float.Parse(tag.Description);
}
if (tag.Name.Equals("Exif Image Width"))
{
width = int.Parse(tag.Description.Replace("pixels", "")
.Trim());
}
if (tag.Name.Equals("Exif Image Height"))
{
height = int.Parse(tag.Description.Replace("pixels", "")
.Trim());
}
if (tag.Name.Equals("Focal Length 35"))
{
focalLength = int.Parse(tag.Description
.Replace("mm", "")
.Trim());
}
}
}
//Console.WriteLine(directory.Name);
if (directory.Name.Equals("Exif Thumbnail"))
{
foreach (var tag in directory.Tags)
{
if (tag.Name.Equals("Thumbnail Offset"))
{
offset = int.Parse(tag.Description.Replace("bytes", "")
.Trim());
}
if (tag.Name.Equals("Thumbnail Length"))
{
length = int.Parse(tag.Description.Replace("bytes", "")
.Trim());
}
}
}
}
}
ms.Seek(offset + 6, SeekOrigin.Begin);
byte[] buffer = new byte[length];
int bytesRead = ms.Read(buffer, 0, length);
// 上传缩略图到MinIO
await _minioService.PutObjectAsync("", data.file.name.ToString(), suoluokey,
new MemoryStream(buffer));
}
}
else if (response.StatusCode == HttpStatusCode.OK)
{
// 服务器不支持Range请求返回完整内容
throw new InvalidOperationException("服务器不支持Range请求");
}
else
{
throw new HttpRequestException($"请求失败: {response.StatusCode}");
}
}
catch (Exception ex)
{
throw new Exception($"执行错误: {ex.Message}", ex);
}
}
var createdTimeStr = (string)data.file.metadata.created_time;
var createTime = string.IsNullOrEmpty(createdTimeStr)
? DateTime.Now
: createdTimeStr.ToDateTime();
_logger.LogDebug("执行到保存媒体文件之前");
var fileUpload = new LasaMediaFile()
{
Id = Guid.NewGuid().ToString(),
FlightId = flightId, // 计划id
TaskId = taskId, // 任务id
DroneModelKey = data.file.ext.drone_model_key, // 无人机型号
PayloadModelKey = data.file.ext.payload_model_key, //这应该可以标明是什么设置
IsOriginal = data.file.ext.is_original,
MediaIndex = data.file.ext.media_index,
AbsoluteAltitude = data.file.metadata.absolute_altitude, // 拍摄绝对高度
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
RelativeAltitude = data.file.metadata.relative_altitude, // 拍摄相对高度
Lat = data.file.metadata.shoot_position.lat,
Lng = data.file.metadata.shoot_position.lng,
Name = data.file.name,
ObjectKey = data.file.object_key,
Path = data.file.path, // 目前这个好像没有值
CreateTime = createTime,
WorkspaceId = workspaceId,
ParentKey = parentKey,
Tid = result.tid,
Bid = result.bid,
FlightType = flightType,
Width = width,
Height = height,
minipic = suoluokey,
Size = fileSize,
ShowOnMap = 1,
display = 1,
FocalLength = focalLength,
PayloadModelName = model,
Type = type,
GimbalPitchDegree = gimbalPitch,
GimbalRollDegree = gimbalRoll,
DigitalZoomRatio = digitalZoomRatio
};
if (executeTask != null)
{
_logger.LogDebug($"任务信息:{JsonConvert.SerializeObject(executeTask)}");
if (!string.IsNullOrEmpty(executeTask.CreateUserName))
{
fileUpload.CreateUserName = executeTask.CreateUserName;
}
if (executeTask.CreateId != null)
{
fileUpload.CreateUserId = executeTask.CreateId;
}
}
// 添加事务
await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync();
}
if (result.need_reply.Equals(1))
{
var fileUploadCallbackEventReply = new FileUploadCallbackEventReply<object>()
{
bid = result.bid,
tid = result.tid,
method = "file_upload_callback",
gateway = sn,
data = new
{
result = 0
},
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
};
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply",
JsonConvert.SerializeObject(fileUploadCallbackEventReply));
}
var expectFileCount = data.flight_task.expected_file_count;
var uploadedFileCount = data.flight_task.uploaded_file_count;
var taskRecord = new LasaTask()
{
Id = taskId,
ExpectedFileCount = expectFileCount, // 期望文件数量
UploadedFileCount = uploadedFileCount // 已上传文件数量
};
// 当expectFileCount 等于uploadedFileCount时则表示航线执行完成
/*
if (uploadedFileCount.Equals(expectFileCount))
{
taskRecord.Status = 5; // 成功状态
}
*/
await _sqlSugarClient.Updateable(taskRecord)
.IgnoreNullColumns().ExecuteCommandAsync();
break;
case "release_terminal_control_area":
//暂不处理
break;
case "flighttask_progress":
{
_logger.LogDebug($"任务进度信息:{message}");
string flightId1 = (string)data.output.ext.flight_id;
code = data.result; // result
var taskAssign1 = _manageApp.GetTaskAssignByFlightId(flightId1);
// 处理航线进度 ,也有可能是失败
if (code != 0)
{
Console.WriteLine($"航线进度错误信息:{DjiErrorMapHelper.ErrorMap[code]} {message}");
// 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
{
method = "flighttask_undo",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
{
flight_ids = new[] { data.output.ext.flight_id }
}
};
await _mqttClientManager.PublishAsync(cancelTaskTopic,
JsonConvert.SerializeObject(cancelTaskRequest));
//更新任务状态及失败原因?
// "773":"低电量返航导致航线中断"
int reasonCode = data.output.ext.break_point.break_reason;
// 添加断点信息
var taskRecord1 = new LasaTask()
{
Id = taskAssign1.TaskId,
Status = 2,
Reason = DjiErrorMapHelper.ErrorMap[code],
BreakPoint = JsonConvert.SerializeObject(data.output.ext.break_point)
};
// 创建一个条件任务怎么样?
await _sqlSugarClient.Updateable(taskRecord1)
.IgnoreNullColumns().ExecuteCommandAsync();
// todo 关于断点原因为773(电量低),处置
var taskHistory = await _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.FlightId == flightId1)
.FirstAsync();
var taskHistoryUpdate = new LasaTaskHistory()
{
Id = taskHistory.Id,
Status = 2, //
Reason =
$"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {DjiErrorMapHelper.ErrorMap[code]}(错误码: {code}"
};
await _sqlSugarClient.Updateable(taskHistoryUpdate)
.IgnoreNullColumns().ExecuteCommandAsync();
}
else
{
var task = _manageApp.GetTaskById(taskAssign1.TaskId);
var step = (int)data.output.progress.current_step;
_logger.LogDebug($"当前步骤:{step} 任务信息:{JsonConvert.SerializeObject(task)}");
// 航线成功
_logger.LogDebug($"航线成功,数据如下::{JsonConvert.SerializeObject(data)}");
var waylineMissionState = (int)data.output.ext.wayline_mission_state;
if (waylineMissionState.Equals(9)) // 航结结束,更新任务状态
{
var isHandle = IsDuplicate(Md5.Encrypt($"{result.bid}{flightId1}"));
if (isHandle)
{
break;
}
// 如果开启了智能巡检
if (!string.IsNullOrEmpty(task.AIInspection) && task.AIInspection.Equals("true") &&
!string.IsNullOrEmpty(task.PushUrl))
{
var lasaDronePort = _manageApp.GetDronePortWithUavInfo(sn);
var cameraIndex = "99-0-0";
if (lasaDronePort.TypeId == "M3TD")
{
cameraIndex = "81-0-0";
}
var para = @$"{{
""bid"": {Guid.NewGuid().ToString()},
""data"": {{
""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0""
}},
""tid"":{Guid.NewGuid().ToString()},
""timestamp:"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""method"": ""live_stop_push""
}}";
var topicRequest = $"thing/product/{sn}/services";
await _mqttClientManager.PublishAsync(topicRequest, para);
var url = _configuration["AIModelApi:Url"];
using var httpClient = new HttpClient();
await httpClient.PostAsync($"{url}/stop_detection", null);
}
var record = new LasaTask()
{
Id = taskAssign1.TaskId,
Status = 5,
CompletedTime = DateTime.Now,
FlyNumber = task.FlyNumber + 1
};
_logger.LogDebug($"要更新的任务信息:{JsonConvert.SerializeObject(record)}");
await _sqlSugarClient.Updateable(record).IgnoreNullColumns().ExecuteCommandAsync();
var taskHistory = await _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.FlightId == flightId1)
.FirstAsync();
var taskHistoryUpdate = new LasaTaskHistory()
{
Id = taskHistory.Id,
Status = 5,
CompletedTime = DateTime.Now,
//ActualExecuteDuration =(((DateTimeOffset)DateTime.Now).ToUnixTimeSeconds() - (DateTimeOffset)taskHistory.ExecuteTime);
};
await _sqlSugarClient.Updateable(taskHistoryUpdate)
.IgnoreNullColumns().ExecuteCommandAsync();
}
if (step.Equals(25))
{
var isHandle = IsDuplicate(Md5.Encrypt($"{result.bid}{flightId1}"));
if (isHandle)
{
break;
}
if (task != null && !string.IsNullOrEmpty(task.AIInspection) &&
task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl))
{
_logger.LogDebug("机场无人机信息查询");
var lasaDronePort = _manageApp.GetDronePortWithUavInfo(sn);
_logger.LogDebug($"无人机信息:{JsonConvert.SerializeObject(lasaDronePort)}");
var cameraIndex = "99-0-0";
if (lasaDronePort.TypeId == "M3TD")
{
// 81-0-0
cameraIndex = "81-0-0";
}
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
var bid = Guid.NewGuid().ToString();
var tid = Guid.NewGuid().ToString();
var param = @$"{{
""bid"": ""{bid}"",
""method"": ""live_start_push"",
""tid"": ""{tid}"",
""timestamp"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
""data"": {{
""url_type"": 1,
""url"": ""{rtmp}{lasaDronePort.UavSn}"",
""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0"",
""video_quality"": 3
}}
}}";
_logger.LogDebug($"直播参数:{param}");
_liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId + "," + lasaDronePort.UavSn;
var topicRequest = $"thing/product/{sn}/services";
// 开启直播
await _mqttClientManager.PublishAsync(topicRequest, param);
}
}
}
break;
}
default:
{
if (!method.Equals("hms"))
{
//Console.WriteLine($"未处理事件events{message}");
}
break;
}
}
break;
case "thing/product/*/services_reply":
switch (method)
{
case "live_start_push":
if (IsDuplicate(Md5.Encrypt(message)))
{
break;
}
// 已验证tid bid 是相同的
// 开启直播成功调用ai model
_logger.LogDebug($"开启直播成功 {message}");
// 关于直播是否开启成功
// 取得taskid 然后从liveInfo中移除
var tempTaskId = _liveInfo[$"{result.tid}{result.bid}"].Split(",");
_logger.LogDebug($"智能巡检TaskId:{tempTaskId[0]}");
_liveInfo.TryRemove($"{result.tid}{result.bid}", out _);
var req = new CallAiModel { TaskId = tempTaskId[0], RtmpUrl = rtmp + tempTaskId[1] };
_logger.LogDebug($"智能巡检调用参数:{JsonConvert.SerializeObject(req)}");
_ = _manageApp.CallAiModel(req);
break;
case "live_stop_push":
_logger.LogDebug($"停止直播成功 {message}");
break;
case "flighttask_prepare": // 下发任务响应
// 顺序处理,多余的不再处理
lock (_locker)
{
// 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
//Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
{
if (taskAssign == null)
{
Console.WriteLine("已跳过prepare处理");
return; // 不存在不操作
}
var flightId = taskAssign.FlightId;
var request = new TopicServicesRequest<object>();
dynamic data1 = new ExpandoObject();
data1.flight_id = flightId;
// todo 检查设备是否在线
request.SetMethod("flighttask_execute")
.SetTid(result.tid)
.SetBid(result.bid)
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1);
// 任务执行
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request));
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Status = 1
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
.ExecuteCommand();
/* 冗余逻辑去除
var taskRecord = new LasaTask()
{
Id = taskAssign.TaskId,
FlightId = flightId,
};
_sqlSugarClient.Updateable(taskRecord).IgnoreNullColumns()
.ExecuteCommand();
// todo 锁定这个机场 ,不再执行其它任务*/
}
else
{
// 错误处理
var errorMsg = DjiErrorMapHelper.ErrorMap[code];
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}",
Status = 2
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
var taskUpdate = new LasaTask
{
Id = taskAssign.TaskId,
Status = 2, // todo 状态待定
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}"
};
_sqlSugarClient.Updateable(taskUpdate).IgnoreNullColumns().ExecuteCommand();
var taskHistory1 = _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.TaskId == taskAssign.FlightId)
.First();
var taskHistoryUpdate1 = new LasaTaskHistory()
{
Id = taskHistory1.Id,
Status = 2,
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}"
};
_sqlSugarClient.Updateable(taskHistoryUpdate1).IgnoreNullColumns().ExecuteCommand();
}
}
break;
case "flighttask_execute": // 执行任务响应
code = data.result;
var taskAssignExecute = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, 1);
var taskRecordExecute = new LasaTask()
{
Id = taskAssignExecute.TaskId,
};
var taskHistory = await _sqlSugarClient
.Queryable<LasaTaskHistory>()
.Where(x => x.TaskId == taskAssignExecute.FlightId)
.FirstAsync();
var taskHistoryUpdate = new LasaTaskHistory()
{
Id = taskHistory.Id
};
var flyTask = await _sqlSugarClient.Queryable<LasaTask>()
.Where(x => x.Id == taskAssignExecute.TaskId)
.FirstAsync();
if (code != 0)
{
var errorMsg = DjiErrorMapHelper.ErrorMap[code];
//Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}");
// 任务执行失败
// 和航线进度方法中返回的错误有没有区别
taskRecordExecute.Status = 2;
taskRecordExecute.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}";
taskHistoryUpdate.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code}";
taskHistoryUpdate.Status = 2;
}
else
{
// 任务开始执行
taskRecordExecute.Status = 1; // 任务执行中
//taskRecordExecute.FlyNumber = flyTask.FlyNumber + 1;
taskHistoryUpdate.Status = 1;
Console.WriteLine($"任务执行响应 {code} {message}");
}
// if 查看是否是部级飞行任务
// 先得到部级任务id , 然后查看有哪些任务 ,是否全部完成
var detail = await _sqlSugarClient.Queryable<DroneDocktaskdetail>()
.Where(x => x.flighttaskid == taskAssignExecute.FlightId)
.FirstAsync();
if (detail != null)
{
var droneTask = new DroneDocktask()
{
id = detail.taskid, // 这里的id是部级任务表的id
state = 1
};
await _sqlSugarClient.Updateable(droneTask).IgnoreNullColumns().ExecuteCommandAsync();
}
await _sqlSugarClient.Updateable(taskHistoryUpdate).IgnoreNullColumns().ExecuteCommandAsync();
await _sqlSugarClient.Updateable(taskRecordExecute).IgnoreNullColumns().ExecuteCommandAsync();
break;
}
break;
case "thing/product/*/osd":
//Console.WriteLine($"osd消息: {message}");
break;
default:
//Console.WriteLine($"未进入主题处理");
break;
}
}
public DateTime GetDateTimeFromSeconds(long miliseconds)
{
var seconds = miliseconds / 1000; // 去除末尾三位
return DateTimeOffset.FromUnixTimeSeconds(seconds).DateTime;
}
public bool IsDuplicate(string messageId)
{
var now = DateTime.UtcNow;
// 清理过期消息
foreach (var kvp in _processedMessages)
{
if (now - kvp.Value > _deduplicationWindow)
{
_processedMessages.TryRemove(kvp.Key, out _);
}
}
// 检查是否已存在
if (_processedMessages.ContainsKey(messageId))
{
return true;
}
_processedMessages[messageId] = now;
return false;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Subscribe();
}
public Task StopAsync(CancellationToken cancellationToken)
{
// todo 取消订阅
throw new NotImplementedException();
}
}