import errno import threading import queue import time import os import cv2 import numpy as np import subprocess import torch from log import logger class FFmpegStreamer(threading.Thread): """独立推流线程 - 避免阻塞检测线程""" def __init__(self, config, fps, frame_width, frame_height): super().__init__() self.config = config['push'] self.fps = fps self.original_width = frame_width self.original_height = frame_height # 使用原始分辨率 self.scaled_width = frame_width self.scaled_height = frame_height self.queue = queue.Queue(maxsize=10) # 合理大小的队列 self.running = True self.process = None self.process_lock = threading.Lock() self.process_starting = threading.Event() self.daemon = True self.last_drop_warning = 0 self.frame_buffer = None # 统计信息 self.total_frames_sent = 0 self.total_frames_dropped = 0 self.frame_counter = 0 self.start_time = time.time() self.avg_process_time = 0.033 # 初始值设为30fps的帧间隔(1/30≈0.033) # 重连参数 self.max_restarts = 5 self.restart_count = 0 self.min_restart_delay = 0.5 self.max_restart_delay = 5.0 self.last_restart_time = 0 # 新增重启控制变量 self.restart_lock = threading.Lock() self.restart_scheduled = False self.last_restart_attempt = 0 self.pipe_broken = False # 管道破裂标志 # 预计算帧处理参数 self.buffer_size = self.scaled_width * self.scaled_height * 3 logger.info( f"推流线程初始化 | 分辨率: {self.scaled_width}x{self.scaled_height} | 缓冲区: {self.buffer_size // 1024}KB") def run(self): """推流线程主循环""" logger.info(f"启动推流线程 | 分辨率: {self.scaled_width}x{self.scaled_height} | FPS: {self.fps}") # 启动时确保FFmpeg已启动 self.start_ffmpeg() while self.running: frame_start = time.perf_counter() try: # 根据处理时间动态调整获取超时 timeout = max(0.1, min(1.0, self.avg_process_time * 2)) frame = self.queue.get(timeout=timeout) if frame is None: # 停止信号 logger.info("接收到停止信号") break # 检查并处理进程失败 if self.process_failed(): logger.warning("检测到进程失效,尝试重启") self.safe_restart_ffmpeg() # 处理帧 processed_frame = self.process_frame(frame) # 安全写入FFmpeg write_success = self.write_frame(processed_frame) if write_success: self.total_frames_sent += 1 self.pipe_broken = False # 重置管道破裂标志 else: self.total_frames_dropped += 1 # 写失败后快速重启 if self.process_failed(): self.safe_restart_ffmpeg() except queue.Empty: # 队列空时进行心跳检测 self.process_heartbeat() continue except Exception as e: logger.error(f"处理帧时发生未知错误: {str(e)}", exc_info=True) self.total_frames_dropped += 1 # 更新帧处理时间估计(EMA平滑) elapsed = time.perf_counter() - frame_start self.avg_process_time = self.avg_process_time * 0.9 + elapsed * 0.1 # 清理资源 self.stop_ffmpeg() self.log_statistics() logger.info("推流线程已停止") def safe_restart_ffmpeg(self): """安全重启FFmpeg(带速率限制和锁保护)""" with self.restart_lock: current_time = time.time() # 防止频繁重启 if current_time - self.last_restart_attempt < 2.0: logger.debug("跳过重启:距离上次尝试时间过短") return False self.last_restart_attempt = current_time # 检查重启次数 self.restart_count += 1 if self.restart_count > self.max_restarts: logger.error(f"达到最大重启次数 {self.max_restarts},停止推流") self.running = False return False # 计算退避延迟 delay = min( self.max_restart_delay, self.min_restart_delay * (2 ** (self.restart_count - 1)) ) logger.info(f"准备重启FFmpeg ({self.restart_count}/{self.max_restarts}), {delay:.1f}秒后执行...") time.sleep(delay) self.last_restart_time = current_time try: return self.start_ffmpeg() except Exception as e: logger.error(f"重启失败: {str(e)}") return False def process_failed(self): """检查进程状态(带状态缓存)""" if not self.process: return True try: # 检查进程是否已终止 if self.process.poll() is not None: return True # 额外检查进程是否真实存在 try: # 使用psutil检查进程状态(如果可用) import psutil if not psutil.pid_exists(self.process.pid): return True process = psutil.Process(self.process.pid) if process.status() == psutil.STATUS_ZOMBIE: return True except ImportError: # 如果没有psutil,使用简单检查 if os.name == 'posix' and not os.path.exists(f"/proc/{self.process.pid}"): return True return False except Exception: return True def process_heartbeat(self): """空闲时的心跳检测""" # 每5秒检测一次进程状态 if time.time() - getattr(self, '_last_heartbeat', 0) > 5: if self.process_failed(): logger.warning("心跳检测发现进程失效,尝试重启") self.safe_restart_ffmpeg() self._last_heartbeat = time.time() def start_ffmpeg(self): """安全启动FFmpeg进程(带互斥锁和状态跟踪)""" if self.process_starting.is_set(): logger.debug("FFmpeg已在启动中,跳过重复启动") return False self.process_starting.set() success = False try: with self.process_lock: # 确保关闭现有进程 if self.process: self.stop_ffmpeg() logger.info(f"启动FFmpeg推流进程 | 目标地址: {self.config['url']}") # 构建FFmpeg命令 command = self.build_ffmpeg_command() # 启动新进程 self.process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, bufsize=0 # 无缓冲 ) # 启动监控线程 threading.Thread( target=self.monitor_ffmpeg, daemon=True, name="FFmpegMonitor" ).start() logger.info(f"FFmpeg启动成功 PID: {self.process.pid}") self.restart_count = 0 # 重置重启计数 self.pipe_broken = False # 重置管道破裂标志 # 等待进程稳定 time.sleep(0.5) # 检查进程是否仍在运行 if self.process_failed(): logger.error("FFmpeg启动后立即退出") # 尝试读取错误输出 try: err_output = self.process.stderr.read() if err_output: logger.error(f"FFmpeg错误输出: {err_output.decode('utf-8', errors='ignore')}") except: pass self.process = None return False success = True except Exception as e: logger.error(f"启动FFmpeg失败: {str(e)}", exc_info=True) self.process = None finally: self.process_starting.clear() return success def build_ffmpeg_command(self): """构建优化的FFmpeg命令""" w, h = self.scaled_width, self.scaled_height # 基本命令 command = [ 'ffmpeg', '-y', '-an', # 覆盖输出文件,禁用音频 '-loglevel', 'warning', # 减少日志输出 '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', self.config.get('pixel_format', 'bgr24'), '-s', f'{w}x{h}', '-r', str(self.fps), '-i', '-', # 从标准输入读取 '-c:v', self.config['video_codec'], '-pix_fmt', 'yuv420p', '-preset', self.config['preset'], '-g', str(int(self.fps * 1.5)), # 关键帧间隔 '-crf', str(self.config.get('crf', 23)), # 质量参数 '-b:v', self.config.get('bitrate', '2000k'), # 码率控制 '-bufsize', self.config.get('bufsize', '1000k'), # 缓冲区大小 '-threads', '0', # 自动多线程 '-f', self.config['format'], '-fflags', 'nobuffer', # 最小化缓冲 self.config['url'] ] # 启用硬件加速 if self.config.get('gpu_acceleration', False) and torch.cuda.is_available(): command[1:1] = [ '-hwaccel', 'cuda', '-hwaccel_output_format', 'cuda', '-extra_hw_frames', '8' ] logger.info("启用CUDA硬件加速") # print("command",command) logger.debug(f"FFmpeg命令: {' '.join(command)}") return command def monitor_ffmpeg(self): """监控FFmpeg输出流(改进退出条件)""" logger.info("启动FFmpeg监控线程") try: stderr = self.process.stderr while self.running and self.process and self.process.poll() is None: try: line = stderr.readline() if line: line = line.decode('utf-8', errors='ignore').strip() self.process_ffmpeg_output(line) else: # 进程可能已终止 if self.process.poll() is not None: break time.sleep(0.1) except Exception as e: logger.error(f"监控线程读取错误: {str(e)}") break except Exception as e: logger.error(f"监控线程异常: {str(e)}", exc_info=True) finally: logger.info("FFmpeg监控线程已退出") def process_ffmpeg_output(self, line): """处理FFmpeg输出(根据级别分类)""" if not line: return # 分类日志级别 if any(tag in line for tag in ['[error]', 'Failed', 'Error']): logger.error(f"FFmpeg ERROR: {line}") # 关键错误处理 if 'Connection refused' in line: logger.error("目标服务器拒绝连接,请检查服务器状态") elif 'Cannot open connection' in line: logger.error("无法建立网络连接,检查网络和防火墙") elif 'Broken pipe' in line: logger.critical("检测到底层管道破裂") self.pipe_broken = True # 设置管道破裂标志 # 立即安排重启 self.schedule_restart() elif any(tag in line for tag in ['[warning]', 'WARNING']): logger.warning(f"FFmpeg WARN: {line}") elif 'frame=' in line and 'fps=' in line: # 忽略进度输出,避免日志污染 pass else: logger.debug(f"FFmpeg INFO: {line}") def schedule_restart(self): """安排安全重启(避免重复调度)""" if self.restart_scheduled: return self.restart_scheduled = True def safe_restart(): try: if self.running: self.safe_restart_ffmpeg() except Exception as e: logger.error(f"安全重启失败: {str(e)}") finally: self.restart_scheduled = False threading.Thread(target=safe_restart, daemon=True).start() def write_frame(self, frame_data): """安全写入帧数据(带自动重试)""" if not self.process: return False # 如果管道已经破裂,直接返回失败并触发重启 if self.pipe_broken: logger.warning("管道已破裂,跳过写入并触发重启") self.schedule_restart() return False max_retries = 1 # 减少重试次数 for attempt in range(max_retries + 1): try: # 检查进程状态 if self.process_failed(): return False # 尝试写入 with self.process_lock: if self.process and self.process.stdin and self.process.poll() is None: self.process.stdin.write(frame_data) self.process.stdin.flush() return True else: logger.debug("写入失败 - 管道已关闭") return False except BrokenPipeError: logger.warning(f"管道破裂 (尝试 {attempt + 1}/{max_retries + 1})") self.pipe_broken = True # 设置管道破裂标志 self.schedule_restart() return False except OSError as e: # 特别处理EAGAIN错误(非阻塞操作) if e.errno == errno.EAGAIN: logger.warning("资源暂时不可用,等待后重试") time.sleep(0.05) else: logger.error(f"系统级写入错误: {os.strerror(e.errno)}") self.schedule_restart() return False except Exception as e: logger.error(f"写入异常: {str(e)}") self.schedule_restart() return False # 重试前短暂暂停 time.sleep(0.02 * (attempt + 1)) return False def stop_ffmpeg(self): """安全停止FFmpeg(改进清理逻辑)""" with self.process_lock: if not self.process: return pid = self.process.pid logger.info(f"停止FFmpeg进程 PID: {pid}...") # 第1层: 关闭输入流 if self.process.stdin: try: self.process.stdin.close() except: pass # 第2层: 发送终止信号 try: self.process.terminate() except: pass # 等待程序优雅退出 try: self.process.wait(timeout=1.0) except subprocess.TimeoutExpired: # 第3层: 强制终止 try: logger.warning(f"强制终止FFmpeg进程 PID: {pid}") self.process.kill() self.process.wait(timeout=0.5) except: pass except: pass # 确保进程已终止 if self.process.poll() is None: logger.warning(f"FFmpeg进程 PID: {pid} 未能正常终止") self.process = None logger.info(f"FFmpeg进程 PID: {pid} 已停止") def process_frame(self, frame): """高效帧处理(减少内存分配)""" # 如果需要缩放 if (self.scaled_width, self.scaled_height) != (self.original_width, self.original_height): # 使用更高效的缩放算法 frame = cv2.resize(frame, (self.scaled_width, self.scaled_height), interpolation=cv2.INTER_LANCZOS4) # 确保格式正确 if frame.dtype != np.uint8: frame = frame.astype(np.uint8) if len(frame.shape) == 3 and frame.shape[2] == 4: frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) elif len(frame.shape) == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) return frame.tobytes() def add_frame(self, frame): """智能帧添加策略(自适应队列管理)""" # 如果管道已破裂,跳过帧添加 if self.pipe_broken: logger.debug("管道破裂状态,跳过帧添加") self.total_frames_dropped += 1 return False try: # 根据队列状态调整行为 queue_size = self.queue.qsize() queue_capacity = self.queue.maxsize if queue_size < queue_capacity * 0.8: # 队列正常 - 直接添加 self.queue.put_nowait(frame) return True else: # 队列较满 - 尝试优化 try: self.queue.put_nowait(frame) return True except queue.Full: # 使用低开销的缓冲区覆盖方法 for _ in range(min(5, queue_size // 2)): try: self.queue.get_nowait() except queue.Empty: break # 再次尝试放入 try: self.queue.put_nowait(frame) self.total_frames_dropped += 1 return True except queue.Full: self.total_frames_dropped += 1 return False except Exception as e: logger.debug(f"帧处理错误: {str(e)}") self.total_frames_dropped += 1 return False def log_statistics(self): """输出性能统计""" elapsed = time.time() - self.start_time sent = self.total_frames_sent dropped = self.total_frames_dropped total = sent + dropped if total > 0: fps = sent / elapsed drop_rate = (dropped / total) * 100 logger.info( "推流统计结果:\n" f" 持续时间: {elapsed:.1f}秒\n" f" 总帧数: {total}\n" f" 成功发送: {sent}\n" f" 丢弃帧数: {dropped}\n" f" 丢帧率: {drop_rate:.1f}%\n" f" 平均FPS: {fps:.1f}\n" f" 重启次数: {self.restart_count}" ) def stop(self): """优雅停止整个线程(改进停止逻辑)""" if not self.running: return logger.info("停止推流线程...") self.running = False # 发送停止信号 try: self.queue.put(None, block=False) except: pass # 清理FFmpeg进程 self.stop_ffmpeg() # 强制唤醒 try: self.queue.put(None, block=False) except: pass # 等待线程结束 if self.is_alive() and threading.current_thread() != self: self.join(3.0) if self.is_alive(): logger.warning("推流线程未能及时停止") logger.info("推流线程已停止")