Yolov/task_stream_manager_windows.py

491 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# task_stream_manager_windows.py
import threading
import time
import os
import psutil
from log import logger
from ffmpegStreamer import FFmpegStreamer
class WindowsTaskStreamManager:
"""Windows系统专用任务推流管理器"""
def __init__(self):
self.task_streams = {}
self.lock = threading.RLock()
self.health_check_thread = None
self.running = True
self.streaming_check_interval = 3
self.ffmpeg_timeout = 20
# Windows特定配置
self.windows_ffmpeg_path = self._find_windows_ffmpeg()
self.windows_ffmpeg_args = [
'-loglevel', 'verbose',
'-hide_banner',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', '{}x{}',
'-r', '{}',
'-i', '-',
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-f', 'flv',
'-g', '10',
'-bf', '0',
'-max_delay', '0',
'-flags', '+global_header',
'-rtbufsize', '100M',
'-b:v', '2000k',
'-bufsize', '2000k',
'{}'
]
self.last_output_time = 0
self.last_restart_time = time.time()
def _find_windows_ffmpeg(self):
"""查找Windows上的ffmpeg路径"""
# 常见ffmpeg安装位置
possible_paths = [
r'C:\ffmpeg\bin\ffmpeg.exe',
r'C:\Program Files\ffmpeg\bin\ffmpeg.exe',
r'C:\Program Files (x86)\ffmpeg\bin\ffmpeg.exe',
os.path.join(os.getcwd(), 'ffmpeg.exe'),
'ffmpeg' # 如果在PATH中
]
for path in possible_paths:
if os.path.exists(path) or os.system(f'where {path} >nul 2>nul') == 0:
logger.info(f"找到FFmpeg: {path}")
return path
logger.warning("未找到FFmpeg将使用系统PATH中的ffmpeg")
return 'ffmpeg'
def create_streamer_for_task(self, task_id, config, fps, width, height):
"""为任务创建Windows专用的推流器"""
with self.lock:
if task_id in self.task_streams:
logger.warning(f"任务 {task_id} 已有推流器,先清理")
self.stop_task_streamer(task_id)
try:
# 创建Windows优化配置
win_config = self._create_windows_config(config, width, height, fps)
# 创建推流器
streamer = FFmpegStreamer(win_config, fps, width, height)
streamer.task_id = task_id
# 覆盖build_ffmpeg_command方法为Windows专用版本
streamer.build_ffmpeg_command = self._build_windows_ffmpeg_command(
streamer, win_config, width, height, fps
)
# 存储任务信息
self.task_streams[task_id] = {
'streamer': streamer,
'config': win_config,
'fps': fps,
'width': width,
'height': height,
'created_at': time.time(),
'last_active': time.time(),
'frame_count': 0,
'status': 'initializing',
'last_ffmpeg_output': '',
'output_lines': []
}
# 启动推流
streamer.start()
# Windows上需要额外时间启动
time.sleep(1)
if streamer.running and streamer.process:
self.task_streams[task_id]['status'] = 'running'
logger.info(f"Windows任务 {task_id} 推流器创建成功")
# 启动输出监控线程
self._start_output_monitor(task_id)
return streamer
else:
logger.error(f"Windows任务 {task_id} 推流器启动失败")
self.stop_task_streamer(task_id)
return None
except Exception as e:
logger.error(f"Windows创建任务推流器失败 {task_id}: {str(e)}", exc_info=True)
return None
def _create_windows_config(self, config, width, height, fps):
"""创建Windows专用的推流配置"""
win_config = config.copy()
push_config = config.get('push', {}).copy()
# Windows优化参数
push_config.update({
'video_codec': 'libx264', # Windows上软件编码更稳定
'gpu_acceleration': False, # Windows上硬件加速问题多
'preset': 'ultrafast',
'tune': 'zerolatency',
'pixel_format': 'bgr24',
'format': 'flv',
'crf': 23,
'bitrate': '2000k',
'bufsize': '2000k',
'framerate': fps,
'extra_args': [
'-max_delay', '0',
'-flags', '+global_header',
'-rtbufsize', '100M',
'-g', '10',
'-bf', '0'
]
})
win_config['push'] = push_config
return win_config
def _build_windows_ffmpeg_command(self, streamer, config, width, height, fps):
"""Windows专用的FFmpeg命令构建"""
def build_command():
# 基础命令
cmd = [
self.windows_ffmpeg_path,
'-y',
'-loglevel', 'verbose', # 详细日志便于调试
'-hide_banner',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', config['push'].get('pixel_format', 'bgr24'),
'-s', f'{width}x{height}',
'-r', str(fps),
'-i', '-',
'-c:v', config['push']['video_codec'],
'-preset', config['push']['preset'],
'-tune', config['push']['tune'],
'-f', config['push']['format'],
'-g', '10',
'-bf', '0',
'-max_delay', '0',
'-flags', '+global_header',
'-rtbufsize', '100M',
'-b:v', config['push']['bitrate'],
'-bufsize', config['push']['bufsize'],
]
# 添加额外参数
if 'extra_args' in config['push']:
cmd.extend(config['push']['extra_args'])
# 添加输出URL
cmd.append(config['push']['url'])
logger.info(f"Windows FFmpeg命令: {' '.join(cmd)}")
return cmd
return build_command
def _start_output_monitor(self, task_id):
"""启动FFmpeg输出监控线程"""
def monitor():
task_info = self.task_streams.get(task_id)
if not task_info or not task_info.get('streamer'):
return
streamer = task_info['streamer']
while self.running and task_id in self.task_streams:
try:
if streamer.process and streamer.process.stderr:
try:
line = streamer.process.stderr.readline()
if line:
line = line.decode('utf-8', errors='ignore').strip()
if line:
task_info['last_ffmpeg_output'] = line
task_info['output_lines'].append(line)
# # 只保留最近100行
# if len(task_info['output_lines']) > 100:
# task_info['output_lines'].pop(0)
# 解析关键信息
self._parse_ffmpeg_output(task_id, line)
self.last_output_time = time.time()
# 输出关键错误
if any(keyword in line.lower() for keyword in
['error', 'failed', 'invalid', 'cannot']):
logger.error(f"FFmpeg[{task_id}]: {line}")
elif 'frame=' in line and 'fps=' in line:
logger.debug(f"FFmpeg[{task_id}]: {line}")
except Exception as e:
pass
# 检查是否长时间无输出
if time.time() - self.last_output_time > self.ffmpeg_timeout:
logger.warning(f"FFmpeg[{task_id}] 长时间无输出,可能已崩溃")
self._safe_restart_streamer(task_id)
break
time.sleep(0.1)
except Exception as e:
time.sleep(1)
threading.Thread(target=monitor, daemon=True, name=f"FFmpegMonitor-{task_id}").start()
def _parse_ffmpeg_output(self, task_id, line):
"""解析FFmpeg输出提取关键信息"""
task_info = self.task_streams.get(task_id)
if not task_info:
return
# 提取帧率信息
if 'fps=' in line:
import re
match = re.search(r'fps=\s*(\d+)', line)
if match:
fps = int(match.group(1))
task_info['actual_fps'] = fps
# 检查关键错误
error_keywords = [
'connection refused',
'cannot open',
'invalid data',
'broken pipe',
'timed out',
'access denied'
]
for keyword in error_keywords:
if keyword in line.lower():
logger.error(f"FFmpeg[{task_id}] 关键错误: {line}")
self._safe_restart_streamer(task_id)
break
def _safe_restart_streamer(self, task_id):
"""安全重启推流器"""
with self.lock:
if task_id not in self.task_streams:
return
task_info = self.task_streams[task_id]
# 防止频繁重启
if time.time() - self.last_restart_time < 10:
logger.debug(f"跳过频繁重启: {task_id}")
return
self.last_restart_time = time.time()
logger.info(f"安全重启推流器: {task_id}")
try:
# 停止旧推流器
old_streamer = task_info['streamer']
if old_streamer:
old_streamer.stop()
# 短暂延迟
time.sleep(1)
# 创建新推流器
new_streamer = self.create_streamer_for_task(
task_id,
task_info['config'],
task_info['fps'],
task_info['width'],
task_info['height']
)
if new_streamer:
logger.info(f"推流器重启成功: {task_id}")
else:
logger.error(f"推流器重启失败: {task_id}")
except Exception as e:
logger.error(f"重启推流器异常 {task_id}: {str(e)}")
def push_frame(self, task_id, frame):
"""为指定任务推送帧Windows优化版本"""
with self.lock:
task_info = self.task_streams.get(task_id)
if not task_info:
logger.warning(f"任务 {task_id} 没有推流器")
return False
streamer = task_info['streamer']
if not streamer or not streamer.running:
logger.warning(f"任务 {task_id} 推流器未运行")
self._safe_restart_streamer(task_id)
return False
try:
# Windows上需要更严格的检查
# if streamer.process and streamer.process.poll() is not None:
# logger.warning(f"FFmpeg进程已退出: {task_id}")
# self._safe_restart_streamer(task_id)
# return False
# 推流帧
success = streamer.add_frame(frame)
if success:
task_info['frame_count'] += 1
task_info['last_active'] = time.time()
task_info['status'] = 'active'
# 每100帧记录一次
if task_info['frame_count'] % 100 == 0:
logger.info(f"任务 {task_id} 已推流 {task_info['frame_count']}")
else:
task_info['status'] = 'error'
logger.warning(f"任务 {task_id} 推流失败")
# 连续失败处理
fail_count = task_info.get('consecutive_failures', 0) + 1
task_info['consecutive_failures'] = fail_count
if fail_count >= 10:
logger.error(f"任务 {task_id} 连续失败 {fail_count} 次,尝试重启")
self._safe_restart_streamer(task_id)
task_info['consecutive_failures'] = 0
return success
except Exception as e:
logger.error(f"推流失败 {task_id}: {str(e)}")
task_info['status'] = 'error'
return False
def stop_task_streamer(self, task_id):
"""停止任务推流器Windows专用"""
with self.lock:
if task_id in self.task_streams:
try:
task_info = self.task_streams[task_id]
streamer = task_info['streamer']
if streamer:
# Windows上需要强制终止
try:
if streamer.process:
import signal
try:
streamer.process.terminate()
streamer.process.wait(timeout=3)
except:
streamer.process.kill()
except:
pass
streamer.stop()
del self.task_streams[task_id]
logger.info(f"Windows任务推流器停止成功: {task_id}")
return True
except Exception as e:
logger.error(f"停止任务推流器失败 {task_id}: {str(e)}")
return False
return True
def get_task_stream_info(self, task_id):
"""获取任务推流信息"""
with self.lock:
if task_id not in self.task_streams:
return None
task_info = self.task_streams[task_id]
streamer = task_info['streamer']
info = {
'status': task_info['status'],
'fps': task_info['fps'],
'resolution': f"{task_info['width']}x{task_info['height']}",
'frame_count': task_info['frame_count'],
'last_active': time.time() - task_info['last_active'],
'running': streamer.running if streamer else False,
'process_alive': streamer.process.poll() is None if streamer and streamer.process else False,
'output_lines': task_info['output_lines'][-10:], # 最近10行输出
'last_ffmpeg_output': task_info.get('last_ffmpeg_output', '')
}
return info
def get_all_task_streams_info(self):
"""获取所有任务推流器信息"""
with self.lock:
info = {}
for task_id in self.task_streams:
info[task_id] = self.get_task_stream_info(task_id)
return info
def start_health_monitor(self):
"""启动Windows健康监控线程"""
def health_monitor_loop():
logger.info("Windows推流健康监控启动")
while self.running:
try:
with self.lock:
task_ids = list(self.task_streams.keys())
for task_id in task_ids:
task_info = self.task_streams.get(task_id)
if not task_info:
continue
# 检查是否长时间无活动
inactive_time = time.time() - task_info['last_active']
if inactive_time > 30 and task_info['frame_count'] > 0:
logger.warning(f"任务 {task_id}{inactive_time:.0f} 秒无活动")
self._safe_restart_streamer(task_id)
# 检查FFmpeg进程状态
streamer = task_info['streamer']
if streamer and hasattr(streamer, 'process'):
if streamer.process is not None:
if streamer.process.poll() is not None:
logger.warning(f"任务 {task_id} FFmpeg进程已退出")
self._safe_restart_streamer(task_id)
time.sleep(self.streaming_check_interval)
except Exception as e:
logger.error(f"健康监控异常: {str(e)}")
time.sleep(10)
self.health_check_thread = threading.Thread(
target=health_monitor_loop,
daemon=True,
name="WindowsTaskStreamHealthMonitor"
)
self.health_check_thread.start()
def stop_health_monitor(self):
"""停止健康监控"""
self.running = False
if self.health_check_thread and self.health_check_thread.is_alive():
self.health_check_thread.join(3.0)
def cleanup_all(self):
"""清理所有推流器"""
logger.info("清理所有Windows任务推流器")
with self.lock:
task_ids = list(self.task_streams.keys())
for task_id in task_ids:
self.stop_task_streamer(task_id)
self.stop_health_monitor()
# Windows专用全局推流管理器实例
windows_task_stream_manager = WindowsTaskStreamManager()