FieldWorkClient/Provider/ThrottledHttpClientHandler.cs

984 lines
38 KiB
C#

using Minio;
using Minio.DataModel.Args;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace Hopetry.Provider
{
#region 原先代码
//public class ThrottledHttpClientHandler : HttpClientHandler
//{
// private readonly long _maxBytesPerSecond;
// private readonly SemaphoreSlim _throttleSemaphore = new SemaphoreSlim(1, 1);
// private long _bytesTransferred;
// private DateTime _lastCheckTime = DateTime.UtcNow;
// public ThrottledHttpClientHandler(long maxBytesPerSecond)
// {
// _maxBytesPerSecond = maxBytesPerSecond;
// // 忽略证书错误(仅测试环境使用)
// ServerCertificateCustomValidationCallback =
// (message, cert, chain, errors) => true;
// }
// protected override async Task<HttpResponseMessage> SendAsync(
// HttpRequestMessage request,
// CancellationToken cancellationToken)
// {
// try
// {
// // 添加请求日志
// Debug.WriteLine($"Sending request: {request.Method} {request.RequestUri}");
// // 仅对分片数据上传请求限速(修改点:直接限速请求内容,而非响应)
// if (IsUploadPartRequest(request) && request.Content != null)
// {
// request.Content = new ThrottledHttpContent(request.Content, _maxBytesPerSecond);
// }
// var response = await base.SendAsync(request, cancellationToken).ConfigureAwait(false);
// // 调试:检查初始化请求的响应
// if (request.Method == HttpMethod.Post && request.RequestUri?.Query?.Contains("uploads=") == true)
// {
// var rawResponse = response.Content != null
// ? await response.Content.ReadAsStringAsync().ConfigureAwait(false)
// : "No content in response";
// Debug.WriteLine($"初始化请求响应: {rawResponse}");
// }
// return response;
// }
// catch (Exception ex)
// {
// Debug.WriteLine($"Request failed: {ex}");
// throw;
// }
// }
// private bool IsUploadPartRequest(HttpRequestMessage request)
// {
// return request.Method == HttpMethod.Put &&
// request.RequestUri != null &&
// request.RequestUri.Query.Contains("uploadId=") &&
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
// }
//}
//public class ThrottledHttpContent : HttpContent
//{
// private readonly HttpContent _innerContent;
// private readonly long _maxBytesPerSecond;
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
// private long _totalBytesRead;
// private long? _contentLength;
// public ThrottledHttpContent(HttpContent innerContent, long maxBytesPerSecond)
// {
// _innerContent = innerContent ?? throw new ArgumentNullException(nameof(innerContent));
// _maxBytesPerSecond = maxBytesPerSecond;
// foreach (var header in _innerContent.Headers)
// {
// Headers.TryAddWithoutValidation(header.Key, header.Value);
// }
// }
// // 覆盖同步方法,强制调用异步版本(避免直接抛出异常)
// protected override void SerializeToStream(Stream stream, TransportContext context, CancellationToken cancellationToken)
// {
// // 同步调用时,直接运行异步方法并阻塞等待(不推荐,但可以避免异常)
// SerializeToStreamAsync(stream, context, cancellationToken).GetAwaiter().GetResult();
// }
// protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
// {
// return SerializeToStreamAsync(stream, context, CancellationToken.None);
// }
// protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken)
// {
// var buffer = new byte[81920]; // 80KB buffer
// using (var contentStream = await _innerContent.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
// {
// while (true)
// {
// var bytesRead = await contentStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
// if (bytesRead == 0) break;
// await stream.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
// _totalBytesRead += bytesRead;
// var elapsedSeconds = _stopwatch.Elapsed.TotalSeconds;
// var targetSeconds = _totalBytesRead / (double)_maxBytesPerSecond;
// if (elapsedSeconds < targetSeconds)
// {
// var delayMilliseconds = (int)((targetSeconds - elapsedSeconds) * 1000);
// await Task.Delay(delayMilliseconds, cancellationToken).ConfigureAwait(false);
// }
// }
// }
// }
// protected override bool TryComputeLength(out long length)
// {
// if (_contentLength.HasValue)
// {
// length = _contentLength.Value;
// return true;
// }
// length = 0;
// return false;
// }
// protected override void Dispose(bool disposing)
// {
// if (disposing)
// {
// _innerContent?.Dispose();
// _stopwatch.Stop();
// }
// base.Dispose(disposing);
// }
//}
#endregion
#region 新代码
//public class ThrottledHttpClientHandler : HttpClientHandler
//{
// private readonly int _maxBytesPerSecond;
// public ThrottledHttpClientHandler(int maxBytesPerSecond = 1024 * 50) // 默认50KB/s
// {
// _maxBytesPerSecond = maxBytesPerSecond;
// }
// protected override async Task<HttpResponseMessage> SendAsync(
// HttpRequestMessage request,
// CancellationToken cancellationToken)
// {
// var originalContent = request.Content;
// if (IsUploadPartRequest(request) && request.Content != null)
// {
// request.Content = new ThrottledStreamContent(originalContent, _maxBytesPerSecond, cancellationToken);
// }
// try
// {
// return await base.SendAsync(request, cancellationToken);
// }
// finally
// {
// request.Content = originalContent;
// }
// }
// private bool IsUploadPartRequest(HttpRequestMessage request)
// {
// return request.Method == HttpMethod.Put &&
// request.RequestUri != null &&
// request.RequestUri.Query.Contains("uploadId=") &&
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
// }
// private class ThrottledStreamContent : HttpContent
// {
// private readonly HttpContent _originalContent;
// private readonly int _maxBytesPerSecond;
// private long _bytesSent;
// private readonly Stopwatch _stopwatch;
// private readonly CancellationToken _cancellationToken;
// public ThrottledStreamContent(HttpContent originalContent, int maxBytesPerSecond, CancellationToken cancellationToken)
// {
// _originalContent = originalContent;
// _maxBytesPerSecond = maxBytesPerSecond;
// _stopwatch = Stopwatch.StartNew();
// _cancellationToken = cancellationToken;
// // 复制原始内容的Headers
// foreach (var header in originalContent.Headers)
// {
// Headers.TryAddWithoutValidation(header.Key, header.Value);
// }
// }
// protected override async Task SerializeToStreamAsync(
// Stream stream,
// TransportContext context)
// {
// using var originalStream = await _originalContent.ReadAsStreamAsync();
// var buffer = new byte[4096];
// int bytesRead;
// while ((bytesRead = await originalStream.ReadAsync(buffer)) > 0)
// {
// await stream.WriteAsync(buffer, 0, bytesRead);
// _bytesSent += bytesRead;
// // 计算需要等待的时间
// var elapsedMs = _stopwatch.ElapsedMilliseconds;
// var allowedTimeMs = (int)(_bytesSent * 1000d / _maxBytesPerSecond);
// var waitMs = allowedTimeMs - (int)elapsedMs;
// if (waitMs > 0)
// {
// await Task.Delay(waitMs, _cancellationToken);
// }
// }
// }
// //
// protected override bool TryComputeLength(out long length)
// {
// // 使用反射调用原始内容的TryComputeLength方法
// var method = _originalContent.GetType().GetMethod(
// "TryComputeLength",
// BindingFlags.NonPublic | BindingFlags.Instance,
// null,
// new[] { typeof(long).MakeByRefType() }, // 正确构造out参数类型
// null
// );
// if (method != null)
// {
// // 通过对象数组处理out参数
// object[] parameters = new object[] { null };
// bool result = (bool)method.Invoke(_originalContent, parameters);
// length = (long)parameters[0];
// return result;
// }
// length = 0;
// return false;
// }
// protected override void Dispose(bool disposing)
// {
// _originalContent?.Dispose();
// base.Dispose(disposing);
// }
// }
//}
#endregion
#region
//public class MinIOThrottledStream : Stream
//{
// private readonly Stream _innerStream;
// private readonly BandwidthLimiter _limiter;
// private long _position;
// public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter)
// {
// _innerStream = innerStream;
// _limiter = limiter;
// }
// public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
// {
// // 计算本次允许读取的最大字节数
// int bytesToRead = await _limiter.GetAllowedBytesAsync(count, cancellationToken);
// if (bytesToRead <= 0) return 0;
// int bytesRead = await _innerStream.ReadAsync(buffer, offset, bytesToRead, cancellationToken);
// _position += bytesRead;
// return bytesRead;
// }
// // 其他必要Stream成员实现...
// public override bool CanRead => _innerStream.CanRead;
// public override bool CanSeek => false;
// public override bool CanWrite => false;
// public override long Length => _innerStream.Length;
// public override long Position
// {
// get => _position;
// set => throw new NotSupportedException();
// }
// public override long Seek(long offset, SeekOrigin origin)
// {
// throw new NotSupportedException("Seeking is not supported");
// }
// public override void SetLength(long value)
// {
// throw new NotSupportedException("SetLength is not supported");
// }
// public override void Write(byte[] buffer, int offset, int count)
// {
// throw new NotSupportedException("Writing is not supported");
// }
// // 同步读取方法(调用异步版本)
// public override int Read(byte[] buffer, int offset, int count)
// {
// return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
// }
// public override void Flush() => _innerStream.Flush();
// // ... 其他必要实现
//}
//public class BandwidthLimiter
//{
// private readonly long _bytesPerSecond;
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
// private long _bytesTransferred;
// private readonly SemaphoreSlim _semaphore = new(1, 1);
// public BandwidthLimiter(long bytesPerSecond)
// {
// _bytesPerSecond = bytesPerSecond;
// }
// public async Task<int> GetAllowedBytesAsync(int requestedBytes, CancellationToken ct)
// {
// await _semaphore.WaitAsync(ct);
// try
// {
// var elapsed = _stopwatch.Elapsed;
// _stopwatch.Restart();
// // 计算时间窗口内允许的字节数
// long allowedBytes = (long)(_bytesPerSecond * elapsed.TotalSeconds);
// _bytesTransferred = Math.Max(0, _bytesTransferred - allowedBytes);
// // 计算本次实际允许的字节数
// int actualBytes = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred);
// if (actualBytes <= 0)
// {
// var waitTime = TimeSpan.FromSeconds((_bytesTransferred + requestedBytes - _bytesPerSecond) / (double)_bytesPerSecond);
// await Task.Delay(waitTime, ct);
// _bytesTransferred = 0;
// return requestedBytes;
// }
// _bytesTransferred += actualBytes;
// return actualBytes;
// }
// finally
// {
// _semaphore.Release();
// }
// }
//}
//public class ThrottledHttpClientHandler : HttpClientHandler
//{
// private readonly BandwidthLimiter _limiter;
// public ThrottledHttpClientHandler(long bytesPerSecond)
// {
// _limiter = new BandwidthLimiter(bytesPerSecond);
// ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true;
// }
// protected override async Task<HttpResponseMessage> SendAsync(
// HttpRequestMessage request,
// CancellationToken cancellationToken)
// {
// if (IsUploadPartRequest(request))
// {
// var originalContent = request.Content;
// var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken);
// var throttledStream = new MinIOThrottledStream(sourceStream, _limiter);
// request.Content = new StreamContent(throttledStream);
// foreach (var header in originalContent.Headers)
// request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value);
// }
// return await base.SendAsync(request, cancellationToken);
// }
// private bool IsUploadPartRequest(HttpRequestMessage request)
// {
// return request.Method == HttpMethod.Put &&
// request.RequestUri != null &&
// request.RequestUri.Query.Contains("uploadId=") &&
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
// }
//}
#endregion
#region 最新代码
//public class MinIOThrottledStream : Stream
//{
// private readonly Stream _innerStream;
// private readonly BandwidthLimiter _limiter;
// private long _position;
// private readonly object _syncLock = new();
// public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter)
// {
// _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
// }
// public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
// {
// int allowedBytes = await _limiter.GetAllowedBytesAsync(buffer.Length, cancellationToken);
// if (allowedBytes <= 0) return 0;
// int bytesRead = await _innerStream.ReadAsync(buffer[..allowedBytes], cancellationToken);
// Interlocked.Add(ref _position, bytesRead);
// return bytesRead;
// }
// // 其他必要成员实现
// public override bool CanRead => _innerStream.CanRead;
// public override bool CanSeek => false;
// public override bool CanWrite => false;
// public override long Length => _innerStream.Length;
// public override long Position { get => _position; set => throw new NotSupportedException(); }
// public override int Read(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
// public override void Flush() => _innerStream.Flush();
// // ...其他必要实现...
// public override long Seek(long offset, SeekOrigin origin)
// {
// throw new NotSupportedException("Seeking is not supported");
// }
// public override void SetLength(long value)
// {
// throw new NotSupportedException("SetLength is not supported");
// }
// public override void Write(byte[] buffer, int offset, int count)
// {
// throw new NotSupportedException("Writing is not supported");
// }
// //// 同步读取方法(调用异步版本)
// //public override int Read(byte[] buffer, int offset, int count)
// //{
// // return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
// //}
//}
//public class BandwidthLimiter : IDisposable
//{
// private readonly long _bytesPerSecond;
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
// private long _bytesTransferred;
// private readonly SemaphoreSlim _semaphore = new(1, 1);
// private static readonly double _ticksPerSecond = Stopwatch.Frequency; // 动态获取计时器精度
// public BandwidthLimiter(long bytesPerSecond)
// {
// _bytesPerSecond = bytesPerSecond > 0 ? bytesPerSecond : throw new ArgumentOutOfRangeException(nameof(bytesPerSecond));
// }
// public async ValueTask<int> GetAllowedBytesAsync(int requestedBytes, CancellationToken ct)
// {
// if (requestedBytes <= 0) return 0;
// await _semaphore.WaitAsync(ct);
// try
// {
// // 计算自上次请求以来的时间窗口
// long elapsedTicks = _stopwatch.ElapsedTicks;
// _stopwatch.Restart();
// // 动态计算允许传输的字节数(基于实际计时器频率)
// double elapsedSeconds = (double)elapsedTicks / _ticksPerSecond;
// long allowedBytes = (long)(_bytesPerSecond * elapsedSeconds);
// // 更新令牌桶:扣除本时间窗口允许的字节数
// _bytesTransferred = Math.Max(0, _bytesTransferred - allowedBytes);
// // 计算实际可传输的字节数
// int actualBytes = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred);
// // 无可用配额时触发等待
// if (actualBytes <= 0)
// {
// // 计算需要等待的时间(精确到毫秒)
// double deficitSeconds = (_bytesTransferred + requestedBytes) / (double)_bytesPerSecond;
// await Task.Delay(TimeSpan.FromSeconds(deficitSeconds), ct);
// // 重置累计值并允许本次完整传输
// _bytesTransferred = 0;
// return requestedBytes;
// }
// // 更新累计值并返回允许的字节数
// _bytesTransferred += actualBytes;
// return actualBytes;
// }
// finally
// {
// _semaphore.Release();
// }
// }
// public void Dispose() => _semaphore.Dispose();
//}
//public class ThrottledHttpClientHandler : HttpClientHandler
//{
// private readonly BandwidthLimiter _limiter;
// public ThrottledHttpClientHandler(BandwidthLimiter limiter)
// {
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
// ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true;
// }
// protected override async Task<HttpResponseMessage> SendAsync(
// HttpRequestMessage request,
// CancellationToken cancellationToken)
// {
// if (IsUploadPartRequest(request))
// {
// var originalContent = request.Content;
// var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken);
// request.Content = new StreamContent(new MinIOThrottledStream(sourceStream, _limiter));
// foreach (var header in originalContent.Headers)
// request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value);
// }
// return await base.SendAsync(request, cancellationToken);
// }
// private bool IsUploadPartRequest(HttpRequestMessage request) =>
// request.Method == HttpMethod.Put &&
// request.RequestUri?.Query.Contains("uploadId=") == true &&
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
// protected override void Dispose(bool disposing)
// {
// _limiter?.Dispose();
// base.Dispose(disposing);
// }
//}
#endregion
#region 限制总体进度
//public class MinIOThrottledStream : Stream
//{
// private readonly Stream _innerStream;
// private readonly BandwidthLimiter _limiter;
// private long _position;
// private bool _disposed;
// private readonly long _totalLength;
// private long _remainingBytes;
// public MinIOThrottledStream(Stream innerStream, BandwidthLimiter limiter)
// {
// _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
// _totalLength = innerStream.Length;
// _remainingBytes = _totalLength;
// }
// public override async ValueTask<int> ReadAsync(Memory<byte> buffer,
// CancellationToken cancellationToken = default)
// {
// if (_disposed) throw new ObjectDisposedException(nameof(MinIOThrottledStream));
// // 关键修改:基于剩余字节数检测完成
// if (_remainingBytes <= 0)
// return 0;
// int allowedBytes = await _limiter.GetAllowedBytesAsync(
// (int)Math.Min(buffer.Length, _remainingBytes),
// cancellationToken);
// if (allowedBytes <= 0)
// return 0;
// int bytesRead = await _innerStream.ReadAsync(buffer[..allowedBytes], cancellationToken);
// if (bytesRead > 0)
// {
// Interlocked.Add(ref _position, bytesRead);
// Interlocked.Add(ref _remainingBytes, -bytesRead);
// }
// if (_remainingBytes == 0)
// {
// await _innerStream.FlushAsync(cancellationToken);
// }
// return bytesRead;
// }
// protected override void Dispose(bool disposing)
// {
// if (_disposed) return;
// if (disposing)
// {
// _innerStream?.Dispose();
// }
// base.Dispose(disposing);
// _disposed = true;
// }
// // 其他必要成员实现
// public override bool CanRead => _innerStream.CanRead;
// public override bool CanSeek => false;
// public override bool CanWrite => false;
// public override long Length => _innerStream.Length;
// public override long Position { get => _position; set => throw new NotSupportedException(); }
// public override int Read(byte[] buffer, int offset, int count) => ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
// public override void Flush() => _innerStream.Flush();
// // ...其他必要实现...
// public override long Seek(long offset, SeekOrigin origin)
// {
// throw new NotSupportedException("Seeking is not supported");
// }
// public override void SetLength(long value)
// {
// throw new NotSupportedException("SetLength is not supported");
// }
// public override void Write(byte[] buffer, int offset, int count)
// {
// throw new NotSupportedException("Writing is not supported");
// }
// //// 同步读取方法(调用异步版本)
// //public override int Read(byte[] buffer, int offset, int count)
// //{
// // return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
// //}
//}
//public class BandwidthLimiter : IDisposable
//{
// private readonly long _bytesPerSecond;
// private readonly SemaphoreSlim _semaphore = new(1, 1);
// private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
// private long _bytesTransferred;
// private bool _disposed;
// public BandwidthLimiter(long bytesPerSecond)
// {
// _bytesPerSecond = bytesPerSecond > 0 ? bytesPerSecond :
// throw new ArgumentOutOfRangeException(nameof(bytesPerSecond));
// }
// public async ValueTask<int> GetAllowedBytesAsync(int requestedBytes, CancellationToken ct)
// {
// if (_disposed) return requestedBytes; // 释放后不再限速
// await _semaphore.WaitAsync(ct);
// try
// {
// var elapsed = _stopwatch.Elapsed;
// _stopwatch.Restart();
// // 更精确的令牌桶算法
// double allowance = elapsed.TotalSeconds * _bytesPerSecond;
// _bytesTransferred = Math.Max(0, _bytesTransferred - (long)allowance);
// if (_bytesTransferred < _bytesPerSecond)
// {
// int allowed = (int)Math.Min(requestedBytes, _bytesPerSecond - _bytesTransferred);
// _bytesTransferred += allowed;
// return allowed;
// }
// // 需要等待
// double deficitSeconds = (_bytesTransferred + requestedBytes - _bytesPerSecond) / (double)_bytesPerSecond;
// await Task.Delay(TimeSpan.FromSeconds(deficitSeconds), ct);
// _bytesTransferred = 0;
// return requestedBytes;
// }
// finally
// {
// _semaphore.Release();
// }
// }
// public void Dispose()
// {
// if (_disposed) return;
// _semaphore.Dispose();
// _disposed = true;
// }
//}
//public class ThrottledHttpClientHandler : HttpClientHandler
//{
// private readonly BandwidthLimiter _limiter;
// private readonly CancellationTokenSource _linkedCts = new(); // 用于取消挂起的操作
// private bool _disposed;
// private readonly ConcurrentDictionary<Stream, bool> _activeStreams = new();
// public ThrottledHttpClientHandler(BandwidthLimiter limiter)
// {
// _limiter = limiter ?? throw new ArgumentNullException(nameof(limiter));
// ServerCertificateCustomValidationCallback = (msg, cert, chain, errors) => true;
// }
// protected override async Task<HttpResponseMessage> SendAsync(
// HttpRequestMessage request,
// CancellationToken cancellationToken)
// {
// if (!IsUploadPartRequest(request))
// return await base.SendAsync(request, cancellationToken);
// var originalContent = request.Content;
// try
// {
// var sourceStream = await originalContent.ReadAsStreamAsync(cancellationToken);
// var throttledStream = new MinIOThrottledStream(sourceStream, _limiter);
// _activeStreams.TryAdd(throttledStream, true);
// var throttledContent = new StreamContent(throttledStream);
// foreach (var header in originalContent.Headers)
// throttledContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
// request.Content = throttledContent;
// var response = await base.SendAsync(request, cancellationToken);
// // 关键修改:确保最后一个分片的完成
// if (IsFinalPart(request))
// {
// await CompleteMultipartUpload(response);
// }
// return response;
// }
// finally
// {
// originalContent?.Dispose();
// }
// }
// private bool IsFinalPart(HttpRequestMessage request)
// {
// // 通过查询参数检测是否是最后一个分片
// return request.RequestUri.Query.Contains("uploadId=") &&
// request.RequestUri.Query.Contains("partNumber=");
// }
// private async Task CompleteMultipartUpload(HttpResponseMessage response)
// {
// try
// {
// // 确保响应完全读取
// await response.Content.ReadAsByteArrayAsync();
// // 等待所有限速流完成
// while (!_activeStreams.IsEmpty)
// {
// await Task.Delay(100);
// }
// }
// catch { /* 忽略清理错误 */ }
// }
// // 允许外部代码主动取消所有挂起的操作
// public void CancelPendingOperations()
// {
// _linkedCts.Cancel();
// }
// //// 判断是否为需要限速的上传请求
// //private bool IsUploadPartRequest(HttpRequestMessage request)
// //{
// // return request.Method == HttpMethod.Put &&
// // request.RequestUri != null &&
// // request.RequestUri.Query.Contains("uploadId=") &&
// // request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
// //}
// /// <summary>
// /// 精准识别分片数据上传请求(关键逻辑)
// /// </summary>
// private static bool IsUploadPartRequest(HttpRequestMessage request)
// {
// return request.Method == HttpMethod.Put &&
// request.RequestUri != null &&
// request.RequestUri.Query.Contains("uploadId=") &&
// request.RequestUri.Query.Contains("partNumber=") &&
// request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
// }
// protected override void Dispose(bool disposing)
// {
// if (_disposed) return;
// if (disposing)
// {
// // 取消所有挂起的操作并释放资源
// CancelPendingOperations();
// _linkedCts.Dispose();
// _limiter?.Dispose();
// }
// base.Dispose(disposing);
// _disposed = true;
// }
//}
#endregion
#region test
public class MinIOThrottledHandler : DelegatingHandler
{
private readonly long _bytesPerSecond;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private long _bytesTransferred;
private DateTime _lastUpdate = DateTime.UtcNow;
public MinIOThrottledHandler(long bytesPerSecond, HttpMessageHandler innerHandler)
: base(innerHandler)
{
_bytesPerSecond = bytesPerSecond;
}
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
// 仅对分片数据上传请求限速(关键修复点)
if (IsPartUploadRequest(request))
{
var originalContent = request.Content;
var contentStream = await originalContent.ReadAsStreamAsync(cancellationToken);
// 创建限速包装流
var throttledStream = new ThrottledStream(
contentStream,
_bytesPerSecond,
originalContent.Headers.ContentLength ?? -1
);
// 重建请求内容
request.Content = new StreamContent(throttledStream);
// 保持原始头信息(避免破坏 MinIO 的签名计算)
foreach (var header in originalContent.Headers)
{
request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
}
// 放行所有其他请求(包括初始化分片和完成分片请求)
return await base.SendAsync(request, cancellationToken);
}
/// <summary>
/// 精准识别分片数据上传请求(关键逻辑)
/// </summary>
private static bool IsPartUploadRequest(HttpRequestMessage request)
{
return request.Method == HttpMethod.Put &&
request.RequestUri != null &&
request.RequestUri.Query.Contains("uploadId=") &&
request.RequestUri.Query.Contains("partNumber=") &&
request.Content?.Headers?.ContentType?.MediaType == "application/octet-stream";
}
/// <summary>
/// 限速流实现(精确控制每个分片的上传速度)
/// </summary>
private class ThrottledStream : Stream
{
private readonly Stream _innerStream;
private readonly long _maxBytesPerSecond;
private readonly long _totalLength;
private long _bytesRead;
private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
public ThrottledStream(Stream innerStream, long bytesPerSecond, long totalLength)
{
_innerStream = innerStream;
_maxBytesPerSecond = bytesPerSecond;
_totalLength = totalLength;
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// 计算剩余可读取字节数
var remaining = _totalLength - _bytesRead;
if (remaining <= 0) return 0;
// 动态调整缓冲区大小(优化大文件传输)
int chunkSize = (int)Math.Min(
Math.Min(count, _maxBytesPerSecond / 10), // 每 chunk 不超过限速的 1/10
remaining
);
int bytesRead = await _innerStream.ReadAsync(buffer, offset, chunkSize, cancellationToken);
if (bytesRead == 0) return 0;
// 精确限速控制
var expectedTime = (double)(_bytesRead + bytesRead) / _maxBytesPerSecond;
var actualTime = _stopwatch.Elapsed.TotalSeconds;
if (actualTime < expectedTime)
{
var delay = TimeSpan.FromSeconds(expectedTime - actualTime);
await Task.Delay(delay, cancellationToken);
}
Interlocked.Add(ref _bytesRead, bytesRead);
return bytesRead;
}
// 其他必要成员实现
public override bool CanRead => _innerStream.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => _innerStream.Length;
public override long Position { get; set; }
public override void Flush() => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
=> ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
}
public class UploadVerifier
{
public static async Task VerifyAsync(
IMinioClient client,
string bucketName,
string objectName,
int maxRetries = 5,
int initialDelayMs = 1000)
{
int retryCount = 0;
while (retryCount++ < maxRetries)
{
try
{
var stat = await client.StatObjectAsync(new StatObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName));
Console.WriteLine($"验证成功!文件大小: {stat.Size} 字节, ETag: {stat.ETag}");
return;
}
catch (Exception ex)
{
Console.WriteLine($"验证失败 ({retryCount}/{maxRetries}): {ex.Message}");
await Task.Delay(initialDelayMs * retryCount);
}
}
throw new ApplicationException($"文件 {objectName} 未在服务器上出现");
}
}
#endregion
}