You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

236 lines
8.1 KiB
C#

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace OpenAuth.WebApi;
public class MqttClientManager
{
private IMqttClient _outBoundClient;
private IMqttClient _inBoundClient;
private MqttClientOptions _inboundOptions;
private MqttClientOptions _outboundOptions;
private readonly ILogger<MqttClientManager> _logger;
private int _inboundReconnectAttempts = 0;
private int _outboundReconnectAttempts = 0;
private readonly CancellationTokenSource _cancellationTokenSource = new();
// 指数退避参数
private readonly TimeSpan _minReconnectInterval = TimeSpan.FromSeconds(1);
private readonly TimeSpan _maxReconnectInterval = TimeSpan.FromMinutes(2);
private List<String> _topics = new();
public MqttClientManager(ILogger<MqttClientManager> logger)
{
_logger = logger;
var mqttFactory = new MqttFactory();
_outBoundClient = mqttFactory.CreateMqttClient();
_inBoundClient = mqttFactory.CreateMqttClient();
// 创建配置构建器
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile(
$"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? "Development"}.json",
optional: true)
.AddEnvironmentVariables();
// 构建配置
var configuration = builder.Build();
// 读取连接字符串
var serverIp = configuration["MQTT:Server"];
var port = configuration["MQTT:Port"];
var username = configuration["MQTT:UserName"];
var password = configuration["MQTT:Password"];
ConnectAsync(serverIp, int.Parse(port), username, password).Wait();
}
/// <summary>
///
/// </summary>
/// <param name="server"></param>
/// <param name="port"></param>
/// <param name="username"></param>
/// <param name="password"></param>
public async Task ConnectAsync(string server, int port, string username = null, string password = null)
{
_inboundOptions = new MqttClientOptionsBuilder()
.WithClientId(Guid.NewGuid() + "_inbound")
.WithTcpServer(server, port)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
.WithCredentials(username, password)
.Build();
_outboundOptions = new MqttClientOptionsBuilder()
.WithClientId(Guid.NewGuid() + "_outbound")
.WithTcpServer(server, port)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
.WithCredentials(username, password)
.Build();
await _outBoundClient.ConnectAsync(_inboundOptions, _cancellationTokenSource.Token);
await _inBoundClient.ConnectAsync(_outboundOptions, _cancellationTokenSource.Token);
_outBoundClient.ConnectedAsync += OnConnectedAsync;
_inBoundClient.ConnectedAsync += OnConnectedAsync;
_outBoundClient.DisconnectedAsync += OnOutboundDisconnectedAsync;
_inBoundClient.DisconnectedAsync += OnDisconnectedAsync;
}
private Task OnConnectedAsync(MqttClientConnectedEventArgs e)
{
_logger?.LogInformation("MQTT 连接成功");
_inboundReconnectAttempts = 0; // 重置重连计数
// 重新订阅主题
return ResubscribeTopicsAsync();
}
private async Task ResubscribeTopicsAsync()
{
if (!_inBoundClient.IsConnected || _topics.Count == 0)
return;
_logger?.LogInformation($"重新订阅 {_topics.Count} 个主题");
foreach (var topic in _topics)
{
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce,
_cancellationTokenSource.Token);
}
}
private async Task OnOutboundDisconnectedAsync(MqttClientDisconnectedEventArgs e)
{
_logger?.LogWarning($"MQTT 连接断开: {e.Reason}");
// 仅在主动断开连接时不尝试重连
if (e.ClientWasConnected)
{
await AttemptReconnectAsync(false);
}
}
private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e)
{
_logger?.LogWarning($"MQTT 连接断开: {e.Reason}");
// 仅在主动断开连接时不尝试重连
if (e.ClientWasConnected)
{
await AttemptReconnectAsync(true);
}
}
// 尝试重连(带指数退避)
private async Task AttemptReconnectAsync(bool isInbound)
{
if (_cancellationTokenSource.IsCancellationRequested)
return;
// 计算退避时间: min(初始间隔 * 2^尝试次数, 最大间隔)
var delay = TimeSpan.FromMilliseconds(
Math.Min(
_minReconnectInterval.TotalMilliseconds * Math.Pow(2, _inboundReconnectAttempts),
_maxReconnectInterval.TotalMilliseconds
)
);
if (isInbound)
{
_inboundReconnectAttempts++;
}
else
{
_outboundReconnectAttempts++;
}
var temp = isInbound ? _inboundReconnectAttempts : _outboundReconnectAttempts;
_logger?.LogInformation(
$"将在 {delay.TotalSeconds:F1} 秒后尝试重连 (尝试次数: {temp})");
try
{
await Task.Delay(delay, _cancellationTokenSource.Token);
if (isInbound)
{
// 尝试重连
if (!_inBoundClient.IsConnected)
{
await _inBoundClient.ConnectAsync(_inboundOptions, _cancellationTokenSource.Token);
}
}
else
{
// 尝试重连
if (!_outBoundClient.IsConnected)
{
await _outBoundClient.ConnectAsync(_outboundOptions, _cancellationTokenSource.Token);
}
}
}
catch (OperationCanceledException)
{
_logger?.LogInformation("重连操作被取消");
}
catch (Exception ex)
{
_logger?.LogError(ex, "重连失败");
// 继续尝试重连
if (!_cancellationTokenSource.IsCancellationRequested)
{
await AttemptReconnectAsync(isInbound);
}
}
}
public async Task SubscribeAsync(string topic,
Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{
_topics.Add(topic);
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler;
}
public async Task SubscribeAsync(string[] topics,
Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{
foreach (var topic in topics)
{
_topics.Add(topic);
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
_inBoundClient.ApplicationMessageReceivedAsync += handler;
}
}
public async Task UnsubscribeAsync(string topic)
{
await _inBoundClient.UnsubscribeAsync(topic, CancellationToken.None);
}
/// <summary>
/// 向主题发布消息
/// </summary>
/// <param name="topic">主题</param>
/// <param name="message">json</param>
public async Task PublishAsync(string topic, string message)
{
// MqttChannelAdapter
var mqttMsg = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
// 级别 0 1 2
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build();
var result = await _outBoundClient.PublishAsync(mqttMsg, CancellationToken.None);
if (result.IsSuccess)
{
//Console.WriteLine($"{topic} {message}发布成功");
}
else
{
throw new Exception($"{topic} {message}发布失败");
}
}
}