洁 任 3 months ago
commit 7175ca51fa

@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Infrastructure.CloudSdk.mqttmessagecenter
{
public interface IMqttMessageHandler
{
bool CanHandle(string topic);
Task HandleAsync(string topic, string payload);
}
}

@ -0,0 +1,64 @@
using Infrastructure.CloudSdk.mqttmessagecenter;
using MQTTnet;
using MQTTnet.Client;
using System.Text;
public class MqttMessageCenter
{
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _options;
private readonly IEnumerable<IMqttMessageHandler> _handlers;
public MqttMessageCenter(IEnumerable<IMqttMessageHandler> handlers, string server, int port, string clientId, string username = null, string password = null)
{
_handlers = handlers;
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithCredentials(username,password)
.WithClientId(clientId)
.WithCleanSession();
//if (!string.IsNullOrEmpty(username))
//{
// optionsBuilder.WithCredentials(username, password);
//}
_options = optionsBuilder.Build();
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
foreach (var handler in _handlers)
{
if (handler.CanHandle(topic))
{
await handler.HandleAsync(topic, payload);
break;
}
}
};
}
public async Task ConnectAndSubscribeAsync(params string[] topics)
{
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_options);
}
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in topics)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
}
}

@ -10,6 +10,7 @@ using OpenAuth.App.Interface;
using SqlSugar;
using Infrastructure;
using OpenAuth.App.ServiceApp.Response;
using DocumentFormat.OpenXml.EMMA;
namespace OpenAuth.App.ServiceApp
{
@ -69,5 +70,39 @@ namespace OpenAuth.App.ServiceApp
}
}
/// <summary>
/// 修改注册码状态
/// </summary>
/// <param name="DeviceBindingCode"></param>
/// <returns></returns>
public async Task<bool> UpdateCodeStatus(string DeviceBindingCode)
{
using (var db = UnitWork.CreateContext())
{
var flag = await db.LasaDeviceBindingCode.UpdateAsync(it => new LasaDeviceBindingCode()
{
BindStatus = 1,
}, it => it.DeviceBindingCode == DeviceBindingCode);
if (db.Commit())
return true;
else
return false;
}
}
#region 健康报警
//添加健康报警
public bool AddManageDeviceHms(List<LasaManageDeviceHms> info)
{
using (var db = UnitWork.CreateContext())
{
var flag = db.LasaManageDeviceHms.InsertRange(info);
if (db.Commit())
return true;
else
return false;
}
}
#endregion
}
}

@ -0,0 +1,74 @@
using SqlSugar;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace OpenAuth.Repository.Domain
{
[SugarTable("lasa_managedevicehms")]
public class LasaManageDeviceHms
{
/// <summary>
/// 报警ID
/// </summary>
[SugarColumn(IsPrimaryKey = true)]
public string Id { get; set; }
/// <summary>
/// uuid
/// </summary>
public string HmsId { get; set; }
/// <summary>
/// tid
/// </summary>
public string TId { get; set; }
/// <summary>
/// bid
/// </summary>
public string BId { get; set; }
/// <summary>
/// Sn
/// </summary>
public string Sn { get; set; }
/// <summary>
/// 等级
/// </summary>
public int? Level { get; set; }
/// <summary>
/// 1:device manage; 2: media; 3: hms
/// </summary>
public int? Module { get; set; }
/// <summary>
/// 消息密钥
/// </summary>
public string HmsKey { get; set; }
/// <summary>
/// 中文报警
/// </summary>
public string MessageZh { get; set; }
/// <summary>
/// 英文报警
/// </summary>
public string MessageEn { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime? CreateTime { get; set; }
/// <summary>
/// 修改时间
/// </summary>
public DateTime? UpdateTime { get; set; }
}
}

@ -71,6 +71,7 @@ namespace OpenAuth.Repository
public SugarRepositiry<LasaSpaceLockFly> LasaSpaceLockFly { get; set; }
public SugarRepositiry<LasaDeviceBindingCode> LasaDeviceBindingCode { get; set; }
public SugarRepositiry<LasaManageDeviceHms> LasaManageDeviceHms { get; set; }
#endregion
}
}

