订阅数据处理(部分)

main
陈伟 2 months ago
parent 5ff3914e11
commit fe1d28e1d4

@ -15,6 +15,8 @@ public class MinioService
public string _bucketName;
public string endPoint;
public bool UseSSL;
public string AccessKey;
public string SecretKey;
public MinioService()
{
@ -36,9 +38,11 @@ public class MinioService
_bucketName = configuration["Minio:BucketName"];
endPoint = configuration["Minio:Endpoint"];
UseSSL = configuration["Minio:UseSSL"].ToBool();
AccessKey = configuration["Minio:AccessKey"];
SecretKey = configuration["Minio:SecretKey"];
_minioClient = new MinioClient()
.WithEndpoint(endPoint)
.WithCredentials(configuration["Minio:AccessKey"], configuration["Minio:SecretKey"])
.WithCredentials(AccessKey, SecretKey)
.Build();
}

@ -32,15 +32,15 @@ public class ConfigSubscribe : IJob
private async Task Subscribe()
{
// 或者 thing/product/#
// sys/product/#
// 或者 thing/product/#
// sys/product/#
string[] topicList =
{
"thing/product/+/services_reply",
"thing/product/+/events",
"thing/product/+/requests"
};
await _mqttClientManager
.SubscribeAsync(topicList,
async (args) =>
@ -57,7 +57,6 @@ public class ConfigSubscribe : IJob
await Subscribe();
}
private async Task HandleTopic(MqttApplicationMessageReceivedEventArgs args, string topic,
string message)
@ -74,7 +73,7 @@ public class ConfigSubscribe : IJob
sys/product/{gateway_sn}/status 线
thing/product/{gateway_sn}/property/set_reply
thing/product/{gateway_sn}/drc/up DRC */
var sn = topic.Split("/")[2];
var tempStr = topic.Replace(sn, "*");
//Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
@ -105,6 +104,7 @@ public class ConfigSubscribe : IJob
// "need_reply":0,"tid":"50e8102c-da72-42b1-a899-a82a519456d9",
// "timestamp":1750575776430,
// "gateway":"8UUXN5400A079H"}
//todo 配置中读取minio配置
var storageConfigRequest = new TopicServicesRequest<object>()
{
method = "storage_config_get",
@ -118,7 +118,7 @@ public class ConfigSubscribe : IJob
{
bucket = "test",
endpoint = "http://175.27.168.120:6013",
object_key_prefix = "xxx", // todo 是否设计任务id
object_key_prefix = Guid.NewGuid().ToString(), // todo 是否设计任务id
provider = "minio",
region = "linyi",
credentials = new
@ -180,8 +180,8 @@ public class ConfigSubscribe : IJob
method = "flighttask_resource_get"
};
Console.WriteLine(
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");
/*Console.WriteLine(
$"topic: {flightTaskResourceGetTopic} 发送资源获取处理结果:{JsonConvert.SerializeObject(outRequest)}");*/
await _mqttClientManager.PublishAsync(flightTaskResourceGetTopic,
JsonConvert.SerializeObject(outRequest));
break;
@ -194,33 +194,89 @@ public class ConfigSubscribe : IJob
break;
case "thing/product/*/events":
if (method.Equals("flighttask_progress"))
switch (method)
{
code = data.result;
// 处理航线进度 ,也有可能是失败
if (code != 0)
case "file_upload_callback":
// 文件上传
int flightType = int.Parse(data.flight_task.flight_type);
if (flightType == 1)
{
break;
}
var fileUpload = new LasaMediaFile()
{
Id = Guid.NewGuid().ToString(),
FlightId = data.file.ext.flight_id,
DroneModelKey = data.file.ext.drone_model_key,
IsOriginal = data.file.ext.is_original,
MediaIndex = data.file.ext.media_index,
PayloadModelKey = data.file.ext.payload_model_key,
AbsoluteAltitude = data.file.metadata.absolute_altitude,
GimbalYawDegree = data.file.metadata.gimbal_yaw_degree, //云台偏航角度
RelativeAltitude = data.file.metadata.relative_altitude,
Lat = data.file.metadata.shoot_position.lat,
Lng = data.file.metadata.shoot_position.lng,
Name = data.file.name,
ObjectKey = data.file.object_key,
Path = data.file.path,
CreateTime = data.file.metadata.create_time.ToDateTime(),
};
await _sqlSugarClient.Insertable(fileUpload).ExecuteCommandAsync();
var expectFileCount = data.flight_task.expected_file_count;
var uploadedFileCount = data.flight_task.uploaded_file_count;
var taskRecord = new LasaTask()
{
ExpectedFileCount = expectFileCount,
UploadedFileCount = uploadedFileCount
};
await _sqlSugarClient.Updateable(taskRecord)
.Where(t => t.FlightId == fileUpload.FlightId)
.ExecuteCommandAsync();
break;
case "release_terminal_control_area":
//暂不处理
break;
case "flighttask_progress":
{
Console.WriteLine($"航线进度错误信息:{ErrorMap[code]} {message}");
// 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
code = data.result;
// 处理航线进度 ,也有可能是失败
if (code != 0)
{
method = "flighttask_undo",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
Console.WriteLine($"航线进度错误信息:{ErrorMap[code]} {message}");
// 取消任务
var cancelTaskTopic = $"thing/product/{sn}/services";
var cancelTaskRequest = new TopicServicesRequest<object>()
{
flight_ids = new[] { data.output.ext.flight_id }
}
};
await _mqttClientManager.PublishAsync(cancelTaskTopic,
JsonConvert.SerializeObject(cancelTaskRequest));
method = "flighttask_undo",
tid = result.tid,
bid = result.bid,
timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(),
data = new
{
flight_ids = new[] { data.output.ext.flight_id }
}
};
await _mqttClientManager.PublishAsync(cancelTaskTopic,
JsonConvert.SerializeObject(cancelTaskRequest));
// todo 更新任务状态及失败原因?
}
else
{
Console.WriteLine($"航线进度:{message}");
}
break;
}
default:
{
if (!method.Equals("hms"))
{
Console.WriteLine($"未处理事件events{message}");
}
break;
}
}
else if (!method.Equals("hms"))
{
Console.WriteLine($"未处理事件events{message}");
}
break;
@ -233,71 +289,87 @@ public class ConfigSubscribe : IJob
{
case "flighttask_prepare": // 下发任务响应
// todo 同一prepare消息只能处理一次
//lock (locker)
// {
// 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
lock (_locker)
{
if (taskAssign == null)
// 报错处理
Console.WriteLine("进入prepare订阅消息");
code = data.result;
var taskAssign = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
//Console.WriteLine($"prepare 任务信息:{JsonConvert.SerializeObject(taskAssign)}");
if (code == 0)
{
Console.WriteLine("已跳过prepare处理");
return; // 不存在不操作
}
if (taskAssign == null)
{
Console.WriteLine("已跳过prepare处理");
return; // 不存在不操作
}
var flightId = taskAssign.FlightId;
var request = new TopicServicesRequest<object>();
dynamic data1 = new ExpandoObject();
data1.flight_id = flightId;
// todo 检查设备是否在线
request.SetMethod("flighttask_execute")
.SetTid(result.tid)
.SetBid(result.bid)
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1);
// 任务执行
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request));
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Status = 1
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
.ExecuteCommand();
// todo 锁定这个机场 ,不再执行其它任务
}
else
{
// 错误处理
var errorMsg = ErrorMap[code];
var taskAssignRecord = new LasaTaskAssign()
var flightId = taskAssign.FlightId;
var request = new TopicServicesRequest<object>();
dynamic data1 = new ExpandoObject();
data1.flight_id = flightId;
// todo 检查设备是否在线
request.SetMethod("flighttask_execute")
.SetTid(result.tid)
.SetBid(result.bid)
.SetTimestamp(DateTimeOffset.Now.ToUnixTimeMilliseconds())
.SetData(data1);
// 任务执行
_ = _mqttClientManager.PublishAsync($"thing/product/{sn}/services",
JsonConvert.SerializeObject(request));
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Status = 1
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns()
.ExecuteCommand();
var taskRecord = new LasaTask()
{
Id = taskAssign.TaskId,
FlightId = flightId,
};
_sqlSugarClient.Updateable(taskRecord).IgnoreNullColumns()
.ExecuteCommand();
// todo 锁定这个机场 ,不再执行其它任务
}
else
{
Id = taskAssign.Id,
Reason = errorMsg,
Status = 2
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
// 错误处理
var errorMsg = ErrorMap[code];
var taskAssignRecord = new LasaTaskAssign()
{
Id = taskAssign.Id,
Reason = errorMsg,
Status = 2
};
_sqlSugarClient.Updateable(taskAssignRecord).IgnoreNullColumns().ExecuteCommand();
}
}
// }
break;
case "flighttask_execute": // 执行任务响应
code = data.result;
var taskAssignExecute = _manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid, 1);
var taskRecordExecute = new LasaTask()
{
Id = taskAssignExecute.TaskId,
};
if (code != 0)
{
var errorMsg = ErrorMap[code];
Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}");
//Console.WriteLine($"任务失败: 错误码 {code} 错误信息 {errorMsg}");
// todo 任务执行失败
// TOdo 和航线进度方法中返回的错误有没有区别
}
else
{
// 任务开始执行
taskRecordExecute.Status = 1; // 任务执行中
Console.WriteLine($"任务执行响应 {code} {message}");
}
await _sqlSugarClient.Updateable(taskRecordExecute).ExecuteCommandAsync();
break;
}

