Compare commits

...

3 Commits

Author SHA1 Message Date
陈伟 268d19d1dd Merge remote-tracking branch 'origin/main' 2025-06-30 09:11:44 +08:00
陈伟 fe1d28e1d4 订阅数据处理(部分) 2025-06-30 09:11:36 +08:00
陈伟 5ff3914e11 重连时,主题重新订阅 2025-06-27 15:09:53 +08:00
7 changed files with 315 additions and 87 deletions

View File

@ -15,6 +15,8 @@ public class MinioService
public string _bucketName; public string _bucketName;
public string endPoint; public string endPoint;
public bool UseSSL; public bool UseSSL;
public string AccessKey;
public string SecretKey;
public MinioService() public MinioService()
{ {
@ -36,9 +38,11 @@ public class MinioService
_bucketName = configuration["Minio:BucketName"]; _bucketName = configuration["Minio:BucketName"];
endPoint = configuration["Minio:Endpoint"]; endPoint = configuration["Minio:Endpoint"];
UseSSL = configuration["Minio:UseSSL"].ToBool(); UseSSL = configuration["Minio:UseSSL"].ToBool();
AccessKey = configuration["Minio:AccessKey"];
SecretKey = configuration["Minio:SecretKey"];
_minioClient = new MinioClient() _minioClient = new MinioClient()
.WithEndpoint(endPoint) .WithEndpoint(endPoint)
.WithCredentials(configuration["Minio:AccessKey"], configuration["Minio:SecretKey"]) .WithCredentials(AccessKey, SecretKey)
.Build(); .Build();
} }

View File

