Yolov/task_stream_manager.py

243 lines
8.6 KiB
Python
Raw Normal View History

2025-12-11 13:41:07 +08:00
# task_stream_manager.py
import threading
import time
from log import logger
from ffmpegStreamer import FFmpegStreamer
class TaskStreamManager:
"""任务独立的推流管理器"""
def __init__(self):
self.task_streams = {} # task_id -> streamer
self.lock = threading.Lock()
self.health_check_thread = None
self.running = True
def create_streamer_for_task(self, task_id, config, fps, width, height):
"""为任务创建独立的推流器"""
with self.lock:
if task_id in self.task_streams:
logger.warning(f"任务 {task_id} 已有推流器,先清理")
self.stop_task_streamer(task_id)
try:
streamer = FFmpegStreamer(config, fps, width, height)
streamer.task_id = task_id # 标记属于哪个任务
self.task_streams[task_id] = {
'streamer': streamer,
'config': config,
'fps': fps,
'width': width,
'height': height,
'created_at': time.time(),
'last_active': time.time(),
'frame_count': 0,
'status': 'initializing'
}
# 启动推流
streamer.start()
# 等待初始化完成
time.sleep(0.5)
if streamer.running:
self.task_streams[task_id]['status'] = 'running'
logger.info(f"任务 {task_id} 推流器创建成功")
return streamer
else:
logger.error(f"任务 {task_id} 推流器启动失败")
del self.task_streams[task_id]
return None
except Exception as e:
logger.error(f"创建任务推流器失败 {task_id}: {str(e)}")
return None
def get_task_streamer(self, task_id):
"""获取任务的推流器"""
with self.lock:
task_info = self.task_streams.get(task_id)
return task_info['streamer'] if task_info else None
def push_frame(self, task_id, frame):
"""为指定任务推送帧"""
with self.lock:
task_info = self.task_streams.get(task_id)
if not task_info:
logger.warning(f"任务 {task_id} 没有推流器")
return False
streamer = task_info['streamer']
if not streamer or not streamer.running:
logger.warning(f"任务 {task_id} 推流器未运行")
# 尝试重启
self._restart_task_streamer(task_id)
return False
try:
success = streamer.add_frame(frame)
if success:
task_info['frame_count'] += 1
task_info['last_active'] = time.time()
task_info['status'] = 'active'
else:
task_info['status'] = 'error'
# 连续错误处理
if self._check_streamer_health(task_id) == 'unhealthy':
self._restart_task_streamer(task_id)
return success
except Exception as e:
logger.error(f"推流失败 {task_id}: {str(e)}")
task_info['status'] = 'error'
return False
def _check_streamer_health(self, task_id):
"""检查推流器健康状态"""
task_info = self.task_streams.get(task_id)
if not task_info:
return 'not_found'
streamer = task_info['streamer']
if not streamer:
return 'unhealthy'
# 检查进程状态
if hasattr(streamer, 'process_failed') and streamer.process_failed():
return 'unhealthy'
# 检查是否长时间无活动
if time.time() - task_info['last_active'] > 10: # 10秒无活动
return 'inactive'
# 检查帧率是否异常
if task_info['frame_count'] > 0:
elapsed = time.time() - task_info['created_at']
actual_fps = task_info['frame_count'] / elapsed
if actual_fps < task_info['fps'] * 0.1: # 低于目标帧率10%
return 'low_fps'
return 'healthy'
def _restart_task_streamer(self, task_id):
"""重启任务推流器"""
task_info = self.task_streams.get(task_id)
if not task_info:
return False
logger.info(f"重启任务推流器: {task_id}")
try:
# 停止旧推流器
old_streamer = task_info['streamer']
if old_streamer:
old_streamer.stop()
# 创建新推流器
streamer = FFmpegStreamer(
task_info['config'],
task_info['fps'],
task_info['width'],
task_info['height']
)
streamer.task_id = task_id
streamer.start()
# 更新信息
task_info['streamer'] = streamer
task_info['created_at'] = time.time()
task_info['last_active'] = time.time()
task_info['frame_count'] = 0
task_info['status'] = 'restarted'
logger.info(f"任务推流器重启成功: {task_id}")
return True
except Exception as e:
logger.error(f"重启任务推流器失败 {task_id}: {str(e)}")
task_info['status'] = 'failed'
return False
def stop_task_streamer(self, task_id):
"""停止任务推流器"""
with self.lock:
if task_id in self.task_streams:
try:
task_info = self.task_streams[task_id]
streamer = task_info['streamer']
if streamer:
streamer.stop()
del self.task_streams[task_id]
logger.info(f"任务推流器停止成功: {task_id}")
return True
except Exception as e:
logger.error(f"停止任务推流器失败 {task_id}: {str(e)}")
return False
return True
def get_all_task_streams_info(self):
"""获取所有任务推流器信息"""
with self.lock:
info = {}
for task_id, task_info in self.task_streams.items():
streamer = task_info['streamer']
info[task_id] = {
'status': task_info['status'],
'fps': task_info['fps'],
'resolution': f"{task_info['width']}x{task_info['height']}",
'frame_count': task_info['frame_count'],
'last_active': time.time() - task_info['last_active'],
'running': streamer.running if streamer else False,
'health': self._check_streamer_health(task_id)
}
return info
def start_health_monitor(self):
"""启动健康监控线程"""
def health_monitor_loop():
logger.info("任务推流健康监控启动")
while self.running:
try:
with self.lock:
task_ids = list(self.task_streams.keys())
for task_id in task_ids:
health = self._check_streamer_health(task_id)
if health in ['unhealthy', 'inactive']:
logger.warning(f"任务 {task_id} 推流器健康状态异常: {health}")
self._restart_task_streamer(task_id)
time.sleep(5) # 每5秒检查一次
except Exception as e:
logger.error(f"健康监控异常: {str(e)}")
time.sleep(10)
self.health_check_thread = threading.Thread(
target=health_monitor_loop,
daemon=True,
name="TaskStreamHealthMonitor"
)
self.health_check_thread.start()
def stop_health_monitor(self):
"""停止健康监控"""
self.running = False
if self.health_check_thread and self.health_check_thread.is_alive():
self.health_check_thread.join(3.0)
def cleanup_all(self):
"""清理所有推流器"""
logger.info("清理所有任务推流器")
with self.lock:
task_ids = list(self.task_streams.keys())
for task_id in task_ids:
self.stop_task_streamer(task_id)
self.stop_health_monitor()
# 全局推流管理器实例
task_stream_manager = TaskStreamManager()