93 lines
3.0 KiB
C#
93 lines
3.0 KiB
C#
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using OpenAuth.App.ServiceApp.FireManagement;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace Infrastructure.Mqtt
|
|
{
|
|
public class MqttService : BackgroundService
|
|
{
|
|
private readonly ILogger<MqttService> _logger;
|
|
private readonly MqttOptions _options;
|
|
private IMqttClient _client;
|
|
private MqttClientOptions _clientOptions;
|
|
private FireManagementApp _app;
|
|
public MqttService(IOptions<MqttOptions> options, ILogger<MqttService> logger, FireManagementApp app)
|
|
{
|
|
_options = options.Value;
|
|
_logger = logger;
|
|
_app = app;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var factory = new MqttFactory();
|
|
_client = factory.CreateMqttClient();
|
|
|
|
_clientOptions = new MqttClientOptionsBuilder()
|
|
.WithTcpServer(_options.Server, _options.Port)
|
|
.WithCredentials(_options.UserName, _options.Password)
|
|
.WithCleanSession()
|
|
.Build();
|
|
|
|
_client.ApplicationMessageReceivedAsync += async e =>
|
|
{
|
|
try
|
|
{
|
|
var topic = e.ApplicationMessage.Topic;
|
|
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
|
|
Console.WriteLine(payload);
|
|
// TODO: 可在这里写入数据库、调用业务逻辑等
|
|
await _app.ReceiveTaskImage(payload);
|
|
await Task.CompletedTask;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "mqtt消息处理异常");
|
|
}
|
|
};
|
|
|
|
_client.DisconnectedAsync += async e =>
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
|
await TryReconnectAsync(stoppingToken);
|
|
};
|
|
|
|
await TryReconnectAsync(stoppingToken);
|
|
}
|
|
|
|
private async Task TryReconnectAsync(CancellationToken stoppingToken)
|
|
{
|
|
if (_client == null || _clientOptions == null) return;
|
|
|
|
while (!_client.IsConnected && !stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await _client.ConnectAsync(_clientOptions, stoppingToken);
|
|
|
|
// 订阅主题
|
|
await _client.SubscribeAsync("fire");
|
|
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
public override async Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
if (_client != null && _client.IsConnected)
|
|
await _client.DisconnectAsync();
|
|
}
|
|
}
|
|
} |