using Infrastructure.CloudSdk.mqttmessagecenter; using MQTTnet; using MQTTnet.Client; using System.Text; public class MqttMessageCenter { private readonly IMqttClient _mqttClient; private MqttClientOptions _options; private IEnumerable _handlers; private readonly HashSet _subscribedTopics = new(); private readonly object _lock = new(); public MqttMessageCenter() { var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); _mqttClient.ApplicationMessageReceivedAsync += async e => { var topic = e.ApplicationMessage.Topic; var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); if (_handlers != null) { foreach (var handler in _handlers) { if (handler.CanHandle(topic)) { try { await handler.HandleAsync(topic, payload); } catch (Exception ex) { Console.WriteLine($"[Handler Error] {ex.Message}"); } break; } } } }; // 自动重连逻辑 _mqttClient.DisconnectedAsync += async e => { Console.WriteLine($"[MQTT] Disconnected. Reason: {e?.Reason}"); while (!_mqttClient.IsConnected) { try { Console.WriteLine("[MQTT] Attempting to reconnect..."); await Task.Delay(5000); // 5秒后重试 await _mqttClient.ConnectAsync(_options); Console.WriteLine("[MQTT] Reconnected."); // 重新订阅所有已记录的 topic await SubscribeAsync(_subscribedTopics.ToArray()); } catch (Exception ex) { Console.WriteLine($"[MQTT] Reconnect failed: {ex.Message}"); } } }; } public async Task InitializeAsync( IEnumerable handlers, string server, int port, string clientId, string username = null, string password = null) { _handlers = handlers; var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer(server, port) .WithClientId(clientId) .WithCleanSession(false) // 保留会话,避免服务器丢掉订阅信息 .WithKeepAlivePeriod(TimeSpan.FromSeconds(30)); if (!string.IsNullOrEmpty(username)) { optionsBuilder.WithCredentials(username, password); } _options = optionsBuilder.Build(); if (!_mqttClient.IsConnected) { await _mqttClient.ConnectAsync(_options); } } public async Task ConnectAndSubscribeAsync(params string[] topics) { if (!_mqttClient.IsConnected) { await _mqttClient.ConnectAsync(_options); } await SubscribeAsync(topics); } public async Task SubscribeAsync(params string[] topics) { if (!_mqttClient.IsConnected || topics == null || topics.Length == 0) return; List newTopics = new(); lock (_lock) { foreach (var topic in topics) { if (_subscribedTopics.Add(topic)) // 避免重复 newTopics.Add(topic); } } if (newTopics.Count > 0) { var subscribeOptions = new MqttClientSubscribeOptionsBuilder(); foreach (var topic in newTopics) { subscribeOptions.WithTopicFilter(f => f.WithTopic(topic)); } await _mqttClient.SubscribeAsync(subscribeOptions.Build()); Console.WriteLine($"[MQTT] Subscribed: {string.Join(", ", newTopics)}"); } } public IReadOnlyCollection GetSubscribedTopics() { lock (_lock) { return _subscribedTopics.ToList().AsReadOnly(); } } }