1. 添加json引用

2. 完善minio应用
dev
陈伟 2025-03-21 11:22:17 +08:00
parent 0f6cf2c2df
commit dcb72e2183
2 changed files with 163 additions and 133 deletions

View File

@ -49,12 +49,12 @@
<PackageReference Include="Microsoft.Extensions.Configuration" Version="9.0.3" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="9.0.3" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="9.0.3" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="9.0.3" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.3" />
<PackageReference Include="Minio" Version="6.0.4" />
<PackageReference Include="ModernWpfUI" Version="0.9.6" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="MSTest.TestFramework" Version="3.8.2" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="NLog" Version="5.4.0" />
<PackageReference Include="Polly" Version="8.5.2" />
<PackageReference Include="WPF-UI" Version="4.0.2" />

View File

@ -1,15 +1,13 @@
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.Security.Cryptography;
using System.Threading.Channels;
using FileUploader.Models;
using Microsoft.Extensions.Configuration;
using Minio;
using Minio.ApiEndpoints;
using Minio.DataModel;
using Minio.DataModel.Args;
using Minio.DataModel.Notification;
using Minio.Exceptions;
using Newtonsoft.Json.Linq;
using Polly;
using Polly.Retry;
@ -17,10 +15,20 @@ namespace Hopetry.Services
{
public partial class MinioService
{
private readonly IMinioClient _minioClient;
private IMinioClient _minioClient;
private readonly string _bucketName = null;
public MinioService()
{
_minioClient = new MinioClient()
.WithEndpoint("123.132.248.154:9107")
.WithCredentials("oZNgo25pNXnKFV9oKGh4", "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u")
.Build();
//WithCredentials("minioadmin", "minioadmin").Build();
_bucketName = "demo";
EnsureBucketExistsAsync(_bucketName).Wait();
}
AsyncRetryPolicy policy = Policy
.Handle<MinioException>()
@ -33,7 +41,7 @@ namespace Hopetry.Services
});
// 使用 Channel 实现生产者-消费者模式
private static readonly Channel<(string ObjectName, string LocalPath)> _syncChannel =
private static readonly Channel<(string ObjectName, string ETag)> SyncChannel =
Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
{
SingleWriter = false,
@ -41,25 +49,66 @@ namespace Hopetry.Services
FullMode = BoundedChannelFullMode.Wait
});
public async Task ListObject(string bucketName)
{
var listArgs = new ListObjectsArgs()
.WithBucket(bucketName)
.WithVersions(true)
.WithRecursive(true);
try
{
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
//observable.ConfigureAwait(false);
await foreach (var item in observable)
{
Console.WriteLine("==============");
Console.WriteLine(item.Key);
//Console.WriteLine(item.IsDir);
Console.WriteLine(item.LastModified);
Console.WriteLine(item.Size);
Console.WriteLine(item.ETag);
Console.WriteLine(item.VersionId);
Console.WriteLine("==============");
}
}
catch (Exception e)
{
Console.WriteLine(e);
Console.WriteLine("我抛出的");
}
}
/// <summary>
/// 同步下载 todo 需要合并另外一个同步方法
/// </summary>
/// <param name="bucket"></param>
/// <param name="localDir"></param>
/// <param name="maxParallel"></param>
public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 16)
public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 5)
{
var count = 0;
var producerTasks = new List<Task>();
var listArgs = new ListObjectsArgs()
.WithBucket(bucket)
.WithRecursive(true);
// 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典
var localFileIndex = await BuildLocalFileIndex(localDir);
var x = policy.ExecuteAsync(
() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs)));
await foreach (var item in x.Result)
//var x = policy.ExecuteAsync(() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs)));
var observable = _minioClient.ListObjectsEnumAsync(listArgs);
await foreach (var item in observable)
{
var index = item.Key.LastIndexOf("/", StringComparison.Ordinal);
if (index > 0)
{
var dir = Path.Combine(localDir, item.Key.Substring(0, index));
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
producerTasks.Add(Task.Run(async () =>
{
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
@ -67,44 +116,39 @@ namespace Hopetry.Services
if (!localFileIndex.TryGetValue(localPath, out var localMeta))
{
// 如果不存在,则加入下载队列
await _syncChannel.Writer.WriteAsync((item.Key, localPath));
await SyncChannel.Writer.WriteAsync((item.Key, item.ETag));
}
// 并行获取远程元数据(避免串行等待)
var remoteMetaTask = GetObjectMetadata(bucket, item.Key);
// 对比本地缓存元数据
if ((ulong)localMeta.Size != item.Size || localMeta.LastModified.CompareTo(item.LastModified) < 0)
else
{
var remoteMeta = await remoteMetaTask;
if (!VerifyETag(localPath, remoteMeta.ETag))
DateTime itemLastModified =
DateTime.Parse(item.LastModified, null, DateTimeStyles.RoundtripKind);
if ((ulong)localMeta.Size != item.Size || localMeta.LastModified < itemLastModified ||
!VerifyETag(localPath, item.ETag))
{
await _syncChannel.Writer.WriteAsync((item.Key, localPath));
await SyncChannel.Writer.WriteAsync((item.Key, item.ETag));
}
}
}));
}
await Task.WhenAll(producerTasks);
SyncChannel.Writer.Complete(); // 关键步骤!
// 阶段2: 并行消费下载任务
var consumerTask = Task.Run(async () =>
{
await Parallel.ForEachAsync(_syncChannel.Reader.ReadAllAsync(),
await Parallel.ForEachAsync(SyncChannel.Reader.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
async (item, _) =>
{
// 经分析 SDK 内部已经做了临时文件处理,不必再画蛇添足
await _minioClient.GetObjectAsync(
new GetObjectArgs()
.WithBucket(bucket)
.WithObject(item.ObjectName)
.WithFile(item.LocalPath));
// todo 可能需要添加文件校验
count++;
await DownLoadObject(bucket, item.ObjectName, localDir, item.ETag);
}
);
);
});
await Task.WhenAll(producerTasks);
_syncChannel.Writer.Complete(); // 关键步骤!
// 等待消费完成
await consumerTask;
Console.WriteLine($"{count}个文件下载完成");
}
/// <summary>
@ -118,31 +162,24 @@ namespace Hopetry.Services
var index =
new ConcurrentDictionary<string, (long Size, DateTime LastModified)>(StringComparer.OrdinalIgnoreCase);
await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories),
async (file, _) =>
async (path, _) =>
{
var info = new FileInfo(file);
index.TryAdd(file, (info.Length, info.LastWriteTimeUtc));
var info = new FileInfo(path);
index.TryAdd(path, (info.Length, info.LastWriteTimeUtc));
});
return index;
}
public MinioService(IConfiguration config)
{
var minioConfig = config.GetSection("Minio");
_minioClient = new MinioClient()
.WithEndpoint(minioConfig["Endpoint"])
.WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]);
_bucketName = minioConfig["BucketName"]!;
EnsureBucketExistsAsync().Wait();
}
private async Task EnsureBucketExistsAsync()
public async Task EnsureBucketExistsAsync(string bucketName)
{
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
var existsArgs = new BucketExistsArgs().WithBucket(bucketName);
var x = await _minioClient.BucketExistsAsync(existsArgs);
Console.WriteLine($" {bucketName} exist status: " + x);
// 如果存储桶不存在,则创建存储桶
if (!await _minioClient.BucketExistsAsync(existsArgs))
if (!x)
{
var makeArgs = new MakeBucketArgs().WithBucket(_bucketName);
var makeArgs = new MakeBucketArgs().WithBucket(bucketName);
await _minioClient.MakeBucketAsync(makeArgs);
}
}
@ -151,7 +188,7 @@ namespace Hopetry.Services
/// 上伟文件
/// </summary>
/// <param name="fileRecord"></param>
public async Task UploadFileAsync(FileRecord fileRecord)
/*public async Task UploadFileAsync(FileRecord fileRecord)
{
var putArgs = new PutObjectArgs()
.WithBucket(_bucketName)
@ -160,70 +197,73 @@ namespace Hopetry.Services
// application/zip
.WithContentType("application/octet-stream");
await _minioClient.PutObjectAsync(putArgs);
}
}*/
/// <summary>
/// 列出存储桶内所有文件
/// </summary>
/// <param name="bucket"></param>
public async Task ListAllObject(string bucket)
public async Task ListAllObject()
{
try
// Just list of objects
// Check whether 'mybucket' exists or not.
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
bool found = await _minioClient.BucketExistsAsync(existsArgs);
if (found)
{
// Just list of objects
// Check whether 'mybucket' exists or not.
var existsArgs = new BucketExistsArgs().WithBucket(_bucketName);
bool found = await _minioClient.BucketExistsAsync(existsArgs);
if (found)
{
// List objects from 'my-bucketname'
ListObjectsArgs args = new ListObjectsArgs()
.WithBucket("mybucket")
.WithPrefix("prefix")
.WithRecursive(true);
// ListObjectsEnumAsync 新方法
IObservable<Item> observable = _minioClient.ListObjectsAsync(args);
IDisposable subscription = observable.Subscribe(
item => Console.WriteLine("OnNext: {0}", item.Key),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnComplete: {0}"));
}
else
{
Console.WriteLine("mybucket does not exist");
}
// List objects from 'my-bucketname'
ListObjectsArgs args = new ListObjectsArgs()
.WithBucket(_bucketName)
.WithRecursive(false);
// ListObjectsEnumAsync 新方法
var observable = _minioClient.ListObjectsEnumAsync(args);
var x = observable.GetAsyncEnumerator();
await x.MoveNextAsync();
Console.WriteLine(x.Current.Key);
Console.WriteLine(x.Current.Size);
Console.WriteLine(x.Current.ETag);
Console.WriteLine(x.Current.LastModified);
Console.WriteLine(x.Current.IsDir);
/*IDisposable subscription = observable.Subscribe(
item => Console.WriteLine("OnNext: {0}", item.Key),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnComplete: {0}"));*/
}
catch (MinioException e)
else
{
Console.WriteLine("Error occurred: " + e);
Console.WriteLine("mybucket does not exist");
}
}
public async Task ListenBucket(string bucketName, string prefix, string suffix)
// todo 下载
public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag)
{
try
var index = objectKey.LastIndexOf("/", StringComparison.Ordinal);
if (index > 0)
{
var events = new List<EventType> { EventType.ObjectCreatedAll };
ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(events)
.WithPrefix(prefix)
.WithSuffix(suffix);
IObservable<MinioNotificationRaw> observable = _minioClient.ListenBucketNotificationsAsync(args);
IDisposable subscription = observable.Subscribe(
notification => Console.WriteLine($"Notification: {notification.Json}"),
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));
var dir = Path.Combine(localDir, objectKey.Substring(0, index));
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
catch (MinioException e)
var localPath = Path.Combine(localDir, objectKey.Replace('/', Path.DirectorySeparatorChar));
var getArgs = new GetObjectArgs()
.WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName)
.WithObject(objectKey)
.WithFile(localPath);
await _minioClient.GetObjectAsync(getArgs);
if (VerifyETag(localPath, objectETag))
{
Console.WriteLine("Error occurred: " + e);
// todo 先忽略处理
}
Console.WriteLine($"{objectKey} Download complete");
}
// 差异检测:通过 ETag 和修改时间对比‌:ml-citation{ref="1,4" data="citationList"}
// 差异检测:通过 ETag 和修改时间对比‌
private async Task<bool> NeedSyncAsync(string bucket, string objectName, string localPath)
{
if (!File.Exists(localPath)) return true;
@ -236,7 +276,7 @@ namespace Hopetry.Services
!VerifyETag(localPath, remoteMeta.ETag);
}
// 获取远程对象元数据‌:ml-citation{ref="1" data="citationList"}
// 获取远程对象元数据‌
public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket,
string objectName)
{
@ -245,7 +285,7 @@ namespace Hopetry.Services
return (stat.Size, stat.LastModified, stat.ETag);
}
// 校验本地文件 ETagMinIO 使用 MD5:ml-citation{ref="1,7" data="citationList"}
// 校验本地文件 ETagMinIO 使用 MD5
public bool VerifyETag(string filePath, string remoteETag)
{
using var md5 = MD5.Create();
@ -254,48 +294,38 @@ namespace Hopetry.Services
return remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase);
}
public async Task MirrorAsync(string bucket, string localDir, int maxParallel = 8)
// 实时监听方法
public async Task RealTimeListen(string bucketName, string localDir, string prefix = "", string suffix = "")
{
var objects =
_minioClient.ListObjectsEnumAsync(new ListObjectsArgs().WithBucket(bucket).WithRecursive(true));
var queue = new ConcurrentQueue<(string ObjectName, string LocalPath)>();
// 差异检测阶段‌:ml-citation{ref="1,4" data="citationList"}
await foreach (var item in objects)
try
{
if (item.IsDir) continue;
var events = new List<EventType> { EventType.ObjectCreatedAll };
var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar));
Directory.CreateDirectory(Path.GetDirectoryName(localPath));
ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(events);
if (await NeedSyncAsync(bucket, item.Key, localPath))
queue.Enqueue((item.Key, localPath));
IObservable<MinioNotificationRaw> observable = _minioClient.ListenBucketNotificationsAsync(args);
IDisposable subscription = observable.Subscribe(
async notification =>
{
Console.WriteLine($"Received notification: {notification.Json}");
var obj = JObject.Parse(notification.Json);
// s3:ObjectCreated:Put
var eventType = obj["Records"]?[0]?["eventName"]?.Value<string>();
var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value<long>();
var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value<string>();
var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value<string>();
await DownLoadObject(bucketName, objectKey, localDir, objectETag);
},
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));
}
catch (MinioException e)
{
Console.WriteLine("Error occurred: " + e);
}
// 并行下载阶段‌:ml-citation{ref="6" data="citationList"}
await Parallel.ForEachAsync(queue, new ParallelOptions { MaxDegreeOfParallelism = maxParallel },
async (item, _) =>
{
var (objectName, localPath) = item;
var tempFile = localPath + ".tmp";
try
{
await _minioClient.GetObjectAsync(
new GetObjectArgs()
.WithBucket(bucket)
.WithObject(objectName)
.WithFile(tempFile));
File.Move(tempFile, localPath,
overwrite: true); // 原子替换‌:ml-citation{ref="1" data="citationList"}
File.SetLastWriteTimeUtc(localPath, DateTime.UtcNow);
}
finally
{
if (File.Exists(tempFile)) File.Delete(tempFile);
}
});
}
}
}