diff --git a/App.xaml b/App.xaml index 05fa19d..b8b83f0 100644 --- a/App.xaml +++ b/App.xaml @@ -2,10 +2,12 @@ xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:h="https://github.com/HeBianGu" - xmlns:local="clr-namespace:HeBianGu.App.Disk"> + xmlns:local="clr-namespace:HeBianGu.App.Disk" + xmlns:provider="clr-namespace:Hopetry.Provider"> + diff --git a/App.xaml.cs b/App.xaml.cs index 3bd1d47..b2ad047 100644 --- a/App.xaml.cs +++ b/App.xaml.cs @@ -8,6 +8,7 @@ using HeBianGu.Service.Mvp; using HeBianGu.Systems.About; using HeBianGu.Systems.Identity; using HeBianGu.Systems.Setting; +using Hopetry.Services; namespace HeBianGu.App.Disk { @@ -18,14 +19,20 @@ namespace HeBianGu.App.Disk { protected override MainWindowBase CreateMainWindow(StartupEventArgs e) { - // 主页面 return new ShellWindow(); } protected override void ConfigureServices(IServiceCollection services) { base.ConfigureServices(services); - + // 本地持久化 + services.AddXmlSerialize(); + // minio 服务 + services.AddSingleton(); + var minioService = services.GetService(); + var bucketName = minioService._bucketName; + minioService.MirrorAsync1(bucketName, "d:/test"); + minioService.RealTimeListen(bucketName, "d:/test"); services.AddStart(x => { x.ProductFontSize = 90; @@ -44,6 +51,7 @@ namespace HeBianGu.App.Disk // services.AddIdentity(x => x.ProductFontSize = 50); services.AddMvc(); //services.AddProjectDefault(); + services.AddXmlWebSerializerService(); //// Do :注册软件更新页面 /* services.AddAutoUpgrade(x => @@ -52,9 +60,9 @@ namespace HeBianGu.App.Disk x.UseIEDownload = true; });*/ services.AddSettingPath(); - services.AddXmlSerialize(); services.AddDESCryptService(); services.AddPrintBoxMessage(); + // ??? services.AddWindowExplorer(); #region - More - @@ -83,7 +91,9 @@ namespace HeBianGu.App.Disk #region - WindowCaption - + // services.AddLoginViewPresenter(); + // 引导功能按钮 services.AddGuideViewPresenter(); services.AddHideWindowViewPresenter(); services.AddSettingViewPrenter(); @@ -112,10 +122,10 @@ namespace HeBianGu.App.Disk { base.Configure(app); app.UseStyle(); - // Do:设置默认主题 app.UseLocalTheme(l => { + // 主题色 l.AccentColor = (Color)ColorConverter.ConvertFromString("#FF0093FF"); l.DefaultFontSize = 13D; l.FontSize = FontSize.Normal; diff --git a/Controller/SyncController.cs b/Controller/SyncController.cs new file mode 100644 index 0000000..833090e --- /dev/null +++ b/Controller/SyncController.cs @@ -0,0 +1,15 @@ +using HeBianGu.App.Disk.ViewModel.Sync; +using HeBianGu.Base.WpfBase; +using HeBianGu.Service.Mvc; + +namespace HeBianGu.App.Disk; + +// control 特性名好像绑定了 xaml +[Controller("Sync")] +public class SyncController : Controller +{ + public async Task Sync() + { + return await ViewAsync(); + } +} \ No newline at end of file diff --git a/Hopetry.csproj b/Hopetry.csproj index 51152c1..262e6f0 100644 --- a/Hopetry.csproj +++ b/Hopetry.csproj @@ -36,6 +36,7 @@ + @@ -55,6 +56,7 @@ + @@ -65,4 +67,12 @@ + + + MSBuild:Compile + Wpf + Designer + + + diff --git a/Provider/DataSourceLocator.cs b/Provider/DataSourceLocator.cs new file mode 100644 index 0000000..4f6b73c --- /dev/null +++ b/Provider/DataSourceLocator.cs @@ -0,0 +1,15 @@ +using HeBianGu.App.Disk.ViewModel.Sync; +using HeBianGu.Base.WpfBase; + +namespace Hopetry.Provider +{ + internal class DataSourceLocator + { + public DataSourceLocator() + { + ServiceRegistry.Instance.Register(); + } + + public SyncViewModel SyncViewModel => ServiceRegistry.Instance.GetInstance(); + } +} \ No newline at end of file diff --git a/Services/MinioService.cs b/Services/MinioService.cs index f893804..a295a6e 100644 --- a/Services/MinioService.cs +++ b/Services/MinioService.cs @@ -1,15 +1,14 @@ using System.Collections.Concurrent; +using System.Globalization; using System.IO; using System.Security.Cryptography; using System.Threading.Channels; -using FileUploader.Models; using Microsoft.Extensions.Configuration; using Minio; -using Minio.ApiEndpoints; -using Minio.DataModel; using Minio.DataModel.Args; using Minio.DataModel.Notification; using Minio.Exceptions; +using Newtonsoft.Json.Linq; using Polly; using Polly.Retry; @@ -17,10 +16,31 @@ namespace Hopetry.Services { public partial class MinioService { - private readonly IMinioClient _minioClient; + private IMinioClient _minioClient; - private readonly string _bucketName = null; + public readonly string _bucketName = null; + public MinioService() + { + + var builder = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("global.json", optional: false, reloadOnChange: true); + // 构建配置 + var config = builder.Build(); + var minioConfig = config.GetSection("Minio"); + _minioClient = new MinioClient() + .WithEndpoint(minioConfig["Endpoint"]) + .WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]).Build(); + _bucketName = minioConfig["BucketName"]!; + + /*_minioClient = new MinioClient() + .WithEndpoint("123.132.248.154:9107") + .WithCredentials("oZNgo25pNXnKFV9oKGh4", "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u") + .Build();*/ + + EnsureBucketExistsAsync(_bucketName).Wait(); + } AsyncRetryPolicy policy = Policy .Handle() @@ -33,7 +53,7 @@ namespace Hopetry.Services }); // 使用 Channel 实现生产者-消费者模式 - private static readonly Channel<(string ObjectName, string LocalPath)> _syncChannel = + private static readonly Channel<(string ObjectName, string ETag)> SyncChannel = Channel.CreateBounded<(string, string)>(new BoundedChannelOptions(10000) { SingleWriter = false, @@ -41,25 +61,66 @@ namespace Hopetry.Services FullMode = BoundedChannelFullMode.Wait }); + public async Task ListObject(string bucketName) + { + var listArgs = new ListObjectsArgs() + .WithBucket(bucketName) + .WithVersions(true) + .WithRecursive(true); + + try + { + var observable = _minioClient.ListObjectsEnumAsync(listArgs); + //observable.ConfigureAwait(false); + await foreach (var item in observable) + { + Console.WriteLine("=============="); + Console.WriteLine(item.Key); + //Console.WriteLine(item.IsDir); + Console.WriteLine(item.LastModified); + Console.WriteLine(item.Size); + Console.WriteLine(item.ETag); + Console.WriteLine(item.VersionId); + Console.WriteLine("=============="); + } + } + catch (Exception e) + { + Console.WriteLine(e); + Console.WriteLine("我抛出的"); + } + } + /// /// 同步下载 todo 需要合并另外一个同步方法 /// /// /// /// - public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 16) + public async Task MirrorAsync1(string bucket, string localDir, int maxParallel = 5) { + var count = 0; var producerTasks = new List(); var listArgs = new ListObjectsArgs() .WithBucket(bucket) .WithRecursive(true); - // 预先生成本地文件索引(大幅减少 File.Exists 调用) 基于字典 var localFileIndex = await BuildLocalFileIndex(localDir); - var x = policy.ExecuteAsync( - () => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs))); - await foreach (var item in x.Result) + //var x = policy.ExecuteAsync(() => Task.FromResult(_minioClient.ListObjectsEnumAsync(listArgs))); + var observable = _minioClient.ListObjectsEnumAsync(listArgs); + + await foreach (var item in observable) { + var index = item.Key.LastIndexOf("/", StringComparison.Ordinal); + if (index > 0) + { + var dir = Path.Combine(localDir, item.Key.Substring(0, index)); + if (!Directory.Exists(dir)) + { + Directory.CreateDirectory(dir); + } + } + producerTasks.Add(Task.Run(async () => { var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar)); @@ -67,44 +128,39 @@ namespace Hopetry.Services if (!localFileIndex.TryGetValue(localPath, out var localMeta)) { // 如果不存在,则加入下载队列 - await _syncChannel.Writer.WriteAsync((item.Key, localPath)); + await SyncChannel.Writer.WriteAsync((item.Key, item.ETag)); } - - // 并行获取远程元数据(避免串行等待) - var remoteMetaTask = GetObjectMetadata(bucket, item.Key); - // 对比本地缓存元数据 - if ((ulong)localMeta.Size != item.Size || localMeta.LastModified.CompareTo(item.LastModified) < 0) + else { - var remoteMeta = await remoteMetaTask; - if (!VerifyETag(localPath, remoteMeta.ETag)) + DateTime itemLastModified = + DateTime.Parse(item.LastModified, null, DateTimeStyles.RoundtripKind); + if ((ulong)localMeta.Size != item.Size || localMeta.LastModified < itemLastModified || + !VerifyETag(localPath, item.ETag)) { - await _syncChannel.Writer.WriteAsync((item.Key, localPath)); + await SyncChannel.Writer.WriteAsync((item.Key, item.ETag)); } } })); } + await Task.WhenAll(producerTasks); + SyncChannel.Writer.Complete(); // 关键步骤! // 阶段2: 并行消费下载任务 var consumerTask = Task.Run(async () => { - await Parallel.ForEachAsync(_syncChannel.Reader.ReadAllAsync(), + await Parallel.ForEachAsync(SyncChannel.Reader.ReadAllAsync(), new ParallelOptions { MaxDegreeOfParallelism = maxParallel }, async (item, _) => { - // 经分析 SDK 内部已经做了临时文件处理,不必再画蛇添足 - await _minioClient.GetObjectAsync( - new GetObjectArgs() - .WithBucket(bucket) - .WithObject(item.ObjectName) - .WithFile(item.LocalPath)); - // todo 可能需要添加文件校验 + count++; + await DownLoadObject(bucket, item.ObjectName, localDir, item.ETag); } - ); + ); }); - await Task.WhenAll(producerTasks); - _syncChannel.Writer.Complete(); // 关键步骤! + // 等待消费完成 await consumerTask; + Console.WriteLine($"{count}个文件下载完成"); } /// @@ -118,31 +174,24 @@ namespace Hopetry.Services var index = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); await Parallel.ForEachAsync(Directory.EnumerateFiles(rootDir, "*", SearchOption.AllDirectories), - async (file, _) => + async (path, _) => { - var info = new FileInfo(file); - index.TryAdd(file, (info.Length, info.LastWriteTimeUtc)); + var info = new FileInfo(path); + index.TryAdd(path, (info.Length, info.LastWriteTimeUtc)); }); return index; } - public MinioService(IConfiguration config) - { - var minioConfig = config.GetSection("Minio"); - _minioClient = new MinioClient() - .WithEndpoint(minioConfig["Endpoint"]) - .WithCredentials(minioConfig["AccessKey"], minioConfig["SecretKey"]); - _bucketName = minioConfig["BucketName"]!; - EnsureBucketExistsAsync().Wait(); - } - private async Task EnsureBucketExistsAsync() + public async Task EnsureBucketExistsAsync(string bucketName) { - var existsArgs = new BucketExistsArgs().WithBucket(_bucketName); + var existsArgs = new BucketExistsArgs().WithBucket(bucketName); + var x = await _minioClient.BucketExistsAsync(existsArgs); + Console.WriteLine($" {bucketName} exist status: " + x); // 如果存储桶不存在,则创建存储桶 - if (!await _minioClient.BucketExistsAsync(existsArgs)) + if (!x) { - var makeArgs = new MakeBucketArgs().WithBucket(_bucketName); + var makeArgs = new MakeBucketArgs().WithBucket(bucketName); await _minioClient.MakeBucketAsync(makeArgs); } } @@ -151,7 +200,7 @@ namespace Hopetry.Services /// 上伟文件 /// /// - public async Task UploadFileAsync(FileRecord fileRecord) + /*public async Task UploadFileAsync(FileRecord fileRecord) { var putArgs = new PutObjectArgs() .WithBucket(_bucketName) @@ -160,70 +209,73 @@ namespace Hopetry.Services // application/zip .WithContentType("application/octet-stream"); await _minioClient.PutObjectAsync(putArgs); - } + }*/ /// /// 列出存储桶内所有文件 /// /// - public async Task ListAllObject(string bucket) + public async Task ListAllObject() { - try + // Just list of objects + // Check whether 'mybucket' exists or not. + var existsArgs = new BucketExistsArgs().WithBucket(_bucketName); + bool found = await _minioClient.BucketExistsAsync(existsArgs); + if (found) { - // Just list of objects - // Check whether 'mybucket' exists or not. - var existsArgs = new BucketExistsArgs().WithBucket(_bucketName); - bool found = await _minioClient.BucketExistsAsync(existsArgs); - if (found) - { - // List objects from 'my-bucketname' - ListObjectsArgs args = new ListObjectsArgs() - .WithBucket("mybucket") - .WithPrefix("prefix") - .WithRecursive(true); - // ListObjectsEnumAsync 新方法 - IObservable observable = _minioClient.ListObjectsAsync(args); - IDisposable subscription = observable.Subscribe( - item => Console.WriteLine("OnNext: {0}", item.Key), - ex => Console.WriteLine("OnError: {0}", ex.Message), - () => Console.WriteLine("OnComplete: {0}")); - } - else - { - Console.WriteLine("mybucket does not exist"); - } + // List objects from 'my-bucketname' + ListObjectsArgs args = new ListObjectsArgs() + .WithBucket(_bucketName) + .WithRecursive(false); + // ListObjectsEnumAsync 新方法 + var observable = _minioClient.ListObjectsEnumAsync(args); + + var x = observable.GetAsyncEnumerator(); + await x.MoveNextAsync(); + Console.WriteLine(x.Current.Key); + Console.WriteLine(x.Current.Size); + Console.WriteLine(x.Current.ETag); + Console.WriteLine(x.Current.LastModified); + Console.WriteLine(x.Current.IsDir); + /*IDisposable subscription = observable.Subscribe( + item => Console.WriteLine("OnNext: {0}", item.Key), + ex => Console.WriteLine("OnError: {0}", ex.Message), + () => Console.WriteLine("OnComplete: {0}"));*/ } - catch (MinioException e) + else { - Console.WriteLine("Error occurred: " + e); + Console.WriteLine("mybucket does not exist"); } } - public async Task ListenBucket(string bucketName, string prefix, string suffix) + // todo 下载 + public async Task DownLoadObject(string bucketName, string objectKey, string localDir, string objectETag) { - try + var index = objectKey.LastIndexOf("/", StringComparison.Ordinal); + if (index > 0) { - var events = new List { EventType.ObjectCreatedAll }; - - ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs() - .WithBucket(bucketName) - .WithEvents(events) - .WithPrefix(prefix) - .WithSuffix(suffix); - IObservable observable = _minioClient.ListenBucketNotificationsAsync(args); - - IDisposable subscription = observable.Subscribe( - notification => Console.WriteLine($"Notification: {notification.Json}"), - ex => Console.WriteLine($"OnError: {ex}"), - () => Console.WriteLine($"Stopped listening for bucket notifications\n")); + var dir = Path.Combine(localDir, objectKey.Substring(0, index)); + if (!Directory.Exists(dir)) + { + Directory.CreateDirectory(dir); + } } - catch (MinioException e) + + var localPath = Path.Combine(localDir, objectKey.Replace('/', Path.DirectorySeparatorChar)); + var getArgs = new GetObjectArgs() + .WithBucket(string.IsNullOrEmpty(bucketName) ? _bucketName : bucketName) + .WithObject(objectKey) + .WithFile(localPath); + await _minioClient.GetObjectAsync(getArgs); + if (VerifyETag(localPath, objectETag)) { - Console.WriteLine("Error occurred: " + e); + // todo 先忽略处理 } + + Console.WriteLine($"{objectKey} Download complete"); } - // 差异检测:通过 ETag 和修改时间对比‌:ml-citation{ref="1,4" data="citationList"} + // 差异检测:通过 ETag 和修改时间对比‌ private async Task NeedSyncAsync(string bucket, string objectName, string localPath) { if (!File.Exists(localPath)) return true; @@ -236,7 +288,7 @@ namespace Hopetry.Services !VerifyETag(localPath, remoteMeta.ETag); } - // 获取远程对象元数据‌:ml-citation{ref="1" data="citationList"} + // 获取远程对象元数据‌ public async Task<(long Size, DateTime LastModified, string ETag)> GetObjectMetadata(string bucket, string objectName) { @@ -245,7 +297,7 @@ namespace Hopetry.Services return (stat.Size, stat.LastModified, stat.ETag); } - // 校验本地文件 ETag(MinIO 使用 MD5)‌:ml-citation{ref="1,7" data="citationList"} + // 校验本地文件 ETag(MinIO 使用 MD5) public bool VerifyETag(string filePath, string remoteETag) { using var md5 = MD5.Create(); @@ -254,48 +306,38 @@ namespace Hopetry.Services return remoteETag.Trim('"').Equals(localHash, StringComparison.OrdinalIgnoreCase); } - public async Task MirrorAsync(string bucket, string localDir, int maxParallel = 8) + // 实时监听方法 + public async Task RealTimeListen(string bucketName, string localDir, string prefix = "", string suffix = "") { - var objects = - _minioClient.ListObjectsEnumAsync(new ListObjectsArgs().WithBucket(bucket).WithRecursive(true)); - var queue = new ConcurrentQueue<(string ObjectName, string LocalPath)>(); - - // 差异检测阶段‌:ml-citation{ref="1,4" data="citationList"} - await foreach (var item in objects) + try { - if (item.IsDir) continue; + var events = new List { EventType.ObjectCreatedAll }; - var localPath = Path.Combine(localDir, item.Key.Replace('/', Path.DirectorySeparatorChar)); - Directory.CreateDirectory(Path.GetDirectoryName(localPath)); + ListenBucketNotificationsArgs args = new ListenBucketNotificationsArgs() + .WithBucket(bucketName) + .WithEvents(events); - if (await NeedSyncAsync(bucket, item.Key, localPath)) - queue.Enqueue((item.Key, localPath)); + IObservable observable = _minioClient.ListenBucketNotificationsAsync(args); + IDisposable subscription = observable.Subscribe( + async notification => + { + Console.WriteLine($"Received notification: {notification.Json}"); + var obj = JObject.Parse(notification.Json); + // s3:ObjectCreated:Put + var eventType = obj["Records"]?[0]?["eventName"]?.Value(); + var objectSize = obj["Records"]?[0]?["s3"]?["object"]?["size"]?.Value(); + var objectKey = obj["Records"]?[0]?["s3"]?["object"]?["key"]?.Value(); + var objectETag = obj["Records"]?[0]?["s3"]?["object"]?["eTag"]?.Value(); + + await DownLoadObject(bucketName, objectKey, localDir, objectETag); + }, + ex => Console.WriteLine($"OnError: {ex}"), + () => Console.WriteLine($"Stopped listening for bucket notifications\n")); + } + catch (MinioException e) + { + Console.WriteLine("Error occurred: " + e); } - - // 并行下载阶段‌:ml-citation{ref="6" data="citationList"} - await Parallel.ForEachAsync(queue, new ParallelOptions { MaxDegreeOfParallelism = maxParallel }, - async (item, _) => - { - var (objectName, localPath) = item; - var tempFile = localPath + ".tmp"; - - try - { - await _minioClient.GetObjectAsync( - new GetObjectArgs() - .WithBucket(bucket) - .WithObject(objectName) - .WithFile(tempFile)); - - File.Move(tempFile, localPath, - overwrite: true); // 原子替换‌:ml-citation{ref="1" data="citationList"} - File.SetLastWriteTimeUtc(localPath, DateTime.UtcNow); - } - finally - { - if (File.Exists(tempFile)) File.Delete(tempFile); - } - }); } } } \ No newline at end of file diff --git a/View/ShellWindow.xaml b/View/ShellWindow.xaml index 16f7905..114dad3 100644 --- a/View/ShellWindow.xaml +++ b/View/ShellWindow.xaml @@ -31,5 +31,6 @@ DisplayName="网盘空间" Logo="{x:Null}" /> + diff --git a/View/ShellWindow.xaml.cs b/View/ShellWindow.xaml.cs index 792544b..1a24146 100644 --- a/View/ShellWindow.xaml.cs +++ b/View/ShellWindow.xaml.cs @@ -1,13 +1,17 @@ -namespace HeBianGu.App.Disk +using Hopetry.Services; + +namespace HeBianGu.App.Disk { /// /// MainWindow.xaml 的交互逻辑 /// public partial class ShellWindow { + private readonly MinioService _minioService; + public ShellWindow() { InitializeComponent(); } } -} +} \ No newline at end of file diff --git a/View/Sync/SyncControl.xaml b/View/Sync/SyncControl.xaml new file mode 100644 index 0000000..b191743 --- /dev/null +++ b/View/Sync/SyncControl.xaml @@ -0,0 +1,100 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + D:\迅雷云盘 + + + + + 1 + 2 + 3 + 4 + 5 + + + + + + + \ No newline at end of file diff --git a/View/Sync/SyncControl.xaml.cs b/View/Sync/SyncControl.xaml.cs new file mode 100644 index 0000000..d3afc30 --- /dev/null +++ b/View/Sync/SyncControl.xaml.cs @@ -0,0 +1,28 @@ +using System.Windows; +using System.Windows.Controls; +using Hopetry.Provider; + +namespace Hopetry.View.Sync; + +public partial class SyncControl : UserControl +{ + public SyncControl() + { + InitializeComponent(); + } + + public event RoutedEventHandler OKClicked; + public event RoutedEventHandler CancelClicked; + + + + private void btnOK_Click(object sender, RoutedEventArgs e) + { + OKClicked?.Invoke(this, e); + } + + private void btnCancel_Click(object sender, RoutedEventArgs e) + { + CancelClicked?.Invoke(this, e); + } +} \ No newline at end of file diff --git a/ViewModel/Sync/SyncViewModel.cs b/ViewModel/Sync/SyncViewModel.cs new file mode 100644 index 0000000..7254f80 --- /dev/null +++ b/ViewModel/Sync/SyncViewModel.cs @@ -0,0 +1,36 @@ +using System.Windows; +using System.Windows.Threading; +using HeBianGu.Service.Mvc; +using Hopetry.Provider; + +namespace HeBianGu.App.Disk.ViewModel.Sync; + +[ViewModel("Sync")] +public class SyncViewModel : MvcViewModelBase +{ + protected override void Init() + { + /*LinkActions.Add(new LinkAction() { Action = "Space", Controller = "Loyout", DisplayName = "会话", Logo = "\xe613" }); + + Application.Current.Dispatcher.BeginInvoke(DispatcherPriority.Loaded, new Action(() => + { + SelectLink = LinkActions[0]; + }));*/ + } + + protected override void Loaded(string args) + { + } + public string _syncDir; + + public string SyncDir + { + get => _syncDir; + set + { + _syncDir = value; + RaisePropertyChanged(); + } + } + +} \ No newline at end of file diff --git a/global.json b/global.json index 6e8a1cd..07f3ce9 100644 --- a/global.json +++ b/global.json @@ -1,8 +1,8 @@ { "Minio": { - "Endpoint": "192.168.10.141:9000", - "AccessKey": "kr4Lr0v01uXVDP08PDUn", - "SecretKey": "mOtYBfIg9UwDVLI8HnXofemYdZfe7A11fUTEf4La", - "BucketName": "test" + "Endpoint": "123.132.248.154:9107", + "AccessKey": "oZNgo25pNXnKFV9oKGh4", + "SecretKey": "66GYn0x1XAEInSa9wdCutzvUWKfhH1EhqxPJ6a9u", + "BucketName": "demo" } } \ No newline at end of file