|
|
|
@ -1,11 +1,14 @@
|
|
|
|
|
using System.Dynamic;
|
|
|
|
|
using System.Text;
|
|
|
|
|
using Infrastructure.Cache;
|
|
|
|
|
using Infrastructure.CloudSdk;
|
|
|
|
|
using Infrastructure.CloudSdk.minio;
|
|
|
|
|
using Infrastructure.CloudSdk.wayline;
|
|
|
|
|
using MQTTnet;
|
|
|
|
|
using Infrastructure.Extensions;
|
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
|
using MQTTnet.Client;
|
|
|
|
|
using Newtonsoft.Json;
|
|
|
|
|
using Newtonsoft.Json.Linq;
|
|
|
|
|
using OpenAuth.App.ServiceApp;
|
|
|
|
|
using OpenAuth.Repository.Domain;
|
|
|
|
|
using OpenAuth.WebApi;
|
|
|
|
@ -22,15 +25,19 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
private readonly ManageApp _manageApp;
|
|
|
|
|
private readonly MinioService _minioService;
|
|
|
|
|
private object _locker = new();
|
|
|
|
|
private object _dockUploadFileLocker = new();
|
|
|
|
|
private readonly ILogger<ConfigSubscribe> _logger;
|
|
|
|
|
|
|
|
|
|
public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
|
|
|
|
|
ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService)
|
|
|
|
|
ICacheContext redisCacheContext, ManageApp manageApp, MinioService minioService,
|
|
|
|
|
ILogger<ConfigSubscribe> logger)
|
|
|
|
|
{
|
|
|
|
|
_mqttClientManager = mqttClientManager;
|
|
|
|
|
_sqlSugarClient = sqlSugarClient;
|
|
|
|
|
_redisCacheContext = redisCacheContext as RedisCacheContext;
|
|
|
|
|
_manageApp = manageApp;
|
|
|
|
|
_minioService = minioService;
|
|
|
|
|
_logger = logger;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async Task Subscribe()
|
|
|
|
@ -41,15 +48,17 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
{
|
|
|
|
|
"thing/product/+/services_reply",
|
|
|
|
|
"thing/product/+/events",
|
|
|
|
|
"thing/product/+/requests"
|
|
|
|
|
"thing/product/+/requests",
|
|
|
|
|
//"thing/product/+/osd",
|
|
|
|
|
//"thing/product/+/status"
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
await _mqttClientManager
|
|
|
|
|
.SubscribeAsync(topicList,
|
|
|
|
|
async (args) =>
|
|
|
|
|
{
|
|
|
|
|
args.IsHandled = false; // todo 待实验确定
|
|
|
|
|
args.AutoAcknowledge = true; // todo 待实验确定
|
|
|
|
|
//args.IsHandled = false; // todo 待实验确定
|
|
|
|
|
//args.AutoAcknowledge = true; // todo 待实验确定
|
|
|
|
|
await HandleTopic(args, args.ApplicationMessage.Topic,
|
|
|
|
|
Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
|
|
|
|
|
});
|
|
|
|
@ -84,8 +93,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
var result = JsonConvert.DeserializeObject<TopicServicesRequest<dynamic>>(message);
|
|
|
|
|
var method = result.method;
|
|
|
|
|
var data = result.data;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation($"主题:{topic}\n消息:{message}");
|
|
|
|
|
long code = 0;
|
|
|
|
|
switch (tempStr)
|
|
|
|
|
{
|
|
|
|
@ -99,8 +107,9 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
// todo 处理资源获取请求
|
|
|
|
|
switch (method)
|
|
|
|
|
{
|
|
|
|
|
// 临时凭证上传
|
|
|
|
|
case "storage_config_get":
|
|
|
|
|
Console.WriteLine($"进入临时凭证获取处理 {message}");
|
|
|
|
|
_logger.LogInformation($"进入临时凭证获取处理");
|
|
|
|
|
// {"bid":"afc5c13e-da1c-4a15-aec1-a765aac34c57",
|
|
|
|
|
// "data":{"module":0},
|
|
|
|
|
// "method":"storage_config_get",
|
|
|
|
@ -114,23 +123,21 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
tid = result.tid,
|
|
|
|
|
bid = result.bid,
|
|
|
|
|
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
|
|
|
|
|
|
|
|
|
|
data = new
|
|
|
|
|
{
|
|
|
|
|
result = 0,
|
|
|
|
|
output = new
|
|
|
|
|
{
|
|
|
|
|
bucket = _minioService._bucketName,
|
|
|
|
|
// todo 这里待确定
|
|
|
|
|
endpoint = $"http://{_minioService.endPoint}",
|
|
|
|
|
object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id ?
|
|
|
|
|
bucket = "test",
|
|
|
|
|
endpoint = "http://175.27.168.120:6013",
|
|
|
|
|
object_key_prefix = $"{Guid.NewGuid().ToString()}", // todo 是否设计任务id ?
|
|
|
|
|
provider = "minio",
|
|
|
|
|
region = "linyi",
|
|
|
|
|
credentials = new
|
|
|
|
|
{
|
|
|
|
|
access_key_id = _minioService.AccessKey,
|
|
|
|
|
access_key_secret = _minioService.SecretKey,
|
|
|
|
|
expire = 7200,
|
|
|
|
|
access_key_id = "minioadmin",
|
|
|
|
|
access_key_secret = "minioadmin",
|
|
|
|
|
expire = 480,
|
|
|
|
|
security_token = ""
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -146,14 +153,8 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
break;
|
|
|
|
|
// 获取航线
|
|
|
|
|
case "flighttask_resource_get":
|
|
|
|
|
Console.WriteLine("进入资源获取处理");
|
|
|
|
|
string flightId = data.flight_id + "";
|
|
|
|
|
Console.WriteLine($"任务ID:{flightId}");
|
|
|
|
|
if (_sqlSugarClient != null)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine("manageApp 注入没有问题");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation($"进入资源获取处理: 任务id {flightId}");
|
|
|
|
|
// eb87b257-5af1-4bf1-9aba-4267be9fdb12 flight
|
|
|
|
|
// http://175.27.168.120:6013/test/2025062209390863860047.kmz
|
|
|
|
|
// md5 585c833012ddb794eaac1050ef71aa31
|
|
|
|
@ -162,7 +163,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
.Queryable<LasaTaskAssign>()
|
|
|
|
|
.Where(x => x.FlightId == flightId)
|
|
|
|
|
.SingleAsync();
|
|
|
|
|
Console.WriteLine($"任务信息:{JsonConvert.SerializeObject(taskAssign)}");
|
|
|
|
|
_logger.LogInformation($"任务信息:{JsonConvert.SerializeObject(taskAssign)}");
|
|
|
|
|
/*var taskAssign =
|
|
|
|
|
manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, flightId);*/
|
|
|
|
|
var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
|
|
|
|
@ -193,7 +194,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
case "config":
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
Console.WriteLine($"未知请求:{message}");
|
|
|
|
|
_logger.LogInformation($"未处理消息:{message}");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -203,45 +204,76 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
{
|
|
|
|
|
case "file_upload_callback":
|
|
|
|
|
// 文件上传
|
|
|
|
|
int flightType = int.Parse(data.flight_task.flight_type);
|
|
|
|
|
Console.WriteLine("进入文件上传处理");
|
|
|
|
|
Console.WriteLine($"文件上传处理:{message}");
|
|
|
|
|
int flightType = data.flight_task.flight_type;
|
|
|
|
|
if (flightType == 1) // 不处理一键起飞媒体信息
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
string flightId = data.file.ext.flight_id;
|
|
|
|
|
string taskId = _manageApp.GetTaskAssignByFlightId(flightId).TaskId;
|
|
|
|
|
var fileUpload = new LasaMediaFile()
|
|
|
|
|
lock (_dockUploadFileLocker)
|
|
|
|
|
{
|
|
|
|
|
Id = Guid.NewGuid().ToString(),
|
|
|
|
|
FlightId = flightId, // 计划id
|
|
|
|
|
TaskId = taskId, // 任务id
|
|
|
|
|
DroneModelKey = data.file.ext.drone_model_key,
|
|
|
|
|
IsOriginal = data.file.ext.is_original,
|
|
|
|
|
MediaIndex = data.file.ext.media_index,
|
|
|
|
|
PayloadModelKey = data.file.ext.payload_model_key,
|
|
|
|
|
AbsoluteAltitude = data.file.metadata.absolute_altitude,
|
|
|
|
|
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
|
|
|
|
|
RelativeAltitude = data.file.metadata.relative_altitude,
|
|
|
|
|
Lat = data.file.metadata.shoot_position.lat,
|
|
|
|
|
Lng = data.file.metadata.shoot_position.lng,
|
|
|
|
|
Name = data.file.name,
|
|
|
|
|
ObjectKey = data.file.object_key,
|
|
|
|
|
Path = data.file.path,
|
|
|
|
|
CreateTime = data.file.metadata.create_time.ToDateTime(),
|
|
|
|
|
};
|
|
|
|
|
await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync();
|
|
|
|
|
var expectFileCount = data.flight_task.expected_file_count;
|
|
|
|
|
var uploadedFileCount = data.flight_task.uploaded_file_count;
|
|
|
|
|
string flightId = data.file.ext.flight_id;
|
|
|
|
|
string taskId = _manageApp.GetTaskAssignByFlightId(flightId).TaskId;
|
|
|
|
|
var fileUpload = new LasaMediaFile()
|
|
|
|
|
{
|
|
|
|
|
Id = Guid.NewGuid().ToString(),
|
|
|
|
|
FlightId = flightId, // 计划id
|
|
|
|
|
TaskId = taskId, // 任务id
|
|
|
|
|
DroneModelKey = data.file.ext.drone_model_key, // 无人机型号
|
|
|
|
|
IsOriginal = data.file.ext.is_original,
|
|
|
|
|
MediaIndex = data.file.ext.media_index,
|
|
|
|
|
PayloadModelKey = data.file.ext.payload_model_key, //这应该可以标明是什么设置
|
|
|
|
|
AbsoluteAltitude = data.file.metadata.absolute_altitude, // 拍摄绝对高度
|
|
|
|
|
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
|
|
|
|
|
RelativeAltitude = data.file.metadata.relative_altitude, // 拍摄相对高度
|
|
|
|
|
Lat = data.file.metadata.shoot_position.lat,
|
|
|
|
|
Lng = data.file.metadata.shoot_position.lng,
|
|
|
|
|
Name = data.file.name,
|
|
|
|
|
ObjectKey = data.file.object_key,
|
|
|
|
|
Path = data.file.path, // 目前这个好像没有值
|
|
|
|
|
CreateTime = ((string)data.file.metadata.created_time).ToDateTime(),
|
|
|
|
|
};
|
|
|
|
|
// todo 添加事务
|
|
|
|
|
_sqlSugarClient.Insertable(fileUpload).ExecuteCommand();
|
|
|
|
|
if (result.need_reply.Equals(1))
|
|
|
|
|
{
|
|
|
|
|
var fileUploadCallbackEventReply = new FileUploadCallbackEventReply<object>()
|
|
|
|
|
{
|
|
|
|
|
bid = result.bid,
|
|
|
|
|
tid = result.tid,
|
|
|
|
|
method = "file_upload_callback",
|
|
|
|
|
gateway = sn,
|
|
|
|
|
data = new
|
|
|
|
|
{
|
|
|
|
|
result = 0
|
|
|
|
|
},
|
|
|
|
|
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
|
|
|
|
|
};
|
|
|
|
|
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply",
|
|
|
|
|
JsonConvert.SerializeObject(fileUploadCallbackEventReply));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var expectFileCount = data.flight_task.expected_file_count;
|
|
|
|
|
var uploadedFileCount = data.flight_task.uploaded_file_count;
|
|
|
|
|
|
|
|
|
|
var taskRecord = new LasaTask()
|
|
|
|
|
{
|
|
|
|
|
Id = taskId,
|
|
|
|
|
ExpectedFileCount = expectFileCount, // 期望文件数量
|
|
|
|
|
UploadedFileCount = uploadedFileCount // 已上传文件数量
|
|
|
|
|
};
|
|
|
|
|
// 当expectFileCount 等于uploadedFileCount时,则表示航线执行完成
|
|
|
|
|
if (uploadedFileCount.Equals(expectFileCount))
|
|
|
|
|
{
|
|
|
|
|
taskRecord.Status = 5; // 成功状态
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_sqlSugarClient.Updateable(taskRecord)
|
|
|
|
|
.IgnoreNullColumns().ExecuteCommand();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var taskRecord = new LasaTask()
|
|
|
|
|
{
|
|
|
|
|
Id = taskId,
|
|
|
|
|
ExpectedFileCount = expectFileCount, // 期望文件数量
|
|
|
|
|
UploadedFileCount = uploadedFileCount // 已上传文件数量
|
|
|
|
|
};
|
|
|
|
|
await _sqlSugarClient.Updateable(taskRecord)
|
|
|
|
|
.IgnoreNullColumns().ExecuteCommandAsync();
|
|
|
|
|
break;
|
|
|
|
|
case "release_terminal_control_area":
|
|
|
|
|
//暂不处理
|
|
|
|
@ -278,6 +310,24 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
if (result.need_reply.Equals(1))
|
|
|
|
|
{
|
|
|
|
|
var fileUploadCallbackEventReply = new FileUploadCallbackEventReply<object>()
|
|
|
|
|
{
|
|
|
|
|
bid = result.bid,
|
|
|
|
|
tid = result.tid,
|
|
|
|
|
method = "flighttask_progress",
|
|
|
|
|
gateway = sn,
|
|
|
|
|
data = new
|
|
|
|
|
{
|
|
|
|
|
result = 0
|
|
|
|
|
},
|
|
|
|
|
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
|
|
|
|
|
};
|
|
|
|
|
await _mqttClientManager.PublishAsync($"thing/product/{sn}/events_reply",
|
|
|
|
|
JsonConvert.SerializeObject(fileUploadCallbackEventReply));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Console.WriteLine($"航线进度:{message}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -303,7 +353,7 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
switch (method)
|
|
|
|
|
{
|
|
|
|
|
case "flighttask_prepare": // 下发任务响应
|
|
|
|
|
// todo 同一prepare消息只能处理一次
|
|
|
|
|
// 顺序处理,多余的不再处理
|
|
|
|
|
lock (_locker)
|
|
|
|
|
{
|
|
|
|
|
// 报错处理
|
|
|
|
@ -397,6 +447,9 @@ public class ConfigSubscribe : IJob
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
case "thing/product/*/osd":
|
|
|
|
|
Console.WriteLine($"osd消息: {message}");
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
Console.WriteLine($"未进入主题处理");
|
|
|
|
|