LASAPlatform/Infrastructure/CloudSdk/mqttmessagecenter/MqttMessageCenter.cs

168 lines
4.9 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using Infrastructure.CloudSdk.mqttmessagecenter;
using MQTTnet;
using MQTTnet.Client;
using System.Text;
public class MqttMessageCenter
{
private readonly IMqttClient _mqttClient;
private MqttClientOptions _options;
private IEnumerable<IMqttMessageHandler> _handlers;
private readonly HashSet<string> _subscribedTopics = new();
private readonly object _lock = new();
public MqttMessageCenter()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
if (_handlers != null)
{
foreach (var handler in _handlers)
{
if (handler.CanHandle(topic))
{
try
{
await handler.HandleAsync(topic, payload);
}
catch (Exception ex)
{
Console.WriteLine($"[Handler Error] {ex.Message}");
}
break;
}
}
}
};
// 自动重连逻辑
_mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine($"[MQTT] Disconnected. Reason: {e?.Reason}");
while (!_mqttClient.IsConnected)
{
try
{
Console.WriteLine("[MQTT] Attempting to reconnect...");
await Task.Delay(5000); // 5秒后重试
await _mqttClient.ConnectAsync(_options);
Console.WriteLine("[MQTT] Reconnected.");
// 重新订阅所有已记录的 topic
//await SubscribeAsync(_subscribedTopics.ToArray());
await ResubscribeAllAsync();
}
catch (Exception ex)
{
Console.WriteLine($"[MQTT] Reconnect failed: {ex.Message}");
}
}
};
}
/// <summary>
/// 重连后强制重新订阅所有 topic绕过去重逻辑
/// </summary>
private async Task ResubscribeAllAsync()
{
string[] topics;
lock (_lock)
{
topics = _subscribedTopics.ToArray();
}
if (topics.Length == 0) return;
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in topics)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
Console.WriteLine($"[MQTT] Resubscribed {topics.Length} topics: {string.Join(", ", topics)}");
}
public async Task InitializeAsync(
IEnumerable<IMqttMessageHandler> handlers,
string server,
int port,
string clientId,
string username = null,
string password = null)
{
_handlers = handlers;
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithClientId(clientId)
.WithCleanSession(true) // 重启后不依赖会话
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30));
if (!string.IsNullOrEmpty(username))
{
optionsBuilder.WithCredentials(username, password);
}
_options = optionsBuilder.Build();
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_options);
}
}
public async Task ConnectAndSubscribeAsync(params string[] topics)
{
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_options);
}
await SubscribeAsync(topics);
}
public async Task SubscribeAsync(params string[] topics)
{
if (!_mqttClient.IsConnected || topics == null || topics.Length == 0)
return;
List<string> newTopics = new();
lock (_lock)
{
foreach (var topic in topics)
{
if (_subscribedTopics.Add(topic)) // 避免重复
newTopics.Add(topic);
}
}
if (newTopics.Count > 0)
{
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in newTopics)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
Console.WriteLine($"[MQTT] Subscribed: {string.Join(", ", newTopics)}");
}
}
public IReadOnlyCollection<string> GetSubscribedTopics()
{
lock (_lock)
{
return _subscribedTopics.ToList().AsReadOnly();
}
}
}