消息异常处理和断连重连

main
zhangbin 3 months ago
parent 1e7b03d992
commit f1e3a39253

@ -6,45 +6,95 @@ using System.Text;
public class MqttMessageCenter
{
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _options;
private readonly IEnumerable<IMqttMessageHandler> _handlers;
private MqttClientOptions _options;
private IEnumerable<IMqttMessageHandler> _handlers;
public MqttMessageCenter(IEnumerable<IMqttMessageHandler> handlers, string server, int port, string clientId, string username = null, string password = null)
{
_handlers = handlers;
private readonly HashSet<string> _subscribedTopics = new();
private readonly object _lock = new();
public MqttMessageCenter()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithCredentials(username,password)
.WithClientId(clientId)
.WithCleanSession();
//if (!string.IsNullOrEmpty(username))
//{
// optionsBuilder.WithCredentials(username, password);
//}
_options = optionsBuilder.Build();
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
foreach (var handler in _handlers)
if (_handlers != null)
{
foreach (var handler in _handlers)
{
if (handler.CanHandle(topic))
{
try
{
await handler.HandleAsync(topic, payload);
}
catch (Exception ex)
{
Console.WriteLine($"[Handler Error] {ex.Message}");
}
break;
}
}
}
};
// 自动重连逻辑
_mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine($"[MQTT] Disconnected. Reason: {e?.Reason}");
while (!_mqttClient.IsConnected)
{
if (handler.CanHandle(topic))
try
{
await handler.HandleAsync(topic, payload);
break;
Console.WriteLine("[MQTT] Attempting to reconnect...");
await Task.Delay(5000); // 5秒后重试
await _mqttClient.ConnectAsync(_options);
Console.WriteLine("[MQTT] Reconnected.");
// 重新订阅所有已记录的 topic
await SubscribeAsync(_subscribedTopics.ToArray());
}
catch (Exception ex)
{
Console.WriteLine($"[MQTT] Reconnect failed: {ex.Message}");
}
}
};
}
public async Task InitializeAsync(
IEnumerable<IMqttMessageHandler> handlers,
string server,
int port,
string clientId,
string username = null,
string password = null)
{
_handlers = handlers;
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithClientId(clientId)
.WithCleanSession(false) // 保留会话,避免服务器丢掉订阅信息
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30));
if (!string.IsNullOrEmpty(username))
{
optionsBuilder.WithCredentials(username, password);
}
_options = optionsBuilder.Build();
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_options);
}
}
public async Task ConnectAndSubscribeAsync(params string[] topics)
{
if (!_mqttClient.IsConnected)
@ -52,13 +102,43 @@ public class MqttMessageCenter
await _mqttClient.ConnectAsync(_options);
}
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in topics)
await SubscribeAsync(topics);
}
public async Task SubscribeAsync(params string[] topics)
{
if (!_mqttClient.IsConnected || topics == null || topics.Length == 0)
return;
List<string> newTopics = new();
lock (_lock)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
foreach (var topic in topics)
{
if (_subscribedTopics.Add(topic)) // 避免重复
newTopics.Add(topic);
}
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
if (newTopics.Count > 0)
{
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in newTopics)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
Console.WriteLine($"[MQTT] Subscribed: {string.Join(", ", newTopics)}");
}
}
public IReadOnlyCollection<string> GetSubscribedTopics()
{
lock (_lock)
{
return _subscribedTopics.ToList().AsReadOnly();
}
}
}

@ -161,6 +161,23 @@ namespace OpenAuth.App.ServiceApp
}
}
//获取网关
public async Task<List<string>> GetGatewaysnList()
{
using (var db = UnitWork.CreateContext())
{
var info = await db.LasaGateway.AsQueryable()
.Where(r => r.BindStatus == 1).Select(r => r.GatewaySn).ToListAsync();
if (info != null)
{
return info;
}
else
{
return new List<string>();
}
}
}
#region 健康报警
/// <summary>
/// 添加健康报警

