Yolov/windows_utils.py

682 lines
25 KiB
Python
Raw Normal View History

2025-12-11 13:41:07 +08:00
# windows_utils.py
import os
import sys
import platform
import subprocess
import ctypes
import traceback
from log import logger
class WindowsSystemUtils:
"""Windows系统工具类"""
@staticmethod
def is_windows():
"""检查是否是Windows系统"""
return os.name == 'nt' or sys.platform.startswith('win')
@staticmethod
def get_windows_version():
"""获取Windows版本信息"""
if not WindowsSystemUtils.is_windows():
return None
version_info = {
'system': platform.system(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor(),
'architecture': platform.architecture()[0]
}
# 获取详细版本信息
try:
# 使用ctypes获取Windows版本
class OSVERSIONINFOEX(ctypes.Structure):
_fields_ = [
('dwOSVersionInfoSize', ctypes.c_ulong),
('dwMajorVersion', ctypes.c_ulong),
('dwMinorVersion', ctypes.c_ulong),
('dwBuildNumber', ctypes.c_ulong),
('dwPlatformId', ctypes.c_ulong),
('szCSDVersion', ctypes.c_wchar * 128),
('wServicePackMajor', ctypes.c_ushort),
('wServicePackMinor', ctypes.c_ushort),
('wSuiteMask', ctypes.c_ushort),
('wProductType', ctypes.c_byte),
('wReserved', ctypes.c_byte)
]
os_version = OSVERSIONINFOEX()
os_version.dwOSVersionInfoSize = ctypes.sizeof(OSVERSIONINFOEX)
if ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version)) == 0:
version_info['major_version'] = os_version.dwMajorVersion
version_info['minor_version'] = os_version.dwMinorVersion
version_info['build_number'] = os_version.dwBuildNumber
version_info['service_pack'] = os_version.wServicePackMajor
# 转换为可读版本名称
if version_info['major_version'] == 10:
version_info['name'] = f"Windows 10/11 (Build {version_info['build_number']})"
elif version_info['major_version'] == 6:
if version_info['minor_version'] == 3:
version_info['name'] = "Windows 8.1"
elif version_info['minor_version'] == 2:
version_info['name'] = "Windows 8"
elif version_info['minor_version'] == 1:
version_info['name'] = "Windows 7"
elif version_info['minor_version'] == 0:
version_info['name'] = "Windows Vista"
elif version_info['major_version'] == 5:
if version_info['minor_version'] == 2:
version_info['name'] = "Windows XP Professional x64"
elif version_info['minor_version'] == 1:
version_info['name'] = "Windows XP"
except Exception as e:
logger.warning(f"获取Windows版本信息失败: {str(e)}")
return version_info
@staticmethod
def check_ffmpeg_installation():
"""检查FFmpeg安装"""
try:
# Windows上使用where命令查找ffmpeg
result = subprocess.run(
'where ffmpeg',
shell=True,
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
ffmpeg_path = result.stdout.strip().split('\n')[0]
# 获取版本信息
version_result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=5
)
version_info = {}
if version_result.returncode == 0:
lines = version_result.stdout.split('\n')
for line in lines:
if 'version' in line.lower():
version_info['version'] = line.strip()
break
return {
'installed': True,
'path': ffmpeg_path,
'version': version_info.get('version', 'unknown'),
'details': lines[0] if lines else 'unknown'
}
except subprocess.TimeoutExpired:
return {'installed': False, 'error': 'Timeout checking ffmpeg'}
except FileNotFoundError:
pass
except Exception as e:
logger.warning(f"检查FFmpeg失败: {str(e)}")
return {'installed': False, 'version': 'not found'}
@staticmethod
def check_rtmp_server_accessibility(rtmp_url):
"""检查RTMP服务器可达性"""
if not rtmp_url.startswith('rtmp://'):
return {'accessible': False, 'error': 'Invalid RTMP URL format'}
try:
# 提取主机和端口
parts = rtmp_url.replace('rtmp://', '').split('/')[0].split(':')
host = parts[0]
port = int(parts[1]) if len(parts) > 1 else 1935
# Windows网络连接检查
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return {'accessible': True, 'host': host, 'port': port}
else:
# Windows错误代码含义
error_map = {
10061: 'Connection refused (服务器拒绝连接)',
10060: 'Connection timed out (连接超时)',
10013: 'Permission denied (权限被拒绝)',
10048: 'Address already in use (地址已被使用)',
10049: 'Cannot assign requested address (无法分配请求的地址)',
10050: 'Network is down (网络断开)',
10051: 'Network is unreachable (网络不可达)',
}
error_msg = error_map.get(result, f'Unknown error code: {result}')
return {
'accessible': False,
'host': host,
'port': port,
'error_code': result,
'error': error_msg
}
except Exception as e:
return {'accessible': False, 'error': str(e)}
@staticmethod
def optimize_windows_for_streaming():
"""优化Windows系统设置以支持推流"""
optimizations = {}
# 1. 检查电源设置
try:
# 使用powercfg检查当前电源方案
result = subprocess.run(
'powercfg /getactivescheme',
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
optimizations['power_scheme'] = result.stdout.strip()
else:
optimizations['power_scheme'] = 'Cannot check power scheme'
# 建议使用高性能电源方案
optimizations['power_recommendation'] = (
'For best streaming performance, use High Performance power plan. '
'Run: powercfg /setactive 8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c'
)
except Exception as e:
optimizations['power_scheme'] = f'Error checking power: {str(e)}'
# 2. 防火墙建议
optimizations['firewall'] = 'Consider adding firewall rules for RTMP ports (1935, 1936)'
# 3. 网络优化建议
optimizations['network_optimizations'] = [
'Increase TCP buffer size: netsh int tcp set global autotuninglevel=normal',
'Disable TCP auto-tuning (if unstable): netsh int tcp set global autotuninglevel=disabled',
'Enable TCP fast open: netsh int tcp set global fastopen=enabled',
'Set TCP keepalive: netsh int tcp set global keepalivetime=30000'
]
# 4. 显卡设置建议
optimizations['gpu_recommendations'] = [
'Update graphics drivers to latest version',
'In NVIDIA Control Panel: Set Power Management Mode to "Prefer Maximum Performance"',
'In Windows Graphics Settings: Add ffmpeg.exe and set to "High Performance"'
]
return optimizations
@staticmethod
def create_windows_firewall_rule(port, name="RTMP Streaming"):
"""创建Windows防火墙规则"""
commands = [
f'netsh advfirewall firewall add rule name="{name}" dir=in action=allow protocol=TCP localport={port}',
f'netsh advfirewall firewall add rule name="{name} UDP" dir=in action=allow protocol=UDP localport={port}'
]
results = []
for cmd in commands:
try:
# Windows上以管理员权限运行
import ctypes
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
if not is_admin:
results.append({
'command': cmd,
'success': False,
'error': '需要管理员权限运行此命令'
})
continue
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
results.append({
'command': cmd,
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
})
except Exception as e:
results.append({
'command': cmd,
'success': False,
'error': str(e)
})
return results
@staticmethod
def get_system_resources():
"""获取Windows系统资源 - 修复版"""
import psutil
resources = {
'cpu_percent': psutil.cpu_percent(interval=0.1),
'cpu_count': psutil.cpu_count(),
'cpu_freq': getattr(psutil.cpu_freq(), 'current', 0) if psutil.cpu_freq() else 0,
'memory_percent': psutil.virtual_memory().percent,
'memory_used_gb': psutil.virtual_memory().used / (1024 ** 3),
'memory_total_gb': psutil.virtual_memory().total / (1024 ** 3),
'process_count': len(psutil.pids()),
}
try:
# 磁盘IO可能在某些系统上不可用
disk_io = psutil.disk_io_counters()
if disk_io:
resources['disk_io'] = {
'read_bytes': disk_io.read_bytes,
'write_bytes': disk_io.write_bytes,
'read_count': disk_io.read_count,
'write_count': disk_io.write_count
}
except:
resources['disk_io'] = {'available': False}
try:
# 网络IO
net_io = psutil.net_io_counters()
resources['network_io'] = {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv
}
except:
resources['network_io'] = {'available': False}
# GPU信息 - 多种方法获取
resources['gpu_info'] = WindowsSystemUtils._get_gpu_info()
return resources
@staticmethod
def _get_gpu_info():
"""获取GPU信息 - 支持多种方法"""
gpu_info = {
'method': 'none',
'gpus': []
}
# 方法2: 使用torch获取GPU信息 (如果可用)
try:
import torch
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
gpu_data = {
'name': torch.cuda.get_device_name(i),
'total_memory_mb': torch.cuda.get_device_properties(i).total_memory / (1024 ** 2),
'method': 'torch'
}
gpu_info['gpus'].append(gpu_data)
if gpu_info['gpus']:
gpu_info['method'] = 'torch'
return gpu_info
except ImportError:
logger.debug("torch未安装跳过PyTorch GPU检测")
except Exception as e:
logger.debug(f"PyTorch GPU检测失败: {str(e)}")
# 方法3: 使用dxdiag命令获取GPU信息 (Windows原生)
try:
# 运行dxdiag并保存到临时文件
import tempfile
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.txt')
temp_file.close()
result = subprocess.run(
f'dxdiag /t {temp_file.name}',
shell=True,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
with open(temp_file.name, 'r', encoding='utf-8', errors='ignore') as f:
dxdiag_content = f.read()
# 解析dxdiag输出
import re
# 查找显示设备部分
display_sections = re.split(r'Display Devices\n-+', dxdiag_content)
if len(display_sections) > 1:
for section in display_sections[1:]:
# 提取显卡名称
name_match = re.search(r'Card name:\s*(.+)', section)
# 提取显存
memory_match = re.search(r'Display Memory:\s*(\d+)', section)
# 提取驱动版本
driver_match = re.search(r'Driver Version:\s*(.+)', section)
gpu_data = {
'name': name_match.group(1).strip() if name_match else 'Unknown GPU',
'display_memory_mb': int(memory_match.group(1)) if memory_match else 0,
'driver_version': driver_match.group(1).strip() if driver_match else 'Unknown',
'method': 'dxdiag'
}
gpu_info['gpus'].append(gpu_data)
if gpu_info['gpus']:
gpu_info['method'] = 'dxdiag'
# 清理临时文件
try:
os.unlink(temp_file.name)
except:
pass
except Exception as e:
logger.debug(f"dxdiag GPU检测失败: {str(e)}")
# 方法4: 使用Windows注册表 (高级方法)
try:
import winreg
# 打开显卡注册表键
reg_path = r"SYSTEM\CurrentControlSet\Control\Class\{4d36e968-e325-11ce-bfc1-08002be10318}"
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, reg_path) as key:
i = 0
while True:
try:
subkey_name = winreg.EnumKey(key, i)
subkey_path = f"{reg_path}\\{subkey_name}"
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, subkey_path) as subkey:
try:
driver_desc = winreg.QueryValueEx(subkey, "DriverDesc")[0]
# 跳过基本显示适配器
if "basic display" not in driver_desc.lower():
gpu_data = {
'name': driver_desc,
'method': 'registry'
}
gpu_info['gpus'].append(gpu_data)
except WindowsError:
pass
i += 1
except WindowsError:
break
if gpu_info['gpus']:
gpu_info['method'] = 'registry'
except Exception as e:
logger.debug(f"注册表GPU检测失败: {str(e)}")
return gpu_info
@staticmethod
def test_ffmpeg_streaming():
"""测试FFmpeg推流功能"""
test_results = []
# 测试1: 基本的FFmpeg功能
try:
result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=5
)
test_results.append({
'test': 'ffmpeg_version',
'success': result.returncode == 0,
'output': result.stdout[:100] if result.stdout else 'No output'
})
except Exception as e:
test_results.append({
'test': 'ffmpeg_version',
'success': False,
'error': str(e)
})
# 测试2: 编码器支持
# 测试3: 生成测试视频并推流到null (本地测试)
try:
# 创建一个简单的测试命令
test_command = [
'ffmpeg',
'-f', 'lavfi',
'-i', 'testsrc=duration=2:size=640x480:rate=30',
'-c:v', 'libx264',
'-t', '1', # 只运行1秒
'-f', 'null', # 输出到null
'-'
]
result = subprocess.run(
test_command,
capture_output=True,
text=True,
timeout=10
)
test_results.append({
'test': 'ffmpeg_basic_encode',
'success': result.returncode == 0,
'output': 'Success' if result.returncode == 0 else result.stderr[:200]
})
except Exception as e:
test_results.append({
'test': 'ffmpeg_basic_encode',
'success': False,
'error': str(e)
})
return test_results
# 系统检测和配置
def detect_and_configure_windows():
"""检测并配置Windows系统"""
if not WindowsSystemUtils.is_windows():
logger.info("非Windows系统跳过Windows特定配置")
return None
logger.info("检测到Windows系统进行系统检测和配置...")
config_result = {
'system_info': {},
'ffmpeg_info': {},
'gpu_info': {},
'optimizations': {},
'resources': {},
'ffmpeg_tests': [],
'status': 'unknown',
'issues': [],
'recommendations': []
}
try:
# 1. 获取系统信息
system_info = WindowsSystemUtils.get_windows_version()
config_result['system_info'] = system_info
logger.info(f"Windows系统: {system_info.get('name', 'Unknown')}")
# 2. 检查FFmpeg
ffmpeg_info = WindowsSystemUtils.check_ffmpeg_installation()
config_result['ffmpeg_info'] = ffmpeg_info
if not ffmpeg_info['installed']:
config_result['issues'].append("FFmpeg未安装")
config_result['recommendations'].append({
'priority': 'critical',
'action': '安装FFmpeg',
'details': '从 https://github.com/BtbN/FFmpeg-Builds/releases 下载 ffmpeg-master-latest-win64-gpl.zip解压并添加bin目录到PATH'
})
logger.error("FFmpeg未安装")
else:
logger.info(f"FFmpeg版本: {ffmpeg_info['version']}")
logger.info(f"FFmpeg路径: {ffmpeg_info.get('path', 'unknown')}")
# 运行FFmpeg测试
logger.info("运行FFmpeg功能测试...")
ffmpeg_tests = WindowsSystemUtils.test_ffmpeg_streaming()
config_result['ffmpeg_tests'] = ffmpeg_tests
# 检查测试结果
failed_tests = [t for t in ffmpeg_tests if not t.get('success', False)]
if failed_tests:
config_result['issues'].append(f"FFmpeg测试失败 ({len(failed_tests)}个)")
for test in failed_tests:
logger.warning(f"FFmpeg测试失败: {test.get('test')}")
else:
logger.info(f"FFmpeg功能测试成功")
# 3. 获取GPU信息
gpu_info = WindowsSystemUtils._get_gpu_info()
config_result['gpu_info'] = gpu_info
if gpu_info['gpus']:
logger.info(f"检测到 {len(gpu_info['gpus'])} 个GPU:")
for i, gpu in enumerate(gpu_info['gpus']):
logger.info(f" GPU{i}: {gpu.get('name', 'Unknown')}")
if gpu.get('adapter_ram_mb', 0) > 0:
logger.info(f" 显存: {gpu['adapter_ram_mb']} MB")
else:
logger.warning("未检测到GPU信息")
config_result['issues'].append("未检测到GPU信息")
# 4. 系统优化建议
optimizations = WindowsSystemUtils.optimize_windows_for_streaming()
config_result['optimizations'] = optimizations
# 输出优化建议
logger.info("系统优化建议:")
for key, value in optimizations.items():
if isinstance(value, list):
logger.info(f" {key}:")
for item in value:
logger.info(f" - {item}")
else:
logger.info(f" {key}: {value}")
# 5. 获取系统资源
resources = WindowsSystemUtils.get_system_resources()
config_result['resources'] = resources
logger.info(f"系统资源: CPU {resources['cpu_percent']}%, 内存 {resources['memory_percent']}%")
# 检查资源问题
if resources['cpu_percent'] > 85:
config_result['issues'].append(f"CPU使用率过高: {resources['cpu_percent']}%")
config_result['recommendations'].append({
'priority': 'high',
'action': '降低CPU使用率',
'details': '关闭不必要的程序,减少后台进程'
})
if resources['memory_percent'] > 90:
config_result['issues'].append(f"内存使用率过高: {resources['memory_percent']}%")
config_result['recommendations'].append({
'priority': 'high',
'action': '释放内存',
'details': '关闭内存占用大的程序,重启系统'
})
# 6. 生成网络测试建议
config_result['recommendations'].append({
'priority': 'medium',
'action': '测试RTMP服务器连接',
'details': '运行: python -c "import socket; sock=socket.socket(); sock.settimeout(5); print(\'OK\' if sock.connect_ex((\'your-server\', 1935))==0 else \'FAIL\')"'
})
# 7. 推流配置建议
config_result['recommendations'].append({
'priority': 'medium',
'action': '使用软件编码',
'details': 'Windows上建议使用libx264而非硬件编码更稳定'
})
config_result['recommendations'].append({
'priority': 'low',
'action': '调整推流参数',
'details': '尝试: -preset ultrafast -tune zerolatency -b:v 1500k -maxrate 1500k -bufsize 3000k'
})
# 设置状态
if config_result['issues']:
config_result['status'] = 'warning'
logger.warning(f"发现 {len(config_result['issues'])} 个问题")
else:
config_result['status'] = 'ready'
logger.info("Windows系统检测完成状态正常")
except Exception as e:
logger.error(f"Windows系统检测异常: {str(e)}")
logger.error(traceback.format_exc())
config_result['status'] = 'error'
config_result['error'] = str(e)
return config_result
# Windows快速诊断函数
def quick_windows_diagnosis():
"""快速Windows诊断"""
logger.info("开始Windows快速诊断...")
results = {
'ffmpeg': WindowsSystemUtils.check_ffmpeg_installation(),
'system': WindowsSystemUtils.get_windows_version(),
'gpu': WindowsSystemUtils._get_gpu_info(),
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S")
}
# 快速资源检查
try:
import psutil
results['resources'] = {
'cpu': psutil.cpu_percent(),
'memory': psutil.virtual_memory().percent
}
except:
results['resources'] = {'error': 'Failed to get resources'}
# 输出摘要
print("\n" + "=" * 60)
print("Windows快速诊断结果")
print("=" * 60)
print(f"\n系统: {results['system'].get('name', 'Unknown')}")
print(f"时间: {results['timestamp']}")
if results['ffmpeg']['installed']:
print(f"✓ FFmpeg: {results['ffmpeg']['version']}")
else:
print("✗ FFmpeg: 未安装")
gpu_count = len(results['gpu'].get('gpus', []))
if gpu_count > 0:
print(f"✓ GPU: {gpu_count}个检测到")
for i, gpu in enumerate(results['gpu']['gpus'][:2]): # 只显示前2个
print(f" {gpu.get('name', 'Unknown GPU')}")
else:
print("⚠ GPU: 未检测到")
if 'resources' in results and 'error' not in results['resources']:
print(f"系统负载: CPU {results['resources']['cpu']}%, 内存 {results['resources']['memory']}%")
print("\n" + "=" * 60)
return results