using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; namespace OpenAuth.WebApi; public class MqttClientManager { private IMqttClient _outBoundClient; private IMqttClient _inBoundClient; private MqttClientOptions _inboundOptions; private MqttClientOptions _outboundOptions; private readonly ILogger _logger; private int _inboundReconnectAttempts = 0; private int _outboundReconnectAttempts = 0; private readonly CancellationTokenSource _cancellationTokenSource = new(); // 指数退避参数 private readonly TimeSpan _minReconnectInterval = TimeSpan.FromSeconds(1); private readonly TimeSpan _maxReconnectInterval = TimeSpan.FromMinutes(2); private List _topics = new(); public MqttClientManager(ILogger logger) { _logger = logger; var mqttFactory = new MqttFactory(); _outBoundClient = mqttFactory.CreateMqttClient(); _inBoundClient = mqttFactory.CreateMqttClient(); // 创建配置构建器 var builder = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) .AddJsonFile( $"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? "Development"}.json", optional: true) .AddEnvironmentVariables(); // 构建配置 var configuration = builder.Build(); // 读取连接字符串 var serverIp = configuration["MQTT:Server"]; var port = configuration["MQTT:Port"]; var username = configuration["MQTT:UserName"]; var password = configuration["MQTT:Password"]; ConnectAsync(serverIp, int.Parse(port), username, password).Wait(); } /// /// /// /// /// /// /// public async Task ConnectAsync(string server, int port, string username = null, string password = null) { _inboundOptions = new MqttClientOptionsBuilder() .WithClientId(Guid.NewGuid() + "_inbound") .WithTcpServer(server, port) .WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) .WithCredentials(username, password) .Build(); _outboundOptions = new MqttClientOptionsBuilder() .WithClientId(Guid.NewGuid() + "_outbound") .WithTcpServer(server, port) .WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) .WithCredentials(username, password) .Build(); await _outBoundClient.ConnectAsync(_inboundOptions, _cancellationTokenSource.Token); await _inBoundClient.ConnectAsync(_outboundOptions, _cancellationTokenSource.Token); _outBoundClient.ConnectedAsync += OnConnectedAsync; _inBoundClient.ConnectedAsync += OnConnectedAsync; _outBoundClient.DisconnectedAsync += OnOutboundDisconnectedAsync; _inBoundClient.DisconnectedAsync += OnDisconnectedAsync; } private Task OnConnectedAsync(MqttClientConnectedEventArgs e) { _logger?.LogInformation("MQTT 连接成功"); _inboundReconnectAttempts = 0; // 重置重连计数 // 重新订阅主题 return ResubscribeTopicsAsync(); } private async Task ResubscribeTopicsAsync() { if (!_inBoundClient.IsConnected || _topics.Count == 0) return; _logger?.LogInformation($"重新订阅 {_topics.Count} 个主题"); foreach (var topic in _topics) { await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce, _cancellationTokenSource.Token); } } private async Task OnOutboundDisconnectedAsync(MqttClientDisconnectedEventArgs e) { _logger?.LogWarning($"MQTT 连接断开: {e.Reason}"); // 仅在主动断开连接时不尝试重连 if (e.ClientWasConnected) { await AttemptReconnectAsync(false); } } private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e) { _logger?.LogWarning($"MQTT 连接断开: {e.Reason}"); // 仅在主动断开连接时不尝试重连 if (e.ClientWasConnected) { await AttemptReconnectAsync(true); } } // 尝试重连(带指数退避) private async Task AttemptReconnectAsync(bool isInbound) { if (_cancellationTokenSource.IsCancellationRequested) return; // 计算退避时间: min(初始间隔 * 2^尝试次数, 最大间隔) var delay = TimeSpan.FromMilliseconds( Math.Min( _minReconnectInterval.TotalMilliseconds * Math.Pow(2, _inboundReconnectAttempts), _maxReconnectInterval.TotalMilliseconds ) ); if (isInbound) { _inboundReconnectAttempts++; } else { _outboundReconnectAttempts++; } var temp = isInbound ? _inboundReconnectAttempts : _outboundReconnectAttempts; _logger?.LogInformation( $"将在 {delay.TotalSeconds:F1} 秒后尝试重连 (尝试次数: {temp})"); try { await Task.Delay(delay, _cancellationTokenSource.Token); if (isInbound) { // 尝试重连 if (!_inBoundClient.IsConnected) { await _inBoundClient.ConnectAsync(_inboundOptions, _cancellationTokenSource.Token); } } else { // 尝试重连 if (!_outBoundClient.IsConnected) { await _outBoundClient.ConnectAsync(_outboundOptions, _cancellationTokenSource.Token); } } } catch (OperationCanceledException) { _logger?.LogInformation("重连操作被取消"); } catch (Exception ex) { _logger?.LogError(ex, "重连失败"); // 继续尝试重连 if (!_cancellationTokenSource.IsCancellationRequested) { await AttemptReconnectAsync(isInbound); } } } public async Task SubscribeAsync(string topic, Func handler) { _topics.Add(topic); await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None); _inBoundClient.ApplicationMessageReceivedAsync += handler; } public async Task SubscribeAsync(string[] topics, Func handler) { foreach (var topic in topics) { _topics.Add(topic); await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None); _inBoundClient.ApplicationMessageReceivedAsync += handler; } } public async Task UnsubscribeAsync(string topic) { await _inBoundClient.UnsubscribeAsync(topic, CancellationToken.None); } /// /// 向主题发布消息 /// /// 主题 /// json public async Task PublishAsync(string topic, string message) { // MqttChannelAdapter var mqttMsg = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(message) // 级别 0 1 2 .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) .Build(); var result = await _outBoundClient.PublishAsync(mqttMsg, CancellationToken.None); if (result.IsSuccess) { Console.WriteLine($"{topic} {message}发布成功"); } else { throw new Exception($"{topic} {message}发布失败"); } } }