|
|
|
|
using System;
|
|
|
|
|
using System.IO;
|
|
|
|
|
using System.Threading;
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
using Microsoft.Extensions.Configuration;
|
|
|
|
|
using MQTTnet;
|
|
|
|
|
using MQTTnet.Client;
|
|
|
|
|
using MQTTnet.Protocol;
|
|
|
|
|
|
|
|
|
|
namespace OpenAuth.WebApi;
|
|
|
|
|
|
|
|
|
|
public class MqttClientManager
|
|
|
|
|
{
|
|
|
|
|
private IMqttClient _outBoundClient;
|
|
|
|
|
private IMqttClient _inBoundClient;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public MqttClientManager()
|
|
|
|
|
{
|
|
|
|
|
var mqttFactory = new MqttFactory();
|
|
|
|
|
_outBoundClient = mqttFactory.CreateMqttClient();
|
|
|
|
|
_inBoundClient = mqttFactory.CreateMqttClient();
|
|
|
|
|
// 创建配置构建器
|
|
|
|
|
var builder = new ConfigurationBuilder()
|
|
|
|
|
.SetBasePath(Directory.GetCurrentDirectory())
|
|
|
|
|
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
|
|
|
|
|
.AddJsonFile(
|
|
|
|
|
$"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? "Development"}.json",
|
|
|
|
|
optional: true)
|
|
|
|
|
.AddEnvironmentVariables();
|
|
|
|
|
// 构建配置
|
|
|
|
|
var configuration = builder.Build();
|
|
|
|
|
// 读取连接字符串
|
|
|
|
|
var serverIp = configuration["MQTT:Server"];
|
|
|
|
|
var port = configuration["MQTT:Port"];
|
|
|
|
|
var username = configuration["MQTT:UserName"];
|
|
|
|
|
var password = configuration["MQTT:Password"];
|
|
|
|
|
ConnectAsync(serverIp, int.Parse(port), username, password).Wait();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
///
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="server"></param>
|
|
|
|
|
/// <param name="port"></param>
|
|
|
|
|
/// <param name="username"></param>
|
|
|
|
|
/// <param name="password"></param>
|
|
|
|
|
public async Task ConnectAsync(string server, int port, string username = null, string password = null)
|
|
|
|
|
{
|
|
|
|
|
var inboundOptions = new MqttClientOptionsBuilder()
|
|
|
|
|
.WithClientId(Guid.NewGuid() + "_inbound")
|
|
|
|
|
.WithTcpServer(server, port)
|
|
|
|
|
.WithCredentials(username, password)
|
|
|
|
|
.Build();
|
|
|
|
|
var outboundOptions = new MqttClientOptionsBuilder()
|
|
|
|
|
.WithClientId(Guid.NewGuid() + "_outbound")
|
|
|
|
|
.WithTcpServer(server, port)
|
|
|
|
|
.WithCredentials(username, password)
|
|
|
|
|
.Build();
|
|
|
|
|
|
|
|
|
|
await _outBoundClient.ConnectAsync(inboundOptions, CancellationToken.None);
|
|
|
|
|
await _inBoundClient.ConnectAsync(outboundOptions, CancellationToken.None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async Task SubscribeAsync(string topic,
|
|
|
|
|
Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
|
|
|
|
|
{
|
|
|
|
|
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
|
|
|
|
|
_inBoundClient.ApplicationMessageReceivedAsync += handler;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async Task SubscribeAsync(string[] topics,
|
|
|
|
|
Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
|
|
|
|
|
{
|
|
|
|
|
foreach (var topic in topics)
|
|
|
|
|
{
|
|
|
|
|
await _inBoundClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce, CancellationToken.None);
|
|
|
|
|
_inBoundClient.ApplicationMessageReceivedAsync += handler;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async Task UnsubscribeAsync(string topic)
|
|
|
|
|
{
|
|
|
|
|
await _inBoundClient.UnsubscribeAsync(topic, CancellationToken.None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 向主题发布消息
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="topic">主题</param>
|
|
|
|
|
/// <param name="message">json</param>
|
|
|
|
|
public async Task PublishAsync(string topic, string message)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine($"Publish: {topic} - {message}");
|
|
|
|
|
// MqttChannelAdapter
|
|
|
|
|
var mqttMsg = new MqttApplicationMessageBuilder()
|
|
|
|
|
.WithTopic(topic)
|
|
|
|
|
.WithPayload(message)
|
|
|
|
|
// 级别 0 1 2
|
|
|
|
|
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
|
|
|
|
|
.Build();
|
|
|
|
|
var result = await _outBoundClient.PublishAsync(mqttMsg, CancellationToken.None);
|
|
|
|
|
if (result.IsSuccess)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine($"{topic} {message}发布成功");
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
throw new Exception($"{topic} {message}发布失败");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|