重连时,主题重新订阅

feature-flyModify
陈伟 2025-06-27 15:09:53 +08:00
parent 9d07444f65
commit 5ff3914e11
1 changed files with 19 additions and 2 deletions

View File

@ -7,6 +7,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;
namespace OpenAuth.WebApi;
@ -26,6 +27,7 @@ public class MqttClientManager
// 指数退避参数
private readonly TimeSpan _minReconnectInterval = TimeSpan.FromSeconds(1);
private readonly TimeSpan _maxReconnectInterval = TimeSpan.FromMinutes(2);
private List<String> _topics = new();
public MqttClientManager(ILogger<MqttClientManager> logger)
@ -86,8 +88,21 @@ public class MqttClientManager
{
_logger?.LogInformation("MQTT 连接成功");
_inboundReconnectAttempts = 0; // 重置重连计数
// todo 重新订阅主题
return null;
// 重新订阅主题
return ResubscribeTopicsAsync();
}
private async Task ResubscribeTopicsAsync()
{
if (!_inBoundClient.IsConnected || _topics.Count == 0)
return;
_logger?.LogInformation($"重新订阅 {_topics.Count} 个主题");
foreach (var topic in _topics)
{
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce,
_cancellationTokenSource.Token);
}
}
private async Task OnOutboundDisconnectedAsync(MqttClientDisconnectedEventArgs e)
@ -179,6 +194,7 @@ public class MqttClientManager
public async Task SubscribeAsync(string topic,
Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{
_topics.Add(topic);
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler;
}
@ -188,6 +204,7 @@ public class MqttClientManager
{
foreach (var topic in topics)
{
_topics.Add(topic);
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler;
}