@ -15,10 +15,10 @@ namespace OpenAuth.WebApi.Controllers
[Route("api/[controller]/[action]")]
[ApiController]
//[ApiExplorerSettings(GroupName = "系统日志_SysLogs")]
class SysLogsController : ControllerBase
public class SysLogsController : ControllerBase
{
private readonly SysLogApp _app;
//获取详情
[HttpGet]
public Response<SysLog> Get(string id)
@ -40,7 +40,7 @@ namespace OpenAuth.WebApi.Controllers
/// <summary>
/// 添加
/// </summary>
[HttpPost]
[HttpPost]
public Response Add(SysLog obj)
{
var result = new Response();
@ -57,7 +57,7 @@ namespace OpenAuth.WebApi.Controllers
return result;
}
/// <summary>
/// 修改日志(建议废弃)
/// </summary>
@ -83,7 +83,7 @@ namespace OpenAuth.WebApi.Controllers
/// 加载列表
/// </summary>
[HttpGet]
public async Task<TableData> Load([FromQuery]QuerySysLogListReq request)
public async Task<TableData> Load([FromQuery] QuerySysLogListReq request)
{
return await _app.Load(request);
}
@ -91,8 +91,8 @@ namespace OpenAuth.WebApi.Controllers
/// <summary>
/// 批量删除
/// </summary>
[HttpPost]
public Response Delete([FromBody]string[] ids)
[HttpPost]
public Response Delete([FromBody] string[] ids)
{
var result = new Response();
try
@ -109,7 +109,7 @@ namespace OpenAuth.WebApi.Controllers
return result;
}
public SysLogsController(SysLogApp app)
public SysLogsController(SysLogApp app)
{
_app = app;
}

@ -0,0 +1,103 @@
using OpenAuth.Repository.Domain;
using System.Text.Json.Nodes;
namespace OpenAuth.WebApi.Model.mqtt
{
public class HmsAlarmParser
{
private readonly JsonObject _hmsMessages;
public HmsAlarmParser(string hmsJsonPath)
{
string json = File.ReadAllText(hmsJsonPath);
_hmsMessages = JsonNode.Parse(json)?.AsObject() ?? throw new Exception("hms.json 格式错误");
}
public List<LasaManageDeviceHms> ParseAlarmMessages(string rawJson)
{
List<LasaManageDeviceHms> result= new List<LasaManageDeviceHms> ();
var root = JsonNode.Parse(rawJson)?.AsObject();
if (root == null)
{
return null;
}
string method = root["method"]?.ToString() ?? "";
string bid = root["bid"]?.ToString() ?? "";
string tid = root["tid"]?.ToString() ?? "";
if (method != "hms")
{
return null;
}
var alarmList = root["data"]?["list"]?.AsArray();
if (alarmList == null || alarmList.Count == 0)
{
return null;
}
foreach (var alarmItem in alarmList)
{
string code = alarmItem?["code"]?.ToString() ?? "";
int module = alarmItem?["module"]?.GetValue<int>() ?? -1;
int inTheSky = alarmItem?["in_the_sky"]?.GetValue<int>() ?? 0;
var args = alarmItem["args"]?.AsObject() ?? new JsonObject();
int sensorIndex = args["sensor_index"]?.GetValue<int>() ?? 0;
int componentIndex = args["component_index"]?.GetValue<int>() ?? 0;
int level = alarmItem?["level"]?.GetValue<int>() ?? 0;
// 拼接 key
string key = module switch
{
3 => $"dock_tip_{code}",
0 => inTheSky == 1 ? $"fpv_tip_{code}_in_the_sky" : $"fpv_tip_{code}",
_ => $"fpv_tip_{code}"
};
if (!_hmsMessages.TryGetPropertyValue(key, out var node))
{
continue;
}
string zh = node["zh"]?.ToString() ?? "未提供中文文案";
string en = node["en"]?.ToString() ?? "No English message provided";
zh = ReplacePlaceholders(zh, code, sensorIndex, componentIndex);
en = ReplacePlaceholders(en, code, sensorIndex, componentIndex);
result.Add(new LasaManageDeviceHms
{
Id = Guid.NewGuid().ToString(),
BId = bid,
TId = tid,
Level = level,
Module = module,
CreateTime=DateTime.Now,
MessageEn = en,
MessageZh = zh,
});
}
return result;
}
private string ReplacePlaceholders(string template, string code, int sensorIndex, int componentIndex)
{
return template
.Replace("%alarmid", code)
.Replace("%index", (sensorIndex + 1).ToString())
.Replace("%component_index", Math.Clamp(componentIndex + 1, 1, 2).ToString())
.Replace("%battery_index", sensorIndex == 0 ? "左" : "右")
.Replace("%dock_cover_index", sensorIndex == 0 ? "左" : "右")
.Replace("%charging_rod_index", sensorIndex switch
{
0 => "前",
1 => "后",
2 => "左",
3 => "右",
_ => "未知"
});
}
}
}

@ -0,0 +1,47 @@
using System.Text.Json.Serialization;
namespace OpenAuth.WebApi.Model.mqtt.ModelResponse
{
public class HmsMessage
{
public string bid { get; set; }
public string tid { get; set; }
public long timestamp { get; set; }
public string method { get; set; }
public HmsData data { get; set; }
}
public class HmsData
{
public List<HmsItem> list { get; set; }
}
public class HmsItem
{
public string code { get; set; }
public string device_type { get; set; }
public int module { get; set; }
public int component_index => args?.component_index ?? -1;
[JsonPropertyName("sensor_index")]
public int sensor_index => args?.sensor_index ?? -1;
public int level { get; set; }
public int imminent { get; set; }
public int in_the_sky { get; set; }
public HmsArgs args { get; set; }
}
public class HmsArgs
{
public int component_index { get; set; }
public int sensor_index { get; set; }
}
}

@ -0,0 +1,37 @@
using Infrastructure.CloudSdk.mqttmessagecenter;
namespace OpenAuth.WebApi.Model.mqtt
{
public class MqttHostedService : IHostedService
{
private readonly IServiceProvider _serviceProvider;
public MqttHostedService(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var handlers = scope.ServiceProvider.GetServices<IMqttMessageHandler>();
var mqttCenter = new MqttMessageCenter(
handlers,
server: "175.27.168.120",
port: 6011,
clientId: "mqtt_client_1581",
username: "sdhc",
password: ""
);
await mqttCenter.ConnectAndSubscribeAsync(
//"thing/product/8UUXN5400A079H/osd",
//"thing/product/8UUXN5400A079H/services",
"thing/product/8UUXN5400A079H/events1"
);
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
}

@ -0,0 +1,49 @@
using Infrastructure;
using Infrastructure.CloudSdk.mqttmessagecenter;
using Microsoft.AspNetCore.Mvc;
using NetTopologySuite.Mathematics;
using OpenAuth.App;
using OpenAuth.App.Interface;
using OpenAuth.App.ServiceApp;
using OpenAuth.Repository.Domain;
using OpenAuth.WebApi.Model.mqtt.ModelResponse;
using System.Collections.Generic;
using System.Text.Json;
namespace OpenAuth.WebApi.Model.mqtt
{
public class ThingEventHandler : IMqttMessageHandler
{
private readonly ILogger<ThingRequestHandler> _logger;
AirportMaintenanceApp _app;
public ThingEventHandler(ILogger<ThingRequestHandler> logger, AirportMaintenanceApp app)
{
_logger = logger;
_app = app;
}
public bool CanHandle(string topic)
{
return topic.Contains("/events1");
}
public Task HandleAsync(string topic, string payload)
{
_logger.LogError($"[osd] Topic={topic}, Payload={payload}");
Console.WriteLine($"[osd] Topic={topic}, Payload={payload}");
if (payload.Contains("hms"))//健康告警
{
var jsondata = JsonSerializer.Deserialize<HmsMessage>(payload);
if (jsondata.data != null && jsondata.data.list.Count > 0)
{
var path = Path.Combine(AppContext.BaseDirectory, "hms.json");
var parser = new HmsAlarmParser(path);
var alarmMessages = parser.ParseAlarmMessages(payload);
_app.AddManageDeviceHms(alarmMessages);
}
}
return Task.CompletedTask;
}
}
}

@ -0,0 +1,31 @@
using Infrastructure.CloudSdk.mqttmessagecenter;
namespace OpenAuth.WebApi.Model.mqtt
{
public class ThingOsdHandler : IMqttMessageHandler
{
private readonly ILogger<ThingRequestHandler> _logger;
public ThingOsdHandler(ILogger<ThingRequestHandler> logger)
{
_logger = logger;
}
public bool CanHandle(string topic)
{
return topic.Contains("/osd");
}
public Task HandleAsync(string topic, string payload)
{
_logger.LogError($"[osd] Topic={topic}, Payload={payload}");
Console.WriteLine($"[osd] Topic={topic}, Payload={payload}");
if (payload.Contains(""))
{
}
// 自定义处理逻辑
return Task.CompletedTask;
}
}
}

@ -0,0 +1,164 @@
using Infrastructure.CloudSdk.mqttmessagecenter;
using OpenAuth.App.ServiceApp;
using System.Text.Json;
using System.Text.Json.Nodes;
namespace OpenAuth.WebApi.Model.mqtt
{
public class ThingRequestHandler : IMqttMessageHandler
{
private readonly ILogger<ThingRequestHandler> _logger;
private readonly MqttClientManager _mqttClientManager;
AirportMaintenanceApp _app;
public ThingRequestHandler(ILogger<ThingRequestHandler> logger, MqttClientManager mqttClientManager, AirportMaintenanceApp app)
{
_logger = logger;
_mqttClientManager = mqttClientManager;
_app = app;
}
public bool CanHandle(string topic)
{
return topic.Contains("/requests");
}
string bid, tid, previousgateway;
int timestamp;
public async Task HandleAsync(string topic, string payload)
{
_logger.LogError($"[Request] Topic={topic}, Payload={payload}");
if (payload.Contains("config"))
{
var root = JsonNode.Parse(payload)?.AsObject();
if (root == null)
{
return;
}
string gateway = root["gateway"]?.ToString() ?? "";
if (previousgateway == gateway)
{
return;
}
bid = root["bid"]?.ToString() ?? "";
tid = root["tid"]?.ToString() ?? "";
timestamp = int.Parse(root["timestamp"]?.ToString() ?? "0");
var requestData = new
{
bid = bid,
method = "config",
tid = tid,
timestamp = timestamp,
gateway = gateway,
data = new
{
app_id = "164821",
app_key = "a5f80db847b285fd5fe51db7aa9f606",
app_license = "QXJX7VKLC03wYHujKbchjqIBiSgpjv46hLGY4B9Ez46RllbKZN9H+rVawveJWbbX/l4r6zPM5aZ/EzmCESUg046rSRziVhDOJ0tPw1cWqKAq6TuqaLK5bH88UFyB3ahv66YF3D/NmAgOfj5VRtT55cc67u/OIsF54lxhs/raAv0=",
ntp_server_host = "ntp.aliyun.com",
//ntp_server_host = "175.27.168.120",
//ntp_server_port = 6012
}
};
string payloadreq = JsonSerializer.Serialize(requestData);
await _mqttClientManager.PublishAsync($"thing/product/{gateway}/requests_reply", payloadreq);
await _app.UpdateCodeStatus(gateway);
}
if (payload.Contains("airport_bind_status"))
{
//解析返回的设备sn 先不解析,先测试绑定
var requestData = new
{
bid = Guid.NewGuid().ToString(),
data = new
{
output = new
{
bind_status = new[]
{
new
{
device_callsign = "山东慧创",
is_device_bind_organization = true,
organization_id = "379",
organization_name = "山东慧创",
sn = "8UUXN5400A079H"
},
new
{
device_callsign = "山东慧创",
is_device_bind_organization = true,
organization_id = "379",
organization_name = "山东慧创",
sn = "1581F8HGX254V00A0BUY"
}
}
},
result = 0
},
tid = Guid.NewGuid().ToString(),
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
method = "airport_bind_status"
};
string payloadreq = JsonSerializer.Serialize(requestData);
string getway = topic.Split('/')[2];
await _mqttClientManager.PublishAsync($"thing/product/{getway}/requests_reply", payloadreq);
}
if (payload.Contains("airport_organization_get"))
{
var requestData = new
{
bid = Guid.NewGuid().ToString(),
data = new
{
output = new
{
organization_name = "山东慧创"
},
result = 0
},
tid = Guid.NewGuid().ToString(),
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
method = "airport_organization_get"
};
string payloadreq = JsonSerializer.Serialize(requestData);
string getway = topic.Split('/')[2];
await _mqttClientManager.PublishAsync($"thing/product/{getway}/requests_reply", payloadreq);
}
if (payload.Contains("airport_organization_bind"))
{
var requestData = new
{
bid = Guid.NewGuid().ToString(),
data = new
{
output = new
{
err_infos = new[]
{
new
{
err_code = 210231,
sn = "8UUXN5400A079H"
},
new
{
err_code = 210231,
sn = "1581F8HGX254V00A0BUY"
}
}
},
result = 0
},
tid = Guid.NewGuid().ToString(),
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
method = "airport_organization_bind"
};
string payloadreq = JsonSerializer.Serialize(requestData);
string getway = topic.Split('/')[2];
await _mqttClientManager.PublishAsync($"thing/product/{getway}/requests_reply", payloadreq);
}
}
}
}

@ -0,0 +1,26 @@
using Infrastructure.CloudSdk.mqttmessagecenter;
namespace OpenAuth.WebApi.Model.mqtt
{
public class ThingServiceHandler : IMqttMessageHandler
{
private readonly ILogger<ThingServiceHandler> _logger;
public ThingServiceHandler(ILogger<ThingServiceHandler> logger)
{
_logger = logger;
}
public bool CanHandle(string topic)
{
return topic.Contains("/services");
}
public Task HandleAsync(string topic, string payload)
{
_logger.LogError($"[Service] Topic={topic}, Payload={payload}");
Console.WriteLine($"[Service] Topic={topic}, Payload={payload}");
return Task.CompletedTask;
}
}
}

File diff suppressed because it is too large Load Diff

@ -70,6 +70,9 @@
<Content Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
<Content Update="Model\mqtt\hms.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>

@ -2,9 +2,11 @@
using Autofac;
using Autofac.Extensions.DependencyInjection;
using ce.autofac.extension;
using DocumentFormat.OpenXml.Validation;
using IdentityServer4.AccessTokenValidation;
using Infrastructure;
using Infrastructure.CloudSdk.minio;
using Infrastructure.CloudSdk.mqttmessagecenter;
using Infrastructure.Extensions.AutofacManager;
using Infrastructure.Middleware;
using Microsoft.AspNetCore.DataProtection;
@ -23,6 +25,7 @@ using OpenAuth.App.BaseApp.ImMsgManager;
using OpenAuth.App.HostedService;
using OpenAuth.Repository;
using OpenAuth.WebApi.Model;
using OpenAuth.WebApi.Model.mqtt;
using SqlSugar;
using Swashbuckle.AspNetCore.SwaggerUI;
@ -325,6 +328,14 @@ namespace OpenAuth.WebApi
services.AddSignalR();
#endregion
#region mqtt
//services.AddSingleton<IMqttMessageHandler, ThingRequestHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingServiceHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingOsdHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingEventHandler>();
services.AddHostedService<MqttHostedService>();
#endregion
}
public void ConfigureContainer(ContainerBuilder builder)

Loading…
Cancel
Save