using System.Collections.Concurrent; using System.IO; using System.Net.Http; using System.Security.Cryptography; using System.Threading.Channels; using System.Windows.Forms.VisualStyles; using Hopetry.Provider; using Microsoft.Extensions.Configuration; using Minio; using Minio.DataModel; using Minio.DataModel.Args; using Minio.DataModel.Notification; using Minio.Exceptions; using Newtonsoft.Json.Linq; using Polly; using Polly.Retry; namespace Hopetry.Services { public partial class MinioService { // 同步锁 private static readonly object SyncLock = new(); private IMinioClient _minioClient; public readonly string _bucketName = null; public MinioService() { var builder = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("global.json", optional: false, reloadOnChange: true); // 构建配置 var config = builder.Build(); var minioConfig = config.GetSection("Minio"); _minioClient = new MinioClient() .WithEndpoint(minioConfig["Endpoint"]) .WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]).Build(); _bucketName = minioConfig["BucketName"]!; /*_minioClient = new MinioClient() .WithEndpoint("123.132.248.154:9107") .WithCredentials("oZNgo25pNXnKFV9oKGh4", "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u") .Build();*/ EnsureBucketExistsAsync(_bucketName).Wait(); } // 使用指数退避算法的重试策略 AsyncRetryPolicy policy = Policy .Handle() .WaitAndRetryAsync(2, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), onRetry: (exception, delay, retryCount, context) => { Console.WriteLine($"MinIO操作重试 #{retryCount}: {exception.Message}"); }); // 使用 Channel 实现生产者-消费者模式 /*private Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000) { SingleWriter = false, SingleReader = false, FullMode = BoundedChannelFullMode.Wait });*/ public async Task ListObject(string bucketName) { var listArgs = new ListObjectsArgs() .WithBucket(bucketName) .WithVersions(true) .WithRecursive(true); try { var observable = _minioClient.ListObjectsEnumAsync(listArgs); //observable.ConfigureAwait(false); await foreach (var item in observable) { Console.WriteLine("=============="); Console.WriteLine(item.Key); //Console.WriteLine(item.IsDir); Console.WriteLine(item.LastModified); Console.WriteLine(item.Size); Console.WriteLine(item.ETag); Console.WriteLine(item.VersionId); Console.WriteLine("=============="); } } catch (Exception e) { Console.WriteLine(e); Console.WriteLine("我抛出的"); } } /// /// 同步下载 todo 需要合并另外一个同步方法 /// /// /// /// public async Task MirrorAsync(string bucket, string localDir, Action action, int maxParallel = 5) { Console.WriteLine("开始全量同步"); Channel<(string ObjectName, string ETag)> syncChannel = Channel.CreateBounded<(string, string)>( new BoundedChannelOptions(1000) { SingleWriter = false, SingleReader = false, FullMode = BoundedChannelFullMode.Wait }); var count = 0; var total = 0; var producerTasks = new List(); var listArgs = new ListObjectsArgs() .WithBucket(bucket) .WithRecursive(true); // 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典 var localFileIndex = await BuildLocalFileIndex(localDir); //var x = policy.ExecuteAsync(() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs))); var observable = _minioClient.ListObjectsEnumAsync(listArgs); // 阶段2: 并行消费下载任务 var consumerTask = Task.Run(async () => { await Parallel.ForEachAsync(syncChannel.Reader.ReadAllAsync(), new ParallelOptions { MaxDegreeOfParallelism = maxParallel }, async (item, _) => { action(total, count, item.ObjectName); await DownLoadObject(bucket, item.ObjectName, localDir, item.ETag); count++; action(total, count, item.ObjectName); } ); }); await foreach (var item in observable) { total++; // 关于isDir判断 if (item.IsDir) { // 判断文件夹存不存在,不存在则创建 var dir = Path.Combine(localDir, item.Key); if (!Directory.Exists(dir)) { Directory.CreateDirectory(dir); } continue; } var index = item.Key.LastIndexOf("/", StringComparison.Ordinal); if (index > 0) { var dir = Path.Combine(localDir, item.Key.Substring(0, index)); if (!Directory.Exists(dir)) { Directory.CreateDirectory(dir); } } producerTasks.Add(Task.Run(async () => { var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar)); // 快速路径判断(存在性检查) var fileExist = localFileIndex.TryGetValue(localPath, out var localMeta); if (fileExist) { // 只要存在就不校验 因为未下载完成的文件为mino文件 //DateTime itemLastModified = DateTime.Parse(item.LastModified, null, DateTimeStyles.RoundtripKind); //if ((ulong)localMeta.Size != item.Size || localMeta.LastModified < itemLastModified || !VerifyETag(localPath, item.ETag)) /*if (!VerifyETag(localPath, item.ETag)) { await syncChannel.Writer.WriteAsync((item.Key, item.ETag)); Console.WriteLine($"{localPath} 加入下载队列"); total++; }*/ } else { // 如果不存在,则加入下载队列 await syncChannel.Writer.WriteAsync((item.Key, item.ETag)); Console.WriteLine($"{localPath} 加入下载队列"); total++; } })); } await Task.WhenAll(producerTasks); syncChannel.Writer.Complete(); // 关键步骤! // 等待消费完成 await consumerTask; Console.WriteLine($"{count}个文件下载完成"); } /// /// 构建本地文件内存索引(路径 → 元数据) /// /// /// private async Task> BuildLocalFileIndex( string rootDir) { var index = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories), async (path, _) => { var info = new FileInfo(path); index.TryAdd(path, (info.Length, info.LastWriteTimeUtc)); }); return index; } public async Task EnsureBucketExistsAsync(string bucketName) { var existsArgs = new BucketExistsArgs().WithBucket(bucketName); var x = await _minioClient.BucketExistsAsync(existsArgs); Console.WriteLine($" {bucketName} exist status: " + x); // 如果存储桶不存在,则创建存储桶 if (!x) { var makeArgs = new MakeBucketArgs().WithBucket(bucketName); await _minioClient.MakeBucketAsync(makeArgs); } } /// /// 上伟文件 /// /// /*public async Task UploadFileAsync(FileRecord fileRecord) { var speedInBytesPerSecond = 1024 * 1024; using (var sourceStream = File.OpenRead("path_to_file")) using (var throttledStream = new ThrottledStream(sourceStream, speedInBytesPerSecond)) // speedInBytesPerSecond是你想要的速度,例如1024 * 1024表示1MB/s { var putArgs = new PutObjectArgs() .WithBucket(_bucketName) .WithObject(fileRecord.FileName) //.WithStreamData(throttledStream) .WithObjectSize(111l) .WithContentType("application/octet-stream"); await _minioClient.PutObjectAsync(putArgs); } }*/ /// /// 列出存储桶内所有文件 /// /// /// /// public async Task> ListAllObject(string bucketName, string prefix, bool recursive, CancellationToken token = default) { // Just list of objects // Check whether 'mybucket' exists or not. if (string.IsNullOrEmpty(bucketName)) { bucketName = _bucketName; } var existsArgs = new BucketExistsArgs().WithBucket(bucketName); var found = await _minioClient.BucketExistsAsync(existsArgs, token); if (found) { var args = new ListObjectsArgs() .WithBucket(bucketName) .WithPrefix(prefix) .WithRecursive(recursive); return _minioClient.ListObjectsEnumAsync(args, token); } Console.WriteLine("mybucket does not exist"); throw new Exception("bucket not found"); } public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag, CancellationToken token = default) { await DownLoadObject(bucketName, objectKey, localDir, objectETag, objectKey, token); } public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag, string name, CancellationToken token = default) { var dir = Path.Combine(localDir); string temp; if (name.Equals(bucketName)) { // todo 桶下载 temp = Path.Combine(localDir, objectKey); } else { temp = objectKey.Substring(objectKey.IndexOf(name, StringComparison.Ordinal)); } temp = temp.Replace("\\","/" ); var index = temp.LastIndexOf("/", StringComparison.Ordinal); if (index > 0) { dir = Path.Combine(localDir, temp.Substring(0, index)); if (!Directory.Exists(dir)) { Directory.CreateDirectory(dir); } } var localPath = Path.Combine(dir, Path.GetFileName(objectKey)); var getArgs = new GetObjectArgs() .WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName) .WithObject(objectKey) .WithFile(localPath); var stat = await _minioClient.GetObjectAsync(getArgs, token); /*if (VerifyETag(localPath, objectETag)) { // todo 先忽略处理 }*/ Console.WriteLine($"{objectKey} Download complete"); } // 差异检测:通过 ETag 和修改时间对比‌ private async Task NeedSyncAsync(string bucket, string objectName, string localPath) { if (!File.Exists(localPath)) return true; var remoteMeta = await GetObjectMetadata(bucket, objectName); var localInfo = new FileInfo(localPath); return localInfo.Length != remoteMeta.Size || localInfo.LastWriteTimeUtc < remoteMeta.LastModified || !VerifyETag(localPath, remoteMeta.ETag); } // 获取远程对象元数据‌ public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket, string objectName) { var args = new StatObjectArgs().WithBucket(bucket).WithObject(objectName); var stat = await _minioClient.StatObjectAsync(args); return (stat.Size, stat.LastModified, stat.ETag); } // 校验本地文件 ETag(MinIO 使用 MD5) public bool VerifyETag(string filePath, string remoteETag) { if (remoteETag.Contains("-")) { return true; } using var md5 = MD5.Create(); using var stream = File.OpenRead(filePath); var localHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower(); var x = remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase); return x; } // 实时监听方法 public async Task RealTimeListen(string bucketName, string localDir, Action action, string prefix = "", string suffix = "", CancellationToken cancellationToken = default) { Console.WriteLine("开启实时监听"); var downloadQueue = Channel.CreateBounded<(string bucketName, string objectKey, string localDir, string objectETag)>( new BoundedChannelOptions(3000) { FullMode = BoundedChannelFullMode.Wait }); // 使用LinkedTokenSource组合令牌 //var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // 辅助任务(独立取消 + 继承总控) /*var helperCts = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken, // 总控取消时辅助任务也停止 new CancellationTokenSource().Token // 独立取消通道 );*/ var consumerTask = Task.Run(async () => { await Parallel.ForEachAsync(downloadQueue.Reader.ReadAllAsync(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, async (item, _) => { action($"实时同步中: {item.objectKey}"); await DownLoadObject(item.bucketName, item.objectKey, item.localDir, item.objectETag); action("实时同步完成"); }); }); #region 监听事件 try { var events = new List { EventType.ObjectCreatedAll }; var args = new ListenBucketNotificationsArgs() .WithBucket(bucketName) .WithEvents(events); IObservable observable = _minioClient .ListenBucketNotificationsAsync(args); using var subscription = observable.Subscribe( async notification => { //cancellationToken.ThrowIfCancellationRequested(); Console.WriteLine($"Received notification: {notification.Json}"); var obj = JObject.Parse(notification.Json); // s3:ObjectCreated:Put var eventType = obj["Records"]?[0]?["eventName"]?.Value(); var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value(); var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value(); var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value(); Console.WriteLine("将要下载的Key:" + objectKey); downloadQueue.Writer.TryWrite((bucketName, objectKey, localDir, objectETag)); }, ex => Console.WriteLine($"OnError: {ex}"), () => Console.WriteLine($"Stopped listening for bucket notifications\n")); // 等待取消请求 await Task.Delay(Timeout.Infinite, cancellationToken); } catch (OperationCanceledException e) { Console.WriteLine("用户取消同步监听: " + e); } catch (Exception e) { Console.WriteLine("Error occurred: " + e); } finally { downloadQueue.Writer.Complete(); // 等待消费完成 await consumerTask; } #endregion } public async Task DownLoadObjectWithCallBack(MinioDownloadTask downTask, string bucketName, string objectKey, string filePath, string fileETag, Action action, long offset = 0) { var updateTask = new MinioDownloadTask { TaskId = downTask.TaskId }; var token = downTask.StopDownTs.Token; long totalBytes = 0; var args = new StatObjectArgs() .WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName) .WithObject(objectKey); var stat = await _minioClient.StatObjectAsync(args, token); totalBytes = stat.Size; //var localPath = Path.Combine(filePath, objectKey.Replace('/', Path.DirectorySeparatorChar)); var localPath = Path.Combine(filePath, objectKey[(objectKey.LastIndexOf('/') + 1)..]); var index = localPath.LastIndexOf("/", StringComparison.Ordinal); if (index > 0) { var dir = localPath.Substring(0, index); if (!Directory.Exists(dir)) { Directory.CreateDirectory(dir); } } var getObjectArgs = new GetObjectArgs() .WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName) .WithObject(objectKey) .WithOffsetAndLength(offset, totalBytes - offset) .WithCallbackStream((stream) => { long bytesRead = 0; byte[] buffer = new byte[64 * 1024]; // 64KB 缓冲区 int read; // 速度计算变量 var speedWindow = TimeSpan.FromSeconds(5); // 5秒时间窗口 var speedRecords = new Queue<(DateTime Time, long Bytes)>(); var lastUpdate = DateTime.MinValue; FileStream fileStream; if (File.Exists(localPath)) { fileStream = new FileStream(localPath, FileMode.Open, FileAccess.Write, FileShare.Write); fileStream.Seek(offset, SeekOrigin.Begin); } else { fileStream = new FileStream(localPath, FileMode.Create, FileAccess.Write, FileShare.Write); } while ((read = stream.Read(buffer, 0, buffer.Length)) > 0) { if (token.IsCancellationRequested) { //保存下载进度 using (var client = SqlSugarConfig.GetSqlSugarScope()) { var temp = new MinioDownloadTask { TaskId = downTask.TaskId, Downloaded = bytesRead + offset, TotalSize = totalBytes, // 不可为null ,每次都有默认值 Status = "下载中" }; // todo 这里有bug,有可能变成0 downTask.Downloaded = temp.Downloaded; lock (SyncLock) { client.Updateable(temp).IgnoreNullColumns().ExecuteCommand(); } } fileStream.Close(); } // 检查取消令牌 token.ThrowIfCancellationRequested(); fileStream.Write(buffer, 0, read); bytesRead += read; // 记录当前数据块 var now = DateTime.Now; speedRecords.Enqueue((now, read)); // 清理过期记录 while (speedRecords.Count > 0 && (now - speedRecords.Peek().Time) > speedWindow) { speedRecords.Dequeue(); } // 计算窗口内总字节 long windowBytes = speedRecords.Sum(r => r.Bytes); // 计算速度(B/s) double speedBps = windowBytes / speedWindow.TotalSeconds; // 每秒触发一次回调 if ((now - lastUpdate).TotalMilliseconds >= 500) { action?.Invoke(bytesRead, totalBytes, speedBps); using (var client = SqlSugarConfig.GetSqlSugarScope()) { // 如何大于30MB,则1秒更新一次进度 if (totalBytes > 31457280) { updateTask.Downloaded = bytesRead + offset; lock (SyncLock) { // 执行更新操作 client.Updateable(updateTask).IgnoreNullColumns().ExecuteCommand(); } } } lastUpdate = now; } } // 触发最终速度回调 action?.Invoke(bytesRead, totalBytes, 0); fileStream.Close(); }); // 会返回stat 怎么减少stat的查询 await _minioClient.GetObjectAsync(getObjectArgs, token); /*if (VerifyETag(localPath, objectETag)) { // todo 先忽略处理 }*/ Console.WriteLine($"{objectKey} Download complete"); } } }