@ -1,4 +1,5 @@
using DocumentFormat.OpenXml.Math;
using DocumentFormat.OpenXml.Spreadsheet;
using Infrastructure;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
@ -19,11 +20,13 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
private readonly AirportMaintenanceApp _app;
private readonly MqttClientManager _mqttClientManager;
private readonly MqttMessageCenter _mqttCenter;
public AirportMaintenanceController(AirportMaintenanceApp app, MqttClientManager mqttClientManager)
public AirportMaintenanceController(AirportMaintenanceApp app, MqttClientManager mqttClientManager, MqttMessageCenter mqttCenter)
{
_app = app;
_mqttClientManager = mqttClientManager;
_mqttCenter = mqttCenter;
}
/// <summary>
/// 机场注册 注册码生成
@ -86,6 +89,17 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
try
{
result = await _app.GetGateway();
//自动更新主题
string sn = result.Result.GatewaySn;
var topics = new List<string>();
topics.AddRange(new[]
{
$"thing/product/{sn}/osd",
$"thing/product/{sn}/events",
$"thing/product/{sn}/requests",
$"thing/product/{sn}/services"
});
await _mqttCenter.SubscribeAsync(topics.ToArray());
}
catch (Exception ex)
{

@ -1,14 +1,20 @@
using Infrastructure.CloudSdk.mqttmessagecenter;
using NuGet.Packaging;
using OpenAuth.App.ServiceApp;
namespace OpenAuth.WebApi.Model.mqtt
{
public class MqttHostedService : IHostedService
{
private readonly IServiceProvider _serviceProvider;
private readonly MqttMessageCenter _mqttCenter;
private readonly AirportMaintenanceApp _app;
public MqttHostedService(IServiceProvider serviceProvider)
public MqttHostedService(IServiceProvider serviceProvider, MqttMessageCenter mqttCenter, AirportMaintenanceApp app)
{
_serviceProvider = serviceProvider;
_mqttCenter = mqttCenter;
_app = app;
}
public async Task StartAsync(CancellationToken cancellationToken)
@ -16,7 +22,7 @@ namespace OpenAuth.WebApi.Model.mqtt
using var scope = _serviceProvider.CreateScope();
var handlers = scope.ServiceProvider.GetServices<IMqttMessageHandler>();
var mqttCenter = new MqttMessageCenter(
await _mqttCenter.InitializeAsync(
handlers,
server: "175.27.168.120",
port: 6011,
@ -24,16 +30,21 @@ namespace OpenAuth.WebApi.Model.mqtt
username: "sdhc",
password: ""
);
//查询网关,订阅主题
var topics = new List<string>();
var gatewayList = await _app.GetGatewaysnList();
await mqttCenter.ConnectAndSubscribeAsync(
// "thing/product/8UUXN5400A079H/osd",
"thing/product/8UUXN5400A079H/osd"
// "thing/product/8UUXN5400A079H/services",
//"thing/product/8UUXN5400A079H/services_reply"
//"thing/product/8UUXN5400A079H/requests",
//"thing/product/8UUXN5400A079H/services_reply"
//"thing/product/8UUXN5400A079H/events"
);
foreach (var gateway in gatewayList)
{
topics.AddRange(new[]
{
$"thing/product/{gateway}/osd",
$"thing/product/{gateway}/events",
$"thing/product/{gateway}/requests",
$"thing/product/{gateway}/services"
});
}
await _mqttCenter.ConnectAndSubscribeAsync(topics.ToArray());
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;

@ -16,7 +16,7 @@ namespace OpenAuth.WebApi.Model.mqtt
public bool CanHandle(string topic)
{
return topic.Contains("/services_reply");
return topic.Contains("/services");
}
public Task HandleAsync(string topic, string payload)

@ -333,12 +333,15 @@ namespace OpenAuth.WebApi
#endregion
#region mqtt
services.AddSingleton<MqttMessageCenter>();
//services.AddSingleton<IMqttMessageHandler, ThingRequestHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingServiceHandler>();
services.AddSingleton<IMqttMessageHandler, ThingOsdHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingEventHandler>();
services.AddHostedService<MqttHostedService>();
#endregion
}

Loading…
Cancel
Save