using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MQTTnet; using MQTTnet.Client; using OpenAuth.App.ServiceApp.FireManagement; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Infrastructure.Mqtt { public class MqttService : BackgroundService { private readonly ILogger _logger; private readonly MqttOptions _options; private IMqttClient _client; private MqttClientOptions _clientOptions; private FireManagementApp _app; public MqttService(IOptions options, ILogger logger, FireManagementApp app) { _options = options.Value; _logger = logger; _app = app; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var factory = new MqttFactory(); _client = factory.CreateMqttClient(); _clientOptions = new MqttClientOptionsBuilder() .WithTcpServer(_options.Server, _options.Port) .WithCredentials(_options.UserName, _options.Password) .WithCleanSession() .Build(); _client.ApplicationMessageReceivedAsync += async e => { try { var topic = e.ApplicationMessage.Topic; var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); Console.WriteLine(payload); // TODO: 可在这里写入数据库、调用业务逻辑等 if (topic == "fire") { await _app.ReceiveTaskImage(payload); } else { if (topic == "fireclueinfo") { await _app.AddFireClueInfoUAV(payload); } } await Task.CompletedTask; } catch (Exception ex) { _logger.LogError(ex, "mqtt消息处理异常"); } }; _client.DisconnectedAsync += async e => { await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); await TryReconnectAsync(stoppingToken); }; await TryReconnectAsync(stoppingToken); } private async Task TryReconnectAsync(CancellationToken stoppingToken) { if (_client == null || _clientOptions == null) return; while (!_client.IsConnected && !stoppingToken.IsCancellationRequested) { try { await _client.ConnectAsync(_clientOptions, stoppingToken); // 订阅主题 await _client.SubscribeAsync("fire"); await _client.SubscribeAsync("fireclueinfo"); } catch (Exception ex) { await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } } } public override async Task StopAsync(CancellationToken cancellationToken) { if (_client != null && _client.IsConnected) await _client.DisconnectAsync(); } } }