1. 事件回复

2. 优化import
3. 排除可能的任务返航高度被更新问题
4. 订阅消息日志
feature-flyModify
陈伟 2025-07-10 15:34:26 +08:00
parent 202cfbe72f
commit a5b069e2ac
5 changed files with 139 additions and 75 deletions

View File

@ -0,0 +1,24 @@
namespace Infrastructure.CloudSdk;
public class FileUploadCallbackEventReply<T>
{
public string tid { get; set; }
public string bid { get; set; }
public long? timestamp { get; set; }
public string gateway { get; set; }
public string method { get; set; }
public T data { get; set; }
public override string ToString()
{
return $"CommonTopicRequest{{tid='{tid}', bid='{bid}', timestamp={timestamp}, data={data}}}";
}
public FileUploadCallbackEventReply<T> SetTid(string tid)
{
this.tid = tid;
return this;
}
}

View File

@ -1,13 +1,7 @@
using System; using Microsoft.Extensions.Configuration;
using System.IO;
using System.Net.Security;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol; using MQTTnet.Protocol;
namespace OpenAuth.WebApi; namespace OpenAuth.WebApi;

View File

@ -32,6 +32,7 @@ public class TopicServicesRequest<T> : CommonTopicRequest<T>
return this; return this;
} }
public string method { get; set; } public string method { get; set; }
public int need_reply { get; set; }
public TopicServicesRequest<T> SetMethod(string method) public TopicServicesRequest<T> SetMethod(string method)
{ {
this.method = method; this.method = method;

View File

@ -1,11 +1,14 @@
using System.Dynamic; using System.Dynamic;
using System.Text; using System.Text;
using Infrastructure.Cache; using Infrastructure.Cache;
using Infrastructure.CloudSdk;
using Infrastructure.CloudSdk.minio; using Infrastructure.CloudSdk.minio;
using Infrastructure.CloudSdk.wayline; using Infrastructure.CloudSdk.wayline;
using MQTTnet; using Infrastructure.Extensions;
using Microsoft.Extensions.Logging;
using MQTTnet.Client; using MQTTnet.Client;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using OpenAuth.App.ServiceApp; using OpenAuth.App.ServiceApp;
using OpenAuth.Repository.Domain; using OpenAuth.Repository.Domain;
using OpenAuth.WebApi; using OpenAuth.WebApi;
@ -22,15 +25,19 @@ public class ConfigSubscribe : IJob
private readonly ManageApp _manageApp; private readonly ManageApp _manageApp;
private readonly MinioService _minioService; private readonly MinioService _minioService;
private object _locker = new(); private object _locker = new();
private object _dockUploadFileLocker = new();
private readonly ILogger<ConfigSubscribe> _logger;
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient, public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService) ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService,
ILogger<ConfigSubscribe> logger)
{ {
_mqttClientManager = mqttClientManager; _mqttClientManager = mqttClientManager;
_sqlSugarClient = sqlSugarClient; _sqlSugarClient = sqlSugarClient;
_redisCacheContext = redisCacheContext as RedisCacheContext; _redisCacheContext = redisCacheContext as RedisCacheContext;
_manageApp = manageApp; _manageApp = manageApp;
_minioService = minioService; _minioService = minioService;
_logger = logger;
} }
private async Task Subscribe() private async Task Subscribe()
@ -42,16 +49,16 @@ public class ConfigSubscribe : IJob
"thing/product/+/services_reply", "thing/product/+/services_reply",
"thing/product/+/events", "thing/product/+/events",
"thing/product/+/requests", "thing/product/+/requests",
"thing/product/+/osd", //"thing/product/+/osd",
"thing/product/+/status" //"thing/product/+/status"
}; };
await _mqttClientManager await _mqttClientManager
.SubscribeAsync(topicList, .SubscribeAsync(topicList,
async (args) => async (args) =>
{ {
args.IsHandled = false; // todo 待实验确定 //args.IsHandled = false; // todo 待实验确定
args.AutoAcknowledge = true; // todo 待实验确定 //args.AutoAcknowledge = true; // todo 待实验确定
await HandleTopic(args, args.ApplicationMessage.Topic, await HandleTopic(args, args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload)); Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
}); });
@ -86,8 +93,7 @@ public class ConfigSubscribe : IJob
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message); var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
var method = result.method; var method = result.method;
var data = result.data; var data = result.data;
_logger.LogInformation($"主题:{topic}\n消息{message}");
long code = 0; long code = 0;
switch (tempStr) switch (tempStr)
{ {
@ -101,8 +107,9 @@ public class ConfigSubscribe : IJob
// todo 处理资源获取请求 // todo 处理资源获取请求
switch (method) switch (method)
{ {
// 临时凭证上传
case "storage_config_get": case "storage_config_get":
Console.WriteLine($"进入临时凭证获取处理 {message}"); _logger.LogInformation($"进入临时凭证获取处理");
// {"bid":"afc5c13e-da1c-4a15-aec1-a765aac34c57", // {"bid":"afc5c13e-da1c-4a15-aec1-a765aac34c57",
// "data":{"module":0}, // "data":{"module":0},
// "method":"storage_config_get", // "method":"storage_config_get",
@ -116,22 +123,21 @@ public class ConfigSubscribe : IJob
tid = result.tid, tid = result.tid,
bid = result.bid, bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(), timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new data = new
{ {
result = 0, result = 0,
output = new output = new
{ {
bucket = _minioService._bucketName, bucket = "test",
endpoint = $"{_minioService.endPoint}", endpoint = "http://175.27.168.120:6013",
object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id object_key_prefix = $"{Guid.NewGuid().ToString()}", // todo 是否设计任务id
provider = "minio", provider = "minio",
region = "linyi", region = "linyi",
credentials = new credentials = new
{ {
access_key_id = _minioService.AccessKey, access_key_id = "minioadmin",
access_key_secret = _minioService.SecretKey, access_key_secret = "minioadmin",
expire = 7200, expire = 480,
security_token = "" security_token = ""
} }
} }
@ -147,14 +153,8 @@ public class ConfigSubscribe : IJob
break; break;
// 获取航线 // 获取航线
case "flighttask_resource_get": case "flighttask_resource_get":
Console.WriteLine("进入资源获取处理");
string flightId = data.flight_id + ""; string flightId = data.flight_id + "";
Console.WriteLine($"任务ID{flightId}"); _logger.LogInformation($"进入资源获取处理: 任务id {flightId}");
if (_sqlSugarClient != null)
{
Console.WriteLine("manageApp 注入没有问题");
}
// eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight // eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight
// http://175.27.168.120:6013/test/2025062209390863860047.kmz // http://175.27.168.120:6013/test/2025062209390863860047.kmz
// md5 585c833012ddb794eaac1050ef71aa31 // md5 585c833012ddb794eaac1050ef71aa31
@ -163,7 +163,7 @@ public class ConfigSubscribe : IJob
.Queryable<LasaTaskAssign>() .Queryable<LasaTaskAssign>()
.Where(x => x.FlightId == flightId) .Where(x => x.FlightId == flightId)
.SingleAsync(); .SingleAsync();
Console.WriteLine($"任务信息:{JsonConvert.SerializeObject(taskAssign)}"); _logger.LogInformation($"任务信息:{JsonConvert.SerializeObject(taskAssign)}");
/*var taskAssign = /*var taskAssign =
manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, flightId);*/ manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, flightId);*/
var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply"; var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
@ -194,7 +194,7 @@ public class ConfigSubscribe : IJob
case "config": case "config":
break; break;
default: default:
Console.WriteLine($"未知请求{message}"); _logger.LogInformation($"未处理消息{message}");
break; break;
} }
@ -204,50 +204,76 @@ public class ConfigSubscribe : IJob
{ {
case "file_upload_callback": case "file_upload_callback":
// 文件上传 // 文件上传
int flightType = int.Parse(data.flight_task.flight_type); Console.WriteLine("进入文件上传处理");
Console.WriteLine($"文件上传处理:{message}");
int flightType = data.flight_task.flight_type;
if (flightType == 1) // 不处理一键起飞媒体信息 if (flightType == 1) // 不处理一键起飞媒体信息
{ {
break; break;
} }
string flightId = data.file.ext.flight_id; lock (_dockUploadFileLocker)
string taskId = _manageApp.GetTaskAssignByFlightId(flightId).TaskId;
var fileUpload = new LasaMediaFile()
{ {
Id = Guid.NewGuid().ToString(), string flightId = data.file.ext.flight_id;
FlightId = flightId, // 计划id string taskId = _manageApp.GetTaskAssignByFlightId(flightId).TaskId;
TaskId = taskId, // 任务id var fileUpload = new LasaMediaFile()
DroneModelKey = data.file.ext.drone_model_key, {
IsOriginal = data.file.ext.is_original, Id = Guid.NewGuid().ToString(),
MediaIndex = data.file.ext.media_index, FlightId = flightId, // 计划id
PayloadModelKey = data.file.ext.payload_model_key, TaskId = taskId, // 任务id
AbsoluteAltitude = data.file.metadata.absolute_altitude, DroneModelKey = data.file.ext.drone_model_key, // 无人机型号
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度 IsOriginal = data.file.ext.is_original,
RelativeAltitude = data.file.metadata.relative_altitude, MediaIndex = data.file.ext.media_index,
Lat = data.file.metadata.shoot_position.lat, PayloadModelKey = data.file.ext.payload_model_key, //这应该可以标明是什么设置
Lng = data.file.metadata.shoot_position.lng, AbsoluteAltitude = data.file.metadata.absolute_altitude, // 拍摄绝对高度
Name = data.file.name, GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
ObjectKey = data.file.object_key, RelativeAltitude = data.file.metadata.relative_altitude, // 拍摄相对高度
Path = data.file.path, Lat = data.file.metadata.shoot_position.lat,
CreateTime = data.file.metadata.create_time.ToDateTime(), Lng = data.file.metadata.shoot_position.lng,
}; Name = data.file.name,
await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync(); ObjectKey = data.file.object_key,
var expectFileCount = data.flight_task.expected_file_count; Path = data.file.path, // 目前这个好像没有值
var uploadedFileCount = data.flight_task.uploaded_file_count; CreateTime = ((string)data.file.metadata.created_time).ToDateTime(),
};
// todo 添加事务
_sqlSugarClient.Insertable(fileUpload).ExecuteCommand();
if (result.need_reply.Equals(1))
{
var fileUploadCallbackEventReply = new FileUploadCallbackEventReply<object>()
{
bid = result.bid,
tid = result.tid,
method = "file_upload_callback",
gateway = sn,
data = new
{
result = 0
},
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
};
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply",
JsonConvert.SerializeObject(fileUploadCallbackEventReply));
}
var taskRecord = new LasaTask() var expectFileCount = data.flight_task.expected_file_count;
{ var uploadedFileCount = data.flight_task.uploaded_file_count;
Id = taskId,
ExpectedFileCount = expectFileCount, // 期望文件数量 var taskRecord = new LasaTask()
UploadedFileCount = uploadedFileCount // 已上传文件数量 {
}; Id = taskId,
// 当expectFileCount 等于uploadedFileCount时则表示航线执行完成 ExpectedFileCount = expectFileCount, // 期望文件数量
if (uploadedFileCount.Equals(expectFileCount)) UploadedFileCount = uploadedFileCount // 已上传文件数量
{ };
taskRecord.Status = 5; // 成功状态 // 当expectFileCount 等于uploadedFileCount时则表示航线执行完成
if (uploadedFileCount.Equals(expectFileCount))
{
taskRecord.Status = 5; // 成功状态
}
_sqlSugarClient.Updateable(taskRecord)
.IgnoreNullColumns().ExecuteCommand();
} }
await _sqlSugarClient.Updateable(taskRecord)
.IgnoreNullColumns().ExecuteCommandAsync();
break; break;
case "release_terminal_control_area": case "release_terminal_control_area":
//暂不处理 //暂不处理
@ -284,6 +310,24 @@ public class ConfigSubscribe : IJob
} }
else else
{ {
if (result.need_reply.Equals(1))
{
var fileUploadCallbackEventReply = new FileUploadCallbackEventReply<object>()
{
bid = result.bid,
tid = result.tid,
method = "flighttask_progress",
gateway = sn,
data = new
{
result = 0
},
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
};
await _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply",
JsonConvert.SerializeObject(fileUploadCallbackEventReply));
}
Console.WriteLine($"航线进度:{message}"); Console.WriteLine($"航线进度:{message}");
} }
@ -402,6 +446,7 @@ public class ConfigSubscribe : IJob
await _sqlSugarClient.Updateable(taskRecordExecute).IgnoreNullColumns().ExecuteCommandAsync(); await _sqlSugarClient.Updateable(taskRecordExecute).IgnoreNullColumns().ExecuteCommandAsync();
break; break;
} }
break; break;
case "thing/product/*/osd": case "thing/product/*/osd":
Console.WriteLine($"osd消息: {message}"); Console.WriteLine($"osd消息: {message}");

