168 lines
4.9 KiB
C#
168 lines
4.9 KiB
C#
using Infrastructure.CloudSdk.mqttmessagecenter;
|
||
using MQTTnet;
|
||
using MQTTnet.Client;
|
||
using System.Text;
|
||
|
||
public class MqttMessageCenter
|
||
{
|
||
private readonly IMqttClient _mqttClient;
|
||
private MqttClientOptions _options;
|
||
private IEnumerable<IMqttMessageHandler> _handlers;
|
||
|
||
private readonly HashSet<string> _subscribedTopics = new();
|
||
private readonly object _lock = new();
|
||
|
||
public MqttMessageCenter()
|
||
{
|
||
var factory = new MqttFactory();
|
||
_mqttClient = factory.CreateMqttClient();
|
||
|
||
_mqttClient.ApplicationMessageReceivedAsync += async e =>
|
||
{
|
||
var topic = e.ApplicationMessage.Topic;
|
||
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
|
||
|
||
if (_handlers != null)
|
||
{
|
||
foreach (var handler in _handlers)
|
||
{
|
||
if (handler.CanHandle(topic))
|
||
{
|
||
try
|
||
{
|
||
await handler.HandleAsync(topic, payload);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine($"[Handler Error] {ex.Message}");
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
};
|
||
|
||
// 自动重连逻辑
|
||
_mqttClient.DisconnectedAsync += async e =>
|
||
{
|
||
Console.WriteLine($"[MQTT] Disconnected. Reason: {e?.Reason}");
|
||
while (!_mqttClient.IsConnected)
|
||
{
|
||
try
|
||
{
|
||
Console.WriteLine("[MQTT] Attempting to reconnect...");
|
||
await Task.Delay(5000); // 5秒后重试
|
||
await _mqttClient.ConnectAsync(_options);
|
||
Console.WriteLine("[MQTT] Reconnected.");
|
||
|
||
// 重新订阅所有已记录的 topic
|
||
//await SubscribeAsync(_subscribedTopics.ToArray());
|
||
await ResubscribeAllAsync();
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine($"[MQTT] Reconnect failed: {ex.Message}");
|
||
}
|
||
}
|
||
};
|
||
}
|
||
/// <summary>
|
||
/// 重连后强制重新订阅所有 topic,绕过去重逻辑
|
||
/// </summary>
|
||
private async Task ResubscribeAllAsync()
|
||
{
|
||
string[] topics;
|
||
lock (_lock)
|
||
{
|
||
topics = _subscribedTopics.ToArray();
|
||
}
|
||
|
||
if (topics.Length == 0) return;
|
||
|
||
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
|
||
foreach (var topic in topics)
|
||
{
|
||
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
|
||
}
|
||
|
||
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
|
||
Console.WriteLine($"[MQTT] Resubscribed {topics.Length} topics: {string.Join(", ", topics)}");
|
||
}
|
||
|
||
public async Task InitializeAsync(
|
||
IEnumerable<IMqttMessageHandler> handlers,
|
||
string server,
|
||
int port,
|
||
string clientId,
|
||
string username = null,
|
||
string password = null)
|
||
{
|
||
_handlers = handlers;
|
||
|
||
var optionsBuilder = new MqttClientOptionsBuilder()
|
||
.WithTcpServer(server, port)
|
||
.WithClientId(clientId)
|
||
.WithCleanSession(true) // 重启后不依赖会话
|
||
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30));
|
||
|
||
if (!string.IsNullOrEmpty(username))
|
||
{
|
||
optionsBuilder.WithCredentials(username, password);
|
||
}
|
||
|
||
_options = optionsBuilder.Build();
|
||
|
||
if (!_mqttClient.IsConnected)
|
||
{
|
||
await _mqttClient.ConnectAsync(_options);
|
||
}
|
||
}
|
||
|
||
public async Task ConnectAndSubscribeAsync(params string[] topics)
|
||
{
|
||
if (!_mqttClient.IsConnected)
|
||
{
|
||
await _mqttClient.ConnectAsync(_options);
|
||
}
|
||
|
||
await SubscribeAsync(topics);
|
||
}
|
||
|
||
public async Task SubscribeAsync(params string[] topics)
|
||
{
|
||
if (!_mqttClient.IsConnected || topics == null || topics.Length == 0)
|
||
return;
|
||
|
||
List<string> newTopics = new();
|
||
lock (_lock)
|
||
{
|
||
foreach (var topic in topics)
|
||
{
|
||
if (_subscribedTopics.Add(topic)) // 避免重复
|
||
newTopics.Add(topic);
|
||
}
|
||
}
|
||
|
||
if (newTopics.Count > 0)
|
||
{
|
||
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
|
||
foreach (var topic in newTopics)
|
||
{
|
||
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
|
||
}
|
||
|
||
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
|
||
Console.WriteLine($"[MQTT] Subscribed: {string.Join(", ", newTopics)}");
|
||
}
|
||
}
|
||
|
||
public IReadOnlyCollection<string> GetSubscribedTopics()
|
||
{
|
||
lock (_lock)
|
||
{
|
||
return _subscribedTopics.ToList().AsReadOnly();
|
||
}
|
||
}
|
||
}
|
||
|