断线重连

main
zhangbin 2026-03-06 16:09:15 +08:00
parent 9920a7796a
commit a4798f779f
1 changed files with 25 additions and 2 deletions

View File

@ -56,7 +56,8 @@ public class MqttMessageCenter
Console.WriteLine("[MQTT] Reconnected.");
// 重新订阅所有已记录的 topic
await SubscribeAsync(_subscribedTopics.ToArray());
//await SubscribeAsync(_subscribedTopics.ToArray());
await ResubscribeAllAsync();
}
catch (Exception ex)
{
@ -65,6 +66,28 @@ public class MqttMessageCenter
}
};
}
/// <summary>
/// 重连后强制重新订阅所有 topic绕过去重逻辑
/// </summary>
private async Task ResubscribeAllAsync()
{
string[] topics;
lock (_lock)
{
topics = _subscribedTopics.ToArray();
}
if (topics.Length == 0) return;
var subscribeOptions = new MqttClientSubscribeOptionsBuilder();
foreach (var topic in topics)
{
subscribeOptions.WithTopicFilter(f => f.WithTopic(topic));
}
await _mqttClient.SubscribeAsync(subscribeOptions.Build());
Console.WriteLine($"[MQTT] Resubscribed {topics.Length} topics: {string.Join(", ", topics)}");
}
public async Task InitializeAsync(
IEnumerable<IMqttMessageHandler> handlers,
@ -79,7 +102,7 @@ public class MqttMessageCenter
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(server, port)
.WithClientId(clientId)
.WithCleanSession(false) // 保留会话,避免服务器丢掉订阅信息
.WithCleanSession(true) // 重启后不依赖会话
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30));
if (!string.IsNullOrEmpty(username))