using System; using System.IO; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Configuration; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; namespace OpenAuth.WebApi; public class MqttClientManager { private IMqttClient _outBoundClient; private IMqttClient _inBoundClient; public MqttClientManager() { var mqttFactory = new MqttFactory(); _outBoundClient = mqttFactory.CreateMqttClient(); _inBoundClient = mqttFactory.CreateMqttClient(); // 创建配置构建器 var builder = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) .AddJsonFile( $"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? "Development"}.json", optional: true) .AddEnvironmentVariables(); // 构建配置 var configuration = builder.Build(); // 读取连接字符串 var serverIp = configuration["MQTT:Server"]; var port = configuration["MQTT:Port"]; var username = configuration["MQTT:UserName"]; var password = configuration["MQTT:Password"]; ConnectAsync(serverIp, int.Parse(port), username, password).Wait(); } /// /// /// /// /// /// /// public async Task ConnectAsync(string server, int port, string username = null, string password = null) { var inboundOptions = new MqttClientOptionsBuilder() .WithClientId(Guid.NewGuid() + "_inbound") .WithTcpServer(server, port) .WithCredentials(username, password) .Build(); var outboundOptions = new MqttClientOptionsBuilder() .WithClientId(Guid.NewGuid() + "_outbound") .WithTcpServer(server, port) .WithCredentials(username, password) .Build(); await _outBoundClient.ConnectAsync(inboundOptions, CancellationToken.None); await _inBoundClient.ConnectAsync(outboundOptions, CancellationToken.None); } public async Task SubscribeAsync(string topic, Func handler) { await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce, CancellationToken.None); _inBoundClient.ApplicationMessageReceivedAsync += handler; } public async Task UnsubscribeAsync(string topic) { await _inBoundClient.UnsubscribeAsync(topic, CancellationToken.None); } /// /// 向主题发布消息 /// /// 主题 /// json public async Task PublishAsync(string topic, string message) { Console.WriteLine($"Publish: {topic} - {message}"); // MqttChannelAdapter var mqttMsg = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(message) // 级别 0 1 2 .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .Build(); await _outBoundClient.PublishAsync(mqttMsg, CancellationToken.None); } }