@ -7,6 +7,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol; using MQTTnet.Protocol;
namespace OpenAuth.WebApi; namespace OpenAuth.WebApi;
@ -26,6 +27,7 @@ public class MqttClientManager
// 指数退避参数 // 指数退避参数
private readonly TimeSpan _minReconnectInterval = TimeSpan.FromSeconds(1); private readonly TimeSpan _minReconnectInterval = TimeSpan.FromSeconds(1);
private readonly TimeSpan _maxReconnectInterval = TimeSpan.FromMinutes(2); private readonly TimeSpan _maxReconnectInterval = TimeSpan.FromMinutes(2);
private List<String> _topics = new();
public MqttClientManager(ILogger<MqttClientManager> logger) public MqttClientManager(ILogger<MqttClientManager> logger)
@ -86,8 +88,21 @@ public class MqttClientManager
{ {
_logger?.LogInformation("MQTT 连接成功"); _logger?.LogInformation("MQTT 连接成功");
_inboundReconnectAttempts = 0; // 重置重连计数 _inboundReconnectAttempts = 0; // 重置重连计数
// todo 重新订阅主题 // 重新订阅主题
return null; return ResubscribeTopicsAsync();
}
private async Task ResubscribeTopicsAsync()
{
if (!_inBoundClient.IsConnected || _topics.Count == 0)
return;
_logger?.LogInformation($"重新订阅 {_topics.Count} 个主题");
foreach (var topic in _topics)
{
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce,
_cancellationTokenSource.Token);
}
} }
private async Task OnOutboundDisconnectedAsync(MqttClientDisconnectedEventArgs e) private async Task OnOutboundDisconnectedAsync(MqttClientDisconnectedEventArgs e)
@ -179,6 +194,7 @@ public class MqttClientManager
public async Task SubscribeAsync(string topic, public async Task SubscribeAsync(string topic,
Func<MqttApplicationMessageReceivedEventArgs, Task> handler) Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{ {
_topics.Add(topic);
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None); await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler; _inBoundClient.ApplicationMessageReceivedAsync += handler;
} }
@ -188,6 +204,7 @@ public class MqttClientManager
{ {
foreach (var topic in topics) foreach (var topic in topics)
{ {
_topics.Add(topic);
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None); await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler; _inBoundClient.ApplicationMessageReceivedAsync += handler;
} }

View File

@ -32,15 +32,15 @@ public class ConfigSubscribe : IJob
private async Task Subscribe() private async Task Subscribe()
{ {
// 或者 thing/product/# // 或者 thing/product/#
// sys/product/# // sys/product/#
string[] topicList = string[] topicList =
{ {
"thing/product/+/services_reply", "thing/product/+/services_reply",
"thing/product/+/events", "thing/product/+/events",
"thing/product/+/requests" "thing/product/+/requests"
}; };
await _mqttClientManager await _mqttClientManager
.SubscribeAsync(topicList, .SubscribeAsync(topicList,
async (args) => async (args) =>
@ -57,7 +57,6 @@ public class ConfigSubscribe : IJob
await Subscribe(); await Subscribe();
} }
private async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic, private async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
string message) string message)
@ -74,7 +73,7 @@ public class ConfigSubscribe : IJob
sys/product/{gateway_sn}/status 线 sys/product/{gateway_sn}/status 线
thing/product/{gateway_sn}/property/set_reply thing/product/{gateway_sn}/property/set_reply
thing/product/{gateway_sn}/drc/up DRC */ thing/product/{gateway_sn}/drc/up DRC */
var sn = topic.Split("/")[2]; var sn = topic.Split("/")[2];
var tempStr = topic.Replace(sn, "*"); var tempStr = topic.Replace(sn, "*");
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}"); //Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
@ -105,6 +104,7 @@ public class ConfigSubscribe : IJob
// "need_reply":0,"tid":"50e8102c-da72-42b1-a899-a82a519456d9", // "need_reply":0,"tid":"50e8102c-da72-42b1-a899-a82a519456d9",
// "timestamp":1750575776430, // "timestamp":1750575776430,
// "gateway":"8UUXN5400A079H"} // "gateway":"8UUXN5400A079H"}
//todo 配置中读取minio配置
var storageConfigRequest = new TopicServicesRequest<object>() var storageConfigRequest = new TopicServicesRequest<object>()
{ {
method = "storage_config_get", method = "storage_config_get",
@ -118,7 +118,7 @@ public class ConfigSubscribe : IJob
{ {
bucket = "test", bucket = "test",
endpoint = "http://175.27.168.120:6013", endpoint = "http://175.27.168.120:6013",
object_key_prefix = "xxx", // todo 是否设计任务id object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id
provider = "minio", provider = "minio",
region = "linyi", region = "linyi",
credentials = new credentials = new
@ -180,8 +180,8 @@ public class ConfigSubscribe : IJob
method = "flighttask_resource_get" method = "flighttask_resource_get"
}; };
Console.WriteLine( /*Console.WriteLine(
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}"); $"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");*/
await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic, await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
JsonConvert.SerializeObject(outRequest)); JsonConvert.SerializeObject(outRequest));
break; break;
@ -194,33 +194,89 @@ public class ConfigSubscribe : IJob
break; break;
case "thing/product/*/events": case "thing/product/*/events":
if (method.Equals("flighttask_progress")) switch (method)
{ {
code = data.result; case "file_upload_callback":
// 处理航线进度 ,也有可能是失败 // 文件上传
if (code != 0) int flightType = int.Parse(data.flight_task.flight_type);
{ if (flightType == 1)
Console.WriteLine($"航线进度错误信息:{ErrorMap[code]} {message}");
// 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
{ {
method = "flighttask_undo", break;
tid = result.tid, }
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(), var fileUpload = new LasaMediaFile()
data = new {
{ Id = Guid.NewGuid().ToString(),
flight_ids = new[] { data.output.ext.flight_id } FlightId = data.file.ext.flight_id,
} DroneModelKey = data.file.ext.drone_model_key,
IsOriginal = data.file.ext.is_original,
MediaIndex = data.file.ext.media_index,
PayloadModelKey = data.file.ext.payload_model_key,
AbsoluteAltitude = data.file.metadata.absolute_altitude,
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
RelativeAltitude = data.file.metadata.relative_altitude,
Lat = data.file.metadata.shoot_position.lat,
Lng = data.file.metadata.shoot_position.lng,
Name = data.file.name,
ObjectKey = data.file.object_key,
Path = data.file.path,
CreateTime = data.file.metadata.create_time.ToDateTime(),
}; };
await _mqttClientManager.PublishAsync(cancelTaskTopic, await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync();
JsonConvert.SerializeObject(cancelTaskRequest)); var expectFileCount = data.flight_task.expected_file_count;
var uploadedFileCount = data.flight_task.uploaded_file_count;
var taskRecord = new LasaTask()
{
ExpectedFileCount = expectFileCount,
UploadedFileCount = uploadedFileCount
};
await _sqlSugarClient.Updateable(taskRecord)
.Where(t => t.FlightId == fileUpload.FlightId)
.ExecuteCommandAsync();
break;
case "release_terminal_control_area":
//暂不处理
break;
case "flighttask_progress":
{
code = data.result;
// 处理航线进度 ,也有可能是失败
if (code != 0)
{
Console.WriteLine($"航线进度错误信息:{ErrorMap[code]} {message}");
// 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
{
method = "flighttask_undo",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
{
flight_ids = new[] { data.output.ext.flight_id }
}
};
await _mqttClientManager.PublishAsync(cancelTaskTopic,
JsonConvert.SerializeObject(cancelTaskRequest));
// todo 更新任务状态及失败原因?
}
else
{
Console.WriteLine($"航线进度:{message}");
}
break;
}
default:
{
if (!method.Equals("hms"))
{
Console.WriteLine($"未处理事件events{message}");
}
break;
} }
}
else if (!method.Equals("hms"))
{
Console.WriteLine($"未处理事件events{message}");
} }
break; break;
@ -233,71 +289,87 @@ public class ConfigSubscribe : IJob
{ {
case "flighttask_prepare": // 下发任务响应 case "flighttask_prepare": // 下发任务响应
// todo 同一prepare消息只能处理一次 // todo 同一prepare消息只能处理一次
//lock (locker) lock (_locker)
// {
// 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
{ {
if (taskAssign == null) // 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
//Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
{ {
Console.WriteLine("已跳过prepare处理"); if (taskAssign == null)
return; // 不存在不操作 {
} Console.WriteLine("已跳过prepare处理");
return; // 不存在不操作
}
var flightId = taskAssign.FlightId; var flightId = taskAssign.FlightId;
var request = new TopicServicesRequest<object>(); var request = new TopicServicesRequest<object>();
dynamic data1 = new ExpandoObject(); dynamic data1 = new ExpandoObject();
data1.flight_id = flightId; data1.flight_id = flightId;
// todo 检查设备是否在线 // todo 检查设备是否在线
request.SetMethod("flighttask_execute") request.SetMethod("flighttask_execute")
.SetTid(result.tid) .SetTid(result.tid)
.SetBid(result.bid) .SetBid(result.bid)
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds()) .SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1); .SetData(data1);
// 任务执行 // 任务执行
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services", _ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request)); JsonConvert.SerializeObject(request));
var taskAssignRecord = new LasaTaskAssign() var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Status = 1
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
.ExecuteCommand();
var taskRecord = new LasaTask()
{
Id = taskAssign.TaskId,
FlightId = flightId,
};
_sqlSugarClient.Updateable(taskRecord).IgnoreNullColumns()
.ExecuteCommand();
// todo 锁定这个机场 ,不再执行其它任务
}
else
{ {
Id = taskAssign.Id, // 错误处理
Status = 1 var errorMsg = ErrorMap[code];
}; var taskAssignRecord = new LasaTaskAssign()
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns() {
.ExecuteCommand(); Id = taskAssign.Id,
// todo 锁定这个机场 ,不再执行其它任务 Reason = errorMsg,
Status = 2
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
}
} }
else
{
// 错误处理
var errorMsg = ErrorMap[code];
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Reason = errorMsg,
Status = 2
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
}
// }
break; break;
case "flighttask_execute": // 执行任务响应 case "flighttask_execute": // 执行任务响应
code = data.result; code = data.result;
var taskAssignExecute = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, 1);
var taskRecordExecute = new LasaTask()
{
Id = taskAssignExecute.TaskId,
};
if (code != 0) if (code != 0)
{ {
var errorMsg = ErrorMap[code]; var errorMsg = ErrorMap[code];
Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}"); //Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}");
// todo 任务执行失败 // todo 任务执行失败
// TOdo 和航线进度方法中返回的错误有没有区别
} }
else else
{ {
// 任务开始执行
taskRecordExecute.Status = 1; // 任务执行中
Console.WriteLine($"任务执行响应 {code} {message}"); Console.WriteLine($"任务执行响应 {code} {message}");
} }
await _sqlSugarClient.Updateable(taskRecordExecute).ExecuteCommandAsync();
break; break;
} }

