2025-03-20 09:09:14 +08:00
|
|
|
|
using System.Collections.Concurrent;
|
|
|
|
|
|
using System.IO;
|
2026-01-15 09:46:48 +08:00
|
|
|
|
using System.Net.Http;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
using System.Security.Cryptography;
|
|
|
|
|
|
using System.Threading.Channels;
|
2025-07-07 10:13:45 +08:00
|
|
|
|
using System.Windows.Forms.VisualStyles;
|
2025-04-21 13:41:57 +08:00
|
|
|
|
using Hopetry.Provider;
|
2025-03-21 13:12:43 +08:00
|
|
|
|
using Microsoft.Extensions.Configuration;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
using Minio;
|
2025-04-24 16:44:43 +08:00
|
|
|
|
using Minio.DataModel;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
using Minio.DataModel.Args;
|
|
|
|
|
|
using Minio.DataModel.Notification;
|
|
|
|
|
|
using Minio.Exceptions;
|
2025-03-21 11:22:17 +08:00
|
|
|
|
using Newtonsoft.Json.Linq;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
using Polly;
|
|
|
|
|
|
using Polly.Retry;
|
2025-03-13 09:58:16 +08:00
|
|
|
|
|
|
|
|
|
|
namespace Hopetry.Services
|
|
|
|
|
|
{
|
2025-03-20 16:08:11 +08:00
|
|
|
|
public partial class MinioService
|
2025-03-13 09:58:16 +08:00
|
|
|
|
{
|
2025-04-23 14:42:54 +08:00
|
|
|
|
// 同步锁
|
|
|
|
|
|
private static readonly object SyncLock = new();
|
2025-03-21 11:22:17 +08:00
|
|
|
|
private IMinioClient _minioClient;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
|
2025-03-22 15:29:32 +08:00
|
|
|
|
public readonly string _bucketName = null;
|
2025-03-13 09:58:16 +08:00
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
public MinioService()
|
|
|
|
|
|
{
|
2025-03-21 13:12:43 +08:00
|
|
|
|
var builder = new ConfigurationBuilder()
|
|
|
|
|
|
.SetBasePath(Directory.GetCurrentDirectory())
|
|
|
|
|
|
.AddJsonFile("global.json", optional: false, reloadOnChange: true);
|
|
|
|
|
|
// 构建配置
|
|
|
|
|
|
var config = builder.Build();
|
|
|
|
|
|
var minioConfig = config.GetSection("Minio");
|
2025-03-21 11:22:17 +08:00
|
|
|
|
_minioClient = new MinioClient()
|
2025-03-21 13:12:43 +08:00
|
|
|
|
.WithEndpoint(minioConfig["Endpoint"])
|
|
|
|
|
|
.WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]).Build();
|
|
|
|
|
|
_bucketName = minioConfig["BucketName"]!;
|
2025-03-24 14:15:16 +08:00
|
|
|
|
|
2025-03-21 13:12:43 +08:00
|
|
|
|
/*_minioClient = new MinioClient()
|
2025-03-21 11:22:17 +08:00
|
|
|
|
.WithEndpoint("123.132.248.154:9107")
|
|
|
|
|
|
.WithCredentials("oZNgo25pNXnKFV9oKGh4", "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u")
|
2025-03-21 13:12:43 +08:00
|
|
|
|
.Build();*/
|
2025-03-24 14:15:16 +08:00
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
EnsureBucketExistsAsync(_bucketName).Wait();
|
|
|
|
|
|
}
|
2025-03-13 09:58:16 +08:00
|
|
|
|
|
2026-01-15 09:46:48 +08:00
|
|
|
|
// 使用指数退避算法的重试策略
|
2025-03-20 09:09:14 +08:00
|
|
|
|
AsyncRetryPolicy policy = Policy
|
|
|
|
|
|
.Handle<MinioException>()
|
2026-01-15 09:46:48 +08:00
|
|
|
|
.WaitAndRetryAsync(2, retryAttempt =>
|
2025-03-20 09:09:14 +08:00
|
|
|
|
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
|
|
|
|
|
|
onRetry: (exception, delay, retryCount, context) =>
|
|
|
|
|
|
{
|
2026-01-15 09:46:48 +08:00
|
|
|
|
Console.WriteLine($"MinIO操作重试 #{retryCount}: {exception.Message}");
|
2025-03-20 09:09:14 +08:00
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
// 使用 Channel 实现生产者-消费者模式
|
2025-03-24 14:15:16 +08:00
|
|
|
|
/*private Channel<(string ObjectName, string ETag)> SyncChannel =
|
2025-03-20 09:09:14 +08:00
|
|
|
|
Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
|
2025-03-13 09:58:16 +08:00
|
|
|
|
{
|
2025-03-20 09:09:14 +08:00
|
|
|
|
SingleWriter = false,
|
|
|
|
|
|
SingleReader = false,
|
|
|
|
|
|
FullMode = BoundedChannelFullMode.Wait
|
2025-03-24 14:15:16 +08:00
|
|
|
|
});*/
|
2025-03-20 09:09:14 +08:00
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
public async Task ListObject(string bucketName)
|
|
|
|
|
|
{
|
|
|
|
|
|
var listArgs = new ListObjectsArgs()
|
|
|
|
|
|
.WithBucket(bucketName)
|
|
|
|
|
|
.WithVersions(true)
|
|
|
|
|
|
.WithRecursive(true);
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
|
|
|
|
|
|
//observable.ConfigureAwait(false);
|
|
|
|
|
|
await foreach (var item in observable)
|
|
|
|
|
|
{
|
|
|
|
|
|
Console.WriteLine("==============");
|
|
|
|
|
|
Console.WriteLine(item.Key);
|
|
|
|
|
|
//Console.WriteLine(item.IsDir);
|
|
|
|
|
|
Console.WriteLine(item.LastModified);
|
|
|
|
|
|
Console.WriteLine(item.Size);
|
|
|
|
|
|
Console.WriteLine(item.ETag);
|
|
|
|
|
|
Console.WriteLine(item.VersionId);
|
|
|
|
|
|
Console.WriteLine("==============");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
|
{
|
|
|
|
|
|
Console.WriteLine(e);
|
|
|
|
|
|
Console.WriteLine("我抛出的");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-20 09:09:14 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 同步下载 todo 需要合并另外一个同步方法
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="bucket"></param>
|
|
|
|
|
|
/// <param name="localDir"></param>
|
|
|
|
|
|
/// <param name="maxParallel"></param>
|
2025-07-04 13:43:07 +08:00
|
|
|
|
public async Task MirrorAsync(string bucket, string localDir, Action<long, long, string> action,
|
|
|
|
|
|
int maxParallel = 5)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-29 11:16:54 +08:00
|
|
|
|
Console.WriteLine("开始全量同步");
|
2025-03-29 17:12:54 +08:00
|
|
|
|
Channel<(string ObjectName, string ETag)> syncChannel = Channel.CreateBounded<(string, string)>(
|
2025-03-27 13:27:06 +08:00
|
|
|
|
new BoundedChannelOptions(1000)
|
2025-03-27 10:22:59 +08:00
|
|
|
|
{
|
|
|
|
|
|
SingleWriter = false,
|
|
|
|
|
|
SingleReader = false,
|
|
|
|
|
|
FullMode = BoundedChannelFullMode.Wait
|
|
|
|
|
|
});
|
2025-03-21 11:22:17 +08:00
|
|
|
|
var count = 0;
|
2025-07-04 09:46:26 +08:00
|
|
|
|
var total = 0;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
var producerTasks = new List<Task>();
|
|
|
|
|
|
var listArgs = new ListObjectsArgs()
|
|
|
|
|
|
.WithBucket(bucket)
|
|
|
|
|
|
.WithRecursive(true);
|
|
|
|
|
|
// 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典
|
|
|
|
|
|
var localFileIndex = await BuildLocalFileIndex(localDir);
|
2025-03-21 11:22:17 +08:00
|
|
|
|
//var x = policy.ExecuteAsync(() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs)));
|
|
|
|
|
|
|
2025-03-29 17:12:54 +08:00
|
|
|
|
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
|
|
|
|
|
|
// 阶段2: 并行消费下载任务
|
|
|
|
|
|
var consumerTask = Task.Run(async () =>
|
|
|
|
|
|
{
|
|
|
|
|
|
await Parallel.ForEachAsync(syncChannel.Reader.ReadAllAsync(),
|
|
|
|
|
|
new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
|
|
|
|
|
|
async (item, _) =>
|
|
|
|
|
|
{
|
2025-07-04 13:43:07 +08:00
|
|
|
|
action(total, count, item.ObjectName);
|
2025-03-29 17:12:54 +08:00
|
|
|
|
await DownLoadObject(bucket, item.ObjectName, localDir, item.ETag);
|
2025-07-04 13:43:07 +08:00
|
|
|
|
count++;
|
|
|
|
|
|
action(total, count, item.ObjectName);
|
2025-03-29 17:12:54 +08:00
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
});
|
2025-03-21 11:22:17 +08:00
|
|
|
|
await foreach (var item in observable)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-07-04 13:43:07 +08:00
|
|
|
|
total++;
|
2025-07-04 09:46:26 +08:00
|
|
|
|
// 关于isDir判断
|
|
|
|
|
|
if (item.IsDir)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 判断文件夹存不存在,不存在则创建
|
|
|
|
|
|
var dir = Path.Combine(localDir, item.Key);
|
|
|
|
|
|
if (!Directory.Exists(dir))
|
|
|
|
|
|
{
|
|
|
|
|
|
Directory.CreateDirectory(dir);
|
|
|
|
|
|
}
|
2025-07-04 13:43:07 +08:00
|
|
|
|
|
2025-07-04 09:46:26 +08:00
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-07-04 13:43:07 +08:00
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
var index = item.Key.LastIndexOf("/", StringComparison.Ordinal);
|
|
|
|
|
|
if (index > 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
var dir = Path.Combine(localDir, item.Key.Substring(0, index));
|
|
|
|
|
|
if (!Directory.Exists(dir))
|
|
|
|
|
|
{
|
|
|
|
|
|
Directory.CreateDirectory(dir);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-20 09:09:14 +08:00
|
|
|
|
producerTasks.Add(Task.Run(async () =>
|
|
|
|
|
|
{
|
|
|
|
|
|
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
|
|
|
|
|
|
// 快速路径判断(存在性检查)
|
2025-03-29 17:12:54 +08:00
|
|
|
|
var fileExist = localFileIndex.TryGetValue(localPath, out var localMeta);
|
|
|
|
|
|
if (fileExist)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-07-04 09:46:26 +08:00
|
|
|
|
// 只要存在就不校验 因为未下载完成的文件为mino文件
|
2025-03-29 17:12:54 +08:00
|
|
|
|
//DateTime itemLastModified = DateTime.Parse(item.LastModified, null, DateTimeStyles.RoundtripKind);
|
2025-03-29 11:16:54 +08:00
|
|
|
|
//if ((ulong)localMeta.Size != item.Size || localMeta.LastModified < itemLastModified || !VerifyETag(localPath, item.ETag))
|
2025-07-04 09:46:26 +08:00
|
|
|
|
/*if (!VerifyETag(localPath, item.ETag))
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-29 17:12:54 +08:00
|
|
|
|
await syncChannel.Writer.WriteAsync((item.Key, item.ETag));
|
2025-07-04 09:46:26 +08:00
|
|
|
|
Console.WriteLine($"{localPath} 加入下载队列");
|
|
|
|
|
|
total++;
|
|
|
|
|
|
}*/
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
2025-03-29 17:12:54 +08:00
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
// 如果不存在,则加入下载队列
|
|
|
|
|
|
await syncChannel.Writer.WriteAsync((item.Key, item.ETag));
|
2025-07-04 09:46:26 +08:00
|
|
|
|
Console.WriteLine($"{localPath} 加入下载队列");
|
|
|
|
|
|
total++;
|
2025-03-29 17:12:54 +08:00
|
|
|
|
}
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}));
|
2025-03-13 09:58:16 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
await Task.WhenAll(producerTasks);
|
2025-03-29 17:12:54 +08:00
|
|
|
|
syncChannel.Writer.Complete(); // 关键步骤!
|
2025-03-21 11:22:17 +08:00
|
|
|
|
|
2025-03-20 09:09:14 +08:00
|
|
|
|
// 等待消费完成
|
|
|
|
|
|
await consumerTask;
|
2025-03-21 11:22:17 +08:00
|
|
|
|
Console.WriteLine($"{count}个文件下载完成");
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 构建本地文件内存索引(路径 → 元数据)
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="rootDir"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task<ConcurrentDictionary<string, (long Size, DateTime LastModified)>> BuildLocalFileIndex(
|
|
|
|
|
|
string rootDir)
|
|
|
|
|
|
{
|
|
|
|
|
|
var index =
|
|
|
|
|
|
new ConcurrentDictionary<string, (long Size, DateTime LastModified)>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
|
|
await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories),
|
2025-03-21 11:22:17 +08:00
|
|
|
|
async (path, _) =>
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-21 11:22:17 +08:00
|
|
|
|
var info = new FileInfo(path);
|
|
|
|
|
|
index.TryAdd(path, (info.Length, info.LastWriteTimeUtc));
|
2025-03-20 09:09:14 +08:00
|
|
|
|
});
|
|
|
|
|
|
return index;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
public async Task EnsureBucketExistsAsync(string bucketName)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-21 11:22:17 +08:00
|
|
|
|
var existsArgs = new BucketExistsArgs().WithBucket(bucketName);
|
|
|
|
|
|
var x = await _minioClient.BucketExistsAsync(existsArgs);
|
|
|
|
|
|
Console.WriteLine($" {bucketName} exist status: " + x);
|
2025-03-20 09:09:14 +08:00
|
|
|
|
// 如果存储桶不存在,则创建存储桶
|
2025-03-21 11:22:17 +08:00
|
|
|
|
if (!x)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-21 11:22:17 +08:00
|
|
|
|
var makeArgs = new MakeBucketArgs().WithBucket(bucketName);
|
2025-03-20 09:09:14 +08:00
|
|
|
|
await _minioClient.MakeBucketAsync(makeArgs);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 上伟文件
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="fileRecord"></param>
|
2025-04-21 13:41:57 +08:00
|
|
|
|
/*public async Task UploadFileAsync(FileRecord fileRecord)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-04-21 13:41:57 +08:00
|
|
|
|
var speedInBytesPerSecond = 1024 * 1024;
|
|
|
|
|
|
using (var sourceStream = File.OpenRead("path_to_file"))
|
|
|
|
|
|
using (var throttledStream =
|
|
|
|
|
|
new ThrottledStream(sourceStream,
|
|
|
|
|
|
speedInBytesPerSecond)) // speedInBytesPerSecond是你想要的速度,例如1024 * 1024表示1MB/s
|
|
|
|
|
|
{
|
|
|
|
|
|
var putArgs = new PutObjectArgs()
|
|
|
|
|
|
.WithBucket(_bucketName)
|
|
|
|
|
|
.WithObject(fileRecord.FileName)
|
|
|
|
|
|
//.WithStreamData(throttledStream)
|
|
|
|
|
|
.WithObjectSize(111l)
|
|
|
|
|
|
.WithContentType("application/octet-stream");
|
|
|
|
|
|
await _minioClient.PutObjectAsync(putArgs);
|
|
|
|
|
|
}
|
|
|
|
|
|
}*/
|
2025-03-20 09:09:14 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 列出存储桶内所有文件
|
|
|
|
|
|
/// </summary>
|
2025-04-24 16:44:43 +08:00
|
|
|
|
/// <param name="bucketName"></param>
|
|
|
|
|
|
/// <param name="prefix"></param>
|
|
|
|
|
|
/// <param name="recursive"></param>
|
|
|
|
|
|
public async Task<IAsyncEnumerable<Item>> ListAllObject(string bucketName, string prefix, bool recursive,
|
|
|
|
|
|
CancellationToken token = default)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-21 11:22:17 +08:00
|
|
|
|
// Just list of objects
|
|
|
|
|
|
// Check whether 'mybucket' exists or not.
|
2025-04-24 16:44:43 +08:00
|
|
|
|
if (string.IsNullOrEmpty(bucketName))
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-04-24 16:44:43 +08:00
|
|
|
|
bucketName = _bucketName;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
2025-04-24 16:44:43 +08:00
|
|
|
|
|
|
|
|
|
|
var existsArgs = new BucketExistsArgs().WithBucket(bucketName);
|
|
|
|
|
|
var found = await _minioClient.BucketExistsAsync(existsArgs, token);
|
|
|
|
|
|
if (found)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-04-24 16:44:43 +08:00
|
|
|
|
var args = new ListObjectsArgs()
|
|
|
|
|
|
.WithBucket(bucketName)
|
|
|
|
|
|
.WithPrefix(prefix)
|
|
|
|
|
|
.WithRecursive(recursive);
|
|
|
|
|
|
return _minioClient.ListObjectsEnumAsync(args, token);
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
2025-04-24 16:44:43 +08:00
|
|
|
|
|
|
|
|
|
|
Console.WriteLine("mybucket does not exist");
|
|
|
|
|
|
throw new Exception("bucket not found");
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
2025-03-27 10:22:59 +08:00
|
|
|
|
|
2025-04-24 16:44:43 +08:00
|
|
|
|
public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag,
|
|
|
|
|
|
CancellationToken token = default)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-07-07 10:13:45 +08:00
|
|
|
|
await DownLoadObject(bucketName, objectKey, localDir, objectETag, objectKey, token);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag,
|
|
|
|
|
|
string name, CancellationToken token = default)
|
|
|
|
|
|
{
|
|
|
|
|
|
var dir = Path.Combine(localDir);
|
|
|
|
|
|
string temp;
|
|
|
|
|
|
if (name.Equals(bucketName))
|
|
|
|
|
|
{
|
|
|
|
|
|
// todo 桶下载
|
|
|
|
|
|
temp = Path.Combine(localDir, objectKey);
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
temp = objectKey.Substring(objectKey.IndexOf(name, StringComparison.Ordinal));
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
temp = temp.Replace("\\","/" );
|
|
|
|
|
|
var index = temp.LastIndexOf("/", StringComparison.Ordinal);
|
|
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
if (index > 0)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-07-07 10:13:45 +08:00
|
|
|
|
dir = Path.Combine(localDir, temp.Substring(0, index));
|
2025-03-21 11:22:17 +08:00
|
|
|
|
if (!Directory.Exists(dir))
|
|
|
|
|
|
{
|
|
|
|
|
|
Directory.CreateDirectory(dir);
|
|
|
|
|
|
}
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
2025-03-21 11:22:17 +08:00
|
|
|
|
|
2025-07-07 10:13:45 +08:00
|
|
|
|
var localPath = Path.Combine(dir, Path.GetFileName(objectKey));
|
2025-03-21 11:22:17 +08:00
|
|
|
|
var getArgs = new GetObjectArgs()
|
|
|
|
|
|
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
|
|
|
|
|
|
.WithObject(objectKey)
|
|
|
|
|
|
.WithFile(localPath);
|
2025-04-24 16:44:43 +08:00
|
|
|
|
var stat = await _minioClient.GetObjectAsync(getArgs, token);
|
2025-07-04 09:46:26 +08:00
|
|
|
|
/*if (VerifyETag(localPath, objectETag))
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-21 11:22:17 +08:00
|
|
|
|
// todo 先忽略处理
|
2025-07-04 09:46:26 +08:00
|
|
|
|
}*/
|
2025-03-21 11:22:17 +08:00
|
|
|
|
|
|
|
|
|
|
Console.WriteLine($"{objectKey} Download complete");
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
2025-03-13 09:58:16 +08:00
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
// 差异检测:通过 ETag 和修改时间对比
|
2025-03-20 09:09:14 +08:00
|
|
|
|
private async Task<bool> NeedSyncAsync(string bucket, string objectName, string localPath)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (!File.Exists(localPath)) return true;
|
|
|
|
|
|
|
|
|
|
|
|
var remoteMeta = await GetObjectMetadata(bucket, objectName);
|
|
|
|
|
|
var localInfo = new FileInfo(localPath);
|
|
|
|
|
|
|
|
|
|
|
|
return localInfo.Length != remoteMeta.Size ||
|
|
|
|
|
|
localInfo.LastWriteTimeUtc < remoteMeta.LastModified ||
|
|
|
|
|
|
!VerifyETag(localPath, remoteMeta.ETag);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
// 获取远程对象元数据
|
2025-03-20 09:09:14 +08:00
|
|
|
|
public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket,
|
|
|
|
|
|
string objectName)
|
|
|
|
|
|
{
|
|
|
|
|
|
var args = new StatObjectArgs().WithBucket(bucket).WithObject(objectName);
|
|
|
|
|
|
var stat = await _minioClient.StatObjectAsync(args);
|
|
|
|
|
|
return (stat.Size, stat.LastModified, stat.ETag);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
// 校验本地文件 ETag(MinIO 使用 MD5)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
public bool VerifyETag(string filePath, string remoteETag)
|
|
|
|
|
|
{
|
2025-03-29 11:16:54 +08:00
|
|
|
|
if (remoteETag.Contains("-"))
|
|
|
|
|
|
{
|
|
|
|
|
|
return true;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-20 09:09:14 +08:00
|
|
|
|
using var md5 = MD5.Create();
|
|
|
|
|
|
using var stream = File.OpenRead(filePath);
|
|
|
|
|
|
var localHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower();
|
2025-03-29 11:16:54 +08:00
|
|
|
|
var x = remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase);
|
|
|
|
|
|
return x;
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
// 实时监听方法
|
2025-07-07 10:13:45 +08:00
|
|
|
|
public async Task RealTimeListen(string bucketName, string localDir, Action<string> action, string prefix = "",
|
|
|
|
|
|
string suffix = "",
|
2025-03-29 17:12:54 +08:00
|
|
|
|
CancellationToken cancellationToken = default)
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-29 17:12:54 +08:00
|
|
|
|
Console.WriteLine("开启实时监听");
|
2025-03-27 10:22:59 +08:00
|
|
|
|
var downloadQueue =
|
|
|
|
|
|
Channel.CreateBounded<(string bucketName, string objectKey, string localDir, string objectETag)>(
|
2025-03-29 17:12:54 +08:00
|
|
|
|
new BoundedChannelOptions(3000)
|
2025-03-27 10:22:59 +08:00
|
|
|
|
{
|
|
|
|
|
|
FullMode = BoundedChannelFullMode.Wait
|
|
|
|
|
|
});
|
2025-03-29 17:12:54 +08:00
|
|
|
|
// 使用LinkedTokenSource组合令牌
|
|
|
|
|
|
//var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
|
|
|
|
// 辅助任务(独立取消 + 继承总控)
|
|
|
|
|
|
/*var helperCts = CancellationTokenSource.CreateLinkedTokenSource(
|
|
|
|
|
|
cancellationToken, // 总控取消时辅助任务也停止
|
|
|
|
|
|
new CancellationTokenSource().Token // 独立取消通道
|
|
|
|
|
|
);*/
|
|
|
|
|
|
var consumerTask = Task.Run(async () =>
|
2025-03-27 10:22:59 +08:00
|
|
|
|
{
|
|
|
|
|
|
await Parallel.ForEachAsync(downloadQueue.Reader.ReadAllAsync(),
|
2025-03-29 17:12:54 +08:00
|
|
|
|
new ParallelOptions
|
|
|
|
|
|
{
|
|
|
|
|
|
MaxDegreeOfParallelism = 5
|
|
|
|
|
|
},
|
2025-03-27 10:22:59 +08:00
|
|
|
|
async (item, _) =>
|
|
|
|
|
|
{
|
2025-07-04 15:36:28 +08:00
|
|
|
|
action($"实时同步中: {item.objectKey}");
|
2025-03-27 10:22:59 +08:00
|
|
|
|
await DownLoadObject(item.bucketName, item.objectKey, item.localDir, item.objectETag);
|
2025-07-04 15:36:28 +08:00
|
|
|
|
action("实时同步完成");
|
2025-03-27 10:22:59 +08:00
|
|
|
|
});
|
|
|
|
|
|
});
|
2025-03-28 14:20:51 +08:00
|
|
|
|
|
|
|
|
|
|
#region 监听事件
|
|
|
|
|
|
|
2025-03-21 11:22:17 +08:00
|
|
|
|
try
|
2025-03-13 09:58:16 +08:00
|
|
|
|
{
|
2025-03-21 11:22:17 +08:00
|
|
|
|
var events = new List<EventType> { EventType.ObjectCreatedAll };
|
2025-03-28 14:20:51 +08:00
|
|
|
|
var args = new ListenBucketNotificationsArgs()
|
2025-03-21 11:22:17 +08:00
|
|
|
|
.WithBucket(bucketName)
|
|
|
|
|
|
.WithEvents(events);
|
2025-03-20 09:09:14 +08:00
|
|
|
|
|
2025-03-29 17:12:54 +08:00
|
|
|
|
IObservable<MinioNotificationRaw> observable = _minioClient
|
|
|
|
|
|
.ListenBucketNotificationsAsync(args);
|
|
|
|
|
|
using var subscription = observable.Subscribe(
|
2025-03-21 11:22:17 +08:00
|
|
|
|
async notification =>
|
2025-03-20 09:09:14 +08:00
|
|
|
|
{
|
2025-03-29 17:12:54 +08:00
|
|
|
|
//cancellationToken.ThrowIfCancellationRequested();
|
2025-03-21 11:22:17 +08:00
|
|
|
|
Console.WriteLine($"Received notification: {notification.Json}");
|
|
|
|
|
|
var obj = JObject.Parse(notification.Json);
|
|
|
|
|
|
// s3:ObjectCreated:Put
|
|
|
|
|
|
var eventType = obj["Records"]?[0]?["eventName"]?.Value<string>();
|
|
|
|
|
|
var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value<long>();
|
|
|
|
|
|
var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value<string>();
|
|
|
|
|
|
var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value<string>();
|
2025-03-27 10:22:59 +08:00
|
|
|
|
Console.WriteLine("将要下载的Key:" + objectKey);
|
|
|
|
|
|
downloadQueue.Writer.TryWrite((bucketName, objectKey, localDir, objectETag));
|
2025-03-21 11:22:17 +08:00
|
|
|
|
},
|
|
|
|
|
|
ex => Console.WriteLine($"OnError: {ex}"),
|
|
|
|
|
|
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));
|
2025-03-29 17:12:54 +08:00
|
|
|
|
// 等待取消请求
|
|
|
|
|
|
|
2025-04-15 09:40:29 +08:00
|
|
|
|
await Task.Delay(Timeout.Infinite, cancellationToken);
|
2025-03-21 11:22:17 +08:00
|
|
|
|
}
|
2025-03-29 17:12:54 +08:00
|
|
|
|
catch (OperationCanceledException e)
|
|
|
|
|
|
{
|
|
|
|
|
|
Console.WriteLine("用户取消同步监听: " + e);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception e)
|
2025-03-21 11:22:17 +08:00
|
|
|
|
{
|
|
|
|
|
|
Console.WriteLine("Error occurred: " + e);
|
|
|
|
|
|
}
|
2025-03-29 17:12:54 +08:00
|
|
|
|
finally
|
|
|
|
|
|
{
|
|
|
|
|
|
downloadQueue.Writer.Complete();
|
|
|
|
|
|
// 等待消费完成
|
|
|
|
|
|
await consumerTask;
|
|
|
|
|
|
}
|
2025-03-28 14:20:51 +08:00
|
|
|
|
|
|
|
|
|
|
#endregion
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|
2025-04-15 10:29:41 +08:00
|
|
|
|
|
2025-04-21 16:35:40 +08:00
|
|
|
|
public async Task DownLoadObjectWithCallBack(MinioDownloadTask downTask, string bucketName, string objectKey,
|
|
|
|
|
|
string filePath,
|
|
|
|
|
|
string fileETag, Action<long, long, double> action, long offset = 0)
|
2025-04-15 10:29:41 +08:00
|
|
|
|
{
|
2025-04-23 14:42:54 +08:00
|
|
|
|
var updateTask = new MinioDownloadTask
|
|
|
|
|
|
{
|
|
|
|
|
|
TaskId = downTask.TaskId
|
|
|
|
|
|
};
|
2025-04-21 16:35:40 +08:00
|
|
|
|
var token = downTask.StopDownTs.Token;
|
2025-04-15 10:29:41 +08:00
|
|
|
|
long totalBytes = 0;
|
2025-07-07 10:13:45 +08:00
|
|
|
|
|
2025-04-15 10:29:41 +08:00
|
|
|
|
var args = new StatObjectArgs()
|
|
|
|
|
|
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
|
|
|
|
|
|
.WithObject(objectKey);
|
2025-04-21 16:35:40 +08:00
|
|
|
|
var stat = await _minioClient.StatObjectAsync(args, token);
|
2025-04-15 10:29:41 +08:00
|
|
|
|
totalBytes = stat.Size;
|
2025-04-25 10:20:56 +08:00
|
|
|
|
//var localPath = Path.Combine(filePath, objectKey.Replace('/', Path.DirectorySeparatorChar));
|
|
|
|
|
|
var localPath = Path.Combine(filePath,
|
2025-07-05 17:34:05 +08:00
|
|
|
|
objectKey[(objectKey.LastIndexOf('/') + 1)..]);
|
2025-07-07 10:13:45 +08:00
|
|
|
|
|
2025-07-05 17:34:05 +08:00
|
|
|
|
var index = localPath.LastIndexOf("/", StringComparison.Ordinal);
|
|
|
|
|
|
if (index > 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
var dir = localPath.Substring(0, index);
|
|
|
|
|
|
if (!Directory.Exists(dir))
|
|
|
|
|
|
{
|
|
|
|
|
|
Directory.CreateDirectory(dir);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-07-07 10:13:45 +08:00
|
|
|
|
|
2025-04-27 17:02:59 +08:00
|
|
|
|
var getObjectArgs = new GetObjectArgs()
|
2025-04-15 10:29:41 +08:00
|
|
|
|
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
|
|
|
|
|
|
.WithObject(objectKey)
|
2025-04-22 10:58:32 +08:00
|
|
|
|
.WithOffsetAndLength(offset, totalBytes - offset)
|
2025-04-15 10:29:41 +08:00
|
|
|
|
.WithCallbackStream((stream) =>
|
|
|
|
|
|
{
|
|
|
|
|
|
long bytesRead = 0;
|
|
|
|
|
|
byte[] buffer = new byte[64 * 1024]; // 64KB 缓冲区
|
|
|
|
|
|
int read;
|
2025-04-16 13:48:45 +08:00
|
|
|
|
// 速度计算变量
|
|
|
|
|
|
var speedWindow = TimeSpan.FromSeconds(5); // 5秒时间窗口
|
|
|
|
|
|
var speedRecords = new Queue<(DateTime Time, long Bytes)>();
|
|
|
|
|
|
var lastUpdate = DateTime.MinValue;
|
2025-04-21 13:41:57 +08:00
|
|
|
|
FileStream fileStream;
|
|
|
|
|
|
if (File.Exists(localPath))
|
|
|
|
|
|
{
|
|
|
|
|
|
fileStream = new FileStream(localPath, FileMode.Open, FileAccess.Write, FileShare.Write);
|
|
|
|
|
|
fileStream.Seek(offset, SeekOrigin.Begin);
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
fileStream = new FileStream(localPath, FileMode.Create, FileAccess.Write, FileShare.Write);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
while ((read = stream.Read(buffer, 0, buffer.Length)) > 0)
|
2025-04-15 10:29:41 +08:00
|
|
|
|
{
|
2025-04-21 13:41:57 +08:00
|
|
|
|
if (token.IsCancellationRequested)
|
2025-04-15 10:29:41 +08:00
|
|
|
|
{
|
2025-04-21 13:41:57 +08:00
|
|
|
|
//保存下载进度
|
2025-04-23 14:42:54 +08:00
|
|
|
|
using (var client = SqlSugarConfig.GetSqlSugarScope())
|
2025-04-21 13:41:57 +08:00
|
|
|
|
{
|
2025-04-23 14:42:54 +08:00
|
|
|
|
var temp = new MinioDownloadTask
|
|
|
|
|
|
{
|
|
|
|
|
|
TaskId = downTask.TaskId,
|
2025-04-29 15:21:07 +08:00
|
|
|
|
Downloaded = bytesRead + offset,
|
|
|
|
|
|
TotalSize = totalBytes, // 不可为null ,每次都有默认值
|
|
|
|
|
|
Status = "下载中"
|
2025-04-23 14:42:54 +08:00
|
|
|
|
};
|
|
|
|
|
|
// todo 这里有bug,有可能变成0
|
|
|
|
|
|
downTask.Downloaded = temp.Downloaded;
|
|
|
|
|
|
lock (SyncLock)
|
|
|
|
|
|
{
|
|
|
|
|
|
client.Updateable(temp).IgnoreNullColumns().ExecuteCommand();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-21 16:35:40 +08:00
|
|
|
|
fileStream.Close();
|
2025-04-21 13:41:57 +08:00
|
|
|
|
}
|
2025-04-16 13:48:45 +08:00
|
|
|
|
|
2025-04-21 13:41:57 +08:00
|
|
|
|
// 检查取消令牌
|
|
|
|
|
|
token.ThrowIfCancellationRequested();
|
|
|
|
|
|
fileStream.Write(buffer, 0, read);
|
|
|
|
|
|
bytesRead += read;
|
2025-04-16 13:48:45 +08:00
|
|
|
|
|
2025-04-21 13:41:57 +08:00
|
|
|
|
// 记录当前数据块
|
|
|
|
|
|
var now = DateTime.Now;
|
|
|
|
|
|
speedRecords.Enqueue((now, read));
|
|
|
|
|
|
|
|
|
|
|
|
// 清理过期记录
|
|
|
|
|
|
while (speedRecords.Count > 0 &&
|
|
|
|
|
|
(now - speedRecords.Peek().Time) > speedWindow)
|
|
|
|
|
|
{
|
|
|
|
|
|
speedRecords.Dequeue();
|
2025-04-15 10:29:41 +08:00
|
|
|
|
}
|
2025-04-16 13:48:45 +08:00
|
|
|
|
|
2025-04-21 13:41:57 +08:00
|
|
|
|
// 计算窗口内总字节
|
|
|
|
|
|
long windowBytes = speedRecords.Sum(r => r.Bytes);
|
|
|
|
|
|
// 计算速度(B/s)
|
|
|
|
|
|
double speedBps = windowBytes / speedWindow.TotalSeconds;
|
|
|
|
|
|
// 每秒触发一次回调
|
|
|
|
|
|
if ((now - lastUpdate).TotalMilliseconds >= 500)
|
|
|
|
|
|
{
|
|
|
|
|
|
action?.Invoke(bytesRead, totalBytes, speedBps);
|
2025-04-23 14:42:54 +08:00
|
|
|
|
using (var client = SqlSugarConfig.GetSqlSugarScope())
|
|
|
|
|
|
{
|
|
|
|
|
|
// 如何大于30MB,则1秒更新一次进度
|
|
|
|
|
|
if (totalBytes > 31457280)
|
|
|
|
|
|
{
|
|
|
|
|
|
updateTask.Downloaded = bytesRead + offset;
|
|
|
|
|
|
lock (SyncLock)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 执行更新操作
|
|
|
|
|
|
client.Updateable(updateTask).IgnoreNullColumns().ExecuteCommand();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-21 13:41:57 +08:00
|
|
|
|
lastUpdate = now;
|
|
|
|
|
|
}
|
2025-04-15 10:29:41 +08:00
|
|
|
|
}
|
2025-04-21 13:41:57 +08:00
|
|
|
|
|
|
|
|
|
|
// 触发最终速度回调
|
|
|
|
|
|
action?.Invoke(bytesRead, totalBytes, 0);
|
|
|
|
|
|
fileStream.Close();
|
2025-04-15 10:29:41 +08:00
|
|
|
|
});
|
2025-04-22 14:57:41 +08:00
|
|
|
|
// 会返回stat 怎么减少stat的查询
|
2025-04-21 13:41:57 +08:00
|
|
|
|
await _minioClient.GetObjectAsync(getObjectArgs, token);
|
2025-04-15 10:29:41 +08:00
|
|
|
|
/*if (VerifyETag(localPath, objectETag))
|
|
|
|
|
|
{
|
|
|
|
|
|
// todo 先忽略处理
|
|
|
|
|
|
}*/
|
|
|
|
|
|
|
|
|
|
|
|
Console.WriteLine($"{objectKey} Download complete");
|
|
|
|
|
|
}
|
2025-03-13 09:58:16 +08:00
|
|
|
|
}
|
2025-03-20 09:09:14 +08:00
|
|
|
|
}
|