You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

51 lines
2.0 KiB
C#

using System.Text;
using ce.autofac.extension;
using Infrastructure.Cache;
using OpenAuth.WebApi;
using Quartz;
namespace OpenAuth.App.BaseApp.HostedService;
/// <summary>
/// 离线去订阅
/// </summary>
public class GlobalSubscribe : IJob
{
public async Task Execute(IJobExecutionContext context)
{
Console.WriteLine($"running !{DateTime.Now}");
var redisCacheContext = context.JobDetail.JobDataMap.Get("redisCacheContext") as RedisCacheContext;
var serviceProvider = context.JobDetail.JobDataMap.Get("serviceProvider") as IServiceProvider;
var mqttManager = serviceProvider.GetService(typeof(MqttClientManager)) as MqttClientManager;
/*var ioc = IocManager.Instance;
var redisCacheContext = IocManager.Instance.GetService<ICacheContext>();*/
// todo 如果无人机不在线,则订阅
if (redisCacheContext == null) return;
var keys = redisCacheContext.GetAllKeys("online:*");
foreach (var redisKey in keys)
{
// todo 取得sn值
// todo 取得设备信息
// todo
// todo 需要判断是不是要订阅,避免重复订阅
if (mqttManager != null)
await mqttManager.SubscribeAsync("thing/product/{gateway_sn}/services_reply", async (args) =>
{
var topic = args.ApplicationMessage.Topic;
var payload = args.ApplicationMessage.Payload;
var message = Encoding.UTF8.GetString(payload);
// todo 解析是否是需要的
// flighttask_prepare method
});
//statusSubscribe.subscribe(gateway);
//stateSubscribe.subscribe(gateway, true);
//osdSubscribe.subscribe(gateway, true);
//servicesSubscribe.subscribe(gateway);
//eventsSubscribe.subscribe(gateway, true);
//requestsSubscribe.subscribe(gateway);
//propertySetSubscribe.subscribe(gateway);
}
// todo
return ;
}
}