From 5598ae48010c17c85ca851ceea37621ddae6e63d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=99=88=E4=BC=9F?= <421281095@qq.com>
Date: Thu, 19 Jun 2025 15:34:01 +0800
Subject: [PATCH 01/10] =?UTF-8?q?1.=20=E6=B7=BB=E5=8A=A0=E6=98=AF=E5=90=A6?=
=?UTF-8?q?=E6=98=AFhttps=E9=85=8D=E7=BD=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
Infrastructure/CloudSdk/minio/MinioService.cs | 10 +++++++---
OpenAuth.WebApi/appsettings.json | 1 +
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/Infrastructure/CloudSdk/minio/MinioService.cs b/Infrastructure/CloudSdk/minio/MinioService.cs
index 36c5970..c6736fe 100644
--- a/Infrastructure/CloudSdk/minio/MinioService.cs
+++ b/Infrastructure/CloudSdk/minio/MinioService.cs
@@ -1,4 +1,6 @@
-using Infrastructure.Helpers;
+using System.Configuration;
+using Infrastructure.Extensions;
+using Infrastructure.Helpers;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Minio;
@@ -13,6 +15,7 @@ public class MinioService
private IMinioClient _minioClient;
public string _bucketName;
public string endPoint;
+ public bool UseSSL;
public MinioService()
{
@@ -33,6 +36,7 @@ public class MinioService
var configuration = builder.Build();
_bucketName = configuration["Minio:BucketName"];
endPoint = configuration["Minio:Endpoint"];
+ UseSSL = configuration["Minio:UseSSL"].ToBool();
_minioClient = new MinioClient()
.WithEndpoint(endPoint)
.WithCredentials(configuration["Minio:AccessKey"], configuration["Minio:SecretKey"])
@@ -181,7 +185,7 @@ public class MinioService
await _minioClient.MakeBucketAsync(mbArgs).ConfigureAwait(false);
}
- var objectName = $"{GenerateId.GenerateOrderNumber()}.wpml";
+ var objectName = $"{GenerateId.GenerateOrderNumber()}.kmz";
// 使用内存流上传
using var stream = new MemoryStream();
await file.CopyToAsync(stream);
@@ -201,7 +205,7 @@ public class MinioService
.WithContentType("application/octet-stream");
//.WithContentType(file.ContentType);
await _minioClient.PutObjectAsync(putArgs);
- return "http://" + endPoint + "/" + bucketName + "/" + objectName;
+ return UseSSL ? "https://" : "http://" + endPoint + "/" + bucketName + "/" + objectName;
}
catch (Exception ex)
{
diff --git a/OpenAuth.WebApi/appsettings.json b/OpenAuth.WebApi/appsettings.json
index ffe292e..2d9c47e 100644
--- a/OpenAuth.WebApi/appsettings.json
+++ b/OpenAuth.WebApi/appsettings.json
@@ -75,6 +75,7 @@
"Password": ""
},
"Minio": {
+ "UseSSL": false,
"Endpoint": "175.27.168.120:6013",
"AccessKey": "minioadmin",
"SecretKey": "minioadmin",
From ac0b033efb05437cc2cee59914cd580dba661f1c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=99=88=E4=BC=9F?= <421281095@qq.com>
Date: Fri, 20 Jun 2025 15:09:32 +0800
Subject: [PATCH 02/10] =?UTF-8?q?1.=20=E8=AE=A2=E9=98=85=E5=A4=84=E7=90=86?=
=?UTF-8?q?=EF=BC=88=E9=83=A8=E5=88=86=EF=BC=89=202.=20=E5=AE=9A=E6=97=B6?=
=?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B0=83=E6=95=B4=203.=20=E6=B7=BB=E5=8A=A0?=
=?UTF-8?q?=E6=96=B0=E8=A1=A8=204.=20=E5=85=B6=E5=AE=83?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
Infrastructure/CloudSdk/minio/MinioService.cs | 3 +-
.../CloudSdk/mqtt/MqttClientManager.cs | 6 +-
.../BaseApp/HostedService/GlobalSubscribe.cs | 51 --
.../BaseApp/HostedService/QuartzService.cs | 42 +-
.../BaseApp/Subscribe/ConfigSubscribe.cs | 586 ++++++++++++++++++
.../BaseApp/Subscribe/GlobalSubscribe.cs | 69 +++
OpenAuth.App/ServiceApp/ManageApp.cs | 100 ++-
OpenAuth.Repository/Domain/LasaAirLine.cs | 1 +
OpenAuth.Repository/Domain/LasaTask.cs | 1 +
OpenAuth.Repository/Domain/LasaTaskAssign.cs | 20 +
.../ServiceControllers/ManageController.cs | 12 +-
OpenAuth.WebApi/Startup.cs | 1 -
OpenAuth.WebApi/appsettings.json | 2 +-
OpenAuth.WebApi/boot/ConfigSubscribe.cs | 35 --
OpenAuth.WebApi/boot/TestAbc.cs | 19 +
15 files changed, 822 insertions(+), 126 deletions(-)
delete mode 100644 OpenAuth.App/BaseApp/HostedService/GlobalSubscribe.cs
create mode 100644 OpenAuth.App/BaseApp/Subscribe/ConfigSubscribe.cs
create mode 100644 OpenAuth.App/BaseApp/Subscribe/GlobalSubscribe.cs
create mode 100644 OpenAuth.Repository/Domain/LasaTaskAssign.cs
delete mode 100644 OpenAuth.WebApi/boot/ConfigSubscribe.cs
create mode 100644 OpenAuth.WebApi/boot/TestAbc.cs
diff --git a/Infrastructure/CloudSdk/minio/MinioService.cs b/Infrastructure/CloudSdk/minio/MinioService.cs
index c6736fe..0992f72 100644
--- a/Infrastructure/CloudSdk/minio/MinioService.cs
+++ b/Infrastructure/CloudSdk/minio/MinioService.cs
@@ -1,5 +1,4 @@
-using System.Configuration;
-using Infrastructure.Extensions;
+using Infrastructure.Extensions;
using Infrastructure.Helpers;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
diff --git a/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs b/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs
index ebd0a4a..4f86cc9 100644
--- a/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs
+++ b/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs
@@ -1,4 +1,8 @@
-using Microsoft.Extensions.Configuration;
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
diff --git a/OpenAuth.App/BaseApp/HostedService/GlobalSubscribe.cs b/OpenAuth.App/BaseApp/HostedService/GlobalSubscribe.cs
deleted file mode 100644
index f0de2a4..0000000
--- a/OpenAuth.App/BaseApp/HostedService/GlobalSubscribe.cs
+++ /dev/null
@@ -1,51 +0,0 @@
-using System.Text;
-using ce.autofac.extension;
-using Infrastructure.Cache;
-using OpenAuth.WebApi;
-using Quartz;
-
-namespace OpenAuth.App.BaseApp.HostedService;
-///
-/// 离线去订阅
-///
-public class GlobalSubscribe : IJob
-{
- public async Task Execute(IJobExecutionContext context)
- {
- Console.WriteLine($"running !{DateTime.Now}");
- var redisCacheContext = context.JobDetail.JobDataMap.Get("redisCacheContext") as RedisCacheContext;
- var serviceProvider = context.JobDetail.JobDataMap.Get("serviceProvider") as IServiceProvider;
- var mqttManager = serviceProvider.GetService(typeof(MqttClientManager)) as MqttClientManager;
- /*var ioc = IocManager.Instance;
- var redisCacheContext = IocManager.Instance.GetService();*/
- // todo 如果无人机不在线,则订阅
- if (redisCacheContext == null) return;
- var keys = redisCacheContext.GetAllKeys("online:*");
- foreach (var redisKey in keys)
- {
- // todo 取得sn值
- // todo 取得设备信息
- // todo
- // todo 需要判断是不是要订阅,避免重复订阅
- if (mqttManager != null)
- await mqttManager.SubscribeAsync("thing/product/{gateway_sn}/services_reply", async (args) =>
- {
- var topic = args.ApplicationMessage.Topic;
- var payload = args.ApplicationMessage.Payload;
- var message = Encoding.UTF8.GetString(payload);
- // todo 解析是否是需要的
- // flighttask_prepare method
- });
- //statusSubscribe.subscribe(gateway);
- //stateSubscribe.subscribe(gateway, true);
- //osdSubscribe.subscribe(gateway, true);
- //servicesSubscribe.subscribe(gateway);
- //eventsSubscribe.subscribe(gateway, true);
- //requestsSubscribe.subscribe(gateway);
- //propertySetSubscribe.subscribe(gateway);
- }
-
- // todo
- return ;
- }
-}
\ No newline at end of file
diff --git a/OpenAuth.App/BaseApp/HostedService/QuartzService.cs b/OpenAuth.App/BaseApp/HostedService/QuartzService.cs
index 1b3f535..1689ad1 100644
--- a/OpenAuth.App/BaseApp/HostedService/QuartzService.cs
+++ b/OpenAuth.App/BaseApp/HostedService/QuartzService.cs
@@ -5,7 +5,7 @@ using Infrastructure.Cache;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
-using OpenAuth.App.BaseApp.HostedService;
+using OpenAuth.App.BaseApp.Subscribe;
using Quartz;
using Quartz.Impl;
@@ -32,7 +32,39 @@ namespace OpenAuth.App.HostedService
public Task StartAsync(CancellationToken cancellationToken)
{
- var map = new JobDataMap();
+ _scheduler.Start(cancellationToken);
+ // 此种方法创建任务,可用构造函数,注入对象
+ var jobBuilderType = typeof(JobBuilder);
+ var method = jobBuilderType.GetMethods().FirstOrDefault(
+ x => x.Name.Equals("Create", StringComparison.OrdinalIgnoreCase) &&
+ x.IsGenericMethod && x.GetParameters().Length == 0)
+ ?.MakeGenericMethod(typeof(GlobalSubscribe));
+
+ var jobBuilder = (JobBuilder)method.Invoke(null, null);
+
+ IJobDetail jobDetail = jobBuilder.WithIdentity("GlobalSubscribe").Build();
+ var trigger = TriggerBuilder.Create()
+ .WithIdentity("SubscribeJobTrigger", "GlobalSubscribe")
+ .WithSimpleSchedule(x => { x.WithIntervalInSeconds(30).RepeatForever(); })
+ .Build();
+
+
+ _scheduler.ScheduleJob(jobDetail, trigger, cancellationToken);
+
+ var method1 = jobBuilderType.GetMethods().FirstOrDefault(
+ x => x.Name.Equals("Create", StringComparison.OrdinalIgnoreCase) &&
+ x.IsGenericMethod && x.GetParameters().Length == 0)
+ ?.MakeGenericMethod(typeof(ConfigSubscribe));
+
+ var jobBuilder1 = (JobBuilder)method.Invoke(null, null);
+
+ IJobDetail jobDetail1 = jobBuilder.WithIdentity("GlobalSubscribe").Build();
+ var bootTrigger = TriggerBuilder.Create()
+ .WithIdentity("bootTrigger")
+ .WithSimpleSchedule(x => x.WithRepeatCount(0)).Build();
+ _scheduler.ScheduleJob(jobDetail1, bootTrigger, cancellationToken);
+
+ /*var map = new JobDataMap();
map.Put("redisCacheContext", _serviceProvider.GetService());
map.Put("ServiceProvider", _serviceProvider);
var job = JobBuilder.Create().UsingJobData(map)
@@ -40,13 +72,13 @@ namespace OpenAuth.App.HostedService
//创建一个触发条件
var trigger = TriggerBuilder.Create()
.WithIdentity("SubscribeJobTrigger", "GlobalSubscribe")
- .WithSimpleSchedule(x => { x.WithIntervalInSeconds(3).RepeatForever(); })
+ .WithSimpleSchedule(x => { x.WithIntervalInSeconds(30).RepeatForever(); })
.Build();
var onceTrigger = TriggerBuilder.Create()
.WithIdentity("onceTrigger")
.WithSimpleSchedule(x => x.WithRepeatCount(0)).Build();
- _scheduler.Start(cancellationToken);
- _scheduler.ScheduleJob(job, trigger, cancellationToken);
+
+ _scheduler.ScheduleJob(job, trigger, cancellationToken);*/
var result = _openJobApp.StartAll();
return result;
}
diff --git a/OpenAuth.App/BaseApp/Subscribe/ConfigSubscribe.cs b/OpenAuth.App/BaseApp/Subscribe/ConfigSubscribe.cs
new file mode 100644
index 0000000..5949656
--- /dev/null
+++ b/OpenAuth.App/BaseApp/Subscribe/ConfigSubscribe.cs
@@ -0,0 +1,586 @@
+using System.Dynamic;
+using System.Text;
+using Infrastructure.Cache;
+using Infrastructure.CloudSdk.wayline;
+using Newtonsoft.Json;
+using OpenAuth.App.ServiceApp;
+using OpenAuth.Repository.Domain;
+using OpenAuth.WebApi;
+using Quartz;
+using SqlSugar;
+
+namespace OpenAuth.App.BaseApp.Subscribe;
+
+public class ConfigSubscribe : IJob
+{
+ private readonly MqttClientManager mqttClientManager;
+ private readonly ISqlSugarClient sqlSugarClient;
+ private readonly RedisCacheContext redisCacheContext;
+ private readonly ManageApp manageApp;
+
+ public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
+ ICacheContext redisCacheContext, ManageApp manageApp)
+ {
+ this.mqttClientManager = mqttClientManager;
+ this.sqlSugarClient = sqlSugarClient;
+ this.redisCacheContext = redisCacheContext as RedisCacheContext;
+ this.manageApp = manageApp;
+ }
+
+ private async Task Subscribe()
+ {
+ string gatewaySn = "8UUXN5400A079H";
+ await mqttClientManager
+ .SubscribeAsync($"thing/product/{gatewaySn}/services_reply",
+ async (args) => await HandleTopic(gatewaySn, args.ApplicationMessage.Topic,
+ Encoding.UTF8.GetString(args.ApplicationMessage.Payload)));
+ }
+
+ public async Task Execute(IJobExecutionContext context)
+ {
+ await Subscribe();
+ }
+
+ public static readonly string[] TopicList =
+ {
+ "thing/product/**/osd",
+ "thing/product/**/state",
+ "thing/product/**/services_reply",
+ "thing/product/**/events",
+ "thing/product/**/requests",
+ "sys/product/**/status",
+ "thing/product/**/property/set_reply",
+ "thing/product/**/drc/up"
+ };
+
+
+ private async Task HandleTopic(string sn, string topic, string message)
+ {
+ /*
+ thing/product/{device_sn}/osd 设备端定频向云平台推送的设备属性(properties),
+ 具体内容范围参见物模型内容
+ thing/product/{device_sn}/state 设备端按需上报向云平台推送的设备属性(properties),
+ 具体内容范围参见物模型内容
+ thing/product/{gateway_sn}/services_reply 设备对 service 的回复、处理结果 (航线任务下发之类的)
+ thing/product/{gateway_sn}/events 设备端向云平台发送的,需要关注和处理的事件。
+ 比如SD满了,飞机解禁禁飞区等信息(事件范围参见物模型内容)
+ thing/product/{gateway_sn}/requests 设备端向云平台发送请求,为了获取一些信息,比如上传的临时凭证
+ sys/product/{gateway_sn}/status 设备上下线、更新拓扑
+ thing/product/{gateway_sn}/property/set_reply 设备属性设置的响应
+ thing/product/{gateway_sn}/drc/up DRC 协议上行*/
+ var tempStr = sn.Replace("sn", "*");
+ Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
+ // 主题方法
+ var result = JsonConvert.DeserializeObject>(message);
+ var method = result.method;
+ var data = result.data;
+ long code = data.code;
+ switch (tempStr)
+ {
+ // todo
+ // 任务资源处理
+ // 航线进度处理
+ // 任务取消
+ case "thing/product/**/services_reply":
+
+ switch (method)
+ {
+ case "flighttask_prepare": // 下发任务响应
+ // 报错处理
+
+ var taskAssign = manageApp.GetTaskAssignByBidAndTid(result.bid, result.tid);
+ if (code == 0)
+ {
+ if (taskAssign == null) return; // 不存在不操作
+ var flightId = taskAssign.FlightId;
+ var request = new TopicServicesRequest