View File

@ -750,7 +750,6 @@ namespace OpenAuth.App.ServiceApp
}; };
await Repository.ChangeRepository<SugarRepositiry<LasaTaskAssign>>().InsertAsync(taskAssign); await Repository.ChangeRepository<SugarRepositiry<LasaTaskAssign>>().InsertAsync(taskAssign);
// todo 更新任务状态? // todo 更新任务状态?
} }
public async Task PendingFlyTask(string taskId) public async Task PendingFlyTask(string taskId)
@ -954,11 +953,11 @@ namespace OpenAuth.App.ServiceApp
return droneDock; return droneDock;
} }
public LasaTaskAssign GetTaskAssignByBidAndTid(string bid, string tid) public LasaTaskAssign GetTaskAssignByBidAndTid(string bid, string tid, int status = 0)
{ {
return Repository return Repository
.ChangeRepository<SugarRepositiry<LasaTaskAssign>>() .ChangeRepository<SugarRepositiry<LasaTaskAssign>>()
.GetSingle(r => r.Bid == bid && r.Tid == tid && r.Status == 0); .GetSingle(r => r.Bid == bid && r.Tid == tid && r.Status == status);
} }

View File

@ -0,0 +1,88 @@
using SqlSugar;
namespace OpenAuth.Repository.Domain;
using System;
/// <summary>
/// 媒体文件实体类
/// </summary>
[SugarTable("lasa_mediafile")]
public class LasaMediaFile
{
/// <summary>
/// 主键ID
/// </summary>
[SugarColumn(IsPrimaryKey = true, ColumnName = "Id")]
public string Id { get; set; }
/// <summary>
/// 计划ID
/// </summary>
public string FlightId { get; set; }
/// <summary>
/// 飞行器产品枚举
/// </summary>
public string DroneModelKey { get; set; }
/// <summary>
/// 是否是原图 0否1是
/// </summary>
public bool? IsOriginal { get; set; }
/// <summary>
/// 文件索引
/// </summary>
public long? MediaIndex { get; set; }
/// <summary>
/// 负载产品枚举
/// </summary>
public string PayloadModelKey { get; set; }
/// <summary>
/// 拍摄绝对高度
/// </summary>
public float? AbsoluteAltitude { get; set; }
/// <summary>
/// 云台偏航角
/// </summary>
public float? GimbalYawDegree { get; set; }
/// <summary>
/// 拍摄相对高度
/// </summary>
public float? RelativeAltitude { get; set; }
/// <summary>
/// 拍摄位置纬度
/// </summary>
public float? Lat { get; set; }
/// <summary>
/// 拍摄位置经度
/// </summary>
public float? Lng { get; set; }
/// <summary>
/// 文件名称
/// </summary>
public string Name { get; set; }
/// <summary>
/// 对象Key
/// </summary>
public string ObjectKey { get; set; }
/// <summary>
/// 路径
/// </summary>
public string Path { get; set; }
/// <summary>
/// 文件创建时间
/// </summary>
public DateTime? CreateTime { get; set; }
}

