Yolov/windows_utils.py

682 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# windows_utils.py
import os
import sys
import platform
import subprocess
import ctypes
import traceback
from log import logger
class WindowsSystemUtils:
"""Windows系统工具类"""
@staticmethod
def is_windows():
"""检查是否是Windows系统"""
return os.name == 'nt' or sys.platform.startswith('win')
@staticmethod
def get_windows_version():
"""获取Windows版本信息"""
if not WindowsSystemUtils.is_windows():
return None
version_info = {
'system': platform.system(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor(),
'architecture': platform.architecture()[0]
}
# 获取详细版本信息
try:
# 使用ctypes获取Windows版本
class OSVERSIONINFOEX(ctypes.Structure):
_fields_ = [
('dwOSVersionInfoSize', ctypes.c_ulong),
('dwMajorVersion', ctypes.c_ulong),
('dwMinorVersion', ctypes.c_ulong),
('dwBuildNumber', ctypes.c_ulong),
('dwPlatformId', ctypes.c_ulong),
('szCSDVersion', ctypes.c_wchar * 128),
('wServicePackMajor', ctypes.c_ushort),
('wServicePackMinor', ctypes.c_ushort),
('wSuiteMask', ctypes.c_ushort),
('wProductType', ctypes.c_byte),
('wReserved', ctypes.c_byte)
]
os_version = OSVERSIONINFOEX()
os_version.dwOSVersionInfoSize = ctypes.sizeof(OSVERSIONINFOEX)
if ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version)) == 0:
version_info['major_version'] = os_version.dwMajorVersion
version_info['minor_version'] = os_version.dwMinorVersion
version_info['build_number'] = os_version.dwBuildNumber
version_info['service_pack'] = os_version.wServicePackMajor
# 转换为可读版本名称
if version_info['major_version'] == 10:
version_info['name'] = f"Windows 10/11 (Build {version_info['build_number']})"
elif version_info['major_version'] == 6:
if version_info['minor_version'] == 3:
version_info['name'] = "Windows 8.1"
elif version_info['minor_version'] == 2:
version_info['name'] = "Windows 8"
elif version_info['minor_version'] == 1:
version_info['name'] = "Windows 7"
elif version_info['minor_version'] == 0:
version_info['name'] = "Windows Vista"
elif version_info['major_version'] == 5:
if version_info['minor_version'] == 2:
version_info['name'] = "Windows XP Professional x64"
elif version_info['minor_version'] == 1:
version_info['name'] = "Windows XP"
except Exception as e:
logger.warning(f"获取Windows版本信息失败: {str(e)}")
return version_info
@staticmethod
def check_ffmpeg_installation():
"""检查FFmpeg安装"""
try:
# Windows上使用where命令查找ffmpeg
result = subprocess.run(
'where ffmpeg',
shell=True,
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
ffmpeg_path = result.stdout.strip().split('\n')[0]
# 获取版本信息
version_result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=5
)
version_info = {}
if version_result.returncode == 0:
lines = version_result.stdout.split('\n')
for line in lines:
if 'version' in line.lower():
version_info['version'] = line.strip()
break
return {
'installed': True,
'path': ffmpeg_path,
'version': version_info.get('version', 'unknown'),
'details': lines[0] if lines else 'unknown'
}
except subprocess.TimeoutExpired:
return {'installed': False, 'error': 'Timeout checking ffmpeg'}
except FileNotFoundError:
pass
except Exception as e:
logger.warning(f"检查FFmpeg失败: {str(e)}")
return {'installed': False, 'version': 'not found'}
@staticmethod
def check_rtmp_server_accessibility(rtmp_url):
"""检查RTMP服务器可达性"""
if not rtmp_url.startswith('rtmp://'):
return {'accessible': False, 'error': 'Invalid RTMP URL format'}
try:
# 提取主机和端口
parts = rtmp_url.replace('rtmp://', '').split('/')[0].split(':')
host = parts[0]
port = int(parts[1]) if len(parts) > 1 else 1935
# Windows网络连接检查
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return {'accessible': True, 'host': host, 'port': port}
else:
# Windows错误代码含义
error_map = {
10061: 'Connection refused (服务器拒绝连接)',
10060: 'Connection timed out (连接超时)',
10013: 'Permission denied (权限被拒绝)',
10048: 'Address already in use (地址已被使用)',
10049: 'Cannot assign requested address (无法分配请求的地址)',
10050: 'Network is down (网络断开)',
10051: 'Network is unreachable (网络不可达)',
}
error_msg = error_map.get(result, f'Unknown error code: {result}')
return {
'accessible': False,
'host': host,
'port': port,
'error_code': result,
'error': error_msg
}
except Exception as e:
return {'accessible': False, 'error': str(e)}
@staticmethod
def optimize_windows_for_streaming():
"""优化Windows系统设置以支持推流"""
optimizations = {}
# 1. 检查电源设置
try:
# 使用powercfg检查当前电源方案
result = subprocess.run(
'powercfg /getactivescheme',
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
optimizations['power_scheme'] = result.stdout.strip()
else:
optimizations['power_scheme'] = 'Cannot check power scheme'
# 建议使用高性能电源方案
optimizations['power_recommendation'] = (
'For best streaming performance, use High Performance power plan. '
'Run: powercfg /setactive 8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c'
)
except Exception as e:
optimizations['power_scheme'] = f'Error checking power: {str(e)}'
# 2. 防火墙建议
optimizations['firewall'] = 'Consider adding firewall rules for RTMP ports (1935, 1936)'
# 3. 网络优化建议
optimizations['network_optimizations'] = [
'Increase TCP buffer size: netsh int tcp set global autotuninglevel=normal',
'Disable TCP auto-tuning (if unstable): netsh int tcp set global autotuninglevel=disabled',
'Enable TCP fast open: netsh int tcp set global fastopen=enabled',
'Set TCP keepalive: netsh int tcp set global keepalivetime=30000'
]
# 4. 显卡设置建议
optimizations['gpu_recommendations'] = [
'Update graphics drivers to latest version',
'In NVIDIA Control Panel: Set Power Management Mode to "Prefer Maximum Performance"',
'In Windows Graphics Settings: Add ffmpeg.exe and set to "High Performance"'
]
return optimizations
@staticmethod
def create_windows_firewall_rule(port, name="RTMP Streaming"):
"""创建Windows防火墙规则"""
commands = [
f'netsh advfirewall firewall add rule name="{name}" dir=in action=allow protocol=TCP localport={port}',
f'netsh advfirewall firewall add rule name="{name} UDP" dir=in action=allow protocol=UDP localport={port}'
]
results = []
for cmd in commands:
try:
# Windows上以管理员权限运行
import ctypes
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
if not is_admin:
results.append({
'command': cmd,
'success': False,
'error': '需要管理员权限运行此命令'
})
continue
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
results.append({
'command': cmd,
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
})
except Exception as e:
results.append({
'command': cmd,
'success': False,
'error': str(e)
})
return results
@staticmethod
def get_system_resources():
"""获取Windows系统资源 - 修复版"""
import psutil
resources = {
'cpu_percent': psutil.cpu_percent(interval=0.1),
'cpu_count': psutil.cpu_count(),
'cpu_freq': getattr(psutil.cpu_freq(), 'current', 0) if psutil.cpu_freq() else 0,
'memory_percent': psutil.virtual_memory().percent,
'memory_used_gb': psutil.virtual_memory().used / (1024 ** 3),
'memory_total_gb': psutil.virtual_memory().total / (1024 ** 3),
'process_count': len(psutil.pids()),
}
try:
# 磁盘IO可能在某些系统上不可用
disk_io = psutil.disk_io_counters()
if disk_io:
resources['disk_io'] = {
'read_bytes': disk_io.read_bytes,
'write_bytes': disk_io.write_bytes,
'read_count': disk_io.read_count,
'write_count': disk_io.write_count
}
except:
resources['disk_io'] = {'available': False}
try:
# 网络IO
net_io = psutil.net_io_counters()
resources['network_io'] = {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv
}
except:
resources['network_io'] = {'available': False}
# GPU信息 - 多种方法获取
resources['gpu_info'] = WindowsSystemUtils._get_gpu_info()
return resources
@staticmethod
def _get_gpu_info():
"""获取GPU信息 - 支持多种方法"""
gpu_info = {
'method': 'none',
'gpus': []
}
# 方法2: 使用torch获取GPU信息 (如果可用)
try:
import torch
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
gpu_data = {
'name': torch.cuda.get_device_name(i),
'total_memory_mb': torch.cuda.get_device_properties(i).total_memory / (1024 ** 2),
'method': 'torch'
}
gpu_info['gpus'].append(gpu_data)
if gpu_info['gpus']:
gpu_info['method'] = 'torch'
return gpu_info
except ImportError:
logger.debug("torch未安装跳过PyTorch GPU检测")
except Exception as e:
logger.debug(f"PyTorch GPU检测失败: {str(e)}")
# 方法3: 使用dxdiag命令获取GPU信息 (Windows原生)
try:
# 运行dxdiag并保存到临时文件
import tempfile
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
temp_file.close()
result = subprocess.run(
f'dxdiag /t {temp_file.name}',
shell=True,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
with open(temp_file.name, 'r', encoding='utf-8', errors='ignore') as f:
dxdiag_content = f.read()
# 解析dxdiag输出
import re
# 查找显示设备部分
display_sections = re.split(r'Display Devices\n-+', dxdiag_content)
if len(display_sections) > 1:
for section in display_sections[1:]:
# 提取显卡名称
name_match = re.search(r'Card name:\s*(.+)', section)
# 提取显存
memory_match = re.search(r'Display Memory:\s*(\d+)', section)
# 提取驱动版本
driver_match = re.search(r'Driver Version:\s*(.+)', section)
gpu_data = {
'name': name_match.group(1).strip() if name_match else 'Unknown GPU',
'display_memory_mb': int(memory_match.group(1)) if memory_match else 0,
'driver_version': driver_match.group(1).strip() if driver_match else 'Unknown',
'method': 'dxdiag'
}
gpu_info['gpus'].append(gpu_data)
if gpu_info['gpus']:
gpu_info['method'] = 'dxdiag'
# 清理临时文件
try:
os.unlink(temp_file.name)
except:
pass
except Exception as e:
logger.debug(f"dxdiag GPU检测失败: {str(e)}")
# 方法4: 使用Windows注册表 (高级方法)
try:
import winreg
# 打开显卡注册表键
reg_path = r"SYSTEM\CurrentControlSet\Control\Class\{4d36e968-e325-11ce-bfc1-08002be10318}"
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path) as key:
i = 0
while True:
try:
subkey_name = winreg.EnumKey(key, i)
subkey_path = f"{reg_path}\\{subkey_name}"
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, subkey_path) as subkey:
try:
driver_desc = winreg.QueryValueEx(subkey, "DriverDesc")[0]
# 跳过基本显示适配器
if "basic display" not in driver_desc.lower():
gpu_data = {
'name': driver_desc,
'method': 'registry'
}
gpu_info['gpus'].append(gpu_data)
except WindowsError:
pass
i += 1
except WindowsError:
break
if gpu_info['gpus']:
gpu_info['method'] = 'registry'
except Exception as e:
logger.debug(f"注册表GPU检测失败: {str(e)}")
return gpu_info
@staticmethod
def test_ffmpeg_streaming():
"""测试FFmpeg推流功能"""
test_results = []
# 测试1: 基本的FFmpeg功能
try:
result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=5
)
test_results.append({
'test': 'ffmpeg_version',
'success': result.returncode == 0,
'output': result.stdout[:100] if result.stdout else 'No output'
})
except Exception as e:
test_results.append({
'test': 'ffmpeg_version',
'success': False,
'error': str(e)
})
# 测试2: 编码器支持
# 测试3: 生成测试视频并推流到null (本地测试)
try:
# 创建一个简单的测试命令
test_command = [
'ffmpeg',
'-f', 'lavfi',
'-i', 'testsrc=duration=2:size=640x480:rate=30',
'-c:v', 'libx264',
'-t', '1', # 只运行1秒
'-f', 'null', # 输出到null
'-'
]
result = subprocess.run(
test_command,
capture_output=True,
text=True,
timeout=10
)
test_results.append({
'test': 'ffmpeg_basic_encode',
'success': result.returncode == 0,
'output': 'Success' if result.returncode == 0 else result.stderr[:200]
})
except Exception as e:
test_results.append({
'test': 'ffmpeg_basic_encode',
'success': False,
'error': str(e)
})
return test_results
# 系统检测和配置
def detect_and_configure_windows():
"""检测并配置Windows系统"""
if not WindowsSystemUtils.is_windows():
logger.info("非Windows系统跳过Windows特定配置")
return None
logger.info("检测到Windows系统进行系统检测和配置...")
config_result = {
'system_info': {},
'ffmpeg_info': {},
'gpu_info': {},
'optimizations': {},
'resources': {},
'ffmpeg_tests': [],
'status': 'unknown',
'issues': [],
'recommendations': []
}
try:
# 1. 获取系统信息
system_info = WindowsSystemUtils.get_windows_version()
config_result['system_info'] = system_info
logger.info(f"Windows系统: {system_info.get('name', 'Unknown')}")
# 2. 检查FFmpeg
ffmpeg_info = WindowsSystemUtils.check_ffmpeg_installation()
config_result['ffmpeg_info'] = ffmpeg_info
if not ffmpeg_info['installed']:
config_result['issues'].append("FFmpeg未安装")
config_result['recommendations'].append({
'priority': 'critical',
'action': '安装FFmpeg',
'details': '从 https://github.com/BtbN/FFmpeg-Builds/releases 下载 ffmpeg-master-latest-win64-gpl.zip解压并添加bin目录到PATH'
})
logger.error("FFmpeg未安装")
else:
logger.info(f"FFmpeg版本: {ffmpeg_info['version']}")
logger.info(f"FFmpeg路径: {ffmpeg_info.get('path', 'unknown')}")
# 运行FFmpeg测试
logger.info("运行FFmpeg功能测试...")
ffmpeg_tests = WindowsSystemUtils.test_ffmpeg_streaming()
config_result['ffmpeg_tests'] = ffmpeg_tests
# 检查测试结果
failed_tests = [t for t in ffmpeg_tests if not t.get('success', False)]
if failed_tests:
config_result['issues'].append(f"FFmpeg测试失败 ({len(failed_tests)}个)")
for test in failed_tests:
logger.warning(f"FFmpeg测试失败: {test.get('test')}")
else:
logger.info(f"FFmpeg功能测试成功")
# 3. 获取GPU信息
gpu_info = WindowsSystemUtils._get_gpu_info()
config_result['gpu_info'] = gpu_info
if gpu_info['gpus']:
logger.info(f"检测到 {len(gpu_info['gpus'])} 个GPU:")
for i, gpu in enumerate(gpu_info['gpus']):
logger.info(f" GPU{i}: {gpu.get('name', 'Unknown')}")
if gpu.get('adapter_ram_mb', 0) > 0:
logger.info(f" 显存: {gpu['adapter_ram_mb']} MB")
else:
logger.warning("未检测到GPU信息")
config_result['issues'].append("未检测到GPU信息")
# 4. 系统优化建议
optimizations = WindowsSystemUtils.optimize_windows_for_streaming()
config_result['optimizations'] = optimizations
# 输出优化建议
logger.info("系统优化建议:")
for key, value in optimizations.items():
if isinstance(value, list):
logger.info(f" {key}:")
for item in value:
logger.info(f" - {item}")
else:
logger.info(f" {key}: {value}")
# 5. 获取系统资源
resources = WindowsSystemUtils.get_system_resources()
config_result['resources'] = resources
logger.info(f"系统资源: CPU {resources['cpu_percent']}%, 内存 {resources['memory_percent']}%")
# 检查资源问题
if resources['cpu_percent'] > 85:
config_result['issues'].append(f"CPU使用率过高: {resources['cpu_percent']}%")
config_result['recommendations'].append({
'priority': 'high',
'action': '降低CPU使用率',
'details': '关闭不必要的程序,减少后台进程'
})
if resources['memory_percent'] > 90:
config_result['issues'].append(f"内存使用率过高: {resources['memory_percent']}%")
config_result['recommendations'].append({
'priority': 'high',
'action': '释放内存',
'details': '关闭内存占用大的程序,重启系统'
})
# 6. 生成网络测试建议
config_result['recommendations'].append({
'priority': 'medium',
'action': '测试RTMP服务器连接',
'details': '运行: python -c "import socket; sock=socket.socket(); sock.settimeout(5); print(\'OK\' if sock.connect_ex((\'your-server\', 1935))==0 else \'FAIL\')"'
})
# 7. 推流配置建议
config_result['recommendations'].append({
'priority': 'medium',
'action': '使用软件编码',
'details': 'Windows上建议使用libx264而非硬件编码更稳定'
})
config_result['recommendations'].append({
'priority': 'low',
'action': '调整推流参数',
'details': '尝试: -preset ultrafast -tune zerolatency -b:v 1500k -maxrate 1500k -bufsize 3000k'
})
# 设置状态
if config_result['issues']:
config_result['status'] = 'warning'
logger.warning(f"发现 {len(config_result['issues'])} 个问题")
else:
config_result['status'] = 'ready'
logger.info("Windows系统检测完成状态正常")
except Exception as e:
logger.error(f"Windows系统检测异常: {str(e)}")
logger.error(traceback.format_exc())
config_result['status'] = 'error'
config_result['error'] = str(e)
return config_result
# Windows快速诊断函数
def quick_windows_diagnosis():
"""快速Windows诊断"""
logger.info("开始Windows快速诊断...")
results = {
'ffmpeg': WindowsSystemUtils.check_ffmpeg_installation(),
'system': WindowsSystemUtils.get_windows_version(),
'gpu': WindowsSystemUtils._get_gpu_info(),
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S")
}
# 快速资源检查
try:
import psutil
results['resources'] = {
'cpu': psutil.cpu_percent(),
'memory': psutil.virtual_memory().percent
}
except:
results['resources'] = {'error': 'Failed to get resources'}
# 输出摘要
print("\n" + "=" * 60)
print("Windows快速诊断结果")
print("=" * 60)
print(f"\n系统: {results['system'].get('name', 'Unknown')}")
print(f"时间: {results['timestamp']}")
if results['ffmpeg']['installed']:
print(f"✓ FFmpeg: {results['ffmpeg']['version']}")
else:
print("✗ FFmpeg: 未安装")
gpu_count = len(results['gpu'].get('gpus', []))
if gpu_count > 0:
print(f"✓ GPU: {gpu_count}个检测到")
for i, gpu in enumerate(results['gpu']['gpus'][:2]): # 只显示前2个
print(f" {gpu.get('name', 'Unknown GPU')}")
else:
print("⚠ GPU: 未检测到")
if 'resources' in results and 'error' not in results['resources']:
print(f"系统负载: CPU {results['resources']['cpu']}%, 内存 {results['resources']['memory']}%")
print("\n" + "=" * 60)
return results