1. minio 添加对象连接获取

2. 订阅消息处理代码修改
3. 任务下发修改
4. 其它
feature-flyModify
陈伟 2025-06-24 15:03:49 +08:00
parent 69a4457a27
commit 5007f16a42
5 changed files with 221 additions and 106 deletions

View File

@ -212,4 +212,14 @@ public class MinioService
throw new Exception($"上传文件失败: {ex.Message}");
}
}
public async Task<string> GetObjectUrl(string bucket, string myobject)
{
PresignedGetObjectArgs args = new PresignedGetObjectArgs()
.WithBucket(bucket)
.WithObject(myobject)
.WithExpiry(60 * 60 * 24);
var url = await _minioClient.PresignedGetObjectAsync(args);
return url;
}
}

View File

@ -64,11 +64,21 @@ public class MqttClientManager
public async Task SubscribeAsync(string topic,
Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce, CancellationToken.None);
{
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler;
}
public async Task SubscribeAsync(string[] topics,
Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{
foreach (var topic in topics)
{
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler;
}
}
public async Task UnsubscribeAsync(string topic)
{
await _inBoundClient.UnsubscribeAsync(topic, CancellationToken.None);
@ -87,8 +97,16 @@ public class MqttClientManager
.WithTopic(topic)
.WithPayload(message)
// 级别 0 1 2
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build();
await _outBoundClient.PublishAsync(mqttMsg, CancellationToken.None);
var result = await _outBoundClient.PublishAsync(mqttMsg, CancellationToken.None);
if (result.IsSuccess)
{
Console.WriteLine($"{topic} {message}发布成功");
}
else
{
throw new Exception($"{topic} {message}发布失败");
}
}
}

View File

