using Minio; using Minio.DataModel.Args; using System; using System.Collections.Concurrent; using System.Diagnostics; using System.IO; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Reflection; using System.Threading; using System.Threading.Tasks; namespace Hopetry.Provider { #region 原先代码 //public class ThrottledHttpClientHandler : HttpClientHandler //{ // private readonly long _maxBytesPerSecond; // private readonly SemaphoreSlim _throttleSemaphore = new SemaphoreSlim(1, 1); // private long _bytesTransferred; // private DateTime _lastCheckTime = DateTime.UtcNow; // public ThrottledHttpClientHandler(long maxBytesPerSecond) // { // _maxBytesPerSecond = maxBytesPerSecond; // // 忽略证书错误(仅测试环境使用) // ServerCertificateCustomValidationCallback = // (message, cert, chain, errors) => true; // } // protected override async Task SendAsync( // HttpRequestMessage request, // CancellationToken cancellationToken) // { // try // { // // 添加请求日志 // Debug.WriteLine($"Sending request: {request.Method} {request.RequestUri}"); // // 仅对分片数据上传请求限速(修改点:直接限速请求内容,而非响应) // if (IsUploadPartRequest(request) && request.Content != null) // { // request.Content = new ThrottledHttpContent(request.Content, _maxBytesPerSecond); // } // var response = await base.SendAsync(request, cancellationToken).ConfigureAwait(false); // // 调试:检查初始化请求的响应 // if (request.Method == HttpMethod.Post && request.RequestUri?.Query?.Contains("uploads=") == true) // { // var rawResponse = response.Content != null // ? await response.Content.ReadAsStringAsync().ConfigureAwait(false) // : "No content in response"; // Debug.WriteLine($"初始化请求响应: {rawResponse}"); // } // return response; // } // catch (Exception ex) // { // Debug.WriteLine($"Request failed: {ex}"); // throw; // } // } // private bool IsUploadPartRequest(HttpRequestMessage request) // { // return request.Method == HttpMethod.Put && // request.RequestUri != null && // request.RequestUri.Query.Contains("uploadId=") && // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream"; // } //} //public class ThrottledHttpContent : HttpContent //{ // private readonly HttpContent _innerContent; // private readonly long _maxBytesPerSecond; // private readonly Stopwatch _stopwatch = Stopwatch.StartNew(); // private long _totalBytesRead; // private long? _contentLength; // public ThrottledHttpContent(HttpContent innerContent, long maxBytesPerSecond) // { // _innerContent = innerContent ?? throw new ArgumentNullException(nameof(innerContent)); // _maxBytesPerSecond = maxBytesPerSecond; // foreach (var header in _innerContent.Headers) // { // Headers.TryAddWithoutValidation(header.Key, header.Value); // } // } // // 覆盖同步方法,强制调用异步版本(避免直接抛出异常) // protected override void SerializeToStream(Stream stream, TransportContext context, CancellationToken cancellationToken) // { // // 同步调用时,直接运行异步方法并阻塞等待(不推荐,但可以避免异常) // SerializeToStreamAsync(stream, context, cancellationToken).GetAwaiter().GetResult(); // } // protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) // { // return SerializeToStreamAsync(stream, context, CancellationToken.None); // } // protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken) // { // var buffer = new byte[81920]; // 80KB buffer // using (var contentStream = await _innerContent.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false)) // { // while (true) // { // var bytesRead = await contentStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); // if (bytesRead == 0) break; // await stream.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); // _totalBytesRead += bytesRead; // var elapsedSeconds = _stopwatch.Elapsed.TotalSeconds; // var targetSeconds = _totalBytesRead / (double)_maxBytesPerSecond; // if (elapsedSeconds < targetSeconds) // { // var delayMilliseconds = (int)((targetSeconds - elapsedSeconds) * 1000); // await Task.Delay(delayMilliseconds, cancellationToken).ConfigureAwait(false); // } // } // } // } // protected override bool TryComputeLength(out long length) // { // if (_contentLength.HasValue) // { // length = _contentLength.Value; // return true; // } // length = 0; // return false; // } // protected override void Dispose(bool disposing) // { // if (disposing) // { // _innerContent?.Dispose(); // _stopwatch.Stop(); // } // base.Dispose(disposing); // } //} #endregion #region 新代码 //public class ThrottledHttpClientHandler : HttpClientHandler //{ // private readonly int _maxBytesPerSecond; // public ThrottledHttpClientHandler(int maxBytesPerSecond = 1024 * 50) // 默认50KB/s // { // _maxBytesPerSecond = maxBytesPerSecond; // } // protected override async Task SendAsync( // HttpRequestMessage request, // CancellationToken cancellationToken) // { // var originalContent = request.Content; // if (IsUploadPartRequest(request) && request.Content != null) // { // request.Content = new ThrottledStreamContent(originalContent, _maxBytesPerSecond, cancellationToken); // } // try // { // return await base.SendAsync(request, cancellationToken); // } // finally // { // request.Content = originalContent; // } // } // private bool IsUploadPartRequest(HttpRequestMessage request) // { // return request.Method == HttpMethod.Put && // request.RequestUri != null && // request.RequestUri.Query.Contains("uploadId=") && // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream"; // } // private class ThrottledStreamContent : HttpContent // { // private readonly HttpContent _originalContent; // private readonly int _maxBytesPerSecond; // private long _bytesSent; // private readonly Stopwatch _stopwatch; // private readonly CancellationToken _cancellationToken; // public ThrottledStreamContent(HttpContent originalContent, int maxBytesPerSecond, CancellationToken cancellationToken) // { // _originalContent = originalContent; // _maxBytesPerSecond = maxBytesPerSecond; // _stopwatch = Stopwatch.StartNew(); // _cancellationToken = cancellationToken; // // 复制原始内容的Headers // foreach (var header in originalContent.Headers) // { // Headers.TryAddWithoutValidation(header.Key, header.Value); // } // } // protected override async Task SerializeToStreamAsync( // Stream stream, // TransportContext context) // { // using var originalStream = await _originalContent.ReadAsStreamAsync(); // var buffer = new byte[4096]; // int bytesRead; // while ((bytesRead = await originalStream.ReadAsync(buffer)) > 0) // { // await stream.WriteAsync(buffer, 0, bytesRead); // _bytesSent += bytesRead; // // 计算需要等待的时间 // var elapsedMs = _stopwatch.ElapsedMilliseconds; // var allowedTimeMs = (int)(_bytesSent * 1000d / _maxBytesPerSecond); // var waitMs = allowedTimeMs - (int)elapsedMs; // if (waitMs > 0) // { // await Task.Delay(waitMs, _cancellationToken); // } // } // } // // // protected override bool TryComputeLength(out long length) // { // // 使用反射调用原始内容的TryComputeLength方法 // var method = _originalContent.GetType().GetMethod( // "TryComputeLength", // BindingFlags.NonPublic | BindingFlags.Instance, // null, // new[] { typeof(long).MakeByRefType() }, // 正确构造out参数类型 // null // ); // if (method != null) // { // // 通过对象数组处理out参数 // object[] parameters = new object[] { null }; // bool result = (bool)method.Invoke(_originalContent, parameters); // length = (long)parameters[0]; // return result; // } // length = 0; // return false; // } // protected override void Dispose(bool disposing) // { // _originalContent?.Dispose(); // base.Dispose(disposing); // } // } //} #endregion #region //public class MinIOThrottledStream : Stream //{ // private readonly Stream _innerStream; // private readonly BandwidthLimiter _limiter; // private long _position; // public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter) // { // _innerStream = innerStream; // _limiter = limiter; // } // public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) // { // // 计算本次允许读取的最大字节数 // int bytesToRead = await _limiter.GetAllowedBytesAsync(count, cancellationToken); // if (bytesToRead <= 0) return 0; // int bytesRead = await _innerStream.ReadAsync(buffer, offset, bytesToRead, cancellationToken); // _position += bytesRead; // return bytesRead; // } // // 其他必要Stream成员实现... // public override bool CanRead => _innerStream.CanRead; // public override bool CanSeek => false; // public override bool CanWrite => false; // public override long Length => _innerStream.Length; // public override long Position // { // get => _position; // set => throw new NotSupportedException(); // } // public override long Seek(long offset, SeekOrigin origin) // { // throw new NotSupportedException("Seeking is not supported"); // } // public override void SetLength(long value) // { // throw new NotSupportedException("SetLength is not supported"); // } // public override void Write(byte[] buffer, int offset, int count) // { // throw new NotSupportedException("Writing is not supported"); // } // // 同步读取方法(调用异步版本) // public override int Read(byte[] buffer, int offset, int count) // { // return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); // } // public override void Flush() => _innerStream.Flush(); // // ... 其他必要实现 //} //public class BandwidthLimiter //{ // private readonly long _bytesPerSecond; // private readonly Stopwatch _stopwatch = Stopwatch.StartNew(); // private long _bytesTransferred; // private readonly SemaphoreSlim _semaphore = new(1, 1); // public BandwidthLimiter(long bytesPerSecond) // { // _bytesPerSecond = bytesPerSecond; // } // public async Task GetAllowedBytesAsync(int requestedBytes, CancellationToken ct) // { // await _semaphore.WaitAsync(ct); // try // { // var elapsed = _stopwatch.Elapsed; // _stopwatch.Restart(); // // 计算时间窗口内允许的字节数 // long allowedBytes = (long)(_bytesPerSecond * elapsed.TotalSeconds); // _bytesTransferred = Math.Max(0, _bytesTransferred - allowedBytes); // // 计算本次实际允许的字节数 // int actualBytes = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred); // if (actualBytes <= 0) // { // var waitTime = TimeSpan.FromSeconds((_bytesTransferred + requestedBytes - _bytesPerSecond) / (double)_bytesPerSecond); // await Task.Delay(waitTime, ct); // _bytesTransferred = 0; // return requestedBytes; // } // _bytesTransferred += actualBytes; // return actualBytes; // } // finally // { // _semaphore.Release(); // } // } //} //public class ThrottledHttpClientHandler : HttpClientHandler //{ // private readonly BandwidthLimiter _limiter; // public ThrottledHttpClientHandler(long bytesPerSecond) // { // _limiter = new BandwidthLimiter(bytesPerSecond); // ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true; // } // protected override async Task SendAsync( // HttpRequestMessage request, // CancellationToken cancellationToken) // { // if (IsUploadPartRequest(request)) // { // var originalContent = request.Content; // var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken); // var throttledStream = new MinIOThrottledStream(sourceStream, _limiter); // request.Content = new StreamContent(throttledStream); // foreach (var header in originalContent.Headers) // request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value); // } // return await base.SendAsync(request, cancellationToken); // } // private bool IsUploadPartRequest(HttpRequestMessage request) // { // return request.Method == HttpMethod.Put && // request.RequestUri != null && // request.RequestUri.Query.Contains("uploadId=") && // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream"; // } //} #endregion #region 最新代码 //public class MinIOThrottledStream : Stream //{ // private readonly Stream _innerStream; // private readonly BandwidthLimiter _limiter; // private long _position; // private readonly object _syncLock = new(); // public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter) // { // _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream)); // _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter)); // } // public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) // { // int allowedBytes = await _limiter.GetAllowedBytesAsync(buffer.Length, cancellationToken); // if (allowedBytes <= 0) return 0; // int bytesRead = await _innerStream.ReadAsync(buffer[..allowedBytes], cancellationToken); // Interlocked.Add(ref _position, bytesRead); // return bytesRead; // } // // 其他必要成员实现 // public override bool CanRead => _innerStream.CanRead; // public override bool CanSeek => false; // public override bool CanWrite => false; // public override long Length => _innerStream.Length; // public override long Position { get => _position; set => throw new NotSupportedException(); } // public override int Read(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); // public override void Flush() => _innerStream.Flush(); // // ...其他必要实现... // public override long Seek(long offset, SeekOrigin origin) // { // throw new NotSupportedException("Seeking is not supported"); // } // public override void SetLength(long value) // { // throw new NotSupportedException("SetLength is not supported"); // } // public override void Write(byte[] buffer, int offset, int count) // { // throw new NotSupportedException("Writing is not supported"); // } // //// 同步读取方法(调用异步版本) // //public override int Read(byte[] buffer, int offset, int count) // //{ // // return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); // //} //} //public class BandwidthLimiter : IDisposable //{ // private readonly long _bytesPerSecond; // private readonly Stopwatch _stopwatch = Stopwatch.StartNew(); // private long _bytesTransferred; // private readonly SemaphoreSlim _semaphore = new(1, 1); // private static readonly double _ticksPerSecond = Stopwatch.Frequency; // 动态获取计时器精度 // public BandwidthLimiter(long bytesPerSecond) // { // _bytesPerSecond = bytesPerSecond > 0 ? bytesPerSecond : throw new ArgumentOutOfRangeException(nameof(bytesPerSecond)); // } // public async ValueTask GetAllowedBytesAsync(int requestedBytes, CancellationToken ct) // { // if (requestedBytes <= 0) return 0; // await _semaphore.WaitAsync(ct); // try // { // // 计算自上次请求以来的时间窗口 // long elapsedTicks = _stopwatch.ElapsedTicks; // _stopwatch.Restart(); // // 动态计算允许传输的字节数(基于实际计时器频率) // double elapsedSeconds = (double)elapsedTicks / _ticksPerSecond; // long allowedBytes = (long)(_bytesPerSecond * elapsedSeconds); // // 更新令牌桶:扣除本时间窗口允许的字节数 // _bytesTransferred = Math.Max(0, _bytesTransferred - allowedBytes); // // 计算实际可传输的字节数 // int actualBytes = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred); // // 无可用配额时触发等待 // if (actualBytes <= 0) // { // // 计算需要等待的时间(精确到毫秒) // double deficitSeconds = (_bytesTransferred + requestedBytes) / (double)_bytesPerSecond; // await Task.Delay(TimeSpan.FromSeconds(deficitSeconds), ct); // // 重置累计值并允许本次完整传输 // _bytesTransferred = 0; // return requestedBytes; // } // // 更新累计值并返回允许的字节数 // _bytesTransferred += actualBytes; // return actualBytes; // } // finally // { // _semaphore.Release(); // } // } // public void Dispose() => _semaphore.Dispose(); //} //public class ThrottledHttpClientHandler : HttpClientHandler //{ // private readonly BandwidthLimiter _limiter; // public ThrottledHttpClientHandler(BandwidthLimiter limiter) // { // _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter)); // ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true; // } // protected override async Task SendAsync( // HttpRequestMessage request, // CancellationToken cancellationToken) // { // if (IsUploadPartRequest(request)) // { // var originalContent = request.Content; // var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken); // request.Content = new StreamContent(new MinIOThrottledStream(sourceStream, _limiter)); // foreach (var header in originalContent.Headers) // request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value); // } // return await base.SendAsync(request, cancellationToken); // } // private bool IsUploadPartRequest(HttpRequestMessage request) => // request.Method == HttpMethod.Put && // request.RequestUri?.Query.Contains("uploadId=") == true && // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream"; // protected override void Dispose(bool disposing) // { // _limiter?.Dispose(); // base.Dispose(disposing); // } //} #endregion #region 限制总体进度 //public class MinIOThrottledStream : Stream //{ // private readonly Stream _innerStream; // private readonly BandwidthLimiter _limiter; // private long _position; // private bool _disposed; // private readonly long _totalLength; // private long _remainingBytes; // public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter) // { // _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream)); // _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter)); // _totalLength = innerStream.Length; // _remainingBytes = _totalLength; // } // public override async ValueTask ReadAsync(Memory buffer, // CancellationToken cancellationToken = default) // { // if (_disposed) throw new ObjectDisposedException(nameof(MinIOThrottledStream)); // // 关键修改:基于剩余字节数检测完成 // if (_remainingBytes <= 0) // return 0; // int allowedBytes = await _limiter.GetAllowedBytesAsync( // (int)Math.Min(buffer.Length, _remainingBytes), // cancellationToken); // if (allowedBytes <= 0) // return 0; // int bytesRead = await _innerStream.ReadAsync(buffer[..allowedBytes], cancellationToken); // if (bytesRead > 0) // { // Interlocked.Add(ref _position, bytesRead); // Interlocked.Add(ref _remainingBytes, -bytesRead); // } // if (_remainingBytes == 0) // { // await _innerStream.FlushAsync(cancellationToken); // } // return bytesRead; // } // protected override void Dispose(bool disposing) // { // if (_disposed) return; // if (disposing) // { // _innerStream?.Dispose(); // } // base.Dispose(disposing); // _disposed = true; // } // // 其他必要成员实现 // public override bool CanRead => _innerStream.CanRead; // public override bool CanSeek => false; // public override bool CanWrite => false; // public override long Length => _innerStream.Length; // public override long Position { get => _position; set => throw new NotSupportedException(); } // public override int Read(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); // public override void Flush() => _innerStream.Flush(); // // ...其他必要实现... // public override long Seek(long offset, SeekOrigin origin) // { // throw new NotSupportedException("Seeking is not supported"); // } // public override void SetLength(long value) // { // throw new NotSupportedException("SetLength is not supported"); // } // public override void Write(byte[] buffer, int offset, int count) // { // throw new NotSupportedException("Writing is not supported"); // } // //// 同步读取方法(调用异步版本) // //public override int Read(byte[] buffer, int offset, int count) // //{ // // return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); // //} //} //public class BandwidthLimiter : IDisposable //{ // private readonly long _bytesPerSecond; // private readonly SemaphoreSlim _semaphore = new(1, 1); // private readonly Stopwatch _stopwatch = Stopwatch.StartNew(); // private long _bytesTransferred; // private bool _disposed; // public BandwidthLimiter(long bytesPerSecond) // { // _bytesPerSecond = bytesPerSecond > 0 ? bytesPerSecond : // throw new ArgumentOutOfRangeException(nameof(bytesPerSecond)); // } // public async ValueTask GetAllowedBytesAsync(int requestedBytes, CancellationToken ct) // { // if (_disposed) return requestedBytes; // 释放后不再限速 // await _semaphore.WaitAsync(ct); // try // { // var elapsed = _stopwatch.Elapsed; // _stopwatch.Restart(); // // 更精确的令牌桶算法 // double allowance = elapsed.TotalSeconds * _bytesPerSecond; // _bytesTransferred = Math.Max(0, _bytesTransferred - (long)allowance); // if (_bytesTransferred < _bytesPerSecond) // { // int allowed = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred); // _bytesTransferred += allowed; // return allowed; // } // // 需要等待 // double deficitSeconds = (_bytesTransferred + requestedBytes - _bytesPerSecond) / (double)_bytesPerSecond; // await Task.Delay(TimeSpan.FromSeconds(deficitSeconds), ct); // _bytesTransferred = 0; // return requestedBytes; // } // finally // { // _semaphore.Release(); // } // } // public void Dispose() // { // if (_disposed) return; // _semaphore.Dispose(); // _disposed = true; // } //} //public class ThrottledHttpClientHandler : HttpClientHandler //{ // private readonly BandwidthLimiter _limiter; // private readonly CancellationTokenSource _linkedCts = new(); // 用于取消挂起的操作 // private bool _disposed; // private readonly ConcurrentDictionary _activeStreams = new(); // public ThrottledHttpClientHandler(BandwidthLimiter limiter) // { // _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter)); // ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true; // } // protected override async Task SendAsync( // HttpRequestMessage request, // CancellationToken cancellationToken) // { // if (!IsUploadPartRequest(request)) // return await base.SendAsync(request, cancellationToken); // var originalContent = request.Content; // try // { // var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken); // var throttledStream = new MinIOThrottledStream(sourceStream, _limiter); // _activeStreams.TryAdd(throttledStream, true); // var throttledContent = new StreamContent(throttledStream); // foreach (var header in originalContent.Headers) // throttledContent.Headers.TryAddWithoutValidation(header.Key, header.Value); // request.Content = throttledContent; // var response = await base.SendAsync(request, cancellationToken); // // 关键修改:确保最后一个分片的完成 // if (IsFinalPart(request)) // { // await CompleteMultipartUpload(response); // } // return response; // } // finally // { // originalContent?.Dispose(); // } // } // private bool IsFinalPart(HttpRequestMessage request) // { // // 通过查询参数检测是否是最后一个分片 // return request.RequestUri.Query.Contains("uploadId=") && // request.RequestUri.Query.Contains("partNumber="); // } // private async Task CompleteMultipartUpload(HttpResponseMessage response) // { // try // { // // 确保响应完全读取 // await response.Content.ReadAsByteArrayAsync(); // // 等待所有限速流完成 // while (!_activeStreams.IsEmpty) // { // await Task.Delay(100); // } // } // catch { /* 忽略清理错误 */ } // } // // 允许外部代码主动取消所有挂起的操作 // public void CancelPendingOperations() // { // _linkedCts.Cancel(); // } // //// 判断是否为需要限速的上传请求 // //private bool IsUploadPartRequest(HttpRequestMessage request) // //{ // // return request.Method == HttpMethod.Put && // // request.RequestUri != null && // // request.RequestUri.Query.Contains("uploadId=") && // // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream"; // //} // /// // /// 精准识别分片数据上传请求(关键逻辑) // /// // private static bool IsUploadPartRequest(HttpRequestMessage request) // { // return request.Method == HttpMethod.Put && // request.RequestUri != null && // request.RequestUri.Query.Contains("uploadId=") && // request.RequestUri.Query.Contains("partNumber=") && // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream"; // } // protected override void Dispose(bool disposing) // { // if (_disposed) return; // if (disposing) // { // // 取消所有挂起的操作并释放资源 // CancelPendingOperations(); // _linkedCts.Dispose(); // _limiter?.Dispose(); // } // base.Dispose(disposing); // _disposed = true; // } //} #endregion #region test public class MinIOThrottledHandler : DelegatingHandler { private readonly long _bytesPerSecond; private readonly SemaphoreSlim _semaphore = new(1, 1); private long _bytesTransferred; private DateTime _lastUpdate = DateTime.UtcNow; public MinIOThrottledHandler(long bytesPerSecond, HttpMessageHandler innerHandler) : base(innerHandler) { _bytesPerSecond = bytesPerSecond; } protected override async Task SendAsync( HttpRequestMessage request, CancellationToken cancellationToken) { // 仅对分片数据上传请求限速(关键修复点) if (IsPartUploadRequest(request)) { var originalContent = request.Content; var contentStream = await originalContent.ReadAsStreamAsync(cancellationToken); // 创建限速包装流 var throttledStream = new ThrottledStream( contentStream, _bytesPerSecond, originalContent.Headers.ContentLength ?? -1 ); // 重建请求内容 request.Content = new StreamContent(throttledStream); // 保持原始头信息(避免破坏 MinIO 的签名计算) foreach (var header in originalContent.Headers) { request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value); } } // 放行所有其他请求(包括初始化分片和完成分片请求) return await base.SendAsync(request, cancellationToken); } /// /// 精准识别分片数据上传请求(关键逻辑) /// private static bool IsPartUploadRequest(HttpRequestMessage request) { return request.Method == HttpMethod.Put && request.RequestUri != null && request.RequestUri.Query.Contains("uploadId=") && request.RequestUri.Query.Contains("partNumber=") && request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream"; } /// /// 限速流实现(精确控制每个分片的上传速度) /// private class ThrottledStream : Stream { private readonly Stream _innerStream; private readonly long _maxBytesPerSecond; private readonly long _totalLength; private long _bytesRead; private readonly Stopwatch _stopwatch = Stopwatch.StartNew(); public ThrottledStream(Stream innerStream, long bytesPerSecond, long totalLength) { _innerStream = innerStream; _maxBytesPerSecond = bytesPerSecond; _totalLength = totalLength; } //public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) //{ // // 计算剩余可读取字节数 // var remaining = _totalLength - _bytesRead; // if (remaining <= 0) return 0; // // 动态调整缓冲区大小(优化大文件传输) // int chunkSize = (int)Math.Min( // Math.Min(count, _maxBytesPerSecond), // 每 chunk 不超过限速的 1/10 // remaining // ); // int bytesRead = await _innerStream.ReadAsync(buffer, offset, chunkSize, cancellationToken); // if (bytesRead == 0) return 0; // // 精确限速控制 // var expectedTime = (double)(_bytesRead + bytesRead) / _maxBytesPerSecond; // var actualTime = _stopwatch.Elapsed.TotalSeconds; // if (actualTime < expectedTime) // { // var delay = TimeSpan.FromSeconds(expectedTime - actualTime); // await Task.Delay(delay, cancellationToken); // } // Interlocked.Add(ref _bytesRead, bytesRead); // return bytesRead; //} public override async Task ReadAsync( byte[] buffer, int offset, int count, CancellationToken cancellationToken) { // 计算剩余可读取字节 var remaining = _totalLength - _bytesRead; if (remaining <= 0) return 0; // 优化1:动态调整块大小为限速值的1/2,减少调用次数 int chunkSize = (int)Math.Min( Math.Min(count, _maxBytesPerSecond / 2), remaining ); // 记录读取开始时间 long startTicks = _stopwatch.ElapsedTicks; int bytesRead = await _innerStream.ReadAsync( buffer, offset, chunkSize, cancellationToken ); if (bytesRead == 0) return 0; // 优化2:精确计算需要等待的时间 double requiredSeconds = (double)bytesRead / _maxBytesPerSecond; double elapsedSeconds = (double)(_stopwatch.ElapsedTicks - startTicks) / Stopwatch.Frequency; if (elapsedSeconds < requiredSeconds) { double delaySeconds = requiredSeconds - elapsedSeconds; int delayMs = (int)Math.Ceiling(delaySeconds * 1000); // 优化3:使用高精度延迟(最小化系统调度误差) await Task.Delay( delayMs > 15 ? delayMs : 15, // 绕过Windows最小15ms调度限制 cancellationToken ); } Interlocked.Add(ref _bytesRead, bytesRead); return bytesRead; } // 其他必要成员实现 public override bool CanRead => _innerStream.CanRead; public override bool CanSeek => false; public override bool CanWrite => false; public override long Length => _innerStream.Length; public override long Position { get; set; } public override void Flush() => throw new NotSupportedException(); public override int Read(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); public override void SetLength(long value) => throw new NotSupportedException(); public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); } } public class UploadVerifier { public static async Task VerifyAsync( IMinioClient client, string bucketName, string objectName, int maxRetries = 3, // 减少重试次数以避免过度重试 int initialDelayMs = 500) { int retryCount = 0; while (retryCount++ < maxRetries) { try { var stat = await client.StatObjectAsync(new StatObjectArgs() .WithBucket(bucketName) .WithObject(objectName)); Console.WriteLine($"验证成功!文件大小: {stat.Size} 字节, ETag: {stat.ETag}"); return; } catch (Exception ex) { Console.WriteLine($"验证失败 ({retryCount}/{maxRetries}): {ex.Message}"); await Task.Delay(initialDelayMs * retryCount); } } throw new ApplicationException($"文件 {objectName} 未在服务器上出现"); } } #endregion }