301 lines
12 KiB
C#
301 lines
12 KiB
C#
using System.Collections.Concurrent;
|
||
using System.IO;
|
||
using System.Security.Cryptography;
|
||
using System.Threading.Channels;
|
||
using FileUploader.Models;
|
||
using Microsoft.Extensions.Configuration;
|
||
using Minio;
|
||
using Minio.ApiEndpoints;
|
||
using Minio.DataModel;
|
||
using Minio.DataModel.Args;
|
||
using Minio.DataModel.Notification;
|
||
using Minio.Exceptions;
|
||
using Polly;
|
||
using Polly.Retry;
|
||
|
||
namespace Hopetry.Services
|
||
{
|
||
class MinioService
|
||
{
|
||
private readonly IMinioClient _minioClient;
|
||
|
||
private readonly string _bucketName = null;
|
||
|
||
|
||
AsyncRetryPolicy policy = Policy
|
||
.Handle<MinioException>()
|
||
.WaitAndRetryAsync(3, retryAttempt =>
|
||
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
|
||
onRetry: (exception, delay, retryCount, context) =>
|
||
{
|
||
//日志
|
||
// _logger.LogWarning($"Retry {retryCount} for {context.PolicyKey}: {exception.Message}");
|
||
});
|
||
|
||
// 使用 Channel 实现生产者-消费者模式
|
||
private static readonly Channel<(string ObjectName, string LocalPath)> _syncChannel =
|
||
Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
|
||
{
|
||
SingleWriter = false,
|
||
SingleReader = false,
|
||
FullMode = BoundedChannelFullMode.Wait
|
||
});
|
||
|
||
/// <summary>
|
||
/// 同步下载 todo 需要合并另外一个同步方法
|
||
/// </summary>
|
||
/// <param name="bucket"></param>
|
||
/// <param name="localDir"></param>
|
||
/// <param name="maxParallel"></param>
|
||
public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 16)
|
||
{
|
||
var producerTasks = new List<Task>();
|
||
var listArgs = new ListObjectsArgs()
|
||
.WithBucket(bucket)
|
||
.WithRecursive(true);
|
||
|
||
// 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典
|
||
var localFileIndex = await BuildLocalFileIndex(localDir);
|
||
var x = policy.ExecuteAsync(
|
||
() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs)));
|
||
await foreach (var item in x.Result)
|
||
{
|
||
producerTasks.Add(Task.Run(async () =>
|
||
{
|
||
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
|
||
// 快速路径判断(存在性检查)
|
||
if (!localFileIndex.TryGetValue(localPath, out var localMeta))
|
||
{
|
||
// 如果不存在,则加入下载队列
|
||
await _syncChannel.Writer.WriteAsync((item.Key, localPath));
|
||
}
|
||
|
||
// 并行获取远程元数据(避免串行等待)
|
||
var remoteMetaTask = GetObjectMetadata(bucket, item.Key);
|
||
// 对比本地缓存元数据
|
||
if ((ulong)localMeta.Size != item.Size || localMeta.LastModified.CompareTo(item.LastModified) < 0)
|
||
{
|
||
var remoteMeta = await remoteMetaTask;
|
||
if (!VerifyETag(localPath, remoteMeta.ETag))
|
||
{
|
||
await _syncChannel.Writer.WriteAsync((item.Key, localPath));
|
||
}
|
||
}
|
||
}));
|
||
}
|
||
|
||
// 阶段2: 并行消费下载任务
|
||
var consumerTask = Task.Run(async () =>
|
||
{
|
||
await Parallel.ForEachAsync(_syncChannel.Reader.ReadAllAsync(),
|
||
new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
|
||
async (item, _) =>
|
||
{
|
||
// 经分析 SDK 内部已经做了临时文件处理,不必再画蛇添足
|
||
await _minioClient.GetObjectAsync(
|
||
new GetObjectArgs()
|
||
.WithBucket(bucket)
|
||
.WithObject(item.ObjectName)
|
||
.WithFile(item.LocalPath));
|
||
// todo 可能需要添加文件校验
|
||
}
|
||
);
|
||
});
|
||
await Task.WhenAll(producerTasks);
|
||
_syncChannel.Writer.Complete(); // 关键步骤!
|
||
// 等待消费完成
|
||
await consumerTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 构建本地文件内存索引(路径 → 元数据)
|
||
/// </summary>
|
||
/// <param name="rootDir"></param>
|
||
/// <returns></returns>
|
||
private async Task<ConcurrentDictionary<string, (long Size, DateTime LastModified)>> BuildLocalFileIndex(
|
||
string rootDir)
|
||
{
|
||
var index =
|
||
new ConcurrentDictionary<string, (long Size, DateTime LastModified)>(StringComparer.OrdinalIgnoreCase);
|
||
await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories),
|
||
async (file, _) =>
|
||
{
|
||
var info = new FileInfo(file);
|
||
index.TryAdd(file, (info.Length, info.LastWriteTimeUtc));
|
||
});
|
||
return index;
|
||
}
|
||
|
||
public MinioService(IConfiguration config)
|
||
{
|
||
var minioConfig = config.GetSection("Minio");
|
||
_minioClient = new MinioClient()
|
||
.WithEndpoint(minioConfig["Endpoint"])
|
||
.WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]);
|
||
_bucketName = minioConfig["BucketName"]!;
|
||
EnsureBucketExistsAsync().Wait();
|
||
}
|
||
|
||
private async Task EnsureBucketExistsAsync()
|
||
{
|
||
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
|
||
// 如果存储桶不存在,则创建存储桶
|
||
if (!await _minioClient.BucketExistsAsync(existsArgs))
|
||
{
|
||
var makeArgs = new MakeBucketArgs().WithBucket(_bucketName);
|
||
await _minioClient.MakeBucketAsync(makeArgs);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 上伟文件
|
||
/// </summary>
|
||
/// <param name="fileRecord"></param>
|
||
public async Task UploadFileAsync(FileRecord fileRecord)
|
||
{
|
||
var putArgs = new PutObjectArgs()
|
||
.WithBucket(_bucketName)
|
||
.WithObject(fileRecord.FileName)
|
||
.WithFileName(fileRecord.LocalPath)
|
||
// application/zip
|
||
.WithContentType("application/octet-stream");
|
||
await _minioClient.PutObjectAsync(putArgs);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 列出存储桶内所有文件
|
||
/// </summary>
|
||
/// <param name="bucket"></param>
|
||
public async Task ListAllObject(string bucket)
|
||
{
|
||
try
|
||
{
|
||
// Just list of objects
|
||
// Check whether 'mybucket' exists or not.
|
||
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
|
||
bool found = await _minioClient.BucketExistsAsync(existsArgs);
|
||
if (found)
|
||
{
|
||
// List objects from 'my-bucketname'
|
||
ListObjectsArgs args = new ListObjectsArgs()
|
||
.WithBucket("mybucket")
|
||
.WithPrefix("prefix")
|
||
.WithRecursive(true);
|
||
// ListObjectsEnumAsync 新方法
|
||
IObservable<Item> observable = _minioClient.ListObjectsAsync(args);
|
||
IDisposable subscription = observable.Subscribe(
|
||
item => Console.WriteLine("OnNext: {0}", item.Key),
|
||
ex => Console.WriteLine("OnError: {0}", ex.Message),
|
||
() => Console.WriteLine("OnComplete: {0}"));
|
||
}
|
||
else
|
||
{
|
||
Console.WriteLine("mybucket does not exist");
|
||
}
|
||
}
|
||
catch (MinioException e)
|
||
{
|
||
Console.WriteLine("Error occurred: " + e);
|
||
}
|
||
}
|
||
|
||
public async Task ListenBucket(string bucketName, string prefix, string suffix)
|
||
{
|
||
try
|
||
{
|
||
var events = new List<EventType> { EventType.ObjectCreatedAll };
|
||
|
||
ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs()
|
||
.WithBucket(bucketName)
|
||
.WithEvents(events)
|
||
.WithPrefix(prefix)
|
||
.WithSuffix(suffix);
|
||
IObservable<MinioNotificationRaw> observable = _minioClient.ListenBucketNotificationsAsync(args);
|
||
|
||
IDisposable subscription = observable.Subscribe(
|
||
notification => Console.WriteLine($"Notification: {notification.Json}"),
|
||
ex => Console.WriteLine($"OnError: {ex}"),
|
||
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));
|
||
}
|
||
catch (MinioException e)
|
||
{
|
||
Console.WriteLine("Error occurred: " + e);
|
||
}
|
||
}
|
||
|
||
// 差异检测:通过 ETag 和修改时间对比:ml-citation{ref="1,4" data="citationList"}
|
||
private async Task<bool> NeedSyncAsync(string bucket, string objectName, string localPath)
|
||
{
|
||
if (!File.Exists(localPath)) return true;
|
||
|
||
var remoteMeta = await GetObjectMetadata(bucket, objectName);
|
||
var localInfo = new FileInfo(localPath);
|
||
|
||
return localInfo.Length != remoteMeta.Size ||
|
||
localInfo.LastWriteTimeUtc < remoteMeta.LastModified ||
|
||
!VerifyETag(localPath, remoteMeta.ETag);
|
||
}
|
||
|
||
// 获取远程对象元数据:ml-citation{ref="1" data="citationList"}
|
||
public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket,
|
||
string objectName)
|
||
{
|
||
var args = new StatObjectArgs().WithBucket(bucket).WithObject(objectName);
|
||
var stat = await _minioClient.StatObjectAsync(args);
|
||
return (stat.Size, stat.LastModified, stat.ETag);
|
||
}
|
||
|
||
// 校验本地文件 ETag(MinIO 使用 MD5):ml-citation{ref="1,7" data="citationList"}
|
||
public bool VerifyETag(string filePath, string remoteETag)
|
||
{
|
||
using var md5 = MD5.Create();
|
||
using var stream = File.OpenRead(filePath);
|
||
var localHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower();
|
||
return remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase);
|
||
}
|
||
|
||
public async Task MirrorAsync(string bucket, string localDir, int maxParallel = 8)
|
||
{
|
||
var objects =
|
||
_minioClient.ListObjectsEnumAsync(new ListObjectsArgs().WithBucket(bucket).WithRecursive(true));
|
||
var queue = new ConcurrentQueue<(string ObjectName, string LocalPath)>();
|
||
|
||
// 差异检测阶段:ml-citation{ref="1,4" data="citationList"}
|
||
await foreach (var item in objects)
|
||
{
|
||
if (item.IsDir) continue;
|
||
|
||
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
|
||
Directory.CreateDirectory(Path.GetDirectoryName(localPath));
|
||
|
||
if (await NeedSyncAsync(bucket, item.Key, localPath))
|
||
queue.Enqueue((item.Key, localPath));
|
||
}
|
||
|
||
// 并行下载阶段:ml-citation{ref="6" data="citationList"}
|
||
await Parallel.ForEachAsync(queue, new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
|
||
async (item, _) =>
|
||
{
|
||
var (objectName, localPath) = item;
|
||
var tempFile = localPath + ".tmp";
|
||
|
||
try
|
||
{
|
||
await _minioClient.GetObjectAsync(
|
||
new GetObjectArgs()
|
||
.WithBucket(bucket)
|
||
.WithObject(objectName)
|
||
.WithFile(tempFile));
|
||
|
||
File.Move(tempFile, localPath,
|
||
overwrite: true); // 原子替换:ml-citation{ref="1" data="citationList"}
|
||
File.SetLastWriteTimeUtc(localPath, DateTime.UtcNow);
|
||
}
|
||
finally
|
||
{
|
||
if (File.Exists(tempFile)) File.Delete(tempFile);
|
||
}
|
||
});
|
||
}
|
||
}
|
||
} |