View File

@ -47,7 +47,7 @@ namespace OpenAuth.Repository.Domain
/// <summary> /// <summary>
/// 返航失控动作 /// 返航失控动作
/// </summary> /// </summary>
public string LossOfControlAction { get; set; } public int LossOfControlAction { get; set; }
/// <summary> /// <summary>
/// 续飞模式 /// 续飞模式
@ -60,7 +60,7 @@ namespace OpenAuth.Repository.Domain
public string AIInspection { get; set; } public string AIInspection { get; set; }
/// <summary> /// <summary>
/// 状态 /// 状态 0-待执行任务 1-任务执行中 2-任务执行失败 3. 任务挂起(挂起的任务超时后,怎么处理?)
/// </summary> /// </summary>
public int Status { get; set; } public int Status { get; set; }
@ -73,11 +73,57 @@ namespace OpenAuth.Repository.Domain
/// 航线id /// 航线id
/// </summary> /// </summary>
public string AirLineId { get; set; } public string AirLineId { get; set; }
public long CreateId { get; set; } public long CreateId { get; set; }
public DateTime? CreateTime { get; set; } public DateTime? CreateTime { get; set; }
/// <summary>
/// 航线精度类型
/// </summary>
public int WaylinePrecisionType { get; set; } public int WaylinePrecisionType { get; set; }
// todo 关联openjob id public DateTime? ScheduledStartTime { get; set; }
/// <summary>
///
/// </summary>
public DateTime? ScheduledEndTime { get; set; }
/// <summary>
/// 实际开始时间
/// </summary>
public DateTime? ExecuteTime { get; set; }
/// <summary>
/// 实际结束时间
/// </summary>
public DateTime? CompletedTime { get; set; }
/// <summary>
///计划执行时长
/// </summary>
public long PlanExecuteDuration { get; set; }
/// <summary>
/// 实际执行时长 todo 后面看实际情况修改
/// </summary>
public long ActualExecuteDuration { get; set; }
/// <summary>
/// 项目id
/// </summary>
public string WorkspaceId { get; set; }
/// <summary>
/// 期望媒体数量
/// </summary>
public int ExpectedFileCount { get; set; }
/// <summary>
/// 已上传媒体数量
/// </summary>
public int UploadedFileCount { get; set; }
public string FlightId { get; set; }
} }
} }

View File

@ -429,6 +429,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
/// <param name="taskId"></param> /// <param name="taskId"></param>
[HttpPost] [HttpPost]
[AllowAnonymous] [AllowAnonymous]
// todo 根据项目设置的起飞条件设置是否起飞
public async Task ExecuteFlyTask(string taskId) public async Task ExecuteFlyTask(string taskId)
{ {
await _app.ExecuteFlyTask(taskId); await _app.ExecuteFlyTask(taskId);
@ -441,6 +442,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
[HttpPost] [HttpPost]
public async Task PendingFlyTask(string taskId) public async Task PendingFlyTask(string taskId)
{ {
// todo 先只针对立即任务实现
await _app.PendingFlyTask(taskId); await _app.PendingFlyTask(taskId);
} }