@ -750,7 +750,6 @@ namespace OpenAuth.App.ServiceApp
};
await Repository.ChangeRepository<SugarRepositiry<LasaTaskAssign>>().InsertAsync(taskAssign);
// todo 更新任务状态?
}
public async Task PendingFlyTask(string taskId)
@ -954,11 +953,11 @@ namespace OpenAuth.App.ServiceApp
return droneDock;
}
public LasaTaskAssign GetTaskAssignByBidAndTid(string bid, string tid)
public LasaTaskAssign GetTaskAssignByBidAndTid(string bid, string tid, int status = 0)
{
return Repository
.ChangeRepository<SugarRepositiry<LasaTaskAssign>>()
.GetSingle(r => r.Bid == bid && r.Tid == tid && r.Status == 0);
.GetSingle(r => r.Bid == bid && r.Tid == tid && r.Status == status);
}

@ -0,0 +1,88 @@
using SqlSugar;
namespace OpenAuth.Repository.Domain;
using System;
/// <summary>
/// 媒体文件实体类
/// </summary>
[SugarTable("lasa_mediafile")]
public class LasaMediaFile
{
/// <summary>
/// 主键ID
/// </summary>
[SugarColumn(IsPrimaryKey = true, ColumnName = "Id")]
public string Id { get; set; }
/// <summary>
/// 计划ID
/// </summary>
public string FlightId { get; set; }
/// <summary>
/// 飞行器产品枚举
/// </summary>
public string DroneModelKey { get; set; }
/// <summary>
/// 是否是原图 0否1是
/// </summary>
public bool? IsOriginal { get; set; }
/// <summary>
/// 文件索引
/// </summary>
public long? MediaIndex { get; set; }
/// <summary>
/// 负载产品枚举
/// </summary>
public string PayloadModelKey { get; set; }
/// <summary>
/// 拍摄绝对高度
/// </summary>
public float? AbsoluteAltitude { get; set; }
/// <summary>
/// 云台偏航角
/// </summary>
public float? GimbalYawDegree { get; set; }
/// <summary>
/// 拍摄相对高度
/// </summary>
public float? RelativeAltitude { get; set; }
/// <summary>
/// 拍摄位置纬度
/// </summary>
public float? Lat { get; set; }
/// <summary>
/// 拍摄位置经度
/// </summary>
public float? Lng { get; set; }
/// <summary>
/// 文件名称
/// </summary>
public string Name { get; set; }
/// <summary>
/// 对象Key
/// </summary>
public string ObjectKey { get; set; }
/// <summary>
/// 路径
/// </summary>
public string Path { get; set; }
/// <summary>
/// 文件创建时间
/// </summary>
public DateTime? CreateTime { get; set; }
}

