using System.Collections.Concurrent; using System.Globalization; using System.IO; using System.Security.Cryptography; using System.Threading.Channels; using Microsoft.Extensions.Configuration; using Minio; using Minio.DataModel.Args; using Minio.DataModel.Notification; using Minio.Exceptions; using Newtonsoft.Json.Linq; using Polly; using Polly.Retry; namespace Hopetry.Services { public partial class MinioService { private IMinioClient _minioClient; public readonly string _bucketName = null; public MinioService() { var builder = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("global.json", optional: false, reloadOnChange: true); // 构建配置 var config = builder.Build(); var minioConfig = config.GetSection("Minio"); _minioClient = new MinioClient() .WithEndpoint(minioConfig["Endpoint"]) .WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]).Build(); _bucketName = minioConfig["BucketName"]!; /*_minioClient = new MinioClient() .WithEndpoint("123.132.248.154:9107") .WithCredentials("oZNgo25pNXnKFV9oKGh4", "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u") .Build();*/ EnsureBucketExistsAsync(_bucketName).Wait(); } AsyncRetryPolicy policy = Policy .Handle() .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), onRetry: (exception, delay, retryCount, context) => { //日志 // _logger.LogWarning($"Retry {retryCount} for {context.PolicyKey}: {exception.Message}"); }); // 使用 Channel 实现生产者-消费者模式 /*private Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000) { SingleWriter = false, SingleReader = false, FullMode = BoundedChannelFullMode.Wait });*/ public async Task ListObject(string bucketName) { var listArgs = new ListObjectsArgs() .WithBucket(bucketName) .WithVersions(true) .WithRecursive(true); try { var observable = _minioClient.ListObjectsEnumAsync(listArgs); //observable.ConfigureAwait(false); await foreach (var item in observable) { Console.WriteLine("=============="); Console.WriteLine(item.Key); //Console.WriteLine(item.IsDir); Console.WriteLine(item.LastModified); Console.WriteLine(item.Size); Console.WriteLine(item.ETag); Console.WriteLine(item.VersionId); Console.WriteLine("=============="); } } catch (Exception e) { Console.WriteLine(e); Console.WriteLine("我抛出的"); } } /// /// 同步下载 todo 需要合并另外一个同步方法 /// /// /// /// public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 5) { Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>( new BoundedChannelOptions(1000) { SingleWriter = false, SingleReader = false, FullMode = BoundedChannelFullMode.Wait }); var count = 0; var producerTasks = new List(); var listArgs = new ListObjectsArgs() .WithBucket(bucket) .WithRecursive(true); // 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典 var localFileIndex = await BuildLocalFileIndex(localDir); //var x = policy.ExecuteAsync(() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs))); var observable = _minioClient.ListObjectsEnumAsync(listArgs); await foreach (var item in observable) { var index = item.Key.LastIndexOf("/", StringComparison.Ordinal); if (index > 0) { var dir = Path.Combine(localDir, item.Key.Substring(0, index)); if (!Directory.Exists(dir)) { Directory.CreateDirectory(dir); } } producerTasks.Add(Task.Run(async () => { var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar)); // 快速路径判断(存在性检查) if (!localFileIndex.TryGetValue(localPath, out var localMeta)) { // 如果不存在,则加入下载队列 await SyncChannel.Writer.WriteAsync((item.Key, item.ETag)); } else { DateTime itemLastModified = DateTime.Parse(item.LastModified, null, DateTimeStyles.RoundtripKind); if ((ulong)localMeta.Size != item.Size || localMeta.LastModified < itemLastModified || !VerifyETag(localPath, item.ETag)) { await SyncChannel.Writer.WriteAsync((item.Key, item.ETag)); } } })); } await Task.WhenAll(producerTasks); SyncChannel.Writer.Complete(); // 关键步骤! // 阶段2: 并行消费下载任务 var consumerTask = Task.Run(async () => { await Parallel.ForEachAsync(SyncChannel.Reader.ReadAllAsync(), new ParallelOptions { MaxDegreeOfParallelism = maxParallel }, async (item, _) => { count++; await DownLoadObject(bucket, item.ObjectName, localDir, item.ETag); } ); }); // 等待消费完成 await consumerTask; Console.WriteLine($"{count}个文件下载完成"); } /// /// 构建本地文件内存索引(路径 → 元数据) /// /// /// private async Task> BuildLocalFileIndex( string rootDir) { var index = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories), async (path, _) => { var info = new FileInfo(path); index.TryAdd(path, (info.Length, info.LastWriteTimeUtc)); }); return index; } public async Task EnsureBucketExistsAsync(string bucketName) { var existsArgs = new BucketExistsArgs().WithBucket(bucketName); var x = await _minioClient.BucketExistsAsync(existsArgs); Console.WriteLine($" {bucketName} exist status: " + x); // 如果存储桶不存在,则创建存储桶 if (!x) { var makeArgs = new MakeBucketArgs().WithBucket(bucketName); await _minioClient.MakeBucketAsync(makeArgs); } } /// /// 上伟文件 /// /// /*public async Task UploadFileAsync(FileRecord fileRecord) { var putArgs = new PutObjectArgs() .WithBucket(_bucketName) .WithObject(fileRecord.FileName) .WithFileName(fileRecord.LocalPath) // application/zip .WithContentType("application/octet-stream"); await _minioClient.PutObjectAsync(putArgs); }*/ /// /// 列出存储桶内所有文件 /// /// public async Task ListAllObject() { // Just list of objects // Check whether 'mybucket' exists or not. var existsArgs = new BucketExistsArgs().WithBucket(_bucketName); bool found = await _minioClient.BucketExistsAsync(existsArgs); if (found) { // List objects from 'my-bucketname' ListObjectsArgs args = new ListObjectsArgs() .WithBucket(_bucketName) .WithRecursive(false); // ListObjectsEnumAsync 新方法 var observable = _minioClient.ListObjectsEnumAsync(args); var x = observable.GetAsyncEnumerator(); await x.MoveNextAsync(); Console.WriteLine(x.Current.Key); Console.WriteLine(x.Current.Size); Console.WriteLine(x.Current.ETag); Console.WriteLine(x.Current.LastModified); Console.WriteLine(x.Current.IsDir); /*IDisposable subscription = observable.Subscribe( item => Console.WriteLine("OnNext: {0}", item.Key), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnComplete: {0}"));*/ } else { Console.WriteLine("mybucket does not exist"); } } public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag) { var index = objectKey.LastIndexOf("/", StringComparison.Ordinal); if (index > 0) { var dir = Path.Combine(localDir, objectKey.Substring(0, index)); if (!Directory.Exists(dir)) { Directory.CreateDirectory(dir); } } var localPath = Path.Combine(localDir, objectKey.Replace('/', Path.DirectorySeparatorChar)); var getArgs = new GetObjectArgs() .WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName) .WithObject(objectKey) .WithFile(localPath); await _minioClient.GetObjectAsync(getArgs); if (VerifyETag(localPath, objectETag)) { // todo 先忽略处理 } Console.WriteLine($"{objectKey} Download complete"); } // 差异检测:通过 ETag 和修改时间对比‌ private async Task NeedSyncAsync(string bucket, string objectName, string localPath) { if (!File.Exists(localPath)) return true; var remoteMeta = await GetObjectMetadata(bucket, objectName); var localInfo = new FileInfo(localPath); return localInfo.Length != remoteMeta.Size || localInfo.LastWriteTimeUtc < remoteMeta.LastModified || !VerifyETag(localPath, remoteMeta.ETag); } // 获取远程对象元数据‌ public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket, string objectName) { var args = new StatObjectArgs().WithBucket(bucket).WithObject(objectName); var stat = await _minioClient.StatObjectAsync(args); return (stat.Size, stat.LastModified, stat.ETag); } // 校验本地文件 ETag(MinIO 使用 MD5) public bool VerifyETag(string filePath, string remoteETag) { using var md5 = MD5.Create(); using var stream = File.OpenRead(filePath); var localHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower(); return remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase); } // 实时监听方法 public async Task RealTimeListen(string bucketName, string localDir, string prefix = "", string suffix = "") { var downloadQueue = Channel.CreateBounded<(string bucketName, string objectKey, string localDir, string objectETag)>( new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait }); Task.Run(async () => { await Parallel.ForEachAsync(downloadQueue.Reader.ReadAllAsync(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, async (item, _) => { await DownLoadObject(item.bucketName, item.objectKey, item.localDir, item.objectETag); }); }); try { var events = new List { EventType.ObjectCreatedAll }; ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs() .WithBucket(bucketName) .WithEvents(events); IObservable observable = _minioClient.ListenBucketNotificationsAsync(args); IDisposable subscription = observable.Subscribe( async notification => { Console.WriteLine($"Received notification: {notification.Json}"); var obj = JObject.Parse(notification.Json); // s3:ObjectCreated:Put var eventType = obj["Records"]?[0]?["eventName"]?.Value(); var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value(); var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value(); var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value(); Console.WriteLine("将要下载的Key:" + objectKey); downloadQueue.Writer.TryWrite((bucketName, objectKey, localDir, objectETag)); //await DownLoadObject(bucketName, objectKey, localDir, objectETag); }, ex => Console.WriteLine($"OnError: {ex}"), () => Console.WriteLine($"Stopped listening for bucket notifications\n")); } catch (MinioException e) { Console.WriteLine("Error occurred: " + e); } } } }