LASAPlatform/OpenAuth.WebApi/Model/mqtt/ChannelDB/LogBatchWriter.cs

80 lines
2.6 KiB
C#

using Microsoft.AspNetCore.HttpOverrides;
using OpenAuth.App.ServiceApp;
using OpenAuth.Repository.Domain;
using Org.BouncyCastle.Crypto.Tls;
using SqlSugar;
namespace OpenAuth.WebApi.Model.mqtt.ChannelDB
{
public class LogBatchWriter : BackgroundService
{
private readonly LogQueueService _queue;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<LogBatchWriter> _logger;
private const int BatchSize = 100;
private const int FlushIntervalMs = 10000;
public LogBatchWriter(LogQueueService queue, IServiceScopeFactory scopeFactory, ILogger<LogBatchWriter> logger)
{
_queue = queue;
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var buffer = new List<LasaLog>(BatchSize);
while (!stoppingToken.IsCancellationRequested)
{
buffer.Clear();
var deadline = DateTime.UtcNow.AddMilliseconds(FlushIntervalMs);
while (buffer.Count < BatchSize && DateTime.UtcNow < deadline)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero) break;
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
cts.CancelAfter(remaining);
var log = await _queue.Reader.ReadAsync(cts.Token);
buffer.Add(log);
}
catch (OperationCanceledException)
{
break;
}
}
if (buffer.Count > 0)
{
await FlushAsync(buffer, stoppingToken);
}
}
}
private async Task FlushAsync(List<LasaLog> logs, CancellationToken ct)
{
try
{
using var scope = _scopeFactory.CreateScope();
// 直接从 DI 拿 SqlSugar 的 ISqlSugarClient
var db = scope.ServiceProvider.GetRequiredService<ISqlSugarClient>();
// SqlSugar 批量插入
await db.Insertable(logs).ExecuteCommandAsync();
_logger.LogDebug($"批量写入 {logs.Count} 条日志");
}
catch (Exception ex)
{
_logger.LogError(ex, $"批量写入失败,丢失 {logs.Count} 条");
}
}
}
}