Yolov/platform_utils.py

134 lines
4.2 KiB
Python
Raw Normal View History

2025-12-11 13:41:07 +08:00
# platform_utils.py
"""
跨平台工具模块处理不同操作系统的兼容性问题
"""
import os
import sys
import time
import select
from log import logger
def is_windows():
"""检查是否是Windows系统"""
return os.name == 'nt' or sys.platform.startswith('win')
def is_linux():
"""检查是否是Linux系统"""
return os.name == 'posix' and sys.platform.startswith('linux')
def is_macos():
"""检查是否是macOS系统"""
return os.name == 'posix' and sys.platform.startswith('darwin')
def set_non_blocking(fd):
"""设置文件描述符为非阻塞模式(跨平台)"""
try:
if is_windows():
# Windows系统使用msvcrt或io模块
import msvcrt
import io
if hasattr(fd, 'fileno'):
# 对于文件对象
handle = msvcrt.get_osfhandle(fd.fileno())
# Windows上的非阻塞设置更复杂这里使用io模块
# 实际上对于Windows上的管道通常使用异步I/O
# 这里我们采用简化方案:使用超时读取
pass
else:
# Unix系统使用fcntl
import fcntl
import os as unix_os
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | unix_os.O_NONBLOCK)
return True
except ImportError:
logger.warning("fcntl模块在Windows上不可用")
except Exception as e:
logger.warning(f"设置非阻塞模式失败: {str(e)}")
return False
def read_with_timeout(fd, timeout=0.5):
"""
带超时的读取跨平台
返回(has_data, data) (False, None)
"""
if is_windows():
# Windows: 使用select检查socket但对于普通文件/管道可能不支持
# 这里我们使用简单的轮询
import time
start_time = time.time()
while time.time() - start_time < timeout:
try:
data = fd.read(1) # 尝试读取1个字节
if data:
# 读取剩余部分
remaining = fd.read()
if remaining:
data += remaining
return True, data
except (IOError, OSError) as e:
if "句柄无效" in str(e) or "bad file descriptor" in str(e):
return False, None
# 其他错误,继续等待
pass
time.sleep(0.01)
return False, None
else:
# Unix: 使用select
try:
ready, _, _ = select.select([fd], [], [], timeout)
if ready:
data = fd.read()
return True, data
except (ValueError, OSError):
# 可能文件描述符无效
pass
return False, None
def safe_readline(fd, timeout=1.0):
"""安全读取一行(跨平台)"""
if is_windows():
# Windows: 使用简单的读取,带超时
import time
line = b""
start_time = time.time()
while time.time() - start_time < timeout:
try:
# 尝试读取一个字节
char = fd.read(1)
if char:
line += char
if char == b'\n':
return line.decode('utf-8', errors='ignore').strip()
else:
# 没有数据
if line:
# 有部分数据但没换行符
return line.decode('utf-8', errors='ignore').strip()
return None
except (IOError, OSError) as e:
if "句柄无效" in str(e) or "bad file descriptor" in str(e):
return None
time.sleep(0.01)
# 超时
if line:
return line.decode('utf-8', errors='ignore').strip()
return None
else:
# Unix: 使用普通readline
try:
line = fd.readline()
if line:
return line.decode('utf-8', errors='ignore').strip()
except (IOError, OSError):
pass
return None