Infrastructure/OpenAuth.WebApi/Mqtt/MqttService.cs

105 lines
3.5 KiB
C#
Raw Normal View History

2025-11-10 14:35:55 +08:00
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MQTTnet;
using MQTTnet.Client;
using OpenAuth.App.ServiceApp.FireManagement;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Infrastructure.Mqtt
{
public class MqttService : BackgroundService
{
private readonly ILogger<MqttService> _logger;
private readonly MqttOptions _options;
private IMqttClient _client;
private MqttClientOptions _clientOptions;
private FireManagementApp _app;
public MqttService(IOptions<MqttOptions> options, ILogger<MqttService> logger, FireManagementApp app)
{
_options = options.Value;
_logger = logger;
_app = app;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(_options.Server, _options.Port)
.WithCredentials(_options.UserName, _options.Password)
.WithCleanSession()
.Build();
_client.ApplicationMessageReceivedAsync += async e =>
{
try
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
Console.WriteLine(payload);
// TODO: 可在这里写入数据库、调用业务逻辑等
if (topic == "fire")
{
await _app.ReceiveTaskImage(payload);
}
else
{
if (topic == "fireclueinfo")
{
await _app.AddFireClueInfoUAV(payload);
}
}
2025-11-10 14:35:55 +08:00
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "mqtt消息处理异常");
}
};
_client.DisconnectedAsync += async e =>
{
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
await TryReconnectAsync(stoppingToken);
};
await TryReconnectAsync(stoppingToken);
}
private async Task TryReconnectAsync(CancellationToken stoppingToken)
{
if (_client == null || _clientOptions == null) return;
while (!_client.IsConnected && !stoppingToken.IsCancellationRequested)
{
try
{
await _client.ConnectAsync(_clientOptions, stoppingToken);
// 订阅主题
await _client.SubscribeAsync("fire");
await _client.SubscribeAsync("fireclueinfo");
2025-11-10 14:35:55 +08:00
}
catch (Exception ex)
{
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
if (_client != null && _client.IsConnected)
await _client.DisconnectAsync();
}
}
}