LASAPlatform/Infrastructure/CloudSdk/mqttmessagecenter/MqttMessageCenter.cs

168 lines
4.9 KiB
C#
Raw Permalink Normal View History

2025-06-17 13:53:54 +08:00
using Infrastructure.CloudSdk.mqttmessagecenter;
using MQTTnet;
using MQTTnet.Client;
using System.Text;
public class MqttMessageCenter
{
private readonly IMqttClient _mqttClient;
2025-06-19 14:47:46 +08:00
private MqttClientOptions _options;
private IEnumerable<IMqttMessageHandler> _handlers;
2025-06-17 13:53:54 +08:00
2025-06-19 14:47:46 +08:00
private readonly HashSet<string> _subscribedTopics = new();
private readonly object _lock = new();
2025-06-17 13:53:54 +08:00
2025-06-19 14:47:46 +08:00
public MqttMessageCenter()
{
2025-06-17 13:53:54 +08:00
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_mqttClient.ApplicationMessageReceivedAsync += async e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
2025-06-19 14:47:46 +08:00
if (_handlers != null)
{
foreach (var handler in _handlers)
{
if (handler.CanHandle(topic))
{
try
{
await handler.HandleAsync(topic, payload);
}
catch (Exception ex)
{
Console.WriteLine($"[Handler Error] {ex.Message}");
}
break;
}
}
}
};
// 自动重连逻辑
_mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine($"[MQTT] Disconnected. Reason: {e?.Reason}");
while (!_mqttClient.IsConnected)
2025-06-17 13:53:54 +08:00
{
2025-06-19 14:47:46 +08:00
try
2025-06-17 13:53:54 +08:00
{
2025-06-19 14:47:46 +08:00
Console.WriteLine("[MQTT] Attempting to reconnect...");
await Task.Delay(5000); // 5秒后重试
await _mqttClient.ConnectAsync(_options);
Console.WriteLine("[MQTT] Reconnected.");
// 重新订阅所有已记录的 topic
2026-03-06 16:09:15 +08:00
//await SubscribeAsync(_subscribedTopics.ToArray());
await ResubscribeAllAsync();
2025-06-19 14:47:46 +08:00
}
catch (Exception ex)
{
Console.WriteLine($"[MQTT] Reconnect failed: {ex.Message}");
2025-06-17 13:53:54 +08:00
}
}
};
}
2026-03-06 16:09:15 +08:00
/// <summary>
/// 重连后强制重新订阅所有 topic绕过去重逻辑
/// </summary>
private async Task ResubscribeAllAsync()
{
string[] topics;
lock (_lock)
{
topics = _subscribedTopics.ToArray();
}
if (topics.Length == 0) return;
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in topics)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
Console.WriteLine($"[MQTT] Resubscribed {topics.Length} topics: {string.Join(", ", topics)}");
}
2025-06-17 13:53:54 +08:00
2025-06-19 14:47:46 +08:00
public async Task InitializeAsync(
IEnumerable<IMqttMessageHandler> handlers,
string server,
int port,
string clientId,
string username = null,
string password = null)
{
_handlers = handlers;
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithClientId(clientId)
2026-03-06 16:09:15 +08:00
.WithCleanSession(true) // 重启后不依赖会话
2025-06-19 14:47:46 +08:00
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30));
if (!string.IsNullOrEmpty(username))
{
optionsBuilder.WithCredentials(username, password);
}
_options = optionsBuilder.Build();
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_options);
}
}
2025-06-17 13:53:54 +08:00
public async Task ConnectAndSubscribeAsync(params string[] topics)
{
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_options);
}
2025-06-19 14:47:46 +08:00
await SubscribeAsync(topics);
}
public async Task SubscribeAsync(params string[] topics)
{
if (!_mqttClient.IsConnected || topics == null || topics.Length == 0)
return;
List<string> newTopics = new();
lock (_lock)
2025-06-17 13:53:54 +08:00
{
2025-06-19 14:47:46 +08:00
foreach (var topic in topics)
{
if (_subscribedTopics.Add(topic)) // 避免重复
newTopics.Add(topic);
}
2025-06-17 13:53:54 +08:00
}
2025-06-19 14:47:46 +08:00
if (newTopics.Count > 0)
{
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in newTopics)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
Console.WriteLine($"[MQTT] Subscribed: {string.Join(", ", newTopics)}");
}
}
public IReadOnlyCollection<string> GetSubscribedTopics()
{
lock (_lock)
{
return _subscribedTopics.ToList().AsReadOnly();
}
2025-06-17 13:53:54 +08:00
}
}
2025-06-19 14:47:46 +08:00