Compare commits

..

No commits in common. "2cf6d612739ee072643c6ba82f6a2dccdc865a3d" and "821658b06aa802bbfa957c61ec0633e35a3f498e" have entirely different histories.

4 changed files with 13 additions and 43 deletions

View File

@ -40,12 +40,8 @@ namespace HeBianGu.App.Disk
if (File.Exists("./settings.xml")) if (File.Exists("./settings.xml"))
{ {
var setting = xmlSerializerService.Load<SystemSetting>("./settings.xml"); var setting = xmlSerializerService.Load<SystemSetting>("./settings.xml");
if (!string.IsNullOrEmpty(setting.SyncDir)) if (!string.IsNullOrEmpty(setting.SyncDir) && setting.TaskCount != null)
{ {
if (setting.TaskCount == 0 )
{
setting.TaskCount = 3;
}
minioService.MirrorAsync1(bucketName, setting.SyncDir, setting.TaskCount); minioService.MirrorAsync1(bucketName, setting.SyncDir, setting.TaskCount);
minioService.RealTimeListen(bucketName, setting.SyncDir); minioService.RealTimeListen(bucketName, setting.SyncDir);
} }

View File

@ -87,9 +87,7 @@
<ItemGroup> <ItemGroup>
<None Remove="global.json" /> <None Remove="global.json" />
<Content Include="global.json"> <EditorConfigFiles Include="global.json" />
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -98,13 +98,13 @@ namespace Hopetry.Services
/// <param name="maxParallel"></param> /// <param name="maxParallel"></param>
public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 5) public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 5)
{ {
Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>(
new BoundedChannelOptions(10000) Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000)
{ {
SingleWriter = false, SingleWriter = false,
SingleReader = false, SingleReader = false,
FullMode = BoundedChannelFullMode.Wait FullMode = BoundedChannelFullMode.Wait
}); });
var count = 0; var count = 0;
var producerTasks = new List<Task>(); var producerTasks = new List<Task>();
var listArgs = new ListObjectsArgs() var listArgs = new ListObjectsArgs()
@ -314,24 +314,10 @@ namespace Hopetry.Services
// 实时监听方法 // 实时监听方法
public async Task RealTimeListen(string bucketName, string localDir, string prefix = "", string suffix = "") public async Task RealTimeListen(string bucketName, string localDir, string prefix = "", string suffix = "")
{ {
var downloadQueue =
Channel.CreateBounded<(string bucketName, string objectKey, string localDir, string objectETag)>(
new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
Task.Run(async () =>
{
await Parallel.ForEachAsync(downloadQueue.Reader.ReadAllAsync(),
new ParallelOptions { MaxDegreeOfParallelism = 5 },
async (item, _) =>
{
await DownLoadObject(item.bucketName, item.objectKey, item.localDir, item.objectETag);
});
});
try try
{ {
var events = new List<EventType> { EventType.ObjectCreatedAll }; var events = new List<EventType> { EventType.ObjectCreatedAll };
ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs() ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs()
.WithBucket(bucketName) .WithBucket(bucketName)
.WithEvents(events); .WithEvents(events);
@ -347,9 +333,8 @@ namespace Hopetry.Services
var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value<long>(); var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value<long>();
var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value<string>(); var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value<string>();
var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value<string>(); var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value<string>();
Console.WriteLine("将要下载的Key:" + objectKey);
downloadQueue.Writer.TryWrite((bucketName, objectKey, localDir, objectETag)); await DownLoadObject(bucketName, objectKey, localDir, objectETag);
//await DownLoadObject(bucketName, objectKey, localDir, objectETag);
}, },
ex => Console.WriteLine($"OnError: {ex}"), ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine($"Stopped listening for bucket notifications\n")); () => Console.WriteLine($"Stopped listening for bucket notifications\n"));

View File

@ -1,4 +1,3 @@
/*
{ {
"Minio": { "Minio": {
"Endpoint": "123.132.248.154:9107", "Endpoint": "123.132.248.154:9107",
@ -6,12 +5,4 @@
"SecretKey": "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u", "SecretKey": "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u",
"BucketName": "demo" "BucketName": "demo"
} }
}*/
{
"Minio": {
"Endpoint": "192.168.10.163:9016",
"AccessKey": "I2c35jD6ayApaneyQZyC",
"SecretKey": "XHlrNeCHK0xf8y2Fo0K5OKyDeaI2ItfEsFbzQPFk",
"BucketName": "demo"
}
} }