import errno import threading import queue import time import os import cv2 import numpy as np import subprocess import torch from log import logger # ffmpegStreamer.py 顶部添加 try: from platform_utils import is_windows, safe_readline except ImportError: # 定义简单的兼容函数 def is_windows(): import os return os.name == 'nt' def safe_readline(fd, timeout=1.0): """简单的readline包装器""" try: line = fd.readline() if line: return line.decode('utf-8', errors='ignore').strip() except: pass return None # 全局变量用于控制重启频率 _global_restart_count = 0 _global_last_restart_time = 0 _global_restart_lock = threading.Lock() class FFmpegStreamer(threading.Thread): """独立推流线程 - 优化版本,减少频繁重启""" def __init__(self, config, fps, frame_width, frame_height): super().__init__() self.config = config['push'] self.fps = fps self.original_width = frame_width self.original_height = frame_height # 使用原始分辨率 self.scaled_width = frame_width self.scaled_height = frame_height self.queue = queue.Queue(maxsize=10) # 合理大小的队列 self.running = True self.process = None self.process_lock = threading.Lock() self.process_starting = threading.Event() self.daemon = True self.last_drop_warning = 0 self.frame_buffer = None # 统计信息 self.total_frames_sent = 0 self.total_frames_dropped = 0 self.frame_counter = 0 self.start_time = time.time() self.avg_process_time = 0.033 # 重连参数 - 调整为更保守 self.max_restarts = 10 # 增加最大重启次数 self.restart_count = 0 self.min_restart_delay = 2.0 # 增加最小延迟 self.max_restart_delay = 30.0 # 增加最大延迟 self.last_restart_time = 0 # 新增重启控制变量 self.restart_lock = threading.Lock() self.restart_scheduled = False self.last_restart_attempt = 0 self.pipe_broken = False # 管道破裂标志 self.last_successful_write = time.time() # 记录上次成功写入时间 # 新增:失败计数器,避免偶尔失败就重启 self.consecutive_failures = 0 self.max_consecutive_failures = 10 # 最大连续失败次数 # 新增:健康检查相关 self.health_check_interval = 5.0 # 健康检查间隔(秒) self.last_health_check = time.time() self.process_is_healthy = True # 进程健康状态 # 新增:输出监控相关 self.last_output_time = time.time() self.silent_threshold = 60.0 # 60秒无输出才认为沉默(原来是30秒) self.silent_start_time = None self.output_count = 0 self.last_process_check = time.time() # 新增:监控线程 self.monitor_thread = None self.monitor_running = True self.stop_event = threading.Event() # 预计算帧处理参数 self.buffer_size = self.scaled_width * self.scaled_height * 3 logger.info( f"推流线程初始化 | 分辨率: {self.scaled_width}x{self.scaled_height} | 缓冲区: {self.buffer_size // 1024}KB") def run(self): """推流线程主循环 - 优化版本""" logger.info(f"启动推流线程 | 分辨率: {self.scaled_width}x{self.scaled_height} | FPS: {self.fps}") # 启动时确保FFmpeg已启动 if not self.start_ffmpeg(): logger.error("初始FFmpeg启动失败") self.running = False return # 新增:启动监控线程 self.start_monitor_thread() # 新增:首次成功写入检查 initial_wait_time = 3.0 # 启动后等待3秒再进行健康检查 initial_start_time = time.time() while self.running: frame_start = time.perf_counter() # 健康检查(更保守的频率) current_time = time.time() if current_time - self.last_health_check >= self.health_check_interval: self.perform_health_check() self.last_health_check = current_time try: # 根据处理时间动态调整获取超时 timeout = max(0.1, min(1.0, self.avg_process_time * 2)) frame = self.queue.get(timeout=timeout) if frame is None: # 停止信号 logger.info("接收到停止信号") break # 检查进程状态(减少检查频率) if current_time - self.last_health_check >= self.health_check_interval: if self.check_process_health(): # 进程不健康,尝试恢复 if not self.attempt_recovery(): logger.error("进程恢复失败,停止推流") break self.last_health_check = current_time # 处理帧 processed_frame = self.process_frame(frame) # 安全写入FFmpeg write_success = self.write_frame(processed_frame) if write_success: self.total_frames_sent += 1 self.consecutive_failures = 0 # 重置连续失败计数器 self.pipe_broken = False # 重置管道破裂标志 self.last_successful_write = time.time() # 更新成功写入时间 self.last_output_time = time.time() # 更新最后活动时间 # 更新进程健康状态 self.process_is_healthy = True else: self.total_frames_dropped += 1 self.consecutive_failures += 1 # 记录错误但不要立即重启 if self.consecutive_failures % 5 == 0: # 每5次失败记录一次 logger.warning(f"连续写入失败 {self.consecutive_failures} 次") # 只有在连续失败次数过多时才尝试重启 if self.consecutive_failures >= self.max_consecutive_failures: logger.warning(f"连续失败 {self.consecutive_failures} 次,尝试恢复") if not self.attempt_recovery(): logger.error("恢复失败,停止推流") break except queue.Empty: # 队列空时进行轻度心跳检测 self.light_heartbeat() continue except Exception as e: logger.error(f"处理帧时发生未知错误: {str(e)}", exc_info=True) self.total_frames_dropped += 1 self.consecutive_failures += 1 # 更新帧处理时间估计(EMA平滑) elapsed = time.perf_counter() - frame_start self.avg_process_time = self.avg_process_time * 0.9 + elapsed * 0.1 # 清理资源 self.stop_monitor_thread() self.stop_ffmpeg() self.log_statistics() logger.info("推流线程已停止") def start_monitor_thread(self): """启动FFmpeg输出监控线程""" if not self.config.get('enable_monitoring', True): return self.monitor_running = True self.monitor_thread = threading.Thread( target=self.monitor_ffmpeg_output, daemon=True, name=f"FFmpegMonitor_{id(self)}" ) self.monitor_thread.start() logger.info("FFmpeg监控线程已启动") def stop_monitor_thread(self): """停止监控线程""" self.monitor_running = False if self.monitor_thread and self.monitor_thread.is_alive(): self.monitor_thread.join(timeout=2.0) if self.monitor_thread.is_alive(): logger.warning("监控线程未能及时停止") else: logger.info("监控线程已停止") def monitor_ffmpeg_output(self): """监控FFmpeg输出流(改进版本)""" logger.info("启动FFmpeg监控线程") try: stderr = self.process.stderr if self.process else None if not stderr: logger.warning("没有stderr管道,跳过监控") return buffer = [] max_buffer_size = 100 # 限制缓冲区大小 last_check_time = time.time() while self.monitor_running and self.process and self.check_process_status(): try: current_time = time.time() # 定期检查进程状态(每5秒一次) if current_time - last_check_time >= 5.0: self._check_silent_state(current_time) last_check_time = current_time # 使用更简单的方法读取(非阻塞) if is_windows(): # Windows: 使用select或直接尝试读取 try: line = stderr.readline() if line: self._process_ffmpeg_line(line) time.sleep(0.01) # 短暂休眠避免CPU占用过高 else: time.sleep(0.1) # 没有数据时等待 except Exception as e: time.sleep(0.5) else: # Unix: 使用select实现非阻塞读取 import select rlist, _, _ = select.select([stderr], [], [], 0.1) if rlist: line = stderr.readline() if line: self._process_ffmpeg_line(line) except Exception as e: # 读取错误,可能是管道关闭 if not self.monitor_running: break time.sleep(0.5) continue except Exception as e: logger.debug(f"监控线程异常退出: {str(e)}") finally: logger.info("FFmpeg监控线程已退出") def _process_ffmpeg_line(self, line_bytes): """处理FFmpeg输出行""" try: line = line_bytes.decode('utf-8', errors='ignore').strip() if not line: return current_time = time.time() # 更新统计 self.output_count += 1 self.last_output_time = current_time self.silent_start_time = None # 重置沉默计时 # 只处理重要错误 if 'error' in line.lower() or 'failed' in line.lower(): logger.warning(f"FFmpeg错误: {line}") # 严重错误处理 if any(keyword in line.lower() for keyword in [ 'broken pipe', 'pipe broken', 'connection refused', 'cannot open connection', 'timeout' ]): logger.error(f"FFmpeg关键错误: {line}") self.pipe_broken = True # 性能信息 elif 'frame=' in line and 'fps=' in line: if self.total_frames_sent % 100 == 0: # 每100帧记录一次 logger.debug(f"FFmpeg状态: {line}") except Exception as e: logger.debug(f"处理FFmpeg行失败: {str(e)}") def _check_silent_state(self, current_time): """检查沉默状态(更宽容的策略)""" # 如果最近有成功写入,重置沉默计时 if current_time - self.last_successful_write < 5.0: self.silent_start_time = None return # 如果最近有输出,重置沉默计时 if current_time - self.last_output_time < 5.0: self.silent_start_time = None return # 开始沉默计时 if self.silent_start_time is None: self.silent_start_time = current_time # 检查沉默时间是否超过阈值 silent_duration = current_time - self.silent_start_time if silent_duration > self.silent_threshold: # 长时间沉默,但先不重启,而是进行深度检查 logger.warning(f"FFmpeg长时间无输出 ({silent_duration:.1f}秒)") # 进行更严格的进程检查 if not self._deep_process_check(): logger.error(f"FFmpeg深度检查失败,准备重启") self.schedule_restart() else: logger.info(f"FFmpeg深度检查通过,继续监控") # 重置沉默计时,给更多时间 self.silent_start_time = current_time - (self.silent_threshold / 2) def _deep_process_check(self): """深度进程检查""" if not self.process: return False try: # 方法1:检查进程是否存在 return_code = self.process.poll() if return_code is not None: return False # 方法2:尝试发送信号检查(Unix) if not is_windows(): import signal try: os.kill(self.process.pid, 0) except OSError: return False # 方法3:检查进程资源使用(如果可用) try: import psutil ps_process = psutil.Process(self.process.pid) # 检查进程状态 status = ps_process.status() if status in (psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD): return False # 检查进程是否消耗CPU(可选) cpu_percent = ps_process.cpu_percent(interval=0.1) logger.debug(f"FFmpeg深度检查: CPU={cpu_percent}%, 状态={status}") except ImportError: # 没有psutil,跳过详细检查 pass except (psutil.NoSuchProcess, psutil.AccessDenied): return False return True except (OSError, ProcessLookupError): return False except Exception as e: logger.debug(f"深度检查异常: {str(e)}") return True # 异常时假设进程正常 def schedule_restart(self): """安排安全重启(避免重复调度)""" if self.restart_scheduled: return self.restart_scheduled = True def safe_restart(): try: if self.running: self.safe_restart_ffmpeg() except Exception as e: logger.error(f"安全重启失败: {str(e)}") finally: self.restart_scheduled = False threading.Thread(target=safe_restart, daemon=True).start() def perform_health_check(self): """执行健康检查(替代原来的网络检查)""" try: # 1. 检查进程是否存在 if not self.process: logger.warning("进程不存在,标记为不健康") self.process_is_healthy = False return # 2. 检查进程是否在运行 if self.process.poll() is not None: logger.warning("进程已退出,标记为不健康") self.process_is_healthy = False return # 3. 检查长时间没有成功写入(超过30秒) current_time = time.time() if current_time - self.last_successful_write > 30.0: logger.warning(f"长时间没有成功写入 ({current_time - self.last_successful_write:.1f}秒)") self.process_is_healthy = False return # 4. 检查管道状态 if self.pipe_broken: logger.warning("管道破裂状态") self.process_is_healthy = False return # 检查通过 self.process_is_healthy = True except Exception as e: logger.debug(f"健康检查异常: {str(e)}") self.process_is_healthy = False def check_process_health(self): """检查进程健康状况""" return not self.process_is_healthy def attempt_recovery(self): """尝试恢复(替代原来的立即重启)""" logger.info("尝试恢复推流进程...") # 1. 先尝试轻微恢复(不清除进程) if not self.pipe_broken and self.consecutive_failures < 5: # 只是轻微问题,等待一下 logger.info("轻微问题,等待2秒后重试") time.sleep(2.0) # 尝试清理缓冲区 try: while not self.queue.empty(): self.queue.get_nowait() except: pass self.consecutive_failures = 0 return True # 2. 需要重启的情况 logger.info("问题较严重,尝试重启FFmpeg") return self.safe_restart_ffmpeg() def safe_restart_ffmpeg(self): """安全重启FFmpeg(优化版本)""" with self.restart_lock: current_time = time.time() # 防止频繁重启(增加最小间隔) min_interval = 10.0 # 至少间隔10秒才能再次重启 if current_time - self.last_restart_attempt < min_interval: logger.debug(f"跳过重启:距离上次尝试时间过短 ({min_interval:.1f}秒)") return False self.last_restart_attempt = current_time # 检查全局重启频率 with _global_restart_lock: global _global_restart_count, _global_last_restart_time # 全局限制:1分钟内最多重启3次 if current_time - _global_last_restart_time < 60: if _global_restart_count >= 3: logger.error("全局重启频率过高,等待60秒") return False else: # 重置计数器 _global_restart_count = 0 _global_last_restart_time = current_time _global_restart_count += 1 # 检查重启次数 self.restart_count += 1 max_restarts = 10 if self.restart_count > max_restarts: logger.error(f"达到最大重启次数 {max_restarts},停止推流") self.running = False return False # 指数退避延迟(更保守) delay = min( self.max_restart_delay, max(self.min_restart_delay, 2.0 * (2 ** (self.restart_count - 1))) ) logger.info(f"准备重启FFmpeg ({self.restart_count}/{max_restarts}), {delay:.1f}秒后执行...") # 在延迟期间持续检查停止信号 start_time = time.time() while time.time() - start_time < delay: if not self.running or self.stop_event.is_set(): logger.info("收到停止信号,取消重启") return False time.sleep(0.5) try: # 清理旧的FFmpeg进程 self.stop_ffmpeg() time.sleep(1.0) # 等待资源释放 # 启动新的FFmpeg进程 success = self.start_ffmpeg() if success: # 重置状态 self.pipe_broken = False self.consecutive_failures = 0 self.process_is_healthy = True self.last_successful_write = time.time() self.last_output_time = time.time() self.silent_start_time = None logger.info(f"FFmpeg重启成功 (第{self.restart_count}次)") # 成功重启后,适当减少重启计数 if self.restart_count > 1: self.restart_count = max(0, self.restart_count - 1) return success except Exception as e: logger.error(f"重启失败: {str(e)}") return False def check_process_status(self): """检查进程状态(简化版本)""" if not self.process: return False try: # 简单检查进程是否在运行 return self.process.poll() is None except: return False def light_heartbeat(self): """轻度心跳检测(减少开销)""" # 每10秒检查一次 current_time = time.time() if hasattr(self, '_last_light_check'): if current_time - self._last_light_check < 10: return self._last_light_check = current_time # 简单检查进程是否存在 if not self.check_process_status(): logger.debug("心跳检测:进程不存在") if not self.attempt_recovery(): logger.error("心跳检测恢复失败") def start_ffmpeg(self): """安全启动FFmpeg进程(优化版本)""" if self.process_starting.is_set(): logger.debug("FFmpeg已在启动中,跳过重复启动") return False self.process_starting.set() success = False try: with self.process_lock: # 确保关闭现有进程 if self.process: self.stop_ffmpeg() time.sleep(0.5) # 等待清理完成 logger.info(f"启动FFmpeg推流进程 | 目标地址: {self.config['url']}") # 构建FFmpeg命令 command = self.build_ffmpeg_command() # 启动新进程(使用更简单的参数) creationflags = 0 if is_windows(): # Windows上使用CREATE_NO_WINDOW避免弹出窗口 creationflags = subprocess.CREATE_NO_WINDOW self.process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, bufsize=0, creationflags=creationflags ) # 重置状态 self.restart_count = 0 self.pipe_broken = False self.consecutive_failures = 0 self.process_is_healthy = True self.last_successful_write = time.time() self.last_output_time = time.time() self.silent_start_time = None logger.info(f"FFmpeg启动成功 PID: {self.process.pid}") # 等待进程稳定(更长的等待时间) time.sleep(1.0) # 检查进程是否仍在运行 if not self.check_process_status(): logger.error("FFmpeg启动后立即退出") # 尝试读取错误输出 try: err_output = self.process.stderr.read(1024) if err_output: logger.error(f"FFmpeg错误输出: {err_output.decode('utf-8', errors='ignore')}") except: pass self.process = None return False success = True except Exception as e: logger.error(f"启动FFmpeg失败: {str(e)}", exc_info=True) self.process = None finally: self.process_starting.clear() return success def build_ffmpeg_command(self): """构建优化的FFmpeg命令""" w, h = self.scaled_width, self.scaled_height # 基础命令 command = [ 'ffmpeg', '-y', '-an', # 覆盖输出文件,禁用音频 '-loglevel', 'warning', # 减少日志输出 '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', self.config.get('pixel_format', 'bgr24'), '-s', f'{w}x{h}', '-r', str(self.fps), '-i', '-', # 从标准输入读取 '-c:v', self.config['video_codec'], '-preset', self.config['preset'], '-g', str(int(self.fps * 2)), # 关键帧间隔调整为2秒 '-crf', str(self.config.get('crf', 23)), '-b:v', self.config.get('bitrate', '2000k'), '-maxrate', self.config.get('bitrate', '2000k'), # 最大码率 '-minrate', '500k', # 最小码率 '-bufsize', self.config.get('bufsize', '2000k'), # 增加缓冲区 '-threads', '2', # 限制线程数,避免过多线程竞争 '-f', self.config['format'], '-fflags', 'nobuffer', '-flags', 'low_delay', # 低延迟标志 '-strict', 'experimental', '-movflags', '+faststart', '-tune', 'zerolatency', # 零延迟调优 '-rtbufsize', '100M', # 实时缓冲区大小 # 添加网络相关参数 '-timeout', '30000000', # 网络超时30秒 '-reconnect', '1', # 启用自动重连 '-reconnect_streamed', '1', '-reconnect_delay_max', '5', # 最大重连延迟5秒 '-rw_timeout', '5000000', # 读写超时5秒 self.config['url'] ] # 启用硬件加速 if self.config.get('gpu_acceleration', False) and torch.cuda.is_available(): # 使用更稳定的硬件编码参数 command[1:1] = [ '-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda', '-extra_hw_frames', '4' # 减少硬件帧缓冲 ] logger.info("启用CUDA硬件加速") # 如果是NVIDIA硬件编码,添加特定参数 if 'nvenc' in self.config['video_codec']: # 移除原有的video_codec参数,重新添加 idx = command.index('-c:v') command[idx + 1] = 'h264_nvenc' # 添加NVIDIA编码器特定参数 nvenc_params = [ '-rc', 'vbr', # 变码率 '-cq', '23', # 恒定质量 '-b:v', '2M', '-maxrate', '3M', '-profile:v', 'main', '-level', '4.1', '-rc-lookahead', '0', # 减少延迟 '-surfaces', '4', # 减少表面数 '-g', str(int(self.fps * 2)), '-bf', '0', # 禁用B帧 '-refs', '1', # 减少参考帧 ] # 在-c:v参数后插入这些参数 insert_idx = command.index('h264_nvenc') + 1 for param in reversed(nvenc_params): command.insert(insert_idx, param) logger.debug(f"FFmpeg命令: {' '.join(command[:20])}...") # 只打印前20个参数 return command def write_frame(self, frame_data): """安全写入帧数据(优化版本,减少误判)""" if not self.process: return False # 检查进程状态 if not self.check_process_status(): logger.debug("进程已退出,跳过写入") return False # 如果管道已经破裂,直接返回失败 if self.pipe_broken: logger.debug("管道已破裂,跳过写入") return False try: with self.process_lock: if not self.process or not self.process.stdin: return False # 尝试写入 self.process.stdin.write(frame_data) # 注意:在Windows上,频繁flush可能导致性能问题 # 可以改为每N帧flush一次,但这里保持简单 try: self.process.stdin.flush() except: # flush失败不一定意味着写入失败 pass return True except BrokenPipeError: logger.warning("管道破裂") self.pipe_broken = True return False except OSError as e: # Windows特定错误 import errno if is_windows(): if hasattr(e, 'winerror'): if e.winerror == 232 or e.winerror == 109: logger.warning("管道已关闭") self.pipe_broken = True return False else: # Unix错误 if hasattr(e, 'errno') and e.errno == errno.EPIPE: logger.warning("管道破裂") self.pipe_broken = True return False logger.debug(f"写入错误: {str(e)}") return False except Exception as e: logger.debug(f"写入异常: {str(e)}") return False def stop_ffmpeg(self): """安全停止FFmpeg(改进清理逻辑)""" with self.process_lock: if not self.process: return pid = self.process.pid logger.info(f"停止FFmpeg进程 PID: {pid}...") # 第1层: 关闭输入流 if self.process.stdin: try: self.process.stdin.close() except: pass # 第2层: 发送终止信号 try: self.process.terminate() except: pass # 等待程序优雅退出 try: self.process.wait(timeout=1.0) except subprocess.TimeoutExpired: # 第3层: 强制终止 try: logger.warning(f"强制终止FFmpeg进程 PID: {pid}") self.process.kill() self.process.wait(timeout=0.5) except: pass except: pass # 确保进程已终止 if self.process.poll() is None: logger.warning(f"FFmpeg进程 PID: {pid} 未能正常终止") self.process = None logger.info(f"FFmpeg进程 PID: {pid} 已停止") def process_frame(self, frame): """高效帧处理(减少内存分配)""" # 如果需要缩放 if (self.scaled_width, self.scaled_height) != (self.original_width, self.original_height): # 使用更高效的缩放算法 frame = cv2.resize(frame, (self.scaled_width, self.scaled_height), interpolation=cv2.INTER_LANCZOS4) # 确保格式正确 if frame.dtype != np.uint8: frame = frame.astype(np.uint8) if len(frame.shape) == 3 and frame.shape[2] == 4: frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) elif len(frame.shape) == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) return frame.tobytes() def add_frame(self, frame): """智能帧添加策略(优化版本)""" # 如果进程不健康,先尝试恢复 if not self.process_is_healthy and self.consecutive_failures >= 3: logger.debug("进程不健康,跳过帧添加") self.total_frames_dropped += 1 return False try: # 直接尝试添加,超时时间很短 self.queue.put(frame, timeout=0.01) return True except queue.Full: # 队列满时,丢弃最旧的几帧 for _ in range(min(3, self.queue.qsize() // 3)): try: self.queue.get_nowait() except queue.Empty: break # 再次尝试 try: self.queue.put_nowait(frame) self.total_frames_dropped += 1 # 丢弃了旧帧,所以记录 return True except queue.Full: self.total_frames_dropped += 1 return False except Exception as e: logger.debug(f"帧处理错误: {str(e)}") self.total_frames_dropped += 1 return False def log_statistics(self): """输出性能统计""" elapsed = time.time() - self.start_time sent = self.total_frames_sent dropped = self.total_frames_dropped total = sent + dropped if total > 0: fps = sent / elapsed drop_rate = (dropped / total) * 100 logger.info( "推流统计结果:\n" f" 持续时间: {elapsed:.1f}秒\n" f" 总帧数: {total}\n" f" 成功发送: {sent}\n" f" 丢弃帧数: {dropped}\n" f" 丢帧率: {drop_rate:.1f}%\n" f" 平均FPS: {fps:.1f}\n" f" 重启次数: {self.restart_count}" ) def stop(self): """优雅停止整个线程(改进停止逻辑)""" if not self.running: return logger.info("停止推流线程...") self.running = False # 发送停止信号 try: self.queue.put(None, block=False) except: pass # 停止监控线程 self.stop_monitor_thread() # 清理FFmpeg进程 self.stop_ffmpeg() # 强制唤醒 try: self.queue.put(None, block=False) except: pass # 等待线程结束 if self.is_alive() and threading.current_thread() != self: self.join(3.0) if self.is_alive(): logger.warning("推流线程未能及时停止") logger.info("推流线程已停止")