zhangbin 3 months ago
commit a985719f3e

@ -1,8 +1,10 @@
using System;
using System.IO;
using System.Net.Security;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
@ -13,10 +15,22 @@ public class MqttClientManager
{
private IMqttClient _outBoundClient;
private IMqttClient _inBoundClient;
private MqttClientOptions _inboundOptions;
private MqttClientOptions _outboundOptions;
private readonly ILogger<MqttClientManager> _logger;
private int _inboundReconnectAttempts = 0;
private int _outboundReconnectAttempts = 0;
private readonly CancellationTokenSource _cancellationTokenSource = new();
public MqttClientManager()
// 指数退避参数
private readonly TimeSpan _minReconnectInterval = TimeSpan.FromSeconds(1);
private readonly TimeSpan _maxReconnectInterval = TimeSpan.FromMinutes(2);
public MqttClientManager(ILogger<MqttClientManager> logger)
{
_logger = logger;
var mqttFactory = new MqttFactory();
_outBoundClient = mqttFactory.CreateMqttClient();
_inBoundClient = mqttFactory.CreateMqttClient();
@ -60,8 +74,106 @@ public class MqttClientManager
.WithCredentials(username, password)
.Build();
await _outBoundClient.ConnectAsync(inboundOptions, CancellationToken.None);
await _inBoundClient.ConnectAsync(outboundOptions, CancellationToken.None);
await _outBoundClient.ConnectAsync(inboundOptions, _cancellationTokenSource.Token);
await _inBoundClient.ConnectAsync(outboundOptions, _cancellationTokenSource.Token);
_outBoundClient.ConnectedAsync += OnConnectedAsync;
_inBoundClient.ConnectedAsync += OnConnectedAsync;
_outBoundClient.DisconnectedAsync += OnOutboundDisconnectedAsync;
_inBoundClient.DisconnectedAsync += OnDisconnectedAsync;
}
private Task OnConnectedAsync(MqttClientConnectedEventArgs e)
{
_logger?.LogInformation("MQTT 连接成功");
_inboundReconnectAttempts = 0; // 重置重连计数
// todo 重新订阅主题
return null;
}
private async Task OnOutboundDisconnectedAsync(MqttClientDisconnectedEventArgs e)
{
_logger?.LogWarning($"MQTT 连接断开: {e.Reason}");
// 仅在主动断开连接时不尝试重连
if (e.ClientWasConnected)
{
await AttemptReconnectAsync(false);
}
}
private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e)
{
_logger?.LogWarning($"MQTT 连接断开: {e.Reason}");
// 仅在主动断开连接时不尝试重连
if (e.ClientWasConnected)
{
await AttemptReconnectAsync(true);
}
}
// 尝试重连(带指数退避)
private async Task AttemptReconnectAsync(bool isInbound)
{
if (_cancellationTokenSource.IsCancellationRequested)
return;
// 计算退避时间: min(初始间隔 * 2^尝试次数, 最大间隔)
var delay = TimeSpan.FromMilliseconds(
Math.Min(
_minReconnectInterval.TotalMilliseconds * Math.Pow(2, _inboundReconnectAttempts),
_maxReconnectInterval.TotalMilliseconds
)
);
if (isInbound)
{
_inboundReconnectAttempts++;
}
else
{
_outboundReconnectAttempts++;
}
var temp = isInbound ? _inboundReconnectAttempts : _outboundReconnectAttempts;
_logger?.LogInformation(
$"将在 {delay.TotalSeconds:F1} 秒后尝试重连 (尝试次数: {temp})");
try
{
await Task.Delay(delay, _cancellationTokenSource.Token);
if (isInbound)
{
// 尝试重连
if (!_inBoundClient.IsConnected)
{
await _inBoundClient.ConnectAsync(_inboundOptions, _cancellationTokenSource.Token);
}
}
else
{
// 尝试重连
if (!_outBoundClient.IsConnected)
{
await _outBoundClient.ConnectAsync(_outboundOptions, _cancellationTokenSource.Token);
}
}
}
catch (OperationCanceledException)
{
_logger?.LogInformation("重连操作被取消");
}
catch (Exception ex)
{
_logger?.LogError(ex, "重连失败");
// 继续尝试重连
if (!_cancellationTokenSource.IsCancellationRequested)
{
await AttemptReconnectAsync(isInbound);
}
}
}
public async Task SubscribeAsync(string topic,

@ -2,6 +2,8 @@
using System.Text;
using Infrastructure.Cache;
using Infrastructure.CloudSdk.wayline;
using MQTTnet;
using MQTTnet.Client;
using Newtonsoft.Json;
using OpenAuth.App.ServiceApp;
using OpenAuth.Repository.Domain;
@ -13,34 +15,41 @@ namespace OpenAuth.App.BaseApp.Subscribe;
public class ConfigSubscribe : IJob
{
private readonly MqttClientManager mqttClientManager;
private readonly ISqlSugarClient sqlSugarClient;
private readonly RedisCacheContext redisCacheContext;
private readonly ManageApp manageApp;
private object locker = new();
private readonly MqttClientManager _mqttClientManager;
private readonly ISqlSugarClient _sqlSugarClient;
private readonly RedisCacheContext _redisCacheContext;
private readonly ManageApp _manageApp;
private object _locker = new();
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
ICacheContext redisCacheContext, ManageApp manageApp)
{
this.mqttClientManager = mqttClientManager;
this.sqlSugarClient = sqlSugarClient;
this.redisCacheContext = redisCacheContext as RedisCacheContext;
this.manageApp = manageApp;
_mqttClientManager = mqttClientManager;
_sqlSugarClient = sqlSugarClient;
_redisCacheContext = redisCacheContext as RedisCacheContext;
_manageApp = manageApp;
}
private async Task Subscribe()
{
string gatewaySn = "8UUXN5400A079H";
// 或者 thing/product/#
// sys/product/#
string[] topicList =
{
$"thing/product/{gatewaySn}/services_reply",
$"thing/product/{gatewaySn}/events",
$"thing/product/{gatewaySn}/requests"
"thing/product/+/services_reply",
"thing/product/+/events",
"thing/product/+/requests"
};
await mqttClientManager
await _mqttClientManager
.SubscribeAsync(topicList,
async (args) => await HandleTopic(gatewaySn, args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload)));
async (args) =>
{
args.IsHandled = false; // todo 待实验确定
args.AutoAcknowledge = true; // todo 待实验确定
await HandleTopic(args, args.ApplicationMessage.Topic,
Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
});
}
public async Task Execute(IJobExecutionContext context)
@ -48,21 +57,10 @@ public class ConfigSubscribe : IJob
await Subscribe();
}
// todo 暂时不使用
public static readonly string[] TopicList =
{
"thing/product/*/osd",
"thing/product/*/state",
"thing/product/*/services_reply",
"thing/product/*/events",
"thing/product/*/requests",
"sys/product/*/status",
"thing/product/*/property/set_reply",
"thing/product/*/drc/up"
};
private async Task HandleTopic(string sn, string topic, string message)
private async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
string message)
{
/*
thing/product/{device_sn}/osd properties
@ -76,8 +74,8 @@ public class ConfigSubscribe : IJob
sys/product/{gateway_sn}/status 线
thing/product/{gateway_sn}/property/set_reply
thing/product/{gateway_sn}/drc/up DRC */
// thing/product/8UUXN5400A079H/requests
var sn = topic.Split("/")[2];
var tempStr = topic.Replace(sn, "*");
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
// 主题方法
@ -135,7 +133,7 @@ public class ConfigSubscribe : IJob
};
// thing/product/{gateway_sn}/requests_reply
var tempTopic = $"thing/product/{sn}/requests_reply";
await mqttClientManager.PublishAsync(tempTopic,
await _mqttClientManager.PublishAsync(tempTopic,
JsonConvert.SerializeObject(storageConfigRequest));
break;
case "flight_areas_get":
@ -146,7 +144,7 @@ public class ConfigSubscribe : IJob
Console.WriteLine("进入资源获取处理");
string flightId = data.flight_id + "";
Console.WriteLine($"任务ID{flightId}");
if (sqlSugarClient != null)
if (_sqlSugarClient != null)
{
Console.WriteLine("manageApp 注入没有问题");
}
@ -155,7 +153,7 @@ public class ConfigSubscribe : IJob
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
// md5 585c833012ddb794eaac1050ef71aa31
// todo 这一小段运行异常
var taskAssign = await sqlSugarClient
var taskAssign = await _sqlSugarClient
.Queryable<LasaTaskAssign>()
.Where(x => x.FlightId == flightId)
.SingleAsync();
@ -170,8 +168,7 @@ public class ConfigSubscribe : IJob
file = new
{
fingerprint = taskAssign.Md5,
url = "http://175.27.168.120:6013/test/2025062415430484240112.kmz"
// url = "http://175.27.168.120:6013/test/2025062209390863860047.kmz"
url = taskAssign.Wpml
}
};
var outRequest = new TopicServicesRequest<object>()
@ -185,7 +182,7 @@ public class ConfigSubscribe : IJob
Console.WriteLine(
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");
await mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
JsonConvert.SerializeObject(outRequest));
break;
case "config":
@ -196,22 +193,15 @@ public class ConfigSubscribe : IJob
}
break;
// thing/product/8UUXN5400A079H/events
// {"bid":"0ebc789e-7f06-4830-a58a-8c4753cd493c",
// "data":{"output":{"ext":{"current_waypoint_index":0,
// "flight_id":"d6c00cc5-ec59-4bab-8508-fcb259f00a60",
// "media_count":0,"track_id":"","wayline_id":65535,"wayline_mission_state":2},
// "progress":{"current_step":36,"percent":15},"status":"failed"},"result":314013},
// "method":"flighttask_progress","need_reply":1,"tid":"84b048f7-43db-41c5-86c5-8938dc446ea4","timestamp":1750411329943,"gateway":"8UUXN5400A079H"}
case "thing/product/*/events":
if (method.Equals("flighttask_progress"))
{
code = data.result;
// todo 处理航线进度 ,也有可能是失败
// 处理航线进度 ,也有可能是失败
if (code != 0)
{
Console.WriteLine($"航线进度错误信息:{ErrorMap[code]} {message}");
// todo 取消任务
// 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
{
@ -224,13 +214,13 @@ public class ConfigSubscribe : IJob
flight_ids = new[] { data.output.ext.flight_id }
}
};
await mqttClientManager.PublishAsync(cancelTaskTopic,
await _mqttClientManager.PublishAsync(cancelTaskTopic,
JsonConvert.SerializeObject(cancelTaskRequest));
}
}
else if (!method.Equals("hms"))
{
Console.WriteLine($"未事件events{message}");
Console.WriteLine($"未处理事件events{message}");
}
break;
@ -248,7 +238,7 @@ public class ConfigSubscribe : IJob
// 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
{
@ -269,14 +259,14 @@ public class ConfigSubscribe : IJob
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1);
// 任务执行
_ = mqttClientManager.PublishAsync($"thing/product/{sn}/services",
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request));
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Status = 1
};
sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
.ExecuteCommand();
// todo 锁定这个机场 ,不再执行其它任务
}
@ -290,7 +280,7 @@ public class ConfigSubscribe : IJob
Reason = errorMsg,
Status = 2
};
sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
}
// }

