930 lines
49 KiB
C#
930 lines
49 KiB
C#
using System.Collections.Concurrent;
|
||
using System.Dynamic;
|
||
using System.Net;
|
||
using System.Net.Http.Headers;
|
||
using System.Text;
|
||
using Infrastructure.Cache;
|
||
using Infrastructure.CloudSdk;
|
||
using Infrastructure.CloudSdk.minio;
|
||
using Infrastructure.CloudSdk.wayline;
|
||
using Infrastructure.Extensions;
|
||
using Infrastructure.Helpers;
|
||
using MetadataExtractor;
|
||
using MetadataExtractor.Formats.Exif;
|
||
using MetadataExtractor.Formats.Xmp;
|
||
using Microsoft.Extensions.Hosting;
|
||
using Microsoft.Extensions.Logging;
|
||
using MQTTnet.Client;
|
||
using Newtonsoft.Json;
|
||
using OpenAuth.App.ServiceApp;
|
||
using OpenAuth.App.ServiceApp.FlyTask.Request;
|
||
using OpenAuth.App.ServiceApp.Subscribe;
|
||
using OpenAuth.Repository.Domain;
|
||
using OpenAuth.WebApi;
|
||
using SqlSugar;
|
||
|
||
namespace OpenAuth.App.BaseApp.Subscribe;
|
||
|
||
public class ConfigSubscribe : IHostedService
|
||
{
|
||
private readonly MqttClientManager _mqttClientManager;
|
||
private readonly ISqlSugarClient _sqlSugarClient;
|
||
private readonly RedisCacheContext _redisCacheContext;
|
||
private readonly ManageApp _manageApp;
|
||
private readonly MinioService _minioService;
|
||
private object _locker = new();
|
||
private object _dockUploadFileLocker = new();
|
||
private readonly ILogger<ConfigSubscribe> _logger;
|
||
private readonly ConcurrentDictionary<string, DateTime> _processedMessages = new();
|
||
private readonly ConcurrentDictionary<string, string> _liveInfo = new();
|
||
private readonly TimeSpan _deduplicationWindow = TimeSpan.FromMinutes(1);
|
||
|
||
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
|
||
ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService,
|
||
ILogger<ConfigSubscribe> logger)
|
||
{
|
||
_mqttClientManager = mqttClientManager;
|
||
_sqlSugarClient = sqlSugarClient;
|
||
_redisCacheContext = redisCacheContext as RedisCacheContext;
|
||
_manageApp = manageApp;
|
||
_minioService = minioService;
|
||
_logger = logger;
|
||
}
|
||
|
||
|
||
private async Task Subscribe()
|
||
{
|
||
// 或者 thing/product/#
|
||
// sys/product/#
|
||
string[] topicList =
|
||
{
|
||
"thing/product/+/services_reply",
|
||
"thing/product/+/events",
|
||
"thing/product/+/requests",
|
||
"thing/aircraft/+/service_reply" //大飞机推流消息
|
||
//"thing/product/+/osd",
|
||
//"thing/product/+/status"
|
||
};
|
||
_logger.LogInformation("开启监听");
|
||
await _mqttClientManager
|
||
.SubscribeAsync(topicList,
|
||
async (args) =>
|
||
{
|
||
//args.IsHandled = false; // todo 待实验确定
|
||
//args.AutoAcknowledge = true; // todo 待实验确定
|
||
await HandleTopic(args, args.ApplicationMessage.Topic,
|
||
Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
|
||
});
|
||
}
|
||
|
||
|
||
public async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
|
||
string message)
|
||
{
|
||
// 序列号提取
|
||
var sn = topic.Split("/")[2];
|
||
var tempStr = topic.Replace(sn, "*");
|
||
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
|
||
// 主题方法
|
||
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
|
||
var method = result.method;
|
||
var data = result.data;
|
||
//_logger.LogInformation($"主题:{topic}\n消息:{message}");
|
||
long code = 0;
|
||
// rtmp://175.27.168.120:6019/live/
|
||
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
|
||
|
||
// 机场推流地址地址
|
||
var config = ConfigHelper.GetConfigRoot();
|
||
var rtmp = config["AIModelApi:DronePortRtmp"];
|
||
//var rtmp = "rtmp://175.27.168.120:6019/live/";
|
||
switch (tempStr)
|
||
{
|
||
// 目前主要处理了获取航线文件及临时凭证上传
|
||
case "thing/product/*/requests":
|
||
switch (method)
|
||
{
|
||
// 临时凭证上传
|
||
case "storage_config_get":
|
||
_logger.LogInformation($"进入临时凭证获取处理");
|
||
// 配置中读取minio配置
|
||
var storageConfigRequest = new TopicServicesRequest<object>()
|
||
{
|
||
method = "storage_config_get",
|
||
tid = result.tid,
|
||
bid = result.bid,
|
||
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
|
||
data = new
|
||
{
|
||
result = 0,
|
||
output = new
|
||
{
|
||
bucket = _minioService._bucketName,
|
||
endpoint = $"http://{_minioService.endPoint}",
|
||
object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id ?
|
||
provider = "minio",
|
||
region = "linyi",
|
||
credentials = new
|
||
{
|
||
access_key_id = $"{_minioService.AccessKey}",
|
||
access_key_secret = $"{_minioService.SecretKey}",
|
||
expire = 3600,
|
||
security_token = ""
|
||
}
|
||
}
|
||
}
|
||
};
|
||
var tempTopic = $"thing/product/{sn}/requests_reply";
|
||
await _mqttClientManager.PublishAsync(tempTopic,
|
||
JsonConvert.SerializeObject(storageConfigRequest));
|
||
break;
|
||
case "flight_areas_get":
|
||
//Console.WriteLine("跳过自定义飞行区文件获取");
|
||
break;
|
||
// 获取航线
|
||
case "flighttask_resource_get":
|
||
string flightId = data.flight_id + "";
|
||
_logger.LogInformation($"进入资源获取处理: 任务id {flightId}");
|
||
// eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight
|
||
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
|
||
// md5 585c833012ddb794eaac1050ef71aa31
|
||
// 这一小段运行异常
|
||
var taskAssign = await _sqlSugarClient
|
||
.Queryable<LasaTaskAssign>()
|
||
.Where(x => x.FlightId == flightId)
|
||
.SingleAsync();
|
||
_logger.LogInformation($"任务信息:{JsonConvert.SerializeObject(taskAssign)}");
|
||
/*var taskAssign =
|
||
manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, flightId);*/
|
||
var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
|
||
dynamic outData = new ExpandoObject();
|
||
outData.result = 0;
|
||
outData.output = new
|
||
{
|
||
file = new
|
||
{
|
||
fingerprint = taskAssign.Md5,
|
||
url = taskAssign.Wpml
|
||
}
|
||
};
|
||
var outRequest = new TopicServicesRequest<object>()
|
||
{
|
||
bid = result.bid,
|
||
data = outData,
|
||
tid = result.tid,
|
||
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
|
||
method = "flighttask_resource_get"
|
||
};
|
||
|
||
/*Console.WriteLine(
|
||
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");*/
|
||
await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
|
||
JsonConvert.SerializeObject(outRequest));
|
||
break;
|
||
case "config":
|
||
break;
|
||
default:
|
||
//_logger.LogInformation($"未处理消息:{message}");
|
||
break;
|
||
}
|
||
|
||
break;
|
||
// 主要处理了文件回传及航线进度
|
||
case "thing/product/*/events":
|
||
switch (method)
|
||
{
|
||
case "file_upload_callback":
|
||
// 文件上传
|
||
_logger.LogDebug("进入文件上传处理");
|
||
_logger.LogDebug($"文件上传处理:{message}");
|
||
//飞行任务 0 手飞任务 1
|
||
// todo 如果是手飞任务,生成任务 关于任务类型?
|
||
int flightType = data.flight_task.flight_type;
|
||
string flightId = data.file.ext.flight_id;
|
||
// 关于flightId 没有值的问题怎么办???
|
||
var taskAssign = _manageApp.GetTaskAssignByFlightId(flightId);
|
||
var taskId = "";
|
||
var taskName = "";
|
||
var workspaceId = "";
|
||
LasaTask executeTask = null;
|
||
if (taskAssign != null)
|
||
{
|
||
taskId = taskAssign.TaskId;
|
||
executeTask = await _sqlSugarClient
|
||
.Queryable<LasaTask>()
|
||
.SingleAsync(a => a.Id == taskId);
|
||
if (!string.IsNullOrEmpty(executeTask.TaskName))
|
||
{
|
||
taskName = executeTask.TaskName;
|
||
}
|
||
|
||
if (!string.IsNullOrEmpty(executeTask.WorkspaceId))
|
||
{
|
||
workspaceId = executeTask.WorkspaceId;
|
||
}
|
||
}
|
||
else
|
||
{
|
||
taskName = "手飞任务";
|
||
}
|
||
|
||
string objectKey = data.file.object_key;
|
||
var folderKey = ((string)data.file.object_key).Split("/");
|
||
var parentKey = folderKey[2];
|
||
if (flightType.Equals(1))
|
||
{
|
||
parentKey = flightId;
|
||
}
|
||
|
||
var isExist = await _sqlSugarClient
|
||
.Queryable<LasaMediaFile>()
|
||
.Where(x => x.Id.Equals(parentKey)).CountAsync();
|
||
if (isExist == 0) //
|
||
{
|
||
var date = DateTime.Now;
|
||
var timeStr = date.ToString("yyyy-MM-dd HH:mm:ss");
|
||
var parent1 = new LasaMediaFile()
|
||
{
|
||
Id = parentKey,
|
||
FlightId = flightId,
|
||
TaskId = taskId,
|
||
ParentKey = "0",
|
||
Name = $"{taskName} {timeStr}",
|
||
WorkspaceId = workspaceId,
|
||
CreateTime = date,
|
||
};
|
||
await _sqlSugarClient.Insertable(parent1).ExecuteCommandAsync();
|
||
}
|
||
|
||
// 重复检测
|
||
var mediaFile = await _sqlSugarClient
|
||
.Queryable<LasaMediaFile>()
|
||
.Where(a => a.FlightId.Equals(flightId))
|
||
.Where(a => a.ObjectKey.Equals(objectKey)).SingleAsync();
|
||
if (mediaFile == null)
|
||
{
|
||
var type = 0;
|
||
var preSize = 1;
|
||
// 判断是不是图片
|
||
if (objectKey.EndsWith(".jpeg")) // todo 是否有其它类型的图片,待确定
|
||
{
|
||
preSize = 65535;
|
||
var fileName = Path.GetFileNameWithoutExtension(objectKey);
|
||
var fileNameParts = fileName.Split("_");
|
||
var suffix = fileNameParts[^1];
|
||
type = suffix switch
|
||
{
|
||
// 0 未知 1 可见光 2 红外 3 变焦 4.广角 5 视频
|
||
"V" => 1,
|
||
"T" => 2,
|
||
"Z" => 3, // 变焦
|
||
"W" => 4, // 广角
|
||
_ => type
|
||
};
|
||
}
|
||
else if (objectKey.EndsWith(".mp4"))
|
||
{
|
||
type = 5;
|
||
}
|
||
|
||
string suoluokey = "";
|
||
long? fileSize = 0;
|
||
int width = 0, height = 0, focalLength = 0;
|
||
int offset = 0, length = 0;
|
||
string model = "";
|
||
float? gimbalRoll = 0, gimbalPitch = 0;
|
||
float? digitalZoomRatio = 1;
|
||
var fileUrl = "http://" + _minioService.endPoint + "/" + _minioService._bucketName +
|
||
"/" + objectKey;
|
||
using (var httpClient = new HttpClient())
|
||
{
|
||
suoluokey = "minipic/" + data.file.name.ToString();
|
||
// 目前读取64KB
|
||
// 添加Range请求头
|
||
httpClient.DefaultRequestHeaders.Range =
|
||
new RangeHeaderValue(0, preSize);
|
||
try
|
||
{
|
||
var response = httpClient
|
||
.GetAsync(fileUrl, HttpCompletionOption.ResponseHeadersRead).Result;
|
||
if (response.StatusCode == HttpStatusCode.PartialContent)
|
||
{
|
||
var contentRange = response.Content.Headers.ContentRange;
|
||
if (contentRange != null)
|
||
{
|
||
fileSize = contentRange.Length.Value;
|
||
}
|
||
|
||
if (objectKey.ToLower().EndsWith("jpeg"))
|
||
{
|
||
// 成功获取部分内容
|
||
var y = response.Content.ReadAsByteArrayAsync().Result;
|
||
var ms = new MemoryStream(y);
|
||
var directories = ImageMetadataReader.ReadMetadata(ms);
|
||
var xmpDirectory = directories.OfType<XmpDirectory>().FirstOrDefault();
|
||
if (xmpDirectory != null)
|
||
{
|
||
var xmpXml = xmpDirectory.GetXmpProperties();
|
||
foreach (var keyValuePair in xmpXml)
|
||
{
|
||
switch (keyValuePair.Key)
|
||
{
|
||
case "drone-dji:GimbalPitchDegree":
|
||
gimbalPitch = float.Parse(keyValuePair.Value);
|
||
break;
|
||
case "drone-dji:GimbalRollDegree":
|
||
gimbalRoll = float.Parse(keyValuePair.Value);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
foreach (var directory in directories)
|
||
{
|
||
if (directory is ExifDirectoryBase)
|
||
{
|
||
if (directory.Name.Equals("Exif IFD0"))
|
||
{
|
||
foreach (var tag in directory.Tags)
|
||
{
|
||
if (tag.Name.Equals("Model"))
|
||
{
|
||
model = tag.Description;
|
||
}
|
||
}
|
||
}
|
||
|
||
if (directory.Name.Equals("Exif SubIFD"))
|
||
{
|
||
// Digital Zoom Ratio: 1 Exif SubIFD
|
||
foreach (var tag in directory.Tags)
|
||
{
|
||
if (tag.Name.Equals("Digital Zoom Ratio"))
|
||
{
|
||
digitalZoomRatio = float.Parse(tag.Description);
|
||
}
|
||
|
||
if (tag.Name.Equals("Exif Image Width"))
|
||
{
|
||
width = int.Parse(tag.Description.Replace("pixels", "")
|
||
.Trim());
|
||
}
|
||
|
||
if (tag.Name.Equals("Exif Image Height"))
|
||
{
|
||
height = int.Parse(tag.Description.Replace("pixels", "")
|
||
.Trim());
|
||
}
|
||
|
||
if (tag.Name.Equals("Focal Length 35"))
|
||
{
|
||
focalLength = int.Parse(tag.Description
|
||
.Replace("mm", "")
|
||
.Trim());
|
||
}
|
||
}
|
||
}
|
||
|
||
//Console.WriteLine(directory.Name);
|
||
if (directory.Name.Equals("Exif Thumbnail"))
|
||
{
|
||
foreach (var tag in directory.Tags)
|
||
{
|
||
if (tag.Name.Equals("Thumbnail Offset"))
|
||
{
|
||
offset = int.Parse(tag.Description.Replace("bytes", "")
|
||
.Trim());
|
||
}
|
||
|
||
if (tag.Name.Equals("Thumbnail Length"))
|
||
{
|
||
length = int.Parse(tag.Description.Replace("bytes", "")
|
||
.Trim());
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
ms.Seek(offset + 6, SeekOrigin.Begin);
|
||
byte[] buffer = new byte[length];
|
||
int bytesRead = ms.Read(buffer, 0, length);
|
||
// 上传缩略图到MinIO
|
||
await _minioService.PutObjectAsync("", data.file.name.ToString(), suoluokey,
|
||
new MemoryStream(buffer));
|
||
}
|
||
}
|
||
else if (response.StatusCode == HttpStatusCode.OK)
|
||
{
|
||
// 服务器不支持Range请求,返回完整内容
|
||
throw new InvalidOperationException("服务器不支持Range请求");
|
||
}
|
||
else
|
||
{
|
||
throw new HttpRequestException($"请求失败: {response.StatusCode}");
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
throw new Exception($"执行错误: {ex.Message}", ex);
|
||
}
|
||
}
|
||
|
||
var createdTimeStr = (string)data.file.metadata.created_time;
|
||
var createTime = string.IsNullOrEmpty(createdTimeStr)
|
||
? DateTime.Now
|
||
: createdTimeStr.ToDateTime();
|
||
_logger.LogDebug("执行到保存媒体文件之前");
|
||
var fileUpload = new LasaMediaFile()
|
||
{
|
||
Id = Guid.NewGuid().ToString(),
|
||
FlightId = flightId, // 计划id
|
||
TaskId = taskId, // 任务id
|
||
DroneModelKey = data.file.ext.drone_model_key, // 无人机型号
|
||
PayloadModelKey = data.file.ext.payload_model_key, //这应该可以标明是什么设置
|
||
IsOriginal = data.file.ext.is_original,
|
||
MediaIndex = data.file.ext.media_index,
|
||
AbsoluteAltitude = data.file.metadata.absolute_altitude, // 拍摄绝对高度
|
||
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
|
||
RelativeAltitude = data.file.metadata.relative_altitude, // 拍摄相对高度
|
||
Lat = data.file.metadata.shoot_position.lat,
|
||
Lng = data.file.metadata.shoot_position.lng,
|
||
Name = data.file.name,
|
||
ObjectKey = data.file.object_key,
|
||
Path = data.file.path, // 目前这个好像没有值
|
||
CreateTime = createTime,
|
||
WorkspaceId = workspaceId,
|
||
ParentKey = parentKey,
|
||
Tid = result.tid,
|
||
Bid = result.bid,
|
||
FlightType = flightType,
|
||
Width = width,
|
||
Height = height,
|
||
minipic = suoluokey,
|
||
Size = fileSize,
|
||
ShowOnMap = 1,
|
||
display = 1,
|
||
FocalLength = focalLength,
|
||
PayloadModelName = model,
|
||
Type = type,
|
||
GimbalPitchDegree = gimbalPitch,
|
||
GimbalRollDegree = gimbalRoll,
|
||
DigitalZoomRatio = digitalZoomRatio
|
||
};
|
||
if (executeTask != null)
|
||
{
|
||
_logger.LogDebug($"任务信息:{JsonConvert.SerializeObject(executeTask)}");
|
||
if (!string.IsNullOrEmpty(executeTask.CreateUserName))
|
||
{
|
||
fileUpload.CreateUserName = executeTask.CreateUserName;
|
||
}
|
||
|
||
if (executeTask.CreateId != null)
|
||
{
|
||
fileUpload.CreateUserId = executeTask.CreateId;
|
||
}
|
||
}
|
||
|
||
// 添加事务
|
||
await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync();
|
||
}
|
||
|
||
if (result.need_reply.Equals(1))
|
||
{
|
||
var fileUploadCallbackEventReply = new FileUploadCallbackEventReply<object>()
|
||
{
|
||
bid = result.bid,
|
||
tid = result.tid,
|
||
method = "file_upload_callback",
|
||
gateway = sn,
|
||
data = new
|
||
{
|
||
result = 0
|
||
},
|
||
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
|
||
};
|
||
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply",
|
||
JsonConvert.SerializeObject(fileUploadCallbackEventReply));
|
||
}
|
||
|
||
var expectFileCount = data.flight_task.expected_file_count;
|
||
var uploadedFileCount = data.flight_task.uploaded_file_count;
|
||
|
||
var taskRecord = new LasaTask()
|
||
{
|
||
Id = taskId,
|
||
ExpectedFileCount = expectFileCount, // 期望文件数量
|
||
UploadedFileCount = uploadedFileCount // 已上传文件数量
|
||
};
|
||
// 当expectFileCount 等于uploadedFileCount时,则表示航线执行完成
|
||
/*
|
||
if (uploadedFileCount.Equals(expectFileCount))
|
||
{
|
||
taskRecord.Status = 5; // 成功状态
|
||
}
|
||
*/
|
||
|
||
await _sqlSugarClient.Updateable(taskRecord)
|
||
.IgnoreNullColumns().ExecuteCommandAsync();
|
||
break;
|
||
case "release_terminal_control_area":
|
||
//暂不处理
|
||
break;
|
||
case "flighttask_progress":
|
||
{
|
||
_logger.LogDebug($"任务进度信息:{message}");
|
||
string flightId1 = (string)data.output.ext.flight_id;
|
||
code = data.result; // result
|
||
var taskAssign1 = _manageApp.GetTaskAssignByFlightId(flightId1);
|
||
// 处理航线进度 ,也有可能是失败
|
||
if (code != 0)
|
||
{
|
||
Console.WriteLine($"航线进度错误信息:{DjiErrorMapHelper.ErrorMap[code]} {message}");
|
||
// 取消任务
|
||
var cancelTaskTopic = $"thing/product/{sn}/services";
|
||
var cancelTaskRequest = new TopicServicesRequest<object>()
|
||
{
|
||
method = "flighttask_undo",
|
||
tid = result.tid,
|
||
bid = result.bid,
|
||
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
|
||
data = new
|
||
{
|
||
flight_ids = new[] { data.output.ext.flight_id }
|
||
}
|
||
};
|
||
await _mqttClientManager.PublishAsync(cancelTaskTopic,
|
||
JsonConvert.SerializeObject(cancelTaskRequest));
|
||
//更新任务状态及失败原因?
|
||
|
||
// "773":"低电量返航导致航线中断"
|
||
int reasonCode = data.output.ext.break_point.break_reason;
|
||
// 添加断点信息
|
||
var taskRecord1 = new LasaTask()
|
||
{
|
||
Id = taskAssign1.TaskId,
|
||
Status = 2,
|
||
Reason = DjiErrorMapHelper.ErrorMap[code],
|
||
BreakPoint = JsonConvert.SerializeObject(data.output.ext.break_point)
|
||
};
|
||
// 创建一个条件任务怎么样?
|
||
await _sqlSugarClient.Updateable(taskRecord1)
|
||
.IgnoreNullColumns().ExecuteCommandAsync();
|
||
// todo 关于断点原因为773(电量低),处置
|
||
var taskHistory = await _sqlSugarClient
|
||
.Queryable<LasaTaskHistory>()
|
||
.Where(x => x.FlightId == flightId1)
|
||
.FirstAsync();
|
||
var taskHistoryUpdate = new LasaTaskHistory()
|
||
{
|
||
Id = taskHistory.Id,
|
||
Status = 2, //
|
||
Reason =
|
||
$"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {DjiErrorMapHelper.ErrorMap[code]}(错误码: {code})"
|
||
};
|
||
await _sqlSugarClient.Updateable(taskHistoryUpdate)
|
||
.IgnoreNullColumns().ExecuteCommandAsync();
|
||
}
|
||
else
|
||
{
|
||
var task = _manageApp.GetTaskById(taskAssign1.TaskId);
|
||
var step = (int)data.output.progress.current_step;
|
||
|
||
_logger.LogDebug($"当前步骤:{step} 任务信息:{JsonConvert.SerializeObject(task)}");
|
||
// 航线成功
|
||
_logger.LogDebug($"航线成功,数据如下::{JsonConvert.SerializeObject(data)}");
|
||
var waylineMissionState = (int)data.output.ext.wayline_mission_state;
|
||
if (waylineMissionState.Equals(9)) // 航结结束,更新任务状态
|
||
{
|
||
var isHandle = IsDuplicate(Md5.Encrypt($"{result.bid}{flightId1}"));
|
||
if (isHandle)
|
||
{
|
||
break;
|
||
}
|
||
|
||
// 如果开启了智能巡检
|
||
if (!string.IsNullOrEmpty(task.AIInspection) && task.AIInspection.Equals("true") &&
|
||
!string.IsNullOrEmpty(task.PushUrl))
|
||
{
|
||
var lasaDronePort = _manageApp.GetDronePortWithUavInfo(sn);
|
||
var cameraIndex = "99-0-0";
|
||
if (lasaDronePort.TypeId == "M3TD")
|
||
{
|
||
cameraIndex = "81-0-0";
|
||
}
|
||
|
||
var para = @$"{{
|
||
""bid"": {Guid.NewGuid().ToString()},
|
||
""data"": {{
|
||
""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0""
|
||
}},
|
||
""tid"":{Guid.NewGuid().ToString()},
|
||
""timestamp:"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
|
||
""method"": ""live_stop_push""
|
||
}}";
|
||
var topicRequest = $"thing/product/{sn}/services";
|
||
await _mqttClientManager.PublishAsync(topicRequest, para);
|
||
var url = config["AIModelApi:Url"];
|
||
using var httpClient = new HttpClient();
|
||
await httpClient.PostAsync($"{url}/stop_detection", null);
|
||
}
|
||
|
||
var record = new LasaTask()
|
||
{
|
||
Id = taskAssign1.TaskId,
|
||
Status = 5,
|
||
CompletedTime = DateTime.Now,
|
||
FlyNumber = task.FlyNumber + 1
|
||
};
|
||
_logger.LogDebug($"要更新的任务信息:{JsonConvert.SerializeObject(record)}");
|
||
await _sqlSugarClient.Updateable(record).IgnoreNullColumns().ExecuteCommandAsync();
|
||
var taskHistory = await _sqlSugarClient
|
||
.Queryable<LasaTaskHistory>()
|
||
.Where(x => x.FlightId == flightId1)
|
||
.FirstAsync();
|
||
var taskHistoryUpdate = new LasaTaskHistory()
|
||
{
|
||
Id = taskHistory.Id,
|
||
Status = 5,
|
||
CompletedTime = DateTime.Now,
|
||
//ActualExecuteDuration =(((DateTimeOffset)DateTime.Now).ToUnixTimeSeconds() - (DateTimeOffset)taskHistory.ExecuteTime);
|
||
};
|
||
await _sqlSugarClient.Updateable(taskHistoryUpdate)
|
||
.IgnoreNullColumns().ExecuteCommandAsync();
|
||
}
|
||
|
||
if (step.Equals(25))
|
||
{
|
||
var isHandle = IsDuplicate(Md5.Encrypt($"{result.bid}{flightId1}"));
|
||
if (isHandle)
|
||
{
|
||
break;
|
||
}
|
||
|
||
if (task != null && !string.IsNullOrEmpty(task.AIInspection) &&
|
||
task.AIInspection.Equals("true") && string.IsNullOrEmpty(task.PushUrl))
|
||
{
|
||
_logger.LogDebug("机场无人机信息查询");
|
||
var lasaDronePort = _manageApp.GetDronePortWithUavInfo(sn);
|
||
_logger.LogDebug($"无人机信息:{JsonConvert.SerializeObject(lasaDronePort)}");
|
||
var cameraIndex = "99-0-0";
|
||
if (lasaDronePort.TypeId == "M3TD")
|
||
{
|
||
// 81-0-0
|
||
cameraIndex = "81-0-0";
|
||
}
|
||
|
||
//var rtmp = "rtmp://box.wisestcity.com:1935/live/7";
|
||
var bid = Guid.NewGuid().ToString();
|
||
var tid = Guid.NewGuid().ToString();
|
||
var param = @$"{{
|
||
""bid"": ""{bid}"",
|
||
""method"": ""live_start_push"",
|
||
""tid"": ""{tid}"",
|
||
""timestamp"": {DateTimeOffset.Now.ToUnixTimeMilliseconds()},
|
||
""data"": {{
|
||
""url_type"": 1,
|
||
""url"": ""{rtmp}{lasaDronePort.UavSn}"",
|
||
""video_id"": ""{lasaDronePort.UavSn}/{cameraIndex}/normal-0"",
|
||
""video_quality"": 3
|
||
}}
|
||
}}";
|
||
_logger.LogDebug($"直播参数:{param}");
|
||
_liveInfo[$"{tid}{bid}"] = taskAssign1.TaskId + "," + lasaDronePort.UavSn;
|
||
var topicRequest = $"thing/product/{sn}/services";
|
||
// 开启直播
|
||
await _mqttClientManager.PublishAsync(topicRequest, param);
|
||
}
|
||
}
|
||
}
|
||
|
||
break;
|
||
}
|
||
default:
|
||
{
|
||
if (!method.Equals("hms"))
|
||
{
|
||
//Console.WriteLine($"未处理事件events:{message}");
|
||
}
|
||
|
||
break;
|
||
}
|
||
}
|
||
|
||
break;
|
||
case "thing/product/*/services_reply":
|
||
switch (method)
|
||
{
|
||
case "live_start_push":
|
||
if (IsDuplicate(Md5.Encrypt(message)))
|
||
{
|
||
break;
|
||
}
|
||
|
||
// 已验证tid bid 是相同的
|
||
// 开启直播成功调用ai model
|
||
_logger.LogDebug($"开启直播成功 {message}");
|
||
// 关于直播是否开启成功
|
||
// 取得taskid 然后从liveInfo中移除
|
||
var tempTaskId = _liveInfo[$"{result.tid}{result.bid}"].Split(",");
|
||
_logger.LogDebug($"智能巡检TaskId:{tempTaskId[0]}");
|
||
_liveInfo.TryRemove($"{result.tid}{result.bid}", out _);
|
||
var req = new CallAiModel { TaskId = tempTaskId[0], RtmpUrl = rtmp + tempTaskId[1] };
|
||
_logger.LogDebug($"智能巡检调用参数:{JsonConvert.SerializeObject(req)}");
|
||
_ = _manageApp.CallAiModel(req);
|
||
break;
|
||
case "live_stop_push":
|
||
_logger.LogDebug($"停止直播成功 {message}");
|
||
break;
|
||
case "flighttask_prepare": // 下发任务响应
|
||
// 顺序处理,多余的不再处理
|
||
lock (_locker)
|
||
{
|
||
// 报错处理
|
||
Console.WriteLine("进入prepare订阅消息");
|
||
code = data.result;
|
||
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
|
||
//Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
|
||
if (code == 0)
|
||
{
|
||
if (taskAssign == null)
|
||
{
|
||
Console.WriteLine("已跳过prepare处理");
|
||
return; // 不存在不操作
|
||
}
|
||
|
||
var flightId = taskAssign.FlightId;
|
||
var request = new TopicServicesRequest<object>();
|
||
dynamic data1 = new ExpandoObject();
|
||
data1.flight_id = flightId;
|
||
// todo 检查设备是否在线
|
||
request.SetMethod("flighttask_execute")
|
||
.SetTid(result.tid)
|
||
.SetBid(result.bid)
|
||
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
|
||
.SetData(data1);
|
||
// 任务执行
|
||
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services",
|
||
JsonConvert.SerializeObject(request));
|
||
var taskAssignRecord = new LasaTaskAssign()
|
||
{
|
||
Id = taskAssign.Id,
|
||
Status = 1
|
||
};
|
||
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
|
||
.ExecuteCommand();
|
||
/* 冗余逻辑去除
|
||
var taskRecord = new LasaTask()
|
||
{
|
||
Id = taskAssign.TaskId,
|
||
FlightId = flightId,
|
||
};
|
||
_sqlSugarClient.Updateable(taskRecord).IgnoreNullColumns()
|
||
.ExecuteCommand();
|
||
// todo 锁定这个机场 ,不再执行其它任务*/
|
||
}
|
||
else
|
||
{
|
||
// 错误处理
|
||
var errorMsg = DjiErrorMapHelper.ErrorMap[code];
|
||
var taskAssignRecord = new LasaTaskAssign()
|
||
{
|
||
Id = taskAssign.Id,
|
||
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})",
|
||
Status = 2
|
||
};
|
||
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
|
||
var taskUpdate = new LasaTask
|
||
{
|
||
Id = taskAssign.TaskId,
|
||
Status = 2, // todo 状态待定
|
||
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})"
|
||
};
|
||
_sqlSugarClient.Updateable(taskUpdate).IgnoreNullColumns().ExecuteCommand();
|
||
|
||
var taskHistory1 = _sqlSugarClient
|
||
.Queryable<LasaTaskHistory>()
|
||
.Where(x => x.TaskId == taskAssign.FlightId)
|
||
.First();
|
||
var taskHistoryUpdate1 = new LasaTaskHistory()
|
||
{
|
||
Id = taskHistory1.Id,
|
||
Status = 2,
|
||
Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})"
|
||
};
|
||
_sqlSugarClient.Updateable(taskHistoryUpdate1).IgnoreNullColumns().ExecuteCommand();
|
||
}
|
||
}
|
||
|
||
break;
|
||
case "flighttask_execute": // 执行任务响应
|
||
code = data.result;
|
||
var taskAssignExecute = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, 1);
|
||
var taskRecordExecute = new LasaTask()
|
||
{
|
||
Id = taskAssignExecute.TaskId,
|
||
};
|
||
|
||
var taskHistory = await _sqlSugarClient
|
||
.Queryable<LasaTaskHistory>()
|
||
.Where(x => x.TaskId == taskAssignExecute.FlightId)
|
||
.FirstAsync();
|
||
var taskHistoryUpdate = new LasaTaskHistory()
|
||
{
|
||
Id = taskHistory.Id
|
||
};
|
||
|
||
|
||
var flyTask = await _sqlSugarClient.Queryable<LasaTask>()
|
||
.Where(x => x.Id == taskAssignExecute.TaskId)
|
||
.FirstAsync();
|
||
if (code != 0)
|
||
{
|
||
var errorMsg = DjiErrorMapHelper.ErrorMap[code];
|
||
//Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}");
|
||
// 任务执行失败
|
||
// 和航线进度方法中返回的错误有没有区别 ????
|
||
taskRecordExecute.Status = 2;
|
||
taskRecordExecute.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})";
|
||
taskHistoryUpdate.Reason = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {errorMsg}(错误码: {code})";
|
||
taskHistoryUpdate.Status = 2;
|
||
}
|
||
else
|
||
{
|
||
// 任务开始执行
|
||
taskRecordExecute.Status = 1; // 任务执行中
|
||
//taskRecordExecute.FlyNumber = flyTask.FlyNumber + 1;
|
||
taskHistoryUpdate.Status = 1;
|
||
Console.WriteLine($"任务执行响应 {code} {message}");
|
||
}
|
||
|
||
// if 查看是否是部级飞行任务
|
||
// 先得到部级任务id , 然后查看有哪些任务 ,是否全部完成
|
||
var detail = await _sqlSugarClient.Queryable<DroneDocktaskdetail>()
|
||
.Where(x => x.flighttaskid == taskAssignExecute.FlightId)
|
||
.FirstAsync();
|
||
if (detail != null)
|
||
{
|
||
var droneTask = new DroneDocktask()
|
||
{
|
||
id = detail.taskid, // 这里的id是部级任务表的id
|
||
state = 1
|
||
};
|
||
await _sqlSugarClient.Updateable(droneTask).IgnoreNullColumns().ExecuteCommandAsync();
|
||
}
|
||
|
||
await _sqlSugarClient.Updateable(taskHistoryUpdate).IgnoreNullColumns().ExecuteCommandAsync();
|
||
await _sqlSugarClient.Updateable(taskRecordExecute).IgnoreNullColumns().ExecuteCommandAsync();
|
||
break;
|
||
}
|
||
|
||
break;
|
||
case "thing/product/*/osd":
|
||
//Console.WriteLine($"osd消息: {message}");
|
||
break;
|
||
default:
|
||
//Console.WriteLine($"未进入主题处理");
|
||
break;
|
||
}
|
||
}
|
||
|
||
public DateTime GetDateTimeFromSeconds(long miliseconds)
|
||
{
|
||
var seconds = miliseconds / 1000; // 去除末尾三位
|
||
return DateTimeOffset.FromUnixTimeSeconds(seconds).DateTime;
|
||
}
|
||
|
||
|
||
public bool IsDuplicate(string messageId)
|
||
{
|
||
var now = DateTime.UtcNow;
|
||
// 清理过期消息
|
||
foreach (var kvp in _processedMessages)
|
||
{
|
||
if (now - kvp.Value > _deduplicationWindow)
|
||
{
|
||
_processedMessages.TryRemove(kvp.Key, out _);
|
||
}
|
||
}
|
||
|
||
// 检查是否已存在
|
||
if (_processedMessages.ContainsKey(messageId))
|
||
{
|
||
return true;
|
||
}
|
||
|
||
_processedMessages[messageId] = now;
|
||
return false;
|
||
}
|
||
|
||
public Task StartAsync(CancellationToken cancellationToken)
|
||
{
|
||
return Subscribe();
|
||
}
|
||
|
||
public Task StopAsync(CancellationToken cancellationToken)
|
||
{
|
||
// todo 取消订阅
|
||
throw new NotImplementedException();
|
||
}
|
||
} |