@ -47,7 +47,7 @@ namespace OpenAuth.Repository.Domain
/// <summary>
/// 返航失控动作
/// </summary>
public string LossOfControlAction { get; set; }
public int LossOfControlAction { get; set; }
/// <summary>
/// 续飞模式
@ -60,7 +60,7 @@ namespace OpenAuth.Repository.Domain
public string AIInspection { get; set; }
/// <summary>
/// 状态
/// 状态 0-待执行任务 1-任务执行中 2-任务执行失败 3. 任务挂起(挂起的任务超时后,怎么处理?)
/// </summary>
public int Status { get; set; }
@ -73,11 +73,57 @@ namespace OpenAuth.Repository.Domain
/// 航线id
/// </summary>
public string AirLineId { get; set; }
public long CreateId { get; set; }
public DateTime? CreateTime { get; set; }
/// <summary>
/// 航线精度类型
/// </summary>
public int WaylinePrecisionType { get; set; }
// todo 关联openjob id
public DateTime? ScheduledStartTime { get; set; }
/// <summary>
///
/// </summary>
public DateTime? ScheduledEndTime { get; set; }
/// <summary>
/// 实际开始时间
/// </summary>
public DateTime? ExecuteTime { get; set; }
/// <summary>
/// 实际结束时间
/// </summary>
public DateTime? CompletedTime { get; set; }
/// <summary>
///计划执行时长
/// </summary>
public long PlanExecuteDuration { get; set; }
/// <summary>
/// 实际执行时长 todo 后面看实际情况修改
/// </summary>
public long ActualExecuteDuration { get; set; }
/// <summary>
/// 项目id
/// </summary>
public string WorkspaceId { get; set; }
/// <summary>
/// 期望媒体数量
/// </summary>
public int ExpectedFileCount { get; set; }
/// <summary>
/// 已上传媒体数量
/// </summary>
public int UploadedFileCount { get; set; }
public string FlightId { get; set; }
}
}

@ -431,6 +431,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
/// <param name="taskId"></param>
[HttpPost]
[AllowAnonymous]
// todo 根据项目设置的起飞条件设置是否起飞
public async Task ExecuteFlyTask(string taskId)
{
await _app.ExecuteFlyTask(taskId);
@ -443,6 +444,7 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
[HttpPost]
public async Task PendingFlyTask(string taskId)
{
// todo 先只针对立即任务实现
await _app.PendingFlyTask(taskId);
}

Loading…
Cancel
Save