View File

@ -47,12 +47,12 @@ namespace OpenAuth.Repository.Domain
/// <summary> /// <summary>
/// 返航高度 /// 返航高度
/// </summary> /// </summary>
public int ReturnAltitude { get; set; } public int? ReturnAltitude { get; set; }
/// <summary> /// <summary>
/// 返航失控动作 /// 返航失控动作
/// </summary> /// </summary>
public int LossOfControlAction { get; set; } public int? LossOfControlAction { get; set; }
/// <summary> /// <summary>
/// 续飞模式 /// 续飞模式
@ -67,7 +67,7 @@ namespace OpenAuth.Repository.Domain
/// <summary> /// <summary>
/// 状态 0-待执行任务 1-任务执行中 2-任务执行失败 3. 任务挂起(挂起的任务超时后,怎么处理?) 5. 成功 /// 状态 0-待执行任务 1-任务执行中 2-任务执行失败 3. 任务挂起(挂起的任务超时后,怎么处理?) 5. 成功
/// </summary> /// </summary>
public int Status { get; set; } public int? Status { get; set; }
/// <summary> /// <summary>
/// 周期公式 /// 周期公式
@ -85,7 +85,7 @@ namespace OpenAuth.Repository.Domain
/// <summary> /// <summary>
/// 航线精度类型 /// 航线精度类型
/// </summary> /// </summary>
public int WaylinePrecisionType { get; set; } public int? WaylinePrecisionType { get; set; }
public DateTime? ScheduledStartTime { get; set; } public DateTime? ScheduledStartTime { get; set; }
@ -122,15 +122,15 @@ namespace OpenAuth.Repository.Domain
/// <summary> /// <summary>
/// 期望媒体数量 /// 期望媒体数量
/// </summary> /// </summary>
public int ExpectedFileCount { get; set; } public int? ExpectedFileCount { get; set; }
/// <summary> /// <summary>
/// 已上传媒体数量 /// 已上传媒体数量
/// </summary> /// </summary>
public int UploadedFileCount { get; set; } public int? UploadedFileCount { get; set; }
public string FlightId { get; set; } public string FlightId { get; set; }
public string Reason { get; set; } public string? Reason { get; set; }
} }
} }