FieldWorkClient/Services/MinioService.cs

566 lines
24 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System.Collections.Concurrent;
using System.IO;
using System.Security.Cryptography;
using System.Threading.Channels;
using Hopetry.Provider;
using Microsoft.Extensions.Configuration;
using Minio;
using Minio.DataModel;
using Minio.DataModel.Args;
using Minio.DataModel.Notification;
using Minio.Exceptions;
using Newtonsoft.Json.Linq;
using Polly;
using Polly.Retry;
namespace Hopetry.Services
{
public partial class MinioService
{
// 同步锁
private static readonly object SyncLock = new();
private IMinioClient _minioClient;
public readonly string _bucketName = null;
public MinioService()
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("global.json", optional: false, reloadOnChange: true);
// 构建配置
var config = builder.Build();
var minioConfig = config.GetSection("Minio");
_minioClient = new MinioClient()
.WithEndpoint(minioConfig["Endpoint"])
.WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]).Build();
_bucketName = minioConfig["BucketName"]!;
/*_minioClient = new MinioClient()
.WithEndpoint("123.132.248.154:9107")
.WithCredentials("oZNgo25pNXnKFV9oKGh4", "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u")
.Build();*/
EnsureBucketExistsAsync(_bucketName).Wait();
}
AsyncRetryPolicy policy = Policy
.Handle<MinioException>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (exception, delay, retryCount, context) =>
{
//日志
// _logger.LogWarning($"Retry {retryCount} for {context.PolicyKey}: {exception.Message}");
});
// 使用 Channel 实现生产者-消费者模式
/*private Channel<(string ObjectName, string ETag)> SyncChannel =
Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});*/
public async Task ListObject(string bucketName)
{
var listArgs = new ListObjectsArgs()
.WithBucket(bucketName)
.WithVersions(true)
.WithRecursive(true);
try
{
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
//observable.ConfigureAwait(false);
await foreach (var item in observable)
{
Console.WriteLine("==============");
Console.WriteLine(item.Key);
//Console.WriteLine(item.IsDir);
Console.WriteLine(item.LastModified);
Console.WriteLine(item.Size);
Console.WriteLine(item.ETag);
Console.WriteLine(item.VersionId);
Console.WriteLine("==============");
}
}
catch (Exception e)
{
Console.WriteLine(e);
Console.WriteLine("我抛出的");
}
}
/// <summary>
/// 同步下载 todo 需要合并另外一个同步方法
/// </summary>
/// <param name="bucket"></param>
/// <param name="localDir"></param>
/// <param name="maxParallel"></param>
public async Task MirrorAsync(string bucket, string localDir, Action<long, long, string> action,
int maxParallel = 5)
{
Console.WriteLine("开始全量同步");
Channel<(string ObjectName, string ETag)> syncChannel = Channel.CreateBounded<(string, string)>(
new BoundedChannelOptions(1000)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
var count = 0;
var total = 0;
var producerTasks = new List<Task>();
var listArgs = new ListObjectsArgs()
.WithBucket(bucket)
.WithRecursive(true);
// 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典
var localFileIndex = await BuildLocalFileIndex(localDir);
//var x = policy.ExecuteAsync(() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs)));
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
// 阶段2: 并行消费下载任务
var consumerTask = Task.Run(async () =>
{
await Parallel.ForEachAsync(syncChannel.Reader.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
async (item, _) =>
{
action(total, count, item.ObjectName);
await DownLoadObject(bucket, item.ObjectName, localDir, item.ETag);
count++;
action(total, count, item.ObjectName);
}
);
});
await foreach (var item in observable)
{
total++;
// 关于isDir判断
if (item.IsDir)
{
// 判断文件夹存不存在,不存在则创建
var dir = Path.Combine(localDir, item.Key);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
continue;
}
var index = item.Key.LastIndexOf("/", StringComparison.Ordinal);
if (index > 0)
{
var dir = Path.Combine(localDir, item.Key.Substring(0, index));
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
producerTasks.Add(Task.Run(async () =>
{
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
// 快速路径判断(存在性检查)
var fileExist = localFileIndex.TryGetValue(localPath, out var localMeta);
if (fileExist)
{
// 只要存在就不校验 因为未下载完成的文件为mino文件
//DateTime itemLastModified = DateTime.Parse(item.LastModified, null, DateTimeStyles.RoundtripKind);
//if ((ulong)localMeta.Size != item.Size || localMeta.LastModified < itemLastModified || !VerifyETag(localPath, item.ETag))
/*if (!VerifyETag(localPath, item.ETag))
{
await syncChannel.Writer.WriteAsync((item.Key, item.ETag));
Console.WriteLine($"{localPath} 加入下载队列");
total++;
}*/
}
else
{
// 如果不存在,则加入下载队列
await syncChannel.Writer.WriteAsync((item.Key, item.ETag));
Console.WriteLine($"{localPath} 加入下载队列");
total++;
}
}));
}
await Task.WhenAll(producerTasks);
syncChannel.Writer.Complete(); // 关键步骤!
// 等待消费完成
await consumerTask;
Console.WriteLine($"{count}个文件下载完成");
}
/// <summary>
/// 构建本地文件内存索引(路径 → 元数据)
/// </summary>
/// <param name="rootDir"></param>
/// <returns></returns>
private async Task<ConcurrentDictionary<string, (long Size, DateTime LastModified)>> BuildLocalFileIndex(
string rootDir)
{
var index =
new ConcurrentDictionary<string, (long Size, DateTime LastModified)>(StringComparer.OrdinalIgnoreCase);
await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories),
async (path, _) =>
{
var info = new FileInfo(path);
index.TryAdd(path, (info.Length, info.LastWriteTimeUtc));
});
return index;
}
public async Task EnsureBucketExistsAsync(string bucketName)
{
var existsArgs = new BucketExistsArgs().WithBucket(bucketName);
var x = await _minioClient.BucketExistsAsync(existsArgs);
Console.WriteLine($" {bucketName} exist status: " + x);
// 如果存储桶不存在,则创建存储桶
if (!x)
{
var makeArgs = new MakeBucketArgs().WithBucket(bucketName);
await _minioClient.MakeBucketAsync(makeArgs);
}
}
/// <summary>
/// 上伟文件
/// </summary>
/// <param name="fileRecord"></param>
/*public async Task UploadFileAsync(FileRecord fileRecord)
{
var speedInBytesPerSecond = 1024 * 1024;
using (var sourceStream = File.OpenRead("path_to_file"))
using (var throttledStream =
new ThrottledStream(sourceStream,
speedInBytesPerSecond)) // speedInBytesPerSecond是你想要的速度例如1024 * 1024表示1MB/s
{
var putArgs = new PutObjectArgs()
.WithBucket(_bucketName)
.WithObject(fileRecord.FileName)
//.WithStreamData(throttledStream)
.WithObjectSize(111l)
.WithContentType("application/octet-stream");
await _minioClient.PutObjectAsync(putArgs);
}
}*/
/// <summary>
/// 列出存储桶内所有文件
/// </summary>
/// <param name="bucketName"></param>
/// <param name="prefix"></param>
/// <param name="recursive"></param>
public async Task<IAsyncEnumerable<Item>> ListAllObject(string bucketName, string prefix, bool recursive,
CancellationToken token = default)
{
// Just list of objects
// Check whether 'mybucket' exists or not.
if (string.IsNullOrEmpty(bucketName))
{
bucketName = _bucketName;
}
var existsArgs = new BucketExistsArgs().WithBucket(bucketName);
var found = await _minioClient.BucketExistsAsync(existsArgs, token);
if (found)
{
var args = new ListObjectsArgs()
.WithBucket(bucketName)
.WithPrefix(prefix)
.WithRecursive(recursive);
return _minioClient.ListObjectsEnumAsync(args, token);
}
Console.WriteLine("mybucket does not exist");
throw new Exception("bucket not found");
}
public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag,
CancellationToken token = default)
{
var index = objectKey.LastIndexOf("/", StringComparison.Ordinal);
if (index > 0)
{
var dir = Path.Combine(localDir, objectKey.Substring(0, index));
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
var localPath = Path.Combine(localDir, objectKey.Replace('/', Path.DirectorySeparatorChar));
var getArgs = new GetObjectArgs()
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
.WithObject(objectKey)
.WithFile(localPath);
var stat = await _minioClient.GetObjectAsync(getArgs, token);
/*if (VerifyETag(localPath, objectETag))
{
// todo 先忽略处理
}*/
Console.WriteLine($"{objectKey} Download complete");
}
// 差异检测:通过 ETag 和修改时间对比‌
private async Task<bool> NeedSyncAsync(string bucket, string objectName, string localPath)
{
if (!File.Exists(localPath)) return true;
var remoteMeta = await GetObjectMetadata(bucket, objectName);
var localInfo = new FileInfo(localPath);
return localInfo.Length != remoteMeta.Size ||
localInfo.LastWriteTimeUtc < remoteMeta.LastModified ||
!VerifyETag(localPath, remoteMeta.ETag);
}
// 获取远程对象元数据‌
public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket,
string objectName)
{
var args = new StatObjectArgs().WithBucket(bucket).WithObject(objectName);
var stat = await _minioClient.StatObjectAsync(args);
return (stat.Size, stat.LastModified, stat.ETag);
}
// 校验本地文件 ETagMinIO 使用 MD5
public bool VerifyETag(string filePath, string remoteETag)
{
if (remoteETag.Contains("-"))
{
return true;
}
using var md5 = MD5.Create();
using var stream = File.OpenRead(filePath);
var localHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower();
var x = remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase);
return x;
}
// 实时监听方法
public async Task RealTimeListen(string bucketName, string localDir,Action<string> action, string prefix = "", string suffix = "",
CancellationToken cancellationToken = default)
{
Console.WriteLine("开启实时监听");
var downloadQueue =
Channel.CreateBounded<(string bucketName, string objectKey, string localDir, string objectETag)>(
new BoundedChannelOptions(3000)
{
FullMode = BoundedChannelFullMode.Wait
});
// 使用LinkedTokenSource组合令牌
//var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
// 辅助任务(独立取消 + 继承总控)
/*var helperCts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken, // 总控取消时辅助任务也停止
new CancellationTokenSource().Token // 独立取消通道
);*/
var consumerTask = Task.Run(async () =>
{
await Parallel.ForEachAsync(downloadQueue.Reader.ReadAllAsync(),
new ParallelOptions
{
MaxDegreeOfParallelism = 5
},
async (item, _) =>
{
action($"实时同步中: {item.objectKey}");
await DownLoadObject(item.bucketName, item.objectKey, item.localDir, item.objectETag);
action("实时同步完成");
});
});
#region 监听事件
try
{
var events = new List<EventType> { EventType.ObjectCreatedAll };
var args = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(events);
IObservable<MinioNotificationRaw> observable = _minioClient
.ListenBucketNotificationsAsync(args);
using var subscription = observable.Subscribe(
async notification =>
{
//cancellationToken.ThrowIfCancellationRequested();
Console.WriteLine($"Received notification: {notification.Json}");
var obj = JObject.Parse(notification.Json);
// s3:ObjectCreated:Put
var eventType = obj["Records"]?[0]?["eventName"]?.Value<string>();
var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value<long>();
var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value<string>();
var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value<string>();
Console.WriteLine("将要下载的Key:" + objectKey);
downloadQueue.Writer.TryWrite((bucketName, objectKey, localDir, objectETag));
},
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));
// 等待取消请求
await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException e)
{
Console.WriteLine("用户取消同步监听: " + e);
}
catch (Exception e)
{
Console.WriteLine("Error occurred: " + e);
}
finally
{
downloadQueue.Writer.Complete();
// 等待消费完成
await consumerTask;
}
#endregion
}
public async Task DownLoadObjectWithCallBack(MinioDownloadTask downTask, string bucketName, string objectKey,
string filePath,
string fileETag, Action<long, long, double> action, long offset = 0)
{
var updateTask = new MinioDownloadTask
{
TaskId = downTask.TaskId
};
var token = downTask.StopDownTs.Token;
long totalBytes = 0;
var args = new StatObjectArgs()
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
.WithObject(objectKey);
var stat = await _minioClient.StatObjectAsync(args, token);
totalBytes = stat.Size;
//var localPath = Path.Combine(filePath, objectKey.Replace('/', Path.DirectorySeparatorChar));
var localPath = Path.Combine(filePath,
objectKey[(objectKey.LastIndexOf('/') + 1)..]);
var index = localPath.LastIndexOf("/", StringComparison.Ordinal);
if (index > 0)
{
var dir = localPath.Substring(0, index);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
var getObjectArgs = new GetObjectArgs()
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
.WithObject(objectKey)
.WithOffsetAndLength(offset, totalBytes - offset)
.WithCallbackStream((stream) =>
{
long bytesRead = 0;
byte[] buffer = new byte[64 * 1024]; // 64KB 缓冲区
int read;
// 速度计算变量
var speedWindow = TimeSpan.FromSeconds(5); // 5秒时间窗口
var speedRecords = new Queue<(DateTime Time, long Bytes)>();
var lastUpdate = DateTime.MinValue;
FileStream fileStream;
if (File.Exists(localPath))
{
fileStream = new FileStream(localPath, FileMode.Open, FileAccess.Write, FileShare.Write);
fileStream.Seek(offset, SeekOrigin.Begin);
}
else
{
fileStream = new FileStream(localPath, FileMode.Create, FileAccess.Write, FileShare.Write);
}
while ((read = stream.Read(buffer, 0, buffer.Length)) > 0)
{
if (token.IsCancellationRequested)
{
//保存下载进度
using (var client = SqlSugarConfig.GetSqlSugarScope())
{
var temp = new MinioDownloadTask
{
TaskId = downTask.TaskId,
Downloaded = bytesRead + offset,
TotalSize = totalBytes, // 不可为null ,每次都有默认值
Status = "下载中"
};
// todo 这里有bug,有可能变成0
downTask.Downloaded = temp.Downloaded;
lock (SyncLock)
{
client.Updateable(temp).IgnoreNullColumns().ExecuteCommand();
}
}
fileStream.Close();
}
// 检查取消令牌
token.ThrowIfCancellationRequested();
fileStream.Write(buffer, 0, read);
bytesRead += read;
// 记录当前数据块
var now = DateTime.Now;
speedRecords.Enqueue((now, read));
// 清理过期记录
while (speedRecords.Count > 0 &&
(now - speedRecords.Peek().Time) > speedWindow)
{
speedRecords.Dequeue();
}
// 计算窗口内总字节
long windowBytes = speedRecords.Sum(r => r.Bytes);
// 计算速度B/s
double speedBps = windowBytes / speedWindow.TotalSeconds;
// 每秒触发一次回调
if ((now - lastUpdate).TotalMilliseconds >= 500)
{
action?.Invoke(bytesRead, totalBytes, speedBps);
using (var client = SqlSugarConfig.GetSqlSugarScope())
{
// 如何大于30MB,则1秒更新一次进度
if (totalBytes > 31457280)
{
updateTask.Downloaded = bytesRead + offset;
lock (SyncLock)
{
// 执行更新操作
client.Updateable(updateTask).IgnoreNullColumns().ExecuteCommand();
}
}
}
lastUpdate = now;
}
}
// 触发最终速度回调
action?.Invoke(bytesRead, totalBytes, 0);
fileStream.Close();
});
// 会返回stat 怎么减少stat的查询
await _minioClient.GetObjectAsync(getObjectArgs, token);
/*if (VerifyETag(localPath, objectETag))
{
// todo 先忽略处理
}*/
Console.WriteLine($"{objectKey} Download complete");
}
}
}