using Infrastructure.CloudSdk.mqttmessagecenter; using MQTTnet; using MQTTnet.Client; using System.Text; public class MqttMessageCenter { private readonly IMqttClient _mqttClient; private readonly MqttClientOptions _options; private readonly IEnumerable _handlers; public MqttMessageCenter(IEnumerable handlers, string server, int port, string clientId, string username = null, string password = null) { _handlers = handlers; var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer(server, port) .WithCredentials(username,password) .WithClientId(clientId) .WithCleanSession(); //if (!string.IsNullOrEmpty(username)) //{ // optionsBuilder.WithCredentials(username, password); //} _options = optionsBuilder.Build(); _mqttClient.ApplicationMessageReceivedAsync += async e => { var topic = e.ApplicationMessage.Topic; var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); foreach (var handler in _handlers) { if (handler.CanHandle(topic)) { await handler.HandleAsync(topic, payload); break; } } }; } public async Task ConnectAndSubscribeAsync(params string[] topics) { if (!_mqttClient.IsConnected) { await _mqttClient.ConnectAsync(_options); } var subscribeOptions = new MqttClientSubscribeOptionsBuilder(); foreach (var topic in topics) { subscribeOptions.WithTopicFilter(f => f.WithTopic(topic)); } await _mqttClient.SubscribeAsync(subscribeOptions.Build()); } }