1. 多线程下载增加默认值

2. 实时监听下载优化支持大数量文件下载
dev
陈伟 2025-03-27 10:22:59 +08:00
parent 4ed33ee6e7
commit 774f57d116
4 changed files with 43 additions and 13 deletions

View File

@ -40,8 +40,12 @@ namespace HeBianGu.App.Disk
if (File.Exists("./settings.xml"))
{
var setting = xmlSerializerService.Load<SystemSetting>("./settings.xml");
if (!string.IsNullOrEmpty(setting.SyncDir) && setting.TaskCount != null)
if (!string.IsNullOrEmpty(setting.SyncDir))
{
if (setting.TaskCount == 0 )
{
setting.TaskCount = 3;
}
minioService.MirrorAsync1(bucketName, setting.SyncDir, setting.TaskCount);
minioService.RealTimeListen(bucketName, setting.SyncDir);
}

View File

@ -87,7 +87,9 @@
<ItemGroup>
<None Remove="global.json" />
<EditorConfigFiles Include="global.json" />
<Content Include="global.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup>
</Project>

View File

@ -98,13 +98,13 @@ namespace Hopetry.Services
/// <param name="maxParallel"></param>
public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 5)
{
Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>(
new BoundedChannelOptions(10000)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
var count = 0;
var producerTasks = new List<Task>();
var listArgs = new ListObjectsArgs()
@ -314,10 +314,24 @@ namespace Hopetry.Services
// 实时监听方法
public async Task RealTimeListen(string bucketName, string localDir, string prefix = "", string suffix = "")
{
var downloadQueue =
Channel.CreateBounded<(string bucketName, string objectKey, string localDir, string objectETag)>(
new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
Task.Run(async () =>
{
await Parallel.ForEachAsync(downloadQueue.Reader.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = 5 },
async (item, _) =>
{
await DownLoadObject(item.bucketName, item.objectKey, item.localDir, item.objectETag);
});
});
try
{
var events = new List<EventType> { EventType.ObjectCreatedAll };
ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs()
.WithBucket(bucketName)
.WithEvents(events);
@ -333,8 +347,9 @@ namespace Hopetry.Services
var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value<long>();
var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value<string>();
var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value<string>();
await DownLoadObject(bucketName, objectKey, localDir, objectETag);
Console.WriteLine("将要下载的Key:" + objectKey);
downloadQueue.Writer.TryWrite((bucketName, objectKey, localDir, objectETag));
//await DownLoadObject(bucketName, objectKey, localDir, objectETag);
},
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine($"Stopped listening for bucket notifications\n"));

View File

@ -1,3 +1,4 @@
/*
{
"Minio": {
"Endpoint": "123.132.248.154:9107",
@ -5,4 +6,12 @@
"SecretKey": "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u",
"BucketName": "demo"
}
}*/
{
"Minio": {
"Endpoint": "192.168.10.163:9016",
"AccessKey": "I2c35jD6ayApaneyQZyC",
"SecretKey": "XHlrNeCHK0xf8y2Fo0K5OKyDeaI2ItfEsFbzQPFk",
"BucketName": "demo"
}
}