@ -5,7 +5,7 @@ namespace Infrastructure.CloudSdk.wayline;
public class TopicServicesRequest<T> : CommonTopicRequest<T>
{
public string method { get; set; }
public TopicServicesRequest<T> SetTid(string tid)
@ -31,7 +31,7 @@ public class TopicServicesRequest<T> : CommonTopicRequest<T>
base.data = data;
return this;
}
public string method { get; set; }
public TopicServicesRequest<T> SetMethod(string method)
{
this.method = method;

View File

@ -17,6 +17,7 @@ public class ConfigSubscribe : IJob
private readonly ISqlSugarClient sqlSugarClient;
private readonly RedisCacheContext redisCacheContext;
private readonly ManageApp manageApp;
private object locker = new();
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
ICacheContext redisCacheContext, ManageApp manageApp)
@ -30,12 +31,14 @@ public class ConfigSubscribe : IJob
private async Task Subscribe()
{
string gatewaySn = "8UUXN5400A079H";
string[] topicList =
{
$"thing/product/{gatewaySn}/services_reply",
$"thing/product/{gatewaySn}/events",
$"thing/product/{gatewaySn}/requests"
};
await mqttClientManager
.SubscribeAsync($"thing/product/{gatewaySn}/services_reply",
async (args) => await HandleTopic(gatewaySn, args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload)));
await mqttClientManager
.SubscribeAsync($"thing/product/{gatewaySn}/events",
.SubscribeAsync(topicList,
async (args) => await HandleTopic(gatewaySn, args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload)));
}
@ -48,14 +51,14 @@ public class ConfigSubscribe : IJob
// todo 暂时不使用
public static readonly string[] TopicList =
{
"thing/product/**/osd",
"thing/product/**/state",
"thing/product/**/services_reply",
"thing/product/**/events",
"thing/product/**/requests",
"sys/product/**/status",
"thing/product/**/property/set_reply",
"thing/product/**/drc/up"
"thing/product/*/osd",
"thing/product/*/state",
"thing/product/*/services_reply",
"thing/product/*/events",
"thing/product/*/requests",
"sys/product/*/status",
"thing/product/*/property/set_reply",
"thing/product/*/drc/up"
};
@ -74,34 +77,16 @@ public class ConfigSubscribe : IJob
thing/product/{gateway_sn}/property/set_reply
thing/product/{gateway_sn}/drc/up DRC */
// thing/product/8UUXN5400A079H/requests
if (topic.Equals("thing/product/8UUXN5400A079H/requests"))
{
Console.WriteLine("收到的资源请求-未处理字符串");
}
var tempStr = topic.Replace(sn, "*");
if (tempStr.Equals("thing/product/*/requests"))
{
Console.WriteLine("收到的资源请求");
}
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
// 主题方法
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
var method = result.method;
var data = result.data;
var hasProperty = false;
foreach (var property in data.GetType().GetProperties())
{
if (property.Name != "result") continue;
hasProperty = true;
break;
}
long code = 0;
if (hasProperty)
{
code = data.result;
}
switch (tempStr)
{
case "thing/product/*/requests":
@ -114,33 +99,99 @@ public class ConfigSubscribe : IJob
// todo 处理资源获取请求
switch (method)
{
case "flighttask_resource_get":
Console.WriteLine($"获取资源请求:{JsonConvert.SerializeObject(data)}");
var flightId = data.flight_id;
LasaTaskAssign taskAssign =
manageApp.GetTaskAssignByBidAndTidAndFlightId(result.bid, result.tid, flightId);
var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
dynamic outData = new ExpandoObject();
var outRequest = new TopicServicesRequest<object>()
case "storage_config_get":
Console.WriteLine("进入临时凭证获取处理");
// {"bid":"afc5c13e-da1c-4a15-aec1-a765aac34c57",
// "data":{"module":0},
// "method":"storage_config_get",
// "need_reply":0,"tid":"50e8102c-da72-42b1-a899-a82a519456d9",
// "timestamp":1750575776430,
// "gateway":"8UUXN5400A079H"}
var storageConfigRequest = new TopicServicesRequest<object>()
{
method = "flighttask_resource_get",
method = "storage_config_get",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = outData
data = new
{
result = 0,
output = new
{
bucket = "test",
endpoint = "http://175.27.168.120:6013",
object_key_prefix = "xxx", // todo 是否设计任务id
provider = "minio",
region = "linyi",
credentials = new
{
access_key_id = "minioadmin",
access_key_secret = "minioadmin",
expire = 7200,
security_token = ""
}
}
}
};
// thing/product/{gateway_sn}/requests_reply
var tempTopic = $"thing/product/{sn}/requests_reply";
//Console.WriteLine($"主题:{tempTopic} request:{JsonConvert.SerializeObject(storageConfigRequest)}");
// todo 临时注释
await mqttClientManager.PublishAsync(tempTopic,
JsonConvert.SerializeObject(storageConfigRequest));
//Console.WriteLine("临时凭证获取处理完成");
break;
case "flight_areas_get":
//Console.WriteLine("跳过自定义飞行区文件获取");
break;
// 获取航线
case "flighttask_resource_get":
Console.WriteLine("进入资源获取处理");
string flightId = data.flight_id + "";
Console.WriteLine($"任务ID{flightId}");
if (sqlSugarClient != null)
{
Console.WriteLine("manageApp 注入没有问题");
}
// eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
// md5 585c833012ddb794eaac1050ef71aa31
// todo 这一小段运行异常
/*var taskAssign = await sqlSugarClient
.Queryable<LasaTaskAssign>()
.Where(x => x.FlightId == flightId)
.SingleAsync();*/
//Console.WriteLine($"任务信息:{JsonConvert.SerializeObject(taskAssign)}");
/*var taskAssign =
manageApp.GetTaskAssignByBidAndTid1(result.bid, result.tid, flightId);*/
var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
dynamic outData = new ExpandoObject();
outData.result = 0;
outData.output = new
{
file = new
{
fingerprint = taskAssign.Md5,
url = taskAssign.Wpml
fingerprint = "585c833012ddb794eaac1050ef71aa31",
url = "http://175.27.168.120:6013/test/2025062209390863860047.kmz"
}
};
var outRequest = new TopicServicesRequest<object>()
{
bid = result.bid,
data = outData,
tid = result.tid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
method = "flighttask_resource_get"
};
Console.WriteLine(
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");
await mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
JsonConvert.SerializeObject(outRequest));
break;
case "config":
break;
default:
Console.WriteLine($"未知请求:{message}");
break;
@ -157,12 +208,32 @@ public class ConfigSubscribe : IJob
case "thing/product/*/events":
if (method.Equals("flighttask_progress"))
{
code = data.result;
// todo 处理航线进度 ,也有可能是失败
if (code != 0)
{
Console.WriteLine($"航线进度错误信息:{ErrorMap[code]}");
Console.WriteLine($"航线进度错误信息:{ErrorMap[code]} {message}");
// todo 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
{
method = "flighttask_undo",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
{
flight_ids = new[] { data.output.ext.flight_id }
}
};
await mqttClientManager.PublishAsync(cancelTaskTopic,
JsonConvert.SerializeObject(cancelTaskRequest));
}
}
else if (!method.Equals("hms"))
{
Console.WriteLine($"未知事件events{message}");
}
break;
// todo
@ -173,40 +244,61 @@ public class ConfigSubscribe : IJob
switch (method)
{
case "flighttask_prepare": // 下发任务响应
// 报错处理
Console.WriteLine("处理prepare消息");
var taskAssign = manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
if (code == 0)
// todo 同一prepare消息只能处理一次
lock (locker)
{
if (taskAssign == null) return; // 不存在不操作
var flightId = taskAssign.FlightId;
var request = new TopicServicesRequest<object>();
dynamic data1 = new ExpandoObject();
data1.flight_id = flightId;
request.SetMethod("flighttask_execute")
.SetTid(Guid.NewGuid().ToString())
.SetBid(Guid.NewGuid().ToString())
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1);
// 任务执行
await mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request));
}
else
{
// 错误处理
var errorMsg = ErrorMap[code];
var taskAssignRecord = new LasaTaskAssign()
// 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
{
Id = taskAssign.Id,
Reason = errorMsg,
Status = 2
};
await sqlSugarClient.Updateable(taskAssignRecord).ExecuteCommandAsync();
if (taskAssign == null)
{
Console.WriteLine("已跳过prepare处理");
return; // 不存在不操作
}
var flightId = taskAssign.FlightId;
var request = new TopicServicesRequest<object>();
dynamic data1 = new ExpandoObject();
data1.flight_id = flightId;
// todo 检查设备是否在线
request.SetMethod("flighttask_execute")
.SetTid(result.tid)
.SetBid(result.bid)
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1);
// 任务执行
_ = mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request));
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Status = 1
};
sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
.ExecuteCommand();
// todo 锁定这个机场 ,不再执行其它任务
}
else
{
// 错误处理
var errorMsg = ErrorMap[code];
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Reason = errorMsg,
Status = 2
};
sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
}
}
break;
case "flighttask_execute": // 执行任务响应
code = data.result;
if (code != 0)
{
var errorMsg = ErrorMap[code];
@ -215,7 +307,7 @@ public class ConfigSubscribe : IJob
}
else
{
Console.WriteLine($"flighttask_execute 任务成功");
Console.WriteLine($"任务执行响应 {code} {message}");
}
break;