@ -80,6 +80,7 @@ namespace OpenAuth.App.ServiceApp
return new Response<bool> { Result = false, Message = "编辑失败" };
}
}
/// <summary>
/// 修改机场固件信息
/// </summary>
@ -160,7 +161,6 @@ namespace OpenAuth.App.ServiceApp
pName = b.Name, // 机场名称
workSpaceId = a.WorkSpaceId,
firmwareVersion = a.FirmwareVersion,
})
.ToPageListAsync(page, limit, totalCount);
return new Response<PageInfo<List<dynamic>>>
@ -612,11 +612,14 @@ namespace OpenAuth.App.ServiceApp
#endregion
public async Task<String> ExecuteFlyTask(string taskId)
public async Task ExecuteFlyTask(string taskId)
{
// 任务信息
var task = await Repository.ChangeRepository<SugarRepositiry<LasaTask>>().GetByIdAsync(taskId);
// 航线文件信息
var airLine = await Repository.ChangeRepository<SugarRepositiry<LasaAirLine>>()
.GetByIdAsync(task.AirLineId);
// 航线文件链接
var wpml = airLine.WPML;
// 查询sn
var dronePort = await Repository.ChangeRepository<SugarRepositiry<LasaDronePort>>()
@ -635,7 +638,7 @@ namespace OpenAuth.App.ServiceApp
bid = Guid.NewGuid().ToString(),
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds()
};
dynamic data = new ExpandoObject();
data.flight_id = Guid.NewGuid().ToString(); // 用任务id 作为
data.execute_time = DateTimeOffset.Now.ToUnixTimeMilliseconds();
@ -646,7 +649,7 @@ namespace OpenAuth.App.ServiceApp
var md5 = await _minioService.GetMetaObject(wpml, "");
// todo 临时固定代码
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
// wpml = await _minioService.GetObjectUrl("test", "2025062209390863860047.kmz");
// wpml = await _minioService.GetObjectUrl("test", "2025062209390863860047.kmz");
data.file = new
{
url = wpml,
@ -676,21 +679,22 @@ namespace OpenAuth.App.ServiceApp
}
// 返航高度 {"max":1500,"min":20,"step":"","unit_name":"米 / m"}
data.rth_altitude = 90; // todo 取自任务
data.rth_altitude = task.ReturnAltitude; // todo 取自任务
// 返航高度模式 {"0":"智能高度","1":"设定高度"}
// 智能返航模式下,飞行器将自动规划最佳返航高度。大疆机场当前不支持设置返航高度模式,只能选择'设定高度'模式。当环境,光线不满足视觉系统要求时(譬如傍晚阳光直射、夜间弱光无光),飞行器将使用您设定的返航高度进行直线返航
// 智能返航模式下,飞行器将自动规划最佳返航高度。
// 大疆机场当前不支持设置返航高度模式,只能选择'设定高度'模式。当环境,光线不满足视觉系统要求时(譬如傍晚阳光直射、夜间弱光无光),飞行器将使用您设定的返航高度进行直线返航
data.rth_mode = 0;
// {"0":"返航","1":"悬停","2":"降落"}
// 失控动作,当前固定传的值是 0即返航。注意该枚举值定义跟飞控跟机场定义的不一致机场端会进行转换。
data.out_of_control_action = 0;
data.out_of_control_action = task.LossOfControlAction;
// 航线失控动作 保持跟 KMZ 文件一致
// {"0":"继续执行航线任务","1":"退出航线任务,执行遥控器失控动作"}
data.exit_wayline_when_rc_lost = 1;
// 航线精度类型 {"0":"GPS 任务","1":"高精度 RTK 任务"}
// 高精度 RTK 任务:飞行器起飞后会在空中等待 RTK 收敛后再执行任务,等待 RTK 收敛的过程中无法暂停任务。默认场景建议使用该模式。GPS 任务:飞行器无需等待 RTK 收敛便可以直接开始执行。精度要求不高的任务或对起飞时效性要求较高的任务建议使用该模式。
//todo 临时注释
// 高精度 RTK 任务:飞行器起飞后会在空中等待 RTK 收敛后再执行任务,等待 RTK 收敛的过程中无法暂停任务。
// 默认场景建议使用该模式。GPS 任务:飞行器无需等待 RTK 收敛便可以直接开始执行。精度要求不高的任务或对起飞时效性要求较高的任务建议使用该模式。
// data.wayline_precision_type = task.WaylinePrecisionType; // 值来自任务
data.wayline_precision_type = 1;
data.wayline_precision_type = task.WaylinePrecisionType;
// 是否在模拟器中执行任务 todo 调试时使用
/*data.simulate_mission = new
{
@ -726,7 +730,9 @@ namespace OpenAuth.App.ServiceApp
}
request.SetData(data);
// 任务下发
await _mqttClientManager.PublishAsync(topic, JsonConvert.SerializeObject(request));
// 记录任务中间信息
var taskAssign = new LasaTaskAssign
{
Id = Guid.NewGuid().ToString(),
@ -743,9 +749,8 @@ namespace OpenAuth.App.ServiceApp
Wpml = wpml
};
await Repository.ChangeRepository<SugarRepositiry<LasaTaskAssign>>().InsertAsync(taskAssign);
// 任务下发
await _mqttClientManager.PublishAsync(topic, JsonConvert.SerializeObject(request));
return JsonConvert.SerializeObject(request);
// todo 更新任务状态?
}
public async Task PendingFlyTask(string taskId)

@ -65,6 +65,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
return await _app.EditDronePort(info);
}
/// <summary>
/// 修改机场固件信息
/// </summary>
@ -125,6 +126,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
return await _app.EditUav(info);
}
/// <summary>
/// 修改无人机固件信息
/// </summary>
@ -136,6 +138,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
return await _app.EditUavFirmware(id, firmwareVersion);
}
/// <summary>
/// 删除无人机
/// </summary>
@ -427,10 +430,10 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
/// </summary>
/// <param name="taskId"></param>
[HttpPost]
[AllowAnonymous] // todo 临沂
public async Task<string> ExecuteFlyTask(string taskId)
[AllowAnonymous]
public async Task ExecuteFlyTask(string taskId)
{
return await _app.ExecuteFlyTask(taskId);
await _app.ExecuteFlyTask(taskId);
}
/// <summary>
@ -501,6 +504,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
}
#region 添加地图作业区域
/// <summary>
/// 添加地图作业区域
/// </summary>
@ -519,6 +523,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
res.Code = 500;
res.Message = ex.InnerException?.Message ?? ex.Message;
}
return res;
}
@ -540,6 +545,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
res.Code = 500;
res.Message = ex.InnerException?.Message ?? ex.Message;
}
return res;
}
@ -577,6 +583,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
return result;
}
#endregion
}
}

@ -47,7 +47,7 @@ namespace OpenAuth.WebApi
public void ConfigureServices(IServiceCollection services)
{
services.AddHttpContextAccessor();
services.AddSingleton(_ => new MqttClientManager());
services.AddSingleton<MqttClientManager>();
// minio client
services.AddSingleton(_ => new MinioService());

Loading…
Cancel
Save