1033 lines
40 KiB
C#
1033 lines
40 KiB
C#
using Minio;
|
||
using Minio.DataModel.Args;
|
||
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Diagnostics;
|
||
using System.IO;
|
||
using System.Net;
|
||
using System.Net.Http;
|
||
using System.Net.Http.Headers;
|
||
using System.Reflection;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace Hopetry.Provider
|
||
{
|
||
#region 原先代码
|
||
//public class ThrottledHttpClientHandler : HttpClientHandler
|
||
//{
|
||
// private readonly long _maxBytesPerSecond;
|
||
// private readonly SemaphoreSlim _throttleSemaphore = new SemaphoreSlim(1, 1);
|
||
// private long _bytesTransferred;
|
||
// private DateTime _lastCheckTime = DateTime.UtcNow;
|
||
|
||
// public ThrottledHttpClientHandler(long maxBytesPerSecond)
|
||
// {
|
||
// _maxBytesPerSecond = maxBytesPerSecond;
|
||
|
||
// // 忽略证书错误(仅测试环境使用)
|
||
// ServerCertificateCustomValidationCallback =
|
||
// (message, cert, chain, errors) => true;
|
||
// }
|
||
|
||
// protected override async Task<HttpResponseMessage> SendAsync(
|
||
// HttpRequestMessage request,
|
||
// CancellationToken cancellationToken)
|
||
// {
|
||
// try
|
||
// {
|
||
// // 添加请求日志
|
||
// Debug.WriteLine($"Sending request: {request.Method} {request.RequestUri}");
|
||
|
||
// // 仅对分片数据上传请求限速(修改点:直接限速请求内容,而非响应)
|
||
// if (IsUploadPartRequest(request) && request.Content != null)
|
||
// {
|
||
// request.Content = new ThrottledHttpContent(request.Content, _maxBytesPerSecond);
|
||
// }
|
||
|
||
// var response = await base.SendAsync(request, cancellationToken).ConfigureAwait(false);
|
||
|
||
// // 调试:检查初始化请求的响应
|
||
// if (request.Method == HttpMethod.Post && request.RequestUri?.Query?.Contains("uploads=") == true)
|
||
// {
|
||
// var rawResponse = response.Content != null
|
||
// ? await response.Content.ReadAsStringAsync().ConfigureAwait(false)
|
||
// : "No content in response";
|
||
// Debug.WriteLine($"初始化请求响应: {rawResponse}");
|
||
// }
|
||
|
||
// return response;
|
||
// }
|
||
// catch (Exception ex)
|
||
// {
|
||
// Debug.WriteLine($"Request failed: {ex}");
|
||
// throw;
|
||
// }
|
||
// }
|
||
// private bool IsUploadPartRequest(HttpRequestMessage request)
|
||
// {
|
||
// return request.Method == HttpMethod.Put &&
|
||
// request.RequestUri != null &&
|
||
// request.RequestUri.Query.Contains("uploadId=") &&
|
||
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
|
||
// }
|
||
//}
|
||
//public class ThrottledHttpContent : HttpContent
|
||
//{
|
||
// private readonly HttpContent _innerContent;
|
||
// private readonly long _maxBytesPerSecond;
|
||
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
|
||
// private long _totalBytesRead;
|
||
// private long? _contentLength;
|
||
|
||
// public ThrottledHttpContent(HttpContent innerContent, long maxBytesPerSecond)
|
||
// {
|
||
// _innerContent = innerContent ?? throw new ArgumentNullException(nameof(innerContent));
|
||
// _maxBytesPerSecond = maxBytesPerSecond;
|
||
|
||
// foreach (var header in _innerContent.Headers)
|
||
// {
|
||
// Headers.TryAddWithoutValidation(header.Key, header.Value);
|
||
// }
|
||
// }
|
||
|
||
// // 覆盖同步方法,强制调用异步版本(避免直接抛出异常)
|
||
// protected override void SerializeToStream(Stream stream, TransportContext context, CancellationToken cancellationToken)
|
||
// {
|
||
// // 同步调用时,直接运行异步方法并阻塞等待(不推荐,但可以避免异常)
|
||
// SerializeToStreamAsync(stream, context, cancellationToken).GetAwaiter().GetResult();
|
||
// }
|
||
|
||
// protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
|
||
// {
|
||
// return SerializeToStreamAsync(stream, context, CancellationToken.None);
|
||
// }
|
||
|
||
// protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken)
|
||
// {
|
||
// var buffer = new byte[81920]; // 80KB buffer
|
||
// using (var contentStream = await _innerContent.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
|
||
// {
|
||
// while (true)
|
||
// {
|
||
// var bytesRead = await contentStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
|
||
// if (bytesRead == 0) break;
|
||
|
||
// await stream.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
|
||
// _totalBytesRead += bytesRead;
|
||
|
||
// var elapsedSeconds = _stopwatch.Elapsed.TotalSeconds;
|
||
// var targetSeconds = _totalBytesRead / (double)_maxBytesPerSecond;
|
||
|
||
// if (elapsedSeconds < targetSeconds)
|
||
// {
|
||
// var delayMilliseconds = (int)((targetSeconds - elapsedSeconds) * 1000);
|
||
// await Task.Delay(delayMilliseconds, cancellationToken).ConfigureAwait(false);
|
||
// }
|
||
// }
|
||
// }
|
||
// }
|
||
|
||
// protected override bool TryComputeLength(out long length)
|
||
// {
|
||
// if (_contentLength.HasValue)
|
||
// {
|
||
// length = _contentLength.Value;
|
||
// return true;
|
||
// }
|
||
|
||
// length = 0;
|
||
// return false;
|
||
// }
|
||
|
||
// protected override void Dispose(bool disposing)
|
||
// {
|
||
// if (disposing)
|
||
// {
|
||
// _innerContent?.Dispose();
|
||
// _stopwatch.Stop();
|
||
// }
|
||
// base.Dispose(disposing);
|
||
// }
|
||
//}
|
||
#endregion
|
||
#region 新代码
|
||
//public class ThrottledHttpClientHandler : HttpClientHandler
|
||
//{
|
||
// private readonly int _maxBytesPerSecond;
|
||
|
||
// public ThrottledHttpClientHandler(int maxBytesPerSecond = 1024 * 50) // 默认50KB/s
|
||
// {
|
||
// _maxBytesPerSecond = maxBytesPerSecond;
|
||
// }
|
||
|
||
// protected override async Task<HttpResponseMessage> SendAsync(
|
||
// HttpRequestMessage request,
|
||
// CancellationToken cancellationToken)
|
||
// {
|
||
|
||
// var originalContent = request.Content;
|
||
// if (IsUploadPartRequest(request) && request.Content != null)
|
||
// {
|
||
// request.Content = new ThrottledStreamContent(originalContent, _maxBytesPerSecond, cancellationToken);
|
||
// }
|
||
// try
|
||
// {
|
||
// return await base.SendAsync(request, cancellationToken);
|
||
// }
|
||
// finally
|
||
// {
|
||
// request.Content = originalContent;
|
||
// }
|
||
|
||
// }
|
||
// private bool IsUploadPartRequest(HttpRequestMessage request)
|
||
// {
|
||
// return request.Method == HttpMethod.Put &&
|
||
// request.RequestUri != null &&
|
||
// request.RequestUri.Query.Contains("uploadId=") &&
|
||
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
|
||
// }
|
||
// private class ThrottledStreamContent : HttpContent
|
||
// {
|
||
// private readonly HttpContent _originalContent;
|
||
// private readonly int _maxBytesPerSecond;
|
||
// private long _bytesSent;
|
||
// private readonly Stopwatch _stopwatch;
|
||
// private readonly CancellationToken _cancellationToken;
|
||
|
||
// public ThrottledStreamContent(HttpContent originalContent, int maxBytesPerSecond, CancellationToken cancellationToken)
|
||
// {
|
||
// _originalContent = originalContent;
|
||
// _maxBytesPerSecond = maxBytesPerSecond;
|
||
// _stopwatch = Stopwatch.StartNew();
|
||
// _cancellationToken = cancellationToken;
|
||
|
||
// // 复制原始内容的Headers
|
||
// foreach (var header in originalContent.Headers)
|
||
// {
|
||
// Headers.TryAddWithoutValidation(header.Key, header.Value);
|
||
// }
|
||
// }
|
||
|
||
// protected override async Task SerializeToStreamAsync(
|
||
// Stream stream,
|
||
// TransportContext context)
|
||
// {
|
||
// using var originalStream = await _originalContent.ReadAsStreamAsync();
|
||
// var buffer = new byte[4096];
|
||
// int bytesRead;
|
||
|
||
// while ((bytesRead = await originalStream.ReadAsync(buffer)) > 0)
|
||
// {
|
||
// await stream.WriteAsync(buffer, 0, bytesRead);
|
||
// _bytesSent += bytesRead;
|
||
|
||
// // 计算需要等待的时间
|
||
// var elapsedMs = _stopwatch.ElapsedMilliseconds;
|
||
// var allowedTimeMs = (int)(_bytesSent * 1000d / _maxBytesPerSecond);
|
||
// var waitMs = allowedTimeMs - (int)elapsedMs;
|
||
|
||
// if (waitMs > 0)
|
||
// {
|
||
// await Task.Delay(waitMs, _cancellationToken);
|
||
// }
|
||
// }
|
||
// }
|
||
|
||
// //
|
||
// protected override bool TryComputeLength(out long length)
|
||
// {
|
||
// // 使用反射调用原始内容的TryComputeLength方法
|
||
// var method = _originalContent.GetType().GetMethod(
|
||
// "TryComputeLength",
|
||
// BindingFlags.NonPublic | BindingFlags.Instance,
|
||
// null,
|
||
// new[] { typeof(long).MakeByRefType() }, // 正确构造out参数类型
|
||
// null
|
||
// );
|
||
|
||
// if (method != null)
|
||
// {
|
||
// // 通过对象数组处理out参数
|
||
// object[] parameters = new object[] { null };
|
||
// bool result = (bool)method.Invoke(_originalContent, parameters);
|
||
// length = (long)parameters[0];
|
||
// return result;
|
||
// }
|
||
|
||
// length = 0;
|
||
// return false;
|
||
// }
|
||
|
||
// protected override void Dispose(bool disposing)
|
||
// {
|
||
// _originalContent?.Dispose();
|
||
// base.Dispose(disposing);
|
||
// }
|
||
// }
|
||
//}
|
||
#endregion
|
||
#region
|
||
//public class MinIOThrottledStream : Stream
|
||
//{
|
||
// private readonly Stream _innerStream;
|
||
// private readonly BandwidthLimiter _limiter;
|
||
// private long _position;
|
||
|
||
// public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter)
|
||
// {
|
||
// _innerStream = innerStream;
|
||
// _limiter = limiter;
|
||
// }
|
||
|
||
// public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||
// {
|
||
// // 计算本次允许读取的最大字节数
|
||
// int bytesToRead = await _limiter.GetAllowedBytesAsync(count, cancellationToken);
|
||
// if (bytesToRead <= 0) return 0;
|
||
|
||
// int bytesRead = await _innerStream.ReadAsync(buffer, offset, bytesToRead, cancellationToken);
|
||
// _position += bytesRead;
|
||
|
||
// return bytesRead;
|
||
// }
|
||
|
||
// // 其他必要Stream成员实现...
|
||
// public override bool CanRead => _innerStream.CanRead;
|
||
// public override bool CanSeek => false;
|
||
// public override bool CanWrite => false;
|
||
// public override long Length => _innerStream.Length;
|
||
// public override long Position
|
||
// {
|
||
// get => _position;
|
||
// set => throw new NotSupportedException();
|
||
// }
|
||
// public override long Seek(long offset, SeekOrigin origin)
|
||
// {
|
||
// throw new NotSupportedException("Seeking is not supported");
|
||
// }
|
||
|
||
// public override void SetLength(long value)
|
||
// {
|
||
// throw new NotSupportedException("SetLength is not supported");
|
||
// }
|
||
|
||
// public override void Write(byte[] buffer, int offset, int count)
|
||
// {
|
||
// throw new NotSupportedException("Writing is not supported");
|
||
// }
|
||
// // 同步读取方法(调用异步版本)
|
||
// public override int Read(byte[] buffer, int offset, int count)
|
||
// {
|
||
// return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
|
||
// }
|
||
// public override void Flush() => _innerStream.Flush();
|
||
// // ... 其他必要实现
|
||
//}
|
||
|
||
//public class BandwidthLimiter
|
||
//{
|
||
// private readonly long _bytesPerSecond;
|
||
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
|
||
// private long _bytesTransferred;
|
||
// private readonly SemaphoreSlim _semaphore = new(1, 1);
|
||
|
||
// public BandwidthLimiter(long bytesPerSecond)
|
||
// {
|
||
// _bytesPerSecond = bytesPerSecond;
|
||
// }
|
||
|
||
// public async Task<int> GetAllowedBytesAsync(int requestedBytes, CancellationToken ct)
|
||
// {
|
||
// await _semaphore.WaitAsync(ct);
|
||
// try
|
||
// {
|
||
// var elapsed = _stopwatch.Elapsed;
|
||
// _stopwatch.Restart();
|
||
|
||
// // 计算时间窗口内允许的字节数
|
||
// long allowedBytes = (long)(_bytesPerSecond * elapsed.TotalSeconds);
|
||
// _bytesTransferred = Math.Max(0, _bytesTransferred - allowedBytes);
|
||
|
||
// // 计算本次实际允许的字节数
|
||
// int actualBytes = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred);
|
||
// if (actualBytes <= 0)
|
||
// {
|
||
// var waitTime = TimeSpan.FromSeconds((_bytesTransferred + requestedBytes - _bytesPerSecond) / (double)_bytesPerSecond);
|
||
// await Task.Delay(waitTime, ct);
|
||
// _bytesTransferred = 0;
|
||
// return requestedBytes;
|
||
// }
|
||
|
||
// _bytesTransferred += actualBytes;
|
||
// return actualBytes;
|
||
// }
|
||
// finally
|
||
// {
|
||
// _semaphore.Release();
|
||
// }
|
||
// }
|
||
//}
|
||
|
||
//public class ThrottledHttpClientHandler : HttpClientHandler
|
||
//{
|
||
// private readonly BandwidthLimiter _limiter;
|
||
|
||
// public ThrottledHttpClientHandler(long bytesPerSecond)
|
||
// {
|
||
// _limiter = new BandwidthLimiter(bytesPerSecond);
|
||
// ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true;
|
||
// }
|
||
|
||
// protected override async Task<HttpResponseMessage> SendAsync(
|
||
// HttpRequestMessage request,
|
||
// CancellationToken cancellationToken)
|
||
// {
|
||
// if (IsUploadPartRequest(request))
|
||
// {
|
||
// var originalContent = request.Content;
|
||
// var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken);
|
||
// var throttledStream = new MinIOThrottledStream(sourceStream, _limiter);
|
||
|
||
// request.Content = new StreamContent(throttledStream);
|
||
// foreach (var header in originalContent.Headers)
|
||
// request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value);
|
||
// }
|
||
|
||
// return await base.SendAsync(request, cancellationToken);
|
||
// }
|
||
|
||
// private bool IsUploadPartRequest(HttpRequestMessage request)
|
||
// {
|
||
// return request.Method == HttpMethod.Put &&
|
||
// request.RequestUri != null &&
|
||
// request.RequestUri.Query.Contains("uploadId=") &&
|
||
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
|
||
// }
|
||
//}
|
||
#endregion
|
||
|
||
#region 最新代码
|
||
//public class MinIOThrottledStream : Stream
|
||
//{
|
||
// private readonly Stream _innerStream;
|
||
// private readonly BandwidthLimiter _limiter;
|
||
// private long _position;
|
||
// private readonly object _syncLock = new();
|
||
|
||
// public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter)
|
||
// {
|
||
// _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
|
||
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
|
||
// }
|
||
|
||
// public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||
// {
|
||
// int allowedBytes = await _limiter.GetAllowedBytesAsync(buffer.Length, cancellationToken);
|
||
// if (allowedBytes <= 0) return 0;
|
||
|
||
// int bytesRead = await _innerStream.ReadAsync(buffer[..allowedBytes], cancellationToken);
|
||
// Interlocked.Add(ref _position, bytesRead);
|
||
// return bytesRead;
|
||
// }
|
||
|
||
// // 其他必要成员实现
|
||
// public override bool CanRead => _innerStream.CanRead;
|
||
// public override bool CanSeek => false;
|
||
// public override bool CanWrite => false;
|
||
// public override long Length => _innerStream.Length;
|
||
// public override long Position { get => _position; set => throw new NotSupportedException(); }
|
||
// public override int Read(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
|
||
// public override void Flush() => _innerStream.Flush();
|
||
// // ...其他必要实现...
|
||
// public override long Seek(long offset, SeekOrigin origin)
|
||
// {
|
||
// throw new NotSupportedException("Seeking is not supported");
|
||
// }
|
||
|
||
// public override void SetLength(long value)
|
||
// {
|
||
// throw new NotSupportedException("SetLength is not supported");
|
||
// }
|
||
|
||
// public override void Write(byte[] buffer, int offset, int count)
|
||
// {
|
||
// throw new NotSupportedException("Writing is not supported");
|
||
// }
|
||
// //// 同步读取方法(调用异步版本)
|
||
// //public override int Read(byte[] buffer, int offset, int count)
|
||
// //{
|
||
// // return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
|
||
// //}
|
||
//}
|
||
|
||
|
||
//public class BandwidthLimiter : IDisposable
|
||
//{
|
||
// private readonly long _bytesPerSecond;
|
||
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
|
||
// private long _bytesTransferred;
|
||
// private readonly SemaphoreSlim _semaphore = new(1, 1);
|
||
// private static readonly double _ticksPerSecond = Stopwatch.Frequency; // 动态获取计时器精度
|
||
|
||
// public BandwidthLimiter(long bytesPerSecond)
|
||
// {
|
||
// _bytesPerSecond = bytesPerSecond > 0 ? bytesPerSecond : throw new ArgumentOutOfRangeException(nameof(bytesPerSecond));
|
||
// }
|
||
|
||
// public async ValueTask<int> GetAllowedBytesAsync(int requestedBytes, CancellationToken ct)
|
||
// {
|
||
// if (requestedBytes <= 0) return 0;
|
||
|
||
// await _semaphore.WaitAsync(ct);
|
||
// try
|
||
// {
|
||
// // 计算自上次请求以来的时间窗口
|
||
// long elapsedTicks = _stopwatch.ElapsedTicks;
|
||
// _stopwatch.Restart();
|
||
|
||
// // 动态计算允许传输的字节数(基于实际计时器频率)
|
||
// double elapsedSeconds = (double)elapsedTicks / _ticksPerSecond;
|
||
// long allowedBytes = (long)(_bytesPerSecond * elapsedSeconds);
|
||
|
||
// // 更新令牌桶:扣除本时间窗口允许的字节数
|
||
// _bytesTransferred = Math.Max(0, _bytesTransferred - allowedBytes);
|
||
|
||
// // 计算实际可传输的字节数
|
||
// int actualBytes = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred);
|
||
|
||
// // 无可用配额时触发等待
|
||
// if (actualBytes <= 0)
|
||
// {
|
||
// // 计算需要等待的时间(精确到毫秒)
|
||
// double deficitSeconds = (_bytesTransferred + requestedBytes) / (double)_bytesPerSecond;
|
||
// await Task.Delay(TimeSpan.FromSeconds(deficitSeconds), ct);
|
||
|
||
// // 重置累计值并允许本次完整传输
|
||
// _bytesTransferred = 0;
|
||
// return requestedBytes;
|
||
// }
|
||
|
||
// // 更新累计值并返回允许的字节数
|
||
// _bytesTransferred += actualBytes;
|
||
// return actualBytes;
|
||
// }
|
||
// finally
|
||
// {
|
||
// _semaphore.Release();
|
||
// }
|
||
// }
|
||
|
||
// public void Dispose() => _semaphore.Dispose();
|
||
//}
|
||
|
||
//public class ThrottledHttpClientHandler : HttpClientHandler
|
||
//{
|
||
// private readonly BandwidthLimiter _limiter;
|
||
|
||
// public ThrottledHttpClientHandler(BandwidthLimiter limiter)
|
||
// {
|
||
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
|
||
// ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true;
|
||
// }
|
||
|
||
// protected override async Task<HttpResponseMessage> SendAsync(
|
||
// HttpRequestMessage request,
|
||
// CancellationToken cancellationToken)
|
||
// {
|
||
// if (IsUploadPartRequest(request))
|
||
// {
|
||
// var originalContent = request.Content;
|
||
// var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken);
|
||
// request.Content = new StreamContent(new MinIOThrottledStream(sourceStream, _limiter));
|
||
|
||
// foreach (var header in originalContent.Headers)
|
||
// request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value);
|
||
// }
|
||
|
||
// return await base.SendAsync(request, cancellationToken);
|
||
// }
|
||
|
||
// private bool IsUploadPartRequest(HttpRequestMessage request) =>
|
||
// request.Method == HttpMethod.Put &&
|
||
// request.RequestUri?.Query.Contains("uploadId=") == true &&
|
||
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
|
||
|
||
// protected override void Dispose(bool disposing)
|
||
// {
|
||
// _limiter?.Dispose();
|
||
// base.Dispose(disposing);
|
||
// }
|
||
//}
|
||
#endregion
|
||
|
||
#region 限制总体进度
|
||
//public class MinIOThrottledStream : Stream
|
||
//{
|
||
// private readonly Stream _innerStream;
|
||
// private readonly BandwidthLimiter _limiter;
|
||
// private long _position;
|
||
// private bool _disposed;
|
||
// private readonly long _totalLength;
|
||
// private long _remainingBytes;
|
||
|
||
// public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter)
|
||
// {
|
||
// _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
|
||
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
|
||
// _totalLength = innerStream.Length;
|
||
// _remainingBytes = _totalLength;
|
||
// }
|
||
|
||
// public override async ValueTask<int> ReadAsync(Memory<byte> buffer,
|
||
// CancellationToken cancellationToken = default)
|
||
// {
|
||
// if (_disposed) throw new ObjectDisposedException(nameof(MinIOThrottledStream));
|
||
|
||
// // 关键修改:基于剩余字节数检测完成
|
||
// if (_remainingBytes <= 0)
|
||
// return 0;
|
||
|
||
// int allowedBytes = await _limiter.GetAllowedBytesAsync(
|
||
// (int)Math.Min(buffer.Length, _remainingBytes),
|
||
// cancellationToken);
|
||
|
||
// if (allowedBytes <= 0)
|
||
// return 0;
|
||
|
||
// int bytesRead = await _innerStream.ReadAsync(buffer[..allowedBytes], cancellationToken);
|
||
|
||
// if (bytesRead > 0)
|
||
// {
|
||
// Interlocked.Add(ref _position, bytesRead);
|
||
// Interlocked.Add(ref _remainingBytes, -bytesRead);
|
||
// }
|
||
// if (_remainingBytes == 0)
|
||
// {
|
||
// await _innerStream.FlushAsync(cancellationToken);
|
||
// }
|
||
// return bytesRead;
|
||
// }
|
||
|
||
// protected override void Dispose(bool disposing)
|
||
// {
|
||
// if (_disposed) return;
|
||
|
||
// if (disposing)
|
||
// {
|
||
// _innerStream?.Dispose();
|
||
// }
|
||
|
||
// base.Dispose(disposing);
|
||
// _disposed = true;
|
||
// }
|
||
// // 其他必要成员实现
|
||
// public override bool CanRead => _innerStream.CanRead;
|
||
// public override bool CanSeek => false;
|
||
// public override bool CanWrite => false;
|
||
// public override long Length => _innerStream.Length;
|
||
// public override long Position { get => _position; set => throw new NotSupportedException(); }
|
||
// public override int Read(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
|
||
// public override void Flush() => _innerStream.Flush();
|
||
// // ...其他必要实现...
|
||
// public override long Seek(long offset, SeekOrigin origin)
|
||
// {
|
||
// throw new NotSupportedException("Seeking is not supported");
|
||
// }
|
||
|
||
// public override void SetLength(long value)
|
||
// {
|
||
// throw new NotSupportedException("SetLength is not supported");
|
||
// }
|
||
|
||
// public override void Write(byte[] buffer, int offset, int count)
|
||
// {
|
||
// throw new NotSupportedException("Writing is not supported");
|
||
// }
|
||
// //// 同步读取方法(调用异步版本)
|
||
// //public override int Read(byte[] buffer, int offset, int count)
|
||
// //{
|
||
// // return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
|
||
// //}
|
||
//}
|
||
|
||
//public class BandwidthLimiter : IDisposable
|
||
//{
|
||
// private readonly long _bytesPerSecond;
|
||
// private readonly SemaphoreSlim _semaphore = new(1, 1);
|
||
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
|
||
// private long _bytesTransferred;
|
||
// private bool _disposed;
|
||
|
||
// public BandwidthLimiter(long bytesPerSecond)
|
||
// {
|
||
// _bytesPerSecond = bytesPerSecond > 0 ? bytesPerSecond :
|
||
// throw new ArgumentOutOfRangeException(nameof(bytesPerSecond));
|
||
// }
|
||
|
||
// public async ValueTask<int> GetAllowedBytesAsync(int requestedBytes, CancellationToken ct)
|
||
// {
|
||
// if (_disposed) return requestedBytes; // 释放后不再限速
|
||
|
||
// await _semaphore.WaitAsync(ct);
|
||
// try
|
||
// {
|
||
// var elapsed = _stopwatch.Elapsed;
|
||
// _stopwatch.Restart();
|
||
|
||
// // 更精确的令牌桶算法
|
||
// double allowance = elapsed.TotalSeconds * _bytesPerSecond;
|
||
// _bytesTransferred = Math.Max(0, _bytesTransferred - (long)allowance);
|
||
|
||
// if (_bytesTransferred < _bytesPerSecond)
|
||
// {
|
||
// int allowed = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred);
|
||
// _bytesTransferred += allowed;
|
||
// return allowed;
|
||
// }
|
||
|
||
// // 需要等待
|
||
// double deficitSeconds = (_bytesTransferred + requestedBytes - _bytesPerSecond) / (double)_bytesPerSecond;
|
||
// await Task.Delay(TimeSpan.FromSeconds(deficitSeconds), ct);
|
||
|
||
// _bytesTransferred = 0;
|
||
// return requestedBytes;
|
||
// }
|
||
// finally
|
||
// {
|
||
// _semaphore.Release();
|
||
// }
|
||
// }
|
||
|
||
// public void Dispose()
|
||
// {
|
||
// if (_disposed) return;
|
||
// _semaphore.Dispose();
|
||
// _disposed = true;
|
||
// }
|
||
//}
|
||
|
||
//public class ThrottledHttpClientHandler : HttpClientHandler
|
||
//{
|
||
// private readonly BandwidthLimiter _limiter;
|
||
// private readonly CancellationTokenSource _linkedCts = new(); // 用于取消挂起的操作
|
||
// private bool _disposed;
|
||
// private readonly ConcurrentDictionary<Stream, bool> _activeStreams = new();
|
||
|
||
// public ThrottledHttpClientHandler(BandwidthLimiter limiter)
|
||
// {
|
||
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
|
||
// ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true;
|
||
// }
|
||
|
||
// protected override async Task<HttpResponseMessage> SendAsync(
|
||
// HttpRequestMessage request,
|
||
// CancellationToken cancellationToken)
|
||
// {
|
||
// if (!IsUploadPartRequest(request))
|
||
// return await base.SendAsync(request, cancellationToken);
|
||
|
||
// var originalContent = request.Content;
|
||
// try
|
||
// {
|
||
// var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken);
|
||
// var throttledStream = new MinIOThrottledStream(sourceStream, _limiter);
|
||
|
||
// _activeStreams.TryAdd(throttledStream, true);
|
||
|
||
// var throttledContent = new StreamContent(throttledStream);
|
||
// foreach (var header in originalContent.Headers)
|
||
// throttledContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
|
||
|
||
// request.Content = throttledContent;
|
||
|
||
// var response = await base.SendAsync(request, cancellationToken);
|
||
|
||
// // 关键修改:确保最后一个分片的完成
|
||
// if (IsFinalPart(request))
|
||
// {
|
||
// await CompleteMultipartUpload(response);
|
||
// }
|
||
|
||
// return response;
|
||
// }
|
||
// finally
|
||
// {
|
||
// originalContent?.Dispose();
|
||
// }
|
||
// }
|
||
|
||
// private bool IsFinalPart(HttpRequestMessage request)
|
||
// {
|
||
// // 通过查询参数检测是否是最后一个分片
|
||
// return request.RequestUri.Query.Contains("uploadId=") &&
|
||
// request.RequestUri.Query.Contains("partNumber=");
|
||
// }
|
||
|
||
// private async Task CompleteMultipartUpload(HttpResponseMessage response)
|
||
// {
|
||
// try
|
||
// {
|
||
// // 确保响应完全读取
|
||
// await response.Content.ReadAsByteArrayAsync();
|
||
|
||
// // 等待所有限速流完成
|
||
// while (!_activeStreams.IsEmpty)
|
||
// {
|
||
// await Task.Delay(100);
|
||
// }
|
||
// }
|
||
// catch { /* 忽略清理错误 */ }
|
||
// }
|
||
|
||
|
||
// // 允许外部代码主动取消所有挂起的操作
|
||
// public void CancelPendingOperations()
|
||
// {
|
||
// _linkedCts.Cancel();
|
||
// }
|
||
|
||
// //// 判断是否为需要限速的上传请求
|
||
// //private bool IsUploadPartRequest(HttpRequestMessage request)
|
||
// //{
|
||
// // return request.Method == HttpMethod.Put &&
|
||
// // request.RequestUri != null &&
|
||
// // request.RequestUri.Query.Contains("uploadId=") &&
|
||
// // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
|
||
// //}
|
||
|
||
// /// <summary>
|
||
// /// 精准识别分片数据上传请求(关键逻辑)
|
||
// /// </summary>
|
||
// private static bool IsUploadPartRequest(HttpRequestMessage request)
|
||
// {
|
||
// return request.Method == HttpMethod.Put &&
|
||
// request.RequestUri != null &&
|
||
// request.RequestUri.Query.Contains("uploadId=") &&
|
||
// request.RequestUri.Query.Contains("partNumber=") &&
|
||
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
|
||
// }
|
||
|
||
// protected override void Dispose(bool disposing)
|
||
// {
|
||
// if (_disposed) return;
|
||
|
||
// if (disposing)
|
||
// {
|
||
// // 取消所有挂起的操作并释放资源
|
||
// CancelPendingOperations();
|
||
// _linkedCts.Dispose();
|
||
// _limiter?.Dispose();
|
||
// }
|
||
|
||
// base.Dispose(disposing);
|
||
// _disposed = true;
|
||
// }
|
||
//}
|
||
#endregion
|
||
|
||
#region test
|
||
public class MinIOThrottledHandler : DelegatingHandler
|
||
{
|
||
private readonly long _bytesPerSecond;
|
||
private readonly SemaphoreSlim _semaphore = new(1, 1);
|
||
private long _bytesTransferred;
|
||
private DateTime _lastUpdate = DateTime.UtcNow;
|
||
|
||
public MinIOThrottledHandler(long bytesPerSecond, HttpMessageHandler innerHandler)
|
||
: base(innerHandler)
|
||
{
|
||
_bytesPerSecond = bytesPerSecond;
|
||
}
|
||
|
||
protected override async Task<HttpResponseMessage> SendAsync(
|
||
HttpRequestMessage request,
|
||
CancellationToken cancellationToken)
|
||
{
|
||
// 仅对分片数据上传请求限速(关键修复点)
|
||
if (IsPartUploadRequest(request))
|
||
{
|
||
var originalContent = request.Content;
|
||
var contentStream = await originalContent.ReadAsStreamAsync(cancellationToken);
|
||
|
||
// 创建限速包装流
|
||
var throttledStream = new ThrottledStream(
|
||
contentStream,
|
||
_bytesPerSecond,
|
||
originalContent.Headers.ContentLength ?? -1
|
||
);
|
||
|
||
// 重建请求内容
|
||
request.Content = new StreamContent(throttledStream);
|
||
|
||
// 保持原始头信息(避免破坏 MinIO 的签名计算)
|
||
foreach (var header in originalContent.Headers)
|
||
{
|
||
request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value);
|
||
}
|
||
}
|
||
|
||
// 放行所有其他请求(包括初始化分片和完成分片请求)
|
||
return await base.SendAsync(request, cancellationToken);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 精准识别分片数据上传请求(关键逻辑)
|
||
/// </summary>
|
||
private static bool IsPartUploadRequest(HttpRequestMessage request)
|
||
{
|
||
return request.Method == HttpMethod.Put &&
|
||
request.RequestUri != null &&
|
||
request.RequestUri.Query.Contains("uploadId=") &&
|
||
request.RequestUri.Query.Contains("partNumber=") &&
|
||
request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
|
||
}
|
||
|
||
/// <summary>
|
||
/// 限速流实现(精确控制每个分片的上传速度)
|
||
/// </summary>
|
||
private class ThrottledStream : Stream
|
||
{
|
||
private readonly Stream _innerStream;
|
||
private readonly long _maxBytesPerSecond;
|
||
private readonly long _totalLength;
|
||
private long _bytesRead;
|
||
private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
|
||
|
||
public ThrottledStream(Stream innerStream, long bytesPerSecond, long totalLength)
|
||
{
|
||
_innerStream = innerStream;
|
||
_maxBytesPerSecond = bytesPerSecond;
|
||
_totalLength = totalLength;
|
||
}
|
||
|
||
//public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||
//{
|
||
// // 计算剩余可读取字节数
|
||
// var remaining = _totalLength - _bytesRead;
|
||
// if (remaining <= 0) return 0;
|
||
|
||
// // 动态调整缓冲区大小(优化大文件传输)
|
||
// int chunkSize = (int)Math.Min(
|
||
// Math.Min(count, _maxBytesPerSecond), // 每 chunk 不超过限速的 1/10
|
||
// remaining
|
||
// );
|
||
|
||
// int bytesRead = await _innerStream.ReadAsync(buffer, offset, chunkSize, cancellationToken);
|
||
// if (bytesRead == 0) return 0;
|
||
|
||
// // 精确限速控制
|
||
// var expectedTime = (double)(_bytesRead + bytesRead) / _maxBytesPerSecond;
|
||
// var actualTime = _stopwatch.Elapsed.TotalSeconds;
|
||
|
||
// if (actualTime < expectedTime)
|
||
// {
|
||
// var delay = TimeSpan.FromSeconds(expectedTime - actualTime);
|
||
// await Task.Delay(delay, cancellationToken);
|
||
// }
|
||
|
||
// Interlocked.Add(ref _bytesRead, bytesRead);
|
||
// return bytesRead;
|
||
//}
|
||
|
||
public override async Task<int> ReadAsync(
|
||
byte[] buffer,
|
||
int offset,
|
||
int count,
|
||
CancellationToken cancellationToken)
|
||
{
|
||
// 计算剩余可读取字节
|
||
var remaining = _totalLength - _bytesRead;
|
||
if (remaining <= 0) return 0;
|
||
|
||
// 优化1:动态调整块大小为限速值的1/2,减少调用次数
|
||
int chunkSize = (int)Math.Min(
|
||
Math.Min(count, _maxBytesPerSecond / 2),
|
||
remaining
|
||
);
|
||
|
||
// 记录读取开始时间
|
||
long startTicks = _stopwatch.ElapsedTicks;
|
||
|
||
int bytesRead = await _innerStream.ReadAsync(
|
||
buffer,
|
||
offset,
|
||
chunkSize,
|
||
cancellationToken
|
||
);
|
||
|
||
if (bytesRead == 0) return 0;
|
||
|
||
// 优化2:精确计算需要等待的时间
|
||
double requiredSeconds = (double)bytesRead / _maxBytesPerSecond;
|
||
double elapsedSeconds = (double)(_stopwatch.ElapsedTicks - startTicks)
|
||
/ Stopwatch.Frequency;
|
||
|
||
if (elapsedSeconds < requiredSeconds)
|
||
{
|
||
double delaySeconds = requiredSeconds - elapsedSeconds;
|
||
int delayMs = (int)Math.Ceiling(delaySeconds * 1000);
|
||
|
||
// 优化3:使用高精度延迟(最小化系统调度误差)
|
||
await Task.Delay(
|
||
delayMs > 15 ? delayMs : 15, // 绕过Windows最小15ms调度限制
|
||
cancellationToken
|
||
);
|
||
}
|
||
|
||
Interlocked.Add(ref _bytesRead, bytesRead);
|
||
return bytesRead;
|
||
}
|
||
|
||
// 其他必要成员实现
|
||
public override bool CanRead => _innerStream.CanRead;
|
||
public override bool CanSeek => false;
|
||
public override bool CanWrite => false;
|
||
public override long Length => _innerStream.Length;
|
||
public override long Position { get; set; }
|
||
public override void Flush() => throw new NotSupportedException();
|
||
public override int Read(byte[] buffer, int offset, int count)
|
||
=> ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
|
||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||
public override void SetLength(long value) => throw new NotSupportedException();
|
||
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
|
||
}
|
||
}
|
||
|
||
|
||
public class UploadVerifier
|
||
{
|
||
public static async Task VerifyAsync(
|
||
IMinioClient client,
|
||
string bucketName,
|
||
string objectName,
|
||
int maxRetries = 3, // 减少重试次数以避免过度重试
|
||
int initialDelayMs = 500)
|
||
{
|
||
int retryCount = 0;
|
||
while (retryCount++ < maxRetries)
|
||
{
|
||
try
|
||
{
|
||
var stat = await client.StatObjectAsync(new StatObjectArgs()
|
||
.WithBucket(bucketName)
|
||
.WithObject(objectName));
|
||
|
||
Console.WriteLine($"验证成功!文件大小: {stat.Size} 字节, ETag: {stat.ETag}");
|
||
return;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine($"验证失败 ({retryCount}/{maxRetries}): {ex.Message}");
|
||
await Task.Delay(initialDelayMs * retryCount);
|
||
}
|
||
}
|
||
throw new ApplicationException($"文件 {objectName} 未在服务器上出现");
|
||
}
|
||
}
|
||
#endregion
|
||
|
||
|
||
}
|