View File

@ -578,26 +578,18 @@ namespace OpenAuth.App.ServiceApp
bid = Guid.NewGuid().ToString(),
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds()
};
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile(
$"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? "Development"}.json",
optional: true)
.AddEnvironmentVariables();
// 构建配置
var configuration = builder.Build();
var wpmlDir = configuration["WpmlDir"];
// 读取连接字符串
//var serverIp = configuration["MQTT:Server"];
dynamic data = new ExpandoObject();
data.flight_id = Guid.NewGuid().ToString();
data.flight_id = Guid.NewGuid().ToString(); // 用任务id 作为
data.execute_time = DateTimeOffset.Now.ToUnixTimeMilliseconds();
//{"0":"立即任务","1":"定时任务","2":"条件任务"}
// 立即任务和定时任务均由execute_time指定执行时间条件任务支持ready_conditions字段指定任务就绪条件设备可在指定时间段内满足就绪条件后即可执行立即任务媒体上传优先级最高定时任务和条件任务媒体上传优先级相同
// 若task_type任务类型指定为“立即执行”时设备端限制了30s的时间误差若设备收到指令的时间与execute_time相差超过30s将报错且该任务无法正常执行。
data.task_type = task.TaskType;
var md5 = await _minioService.GetMetaObject(wpml, "");
// todo 临时固定代码
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
//wpml = await _minioService.GetObjectUrl("test", "2025062209390863860047.kmz");
data.file = new
{
url = wpml,
@ -612,7 +604,7 @@ namespace OpenAuth.App.ServiceApp
// 2. 多少图片需要多少容量
// 3. 要存多少图片
// 4.
storage_capacity = 1000
storage_capacity = 1000 // todo 50GB左右共 ,其它型号不一定啊? 先不考虑等它报错
};
// todo 1. 查询上报断点 2. 断点续飞方法支持
if (false)
@ -627,21 +619,23 @@ namespace OpenAuth.App.ServiceApp
}
// 返航高度 {"max":1500,"min":20,"step":"","unit_name":"米 / m"}
data.rth_altitude = 150; // todo 取自任务
data.rth_altitude = 50; // todo 取自任务
// 返航高度模式 {"0":"智能高度","1":"设定高度"}
// 智能返航模式下,飞行器将自动规划最佳返航高度。大疆机场当前不支持设置返航高度模式,只能选择'设定高度'模式。当环境,光线不满足视觉系统要求时(譬如傍晚阳光直射、夜间弱光无光),飞行器将使用您设定的返航高度进行直线返航
data.rth_mode = 1;
data.rth_mode = 0;
// {"0":"返航","1":"悬停","2":"降落"}
// 失控动作,当前固定传的值是 0即返航。注意该枚举值定义跟飞控跟机场定义的不一致机场端会进行转换。
data.out_of_control_action = 0;
// 航线失控动作 保持跟 KMZ 文件一致
// {"0":"继续执行航线任务","1":"退出航线任务,执行遥控器失控动作"}
data.exit_wayline_when_rc_lost = 0;
data.exit_wayline_when_rc_lost = 1;
// 航线精度类型 {"0":"GPS 任务","1":"高精度 RTK 任务"}
// 高精度 RTK 任务:飞行器起飞后会在空中等待 RTK 收敛后再执行任务,等待 RTK 收敛的过程中无法暂停任务。默认场景建议使用该模式。GPS 任务:飞行器无需等待 RTK 收敛便可以直接开始执行。精度要求不高的任务或对起飞时效性要求较高的任务建议使用该模式。
data.wayline_precision_type = task.WaylinePrecisionType; // 值来自任务
//todo 临时注释
// data.wayline_precision_type = task.WaylinePrecisionType; // 值来自任务
data.wayline_precision_type = 1;
// 是否在模拟器中执行任务 todo 调试时使用
data.simulate_mission = new
/*data.simulate_mission = new
{
//118.309405,35.14035 应用科学城坐标
//是否开启模拟器任务
@ -652,8 +646,8 @@ namespace OpenAuth.App.ServiceApp
// 经度 {"max":"180.0","min":"-180.0"}
longitude = 118.309419,
// 高度 {"max":"9999.9","min":"-9999.9"unit_name":"米 / m"}
altitude = 150.0
};
altitude = 120.0
};*/
// 飞行安全预检查
// {"0":"关闭","1":"开启"}
// 设置一键起飞和航线任务中的飞行安全是否预先检查。此字段为可选默认为0值为0表示关闭1表示开启。飞行安全预先检查表示: 飞行器执行任务前,检查自身作业区文件是否与云端一致,如果不一致则拉取文件更新,如果一致则不处理
@ -684,7 +678,7 @@ namespace OpenAuth.App.ServiceApp
FlightId = data.flight_id,
GatewaySn = dronePort.Sn,
AirlineId = task.AirLineId,
Status = 1,
Status = 0,
CreateTime = DateTime.Now,
UpdateTime = DateTime.Now,
TaskId = taskId,
@ -902,14 +896,15 @@ namespace OpenAuth.App.ServiceApp
{
return Repository
.ChangeRepository<SugarRepositiry<LasaTaskAssign>>()
.GetSingle(r => r.Bid == bid && r.Tid == tid);
.GetSingle(r => r.Bid == bid && r.Tid == tid && r.Status == 0);
}
public LasaTaskAssign GetTaskAssignByBidAndTidAndFlightId(string bid, string tid, string flightId)
public LasaTaskAssign GetTaskAssignByFlightId(string flightId)
{
return Repository
.ChangeRepository<SugarRepositiry<LasaTaskAssign>>()
.GetSingle(r => r.Bid == bid && r.Tid == tid && r.FlightId == flightId);
.GetSingle(r => r.FlightId == flightId);
}
}
}