FieldWorkClient/Services/MinioService.cs

343 lines
14 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.Security.Cryptography;
using System.Threading.Channels;
using Microsoft.Extensions.Configuration;
using Minio;
using Minio.DataModel.Args;
using Minio.DataModel.Notification;
using Minio.Exceptions;
using Newtonsoft.Json.Linq;
using Polly;
using Polly.Retry;
namespace Hopetry.Services
{
public partial class MinioService
{
private IMinioClient _minioClient;
public readonly string _bucketName = null;
public MinioService()
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("global.json", optional: false, reloadOnChange: true);
// 构建配置
var config = builder.Build();
var minioConfig = config.GetSection("Minio");
_minioClient = new MinioClient()
.WithEndpoint(minioConfig["Endpoint"])
.WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]).Build();
_bucketName = minioConfig["BucketName"]!;
/*_minioClient = new MinioClient()
.WithEndpoint("123.132.248.154:9107")
.WithCredentials("oZNgo25pNXnKFV9oKGh4", "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u")
.Build();*/
EnsureBucketExistsAsync(_bucketName).Wait();
}
AsyncRetryPolicy policy = Policy
.Handle<MinioException>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (exception, delay, retryCount, context) =>
{
//日志
// _logger.LogWarning($"Retry {retryCount} for {context.PolicyKey}: {exception.Message}");
});
// 使用 Channel 实现生产者-消费者模式
private static readonly Channel<(string ObjectName, string ETag)> SyncChannel =
Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
public async Task ListObject(string bucketName)
{
var listArgs = new ListObjectsArgs()
.WithBucket(bucketName)
.WithVersions(true)
.WithRecursive(true);
try
{
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
//observable.ConfigureAwait(false);
await foreach (var item in observable)
{
Console.WriteLine("==============");
Console.WriteLine(item.Key);
//Console.WriteLine(item.IsDir);
Console.WriteLine(item.LastModified);
Console.WriteLine(item.Size);
Console.WriteLine(item.ETag);
Console.WriteLine(item.VersionId);
Console.WriteLine("==============");
}
}
catch (Exception e)
{
Console.WriteLine(e);
Console.WriteLine("我抛出的");
}
}
/// <summary>
/// 同步下载 todo 需要合并另外一个同步方法
/// </summary>
/// <param name="bucket"></param>
/// <param name="localDir"></param>
/// <param name="maxParallel"></param>
public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 5)
{
var count = 0;
var producerTasks = new List<Task>();
var listArgs = new ListObjectsArgs()
.WithBucket(bucket)
.WithRecursive(true);
// 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典
var localFileIndex = await BuildLocalFileIndex(localDir);
//var x = policy.ExecuteAsync(() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs)));
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
await foreach (var item in observable)
{
var index = item.Key.LastIndexOf("/", StringComparison.Ordinal);
if (index > 0)
{
var dir = Path.Combine(localDir, item.Key.Substring(0, index));
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
producerTasks.Add(Task.Run(async () =>
{
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
// 快速路径判断(存在性检查)
if (!localFileIndex.TryGetValue(localPath, out var localMeta))
{
// 如果不存在,则加入下载队列
await SyncChannel.Writer.WriteAsync((item.Key, item.ETag));
}
else
{
DateTime itemLastModified =
DateTime.Parse(item.LastModified, null, DateTimeStyles.RoundtripKind);
if ((ulong)localMeta.Size != item.Size || localMeta.LastModified < itemLastModified ||
!VerifyETag(localPath, item.ETag))
{
await SyncChannel.Writer.WriteAsync((item.Key, item.ETag));
}
}
}));
}
await Task.WhenAll(producerTasks);
SyncChannel.Writer.Complete(); // 关键步骤!
// 阶段2: 并行消费下载任务
var consumerTask = Task.Run(async () =>
{
await Parallel.ForEachAsync(SyncChannel.Reader.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
async (item, _) =>
{
count++;
await DownLoadObject(bucket, item.ObjectName, localDir, item.ETag);
}
);
});
// 等待消费完成
await consumerTask;
Console.WriteLine($"{count}个文件下载完成");
}
/// <summary>
/// 构建本地文件内存索引(路径 → 元数据)
/// </summary>
/// <param name="rootDir"></param>
/// <returns></returns>
private async Task<ConcurrentDictionary<string, (long Size, DateTime LastModified)>> BuildLocalFileIndex(
string rootDir)
{
var index =
new ConcurrentDictionary<string, (long Size, DateTime LastModified)>(StringComparer.OrdinalIgnoreCase);
await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories),
async (path, _) =>
{
var info = new FileInfo(path);
index.TryAdd(path, (info.Length, info.LastWriteTimeUtc));
});
return index;
}
public async Task EnsureBucketExistsAsync(string bucketName)
{
var existsArgs = new BucketExistsArgs().WithBucket(bucketName);
var x = await _minioClient.BucketExistsAsync(existsArgs);
Console.WriteLine($" {bucketName} exist status: " + x);
// 如果存储桶不存在,则创建存储桶
if (!x)
{
var makeArgs = new MakeBucketArgs().WithBucket(bucketName);
await _minioClient.MakeBucketAsync(makeArgs);
}
}
/// <summary>
/// 上伟文件
/// </summary>
/// <param name="fileRecord"></param>
/*public async Task UploadFileAsync(FileRecord fileRecord)
{
var putArgs = new PutObjectArgs()
.WithBucket(_bucketName)
.WithObject(fileRecord.FileName)
.WithFileName(fileRecord.LocalPath)
// application/zip
.WithContentType("application/octet-stream");
await _minioClient.PutObjectAsync(putArgs);
}*/
/// <summary>
/// 列出存储桶内所有文件
/// </summary>
/// <param name="bucket"></param>
public async Task ListAllObject()
{
// Just list of objects
// Check whether 'mybucket' exists or not.
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
bool found = await _minioClient.BucketExistsAsync(existsArgs);
if (found)
{
// List objects from 'my-bucketname'
ListObjectsArgs args = new ListObjectsArgs()
.WithBucket(_bucketName)
.WithRecursive(false);
// ListObjectsEnumAsync 新方法
var observable = _minioClient.ListObjectsEnumAsync(args);
var x = observable.GetAsyncEnumerator();
await x.MoveNextAsync();
Console.WriteLine(x.Current.Key);
Console.WriteLine(x.Current.Size);
Console.WriteLine(x.Current.ETag);
Console.WriteLine(x.Current.LastModified);
Console.WriteLine(x.Current.IsDir);
/*IDisposable subscription = observable.Subscribe(
item => Console.WriteLine("OnNext: {0}", item.Key),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnComplete: {0}"));*/
}
else
{
Console.WriteLine("mybucket does not exist");
}
}
// todo 下载
public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag)
{
var index = objectKey.LastIndexOf("/", StringComparison.Ordinal);
if (index > 0)
{
var dir = Path.Combine(localDir, objectKey.Substring(0, index));
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
var localPath = Path.Combine(localDir, objectKey.Replace('/', Path.DirectorySeparatorChar));
var getArgs = new GetObjectArgs()
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
.WithObject(objectKey)
.WithFile(localPath);
await _minioClient.GetObjectAsync(getArgs);
if (VerifyETag(localPath, objectETag))
{
// todo 先忽略处理
}
Console.WriteLine($"{objectKey} Download complete");
}
// 差异检测:通过 ETag 和修改时间对比‌
private async Task<bool> NeedSyncAsync(string bucket, string objectName, string localPath)
{
if (!File.Exists(localPath)) return true;
var remoteMeta = await GetObjectMetadata(bucket, objectName);
var localInfo = new FileInfo(localPath);
return localInfo.Length != remoteMeta.Size ||
localInfo.LastWriteTimeUtc < remoteMeta.LastModified ||
!VerifyETag(localPath, remoteMeta.ETag);
}
// 获取远程对象元数据‌
public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket,
string objectName)
{
var args = new StatObjectArgs().WithBucket(bucket).WithObject(objectName);
var stat = await _minioClient.StatObjectAsync(args);
return (stat.Size, stat.LastModified, stat.ETag);
}
// 校验本地文件 ETagMinIO 使用 MD5
public bool VerifyETag(string filePath, string remoteETag)
{
using var md5 = MD5.Create();
using var stream = File.OpenRead(filePath);
var localHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower();
return remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase);
}
// 实时监听方法
public async Task RealTimeListen(string bucketName, string localDir, string prefix = "", string suffix = "")
{
try
{
var events = new List<EventType> { EventType.ObjectCreatedAll };
ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(events);
IObservable<MinioNotificationRaw> observable = _minioClient.ListenBucketNotificationsAsync(args);
IDisposable subscription = observable.Subscribe(
async notification =>
{
Console.WriteLine($"Received notification: {notification.Json}");
var obj = JObject.Parse(notification.Json);
// s3:ObjectCreated:Put
var eventType = obj["Records"]?[0]?["eventName"]?.Value<string>();
var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value<long>();
var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value<string>();
var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value<string>();
await DownLoadObject(bucketName, objectKey, localDir, objectETag);
},
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));
}
catch (MinioException e)
{
Console.WriteLine("Error occurred: " + e);
}
}
}
}