FieldWorkClient/Services/MinioService.cs

301 lines
12 KiB
C#
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System.Collections.Concurrent;
using System.IO;
using System.Security.Cryptography;
using System.Threading.Channels;
using FileUploader.Models;
using Microsoft.Extensions.Configuration;
using Minio;
using Minio.ApiEndpoints;
using Minio.DataModel;
using Minio.DataModel.Args;
using Minio.DataModel.Notification;
using Minio.Exceptions;
using Polly;
using Polly.Retry;
namespace Hopetry.Services
{
class MinioService
{
private readonly IMinioClient _minioClient;
private readonly string _bucketName = null;
AsyncRetryPolicy policy = Policy
.Handle<MinioException>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (exception, delay, retryCount, context) =>
{
//日志
// _logger.LogWarning($"Retry {retryCount} for {context.PolicyKey}: {exception.Message}");
});
// 使用 Channel 实现生产者-消费者模式
private static readonly Channel<(string ObjectName, string LocalPath)> _syncChannel =
Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
/// <summary>
/// 同步下载 todo 需要合并另外一个同步方法
/// </summary>
/// <param name="bucket"></param>
/// <param name="localDir"></param>
/// <param name="maxParallel"></param>
public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 16)
{
var producerTasks = new List<Task>();
var listArgs = new ListObjectsArgs()
.WithBucket(bucket)
.WithRecursive(true);
// 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典
var localFileIndex = await BuildLocalFileIndex(localDir);
var x = policy.ExecuteAsync(
() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs)));
await foreach (var item in x.Result)
{
producerTasks.Add(Task.Run(async () =>
{
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
// 快速路径判断(存在性检查)
if (!localFileIndex.TryGetValue(localPath, out var localMeta))
{
// 如果不存在,则加入下载队列
await _syncChannel.Writer.WriteAsync((item.Key, localPath));
}
// 并行获取远程元数据(避免串行等待)
var remoteMetaTask = GetObjectMetadata(bucket, item.Key);
// 对比本地缓存元数据
if ((ulong)localMeta.Size != item.Size || localMeta.LastModified.CompareTo(item.LastModified) < 0)
{
var remoteMeta = await remoteMetaTask;
if (!VerifyETag(localPath, remoteMeta.ETag))
{
await _syncChannel.Writer.WriteAsync((item.Key, localPath));
}
}
}));
}
// 阶段2: 并行消费下载任务
var consumerTask = Task.Run(async () =>
{
await Parallel.ForEachAsync(_syncChannel.Reader.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
async (item, _) =>
{
// 经分析 SDK 内部已经做了临时文件处理,不必再画蛇添足
await _minioClient.GetObjectAsync(
new GetObjectArgs()
.WithBucket(bucket)
.WithObject(item.ObjectName)
.WithFile(item.LocalPath));
// todo 可能需要添加文件校验
}
);
});
await Task.WhenAll(producerTasks);
_syncChannel.Writer.Complete(); // 关键步骤!
// 等待消费完成
await consumerTask;
}
/// <summary>
/// 构建本地文件内存索引(路径 → 元数据)
/// </summary>
/// <param name="rootDir"></param>
/// <returns></returns>
private async Task<ConcurrentDictionary<string, (long Size, DateTime LastModified)>> BuildLocalFileIndex(
string rootDir)
{
var index =
new ConcurrentDictionary<string, (long Size, DateTime LastModified)>(StringComparer.OrdinalIgnoreCase);
await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories),
async (file, _) =>
{
var info = new FileInfo(file);
index.TryAdd(file, (info.Length, info.LastWriteTimeUtc));
});
return index;
}
public MinioService(IConfiguration config)
{
var minioConfig = config.GetSection("Minio");
_minioClient = new MinioClient()
.WithEndpoint(minioConfig["Endpoint"])
.WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]);
_bucketName = minioConfig["BucketName"]!;
EnsureBucketExistsAsync().Wait();
}
private async Task EnsureBucketExistsAsync()
{
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
// 如果存储桶不存在,则创建存储桶
if (!await _minioClient.BucketExistsAsync(existsArgs))
{
var makeArgs = new MakeBucketArgs().WithBucket(_bucketName);
await _minioClient.MakeBucketAsync(makeArgs);
}
}
/// <summary>
/// 上伟文件
/// </summary>
/// <param name="fileRecord"></param>
public async Task UploadFileAsync(FileRecord fileRecord)
{
var putArgs = new PutObjectArgs()
.WithBucket(_bucketName)
.WithObject(fileRecord.FileName)
.WithFileName(fileRecord.LocalPath)
// application/zip
.WithContentType("application/octet-stream");
await _minioClient.PutObjectAsync(putArgs);
}
/// <summary>
/// 列出存储桶内所有文件
/// </summary>
/// <param name="bucket"></param>
public async Task ListAllObject(string bucket)
{
try
{
// Just list of objects
// Check whether 'mybucket' exists or not.
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
bool found = await _minioClient.BucketExistsAsync(existsArgs);
if (found)
{
// List objects from 'my-bucketname'
ListObjectsArgs args = new ListObjectsArgs()
.WithBucket("mybucket")
.WithPrefix("prefix")
.WithRecursive(true);
// ListObjectsEnumAsync 新方法
IObservable<Item> observable = _minioClient.ListObjectsAsync(args);
IDisposable subscription = observable.Subscribe(
item => Console.WriteLine("OnNext: {0}", item.Key),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnComplete: {0}"));
}
else
{
Console.WriteLine("mybucket does not exist");
}
}
catch (MinioException e)
{
Console.WriteLine("Error occurred: " + e);
}
}
public async Task ListenBucket(string bucketName, string prefix, string suffix)
{
try
{
var events = new List<EventType> { EventType.ObjectCreatedAll };
ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(events)
.WithPrefix(prefix)
.WithSuffix(suffix);
IObservable<MinioNotificationRaw> observable = _minioClient.ListenBucketNotificationsAsync(args);
IDisposable subscription = observable.Subscribe(
notification => Console.WriteLine($"Notification: {notification.Json}"),
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));
}
catch (MinioException e)
{
Console.WriteLine("Error occurred: " + e);
}
}
// 差异检测:通过 ETag 和修改时间对比‌:ml-citation{ref="1,4" data="citationList"}
private async Task<bool> NeedSyncAsync(string bucket, string objectName, string localPath)
{
if (!File.Exists(localPath)) return true;
var remoteMeta = await GetObjectMetadata(bucket, objectName);
var localInfo = new FileInfo(localPath);
return localInfo.Length != remoteMeta.Size ||
localInfo.LastWriteTimeUtc < remoteMeta.LastModified ||
!VerifyETag(localPath, remoteMeta.ETag);
}
// 获取远程对象元数据‌:ml-citation{ref="1" data="citationList"}
public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket,
string objectName)
{
var args = new StatObjectArgs().WithBucket(bucket).WithObject(objectName);
var stat = await _minioClient.StatObjectAsync(args);
return (stat.Size, stat.LastModified, stat.ETag);
}
// 校验本地文件 ETagMinIO 使用 MD5:ml-citation{ref="1,7" data="citationList"}
public bool VerifyETag(string filePath, string remoteETag)
{
using var md5 = MD5.Create();
using var stream = File.OpenRead(filePath);
var localHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower();
return remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase);
}
public async Task MirrorAsync(string bucket, string localDir, int maxParallel = 8)
{
var objects =
_minioClient.ListObjectsEnumAsync(new ListObjectsArgs().WithBucket(bucket).WithRecursive(true));
var queue = new ConcurrentQueue<(string ObjectName, string LocalPath)>();
// 差异检测阶段‌:ml-citation{ref="1,4" data="citationList"}
await foreach (var item in objects)
{
if (item.IsDir) continue;
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
Directory.CreateDirectory(Path.GetDirectoryName(localPath));
if (await NeedSyncAsync(bucket, item.Key, localPath))
queue.Enqueue((item.Key, localPath));
}
// 并行下载阶段‌:ml-citation{ref="6" data="citationList"}
await Parallel.ForEachAsync(queue, new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
async (item, _) =>
{
var (objectName, localPath) = item;
var tempFile = localPath + ".tmp";
try
{
await _minioClient.GetObjectAsync(
new GetObjectArgs()
.WithBucket(bucket)
.WithObject(objectName)
.WithFile(tempFile));
File.Move(tempFile, localPath,
overwrite: true); // 原子替换‌:ml-citation{ref="1" data="citationList"}
File.SetLastWriteTimeUtc(localPath, DateTime.UtcNow);
}
finally
{
if (File.Exists(tempFile)) File.Delete(tempFile);
}
});
}
}
}