diff --git a/Infrastructure/CloudSdk/GatewayManager.cs b/Infrastructure/CloudSdk/GatewayManager.cs
index 8b03ae4..04ab8ea 100644
--- a/Infrastructure/CloudSdk/GatewayManager.cs
+++ b/Infrastructure/CloudSdk/GatewayManager.cs
@@ -15,7 +15,7 @@ public class GatewayManager
///
/// 下发任务 down(向设备端发布消息) flighttask_prepare(method) thing/product/{gateway_sn}/services(topic)
///
- public static string FlightTaskPrepare = TopicConst.ThingModelPre + TopicConst.Product + "%s" + TopicConst.ServicesSuffix;
+ public static string FlightTaskPrepare = TopicConst.ThingModelPre + TopicConst.Product + "{0}" + TopicConst.ServicesSuffix;
public GatewayManager(string gatewaySn, string droneSn, GatewayType gatewayType)
{
diff --git a/Infrastructure/CloudSdk/minio/MinioService.cs b/Infrastructure/CloudSdk/minio/MinioService.cs
index 36c5970..5506d20 100644
--- a/Infrastructure/CloudSdk/minio/MinioService.cs
+++ b/Infrastructure/CloudSdk/minio/MinioService.cs
@@ -1,4 +1,5 @@
-using Infrastructure.Helpers;
+using Infrastructure.Extensions;
+using Infrastructure.Helpers;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Minio;
@@ -13,6 +14,7 @@ public class MinioService
private IMinioClient _minioClient;
public string _bucketName;
public string endPoint;
+ public bool UseSSL;
public MinioService()
{
@@ -33,6 +35,7 @@ public class MinioService
var configuration = builder.Build();
_bucketName = configuration["Minio:BucketName"];
endPoint = configuration["Minio:Endpoint"];
+ UseSSL = configuration["Minio:UseSSL"].ToBool();
_minioClient = new MinioClient()
.WithEndpoint(endPoint)
.WithCredentials(configuration["Minio:AccessKey"], configuration["Minio:SecretKey"])
@@ -181,7 +184,8 @@ public class MinioService
await _minioClient.MakeBucketAsync(mbArgs).ConfigureAwait(false);
}
- var objectName = $"{GenerateId.GenerateOrderNumber()}.wpml";
+ var suffix = Path.GetExtension(file.FileName);
+ var objectName = $"{GenerateId.GenerateOrderNumber()}{suffix}";
// 使用内存流上传
using var stream = new MemoryStream();
await file.CopyToAsync(stream);
@@ -201,7 +205,7 @@ public class MinioService
.WithContentType("application/octet-stream");
//.WithContentType(file.ContentType);
await _minioClient.PutObjectAsync(putArgs);
- return "http://" + endPoint + "/" + bucketName + "/" + objectName;
+ return UseSSL ? "https://" : "http://" + endPoint + "/" + bucketName + "/" + objectName;
}
catch (Exception ex)
{
diff --git a/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs b/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs
index ebd0a4a..5f3b2e1 100644
--- a/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs
+++ b/Infrastructure/CloudSdk/mqtt/MqttClientManager.cs
@@ -1,4 +1,8 @@
-using Microsoft.Extensions.Configuration;
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
@@ -77,6 +81,7 @@ public class MqttClientManager
/// json
public async Task PublishAsync(string topic, string message)
{
+ Console.WriteLine($"Publish: {topic} - {message}");
// MqttChannelAdapter
var mqttMsg = new MqttApplicationMessageBuilder()
.WithTopic(topic)
diff --git a/OpenAuth.App/BaseApp/HostedService/GlobalSubscribe.cs b/OpenAuth.App/BaseApp/HostedService/GlobalSubscribe.cs
deleted file mode 100644
index f0de2a4..0000000
--- a/OpenAuth.App/BaseApp/HostedService/GlobalSubscribe.cs
+++ /dev/null
@@ -1,51 +0,0 @@
-using System.Text;
-using ce.autofac.extension;
-using Infrastructure.Cache;
-using OpenAuth.WebApi;
-using Quartz;
-
-namespace OpenAuth.App.BaseApp.HostedService;
-///
-/// 离线去订阅
-///
-public class GlobalSubscribe : IJob
-{
- public async Task Execute(IJobExecutionContext context)
- {
- Console.WriteLine($"running !{DateTime.Now}");
- var redisCacheContext = context.JobDetail.JobDataMap.Get("redisCacheContext") as RedisCacheContext;
- var serviceProvider = context.JobDetail.JobDataMap.Get("serviceProvider") as IServiceProvider;
- var mqttManager = serviceProvider.GetService(typeof(MqttClientManager)) as MqttClientManager;
- /*var ioc = IocManager.Instance;
- var redisCacheContext = IocManager.Instance.GetService();*/
- // todo 如果无人机不在线,则订阅
- if (redisCacheContext == null) return;
- var keys = redisCacheContext.GetAllKeys("online:*");
- foreach (var redisKey in keys)
- {
- // todo 取得sn值
- // todo 取得设备信息
- // todo
- // todo 需要判断是不是要订阅,避免重复订阅
- if (mqttManager != null)
- await mqttManager.SubscribeAsync("thing/product/{gateway_sn}/services_reply", async (args) =>
- {
- var topic = args.ApplicationMessage.Topic;
- var payload = args.ApplicationMessage.Payload;
- var message = Encoding.UTF8.GetString(payload);
- // todo 解析是否是需要的
- // flighttask_prepare method
- });
- //statusSubscribe.subscribe(gateway);
- //stateSubscribe.subscribe(gateway, true);
- //osdSubscribe.subscribe(gateway, true);
- //servicesSubscribe.subscribe(gateway);
- //eventsSubscribe.subscribe(gateway, true);
- //requestsSubscribe.subscribe(gateway);
- //propertySetSubscribe.subscribe(gateway);
- }
-
- // todo
- return ;
- }
-}
\ No newline at end of file
diff --git a/OpenAuth.App/BaseApp/HostedService/QuartzService.cs b/OpenAuth.App/BaseApp/HostedService/QuartzService.cs
index 1b3f535..d30c5e5 100644
--- a/OpenAuth.App/BaseApp/HostedService/QuartzService.cs
+++ b/OpenAuth.App/BaseApp/HostedService/QuartzService.cs
@@ -5,7 +5,7 @@ using Infrastructure.Cache;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
-using OpenAuth.App.BaseApp.HostedService;
+using OpenAuth.App.BaseApp.Subscribe;
using Quartz;
using Quartz.Impl;
@@ -32,7 +32,41 @@ namespace OpenAuth.App.HostedService
public Task StartAsync(CancellationToken cancellationToken)
{
- var map = new JobDataMap();
+ _scheduler.Start(cancellationToken);
+ // 此种方法创建任务,可用构造函数,注入对象
+ var jobBuilderType = typeof(JobBuilder);
+ var method = jobBuilderType.GetMethods().FirstOrDefault(
+ x => x.Name.Equals("Create", StringComparison.OrdinalIgnoreCase) &&
+ x.IsGenericMethod && x.GetParameters().Length == 0)
+ ?.MakeGenericMethod(typeof(GlobalSubscribe));
+
+ var jobBuilder = (JobBuilder)method.Invoke(null, null);
+
+ IJobDetail jobDetail = jobBuilder.WithIdentity("GlobalSubscribe").Build();
+ var trigger = TriggerBuilder.Create()
+ .WithIdentity("SubscribeJobTrigger", "GlobalSubscribe")
+ .WithSimpleSchedule(x => { x.WithIntervalInSeconds(30).RepeatForever(); })
+ .Build();
+
+
+ //todo 临时注销
+ //_scheduler.ScheduleJob(jobDetail, trigger, cancellationToken);
+
+ var method1 = jobBuilderType.GetMethods().FirstOrDefault(
+ x => x.Name.Equals("Create", StringComparison.OrdinalIgnoreCase) &&
+ x.IsGenericMethod && x.GetParameters().Length == 0)
+ ?.MakeGenericMethod(typeof(ConfigSubscribe));
+
+ var jobBuilder1 = (JobBuilder)method1.Invoke(null, null);
+
+ IJobDetail jobDetail1 = jobBuilder1.WithIdentity("ConfigSubscribe").Build();
+ var bootTrigger = TriggerBuilder.Create()
+ .WithIdentity("bootTrigger")
+ .StartNow()
+ .WithSimpleSchedule(x => x.WithRepeatCount(0)).Build();
+ _scheduler.ScheduleJob(jobDetail1, bootTrigger, cancellationToken);
+
+ /*var map = new JobDataMap();
map.Put("redisCacheContext", _serviceProvider.GetService());
map.Put("ServiceProvider", _serviceProvider);
var job = JobBuilder.Create().UsingJobData(map)
@@ -40,13 +74,13 @@ namespace OpenAuth.App.HostedService
//创建一个触发条件
var trigger = TriggerBuilder.Create()
.WithIdentity("SubscribeJobTrigger", "GlobalSubscribe")
- .WithSimpleSchedule(x => { x.WithIntervalInSeconds(3).RepeatForever(); })
+ .WithSimpleSchedule(x => { x.WithIntervalInSeconds(30).RepeatForever(); })
.Build();
var onceTrigger = TriggerBuilder.Create()
.WithIdentity("onceTrigger")
.WithSimpleSchedule(x => x.WithRepeatCount(0)).Build();
- _scheduler.Start(cancellationToken);
- _scheduler.ScheduleJob(job, trigger, cancellationToken);
+
+ _scheduler.ScheduleJob(job, trigger, cancellationToken);*/
var result = _openJobApp.StartAll();
return result;
}
diff --git a/OpenAuth.App/BaseApp/Subscribe/ConfigSubscribe.cs b/OpenAuth.App/BaseApp/Subscribe/ConfigSubscribe.cs
new file mode 100644
index 0000000..62e7cd8
--- /dev/null
+++ b/OpenAuth.App/BaseApp/Subscribe/ConfigSubscribe.cs
@@ -0,0 +1,677 @@
+using System.Dynamic;
+using System.Text;
+using Infrastructure.Cache;
+using Infrastructure.CloudSdk.wayline;
+using Newtonsoft.Json;
+using OpenAuth.App.ServiceApp;
+using OpenAuth.Repository.Domain;
+using OpenAuth.WebApi;
+using Quartz;
+using SqlSugar;
+
+namespace OpenAuth.App.BaseApp.Subscribe;
+
+public class ConfigSubscribe : IJob
+{
+ private readonly MqttClientManager mqttClientManager;
+ private readonly ISqlSugarClient sqlSugarClient;
+ private readonly RedisCacheContext redisCacheContext;
+ private readonly ManageApp manageApp;
+
+ public ConfigSubscribe(MqttClientManager mqttClientManager, ISqlSugarClient sqlSugarClient,
+ ICacheContext redisCacheContext, ManageApp manageApp)
+ {
+ this.mqttClientManager = mqttClientManager;
+ this.sqlSugarClient = sqlSugarClient;
+ this.redisCacheContext = redisCacheContext as RedisCacheContext;
+ this.manageApp = manageApp;
+ }
+
+ private async Task Subscribe()
+ {
+ string gatewaySn = "8UUXN5400A079H";
+ await mqttClientManager
+ .SubscribeAsync($"thing/product/{gatewaySn}/services_reply",
+ async (args) => await HandleTopic(gatewaySn, args.ApplicationMessage.Topic,
+ Encoding.UTF8.GetString(args.ApplicationMessage.Payload)));
+ await mqttClientManager
+ .SubscribeAsync($"thing/product/{gatewaySn}/events",
+ async (args) => await HandleTopic(gatewaySn, args.ApplicationMessage.Topic,
+ Encoding.UTF8.GetString(args.ApplicationMessage.Payload)));
+ }
+
+ public async Task Execute(IJobExecutionContext context)
+ {
+ await Subscribe();
+ }
+
+ // todo 暂时不使用
+ public static readonly string[] TopicList =
+ {
+ "thing/product/**/osd",
+ "thing/product/**/state",
+ "thing/product/**/services_reply",
+ "thing/product/**/events",
+ "thing/product/**/requests",
+ "sys/product/**/status",
+ "thing/product/**/property/set_reply",
+ "thing/product/**/drc/up"
+ };
+
+
+ private async Task HandleTopic(string sn, string topic, string message)
+ {
+ /*
+ thing/product/{device_sn}/osd 设备端定频向云平台推送的设备属性(properties),
+ 具体内容范围参见物模型内容
+ thing/product/{device_sn}/state 设备端按需上报向云平台推送的设备属性(properties),
+ 具体内容范围参见物模型内容
+ thing/product/{gateway_sn}/services_reply 设备对 service 的回复、处理结果 (航线任务下发之类的)
+ thing/product/{gateway_sn}/events 设备端向云平台发送的,需要关注和处理的事件。
+ 比如SD满了,飞机解禁禁飞区等信息(事件范围参见物模型内容)
+ thing/product/{gateway_sn}/requests 设备端向云平台发送请求,为了获取一些信息,比如上传的临时凭证
+ sys/product/{gateway_sn}/status 设备上下线、更新拓扑
+ thing/product/{gateway_sn}/property/set_reply 设备属性设置的响应
+ thing/product/{gateway_sn}/drc/up DRC 协议上行*/
+ // thing/product/8UUXN5400A079H/requests
+ if (topic.Equals("thing/product/8UUXN5400A079H/requests"))
+ {
+ Console.WriteLine("收到的资源请求-未处理字符串");
+ }
+ var tempStr = topic.Replace(sn, "*");
+ if (tempStr.Equals("thing/product/*/requests"))
+ {
+ Console.WriteLine("收到的资源请求");
+ }
+ //Console.WriteLine($"成功调用主题 [{topic}] 的消息: {message}");
+ // 主题方法
+ var result = JsonConvert.DeserializeObject>(message);
+ var method = result.method;
+ var data = result.data;
+ var hasProperty = false;
+ foreach (var property in data.GetType().GetProperties())
+ {
+ if (property.Name != "result") continue;
+ hasProperty = true;
+ break;
+ }
+
+ long code = 0;
+ if (hasProperty)
+ {
+ code = data.result;
+ }
+
+ switch (tempStr)
+ {
+ case "thing/product/*/requests":
+ //{"bid":"f936a236-030c-4358-bee9-b5075e1e2ddf",
+ //"data":{"flight_id":"e5ce8433-c264-4357-84d9-b701faf90d9e"},
+ //"method":"flighttask_resource_get",
+ //"tid":"61b6389a-7b72-49ae-bb46-0729e85c95d2",
+ //"timestamp":1750554644321,
+ //"gateway":"8UUXN5400A079H"}
+ // todo 处理资源获取请求
+ switch (method)
+ {
+ case "flighttask_resource_get":
+ Console.WriteLine($"获取资源请求:{JsonConvert.SerializeObject(data)}");
+ var flightId = data.flight_id;
+ LasaTaskAssign taskAssign =
+ manageApp.GetTaskAssignByBidAndTidAndFlightId(result.bid, result.tid, flightId);
+ var flightTaskResourceGetTopic = $"thing/product/{sn}/requests_reply";
+ dynamic outData = new ExpandoObject();
+ var outRequest = new TopicServicesRequest