569 lines
20 KiB
Python
569 lines
20 KiB
Python
import errno
|
||
import threading
|
||
import queue
|
||
import time
|
||
import os
|
||
import cv2
|
||
import numpy as np
|
||
import subprocess
|
||
import torch
|
||
from log import logger
|
||
|
||
class FFmpegStreamer(threading.Thread):
|
||
"""独立推流线程 - 避免阻塞检测线程"""
|
||
|
||
def __init__(self, config, fps, frame_width, frame_height):
|
||
super().__init__()
|
||
self.config = config['push']
|
||
self.fps = fps
|
||
self.original_width = frame_width
|
||
self.original_height = frame_height
|
||
|
||
# 使用原始分辨率
|
||
self.scaled_width = frame_width
|
||
self.scaled_height = frame_height
|
||
|
||
self.queue = queue.Queue(maxsize=10) # 合理大小的队列
|
||
self.running = True
|
||
self.process = None
|
||
self.process_lock = threading.Lock()
|
||
self.process_starting = threading.Event()
|
||
self.daemon = True
|
||
self.last_drop_warning = 0
|
||
self.frame_buffer = None
|
||
|
||
# 统计信息
|
||
self.total_frames_sent = 0
|
||
self.total_frames_dropped = 0
|
||
self.frame_counter = 0
|
||
self.start_time = time.time()
|
||
self.avg_process_time = 0.033 # 初始值设为30fps的帧间隔(1/30≈0.033)
|
||
|
||
# 重连参数
|
||
self.max_restarts = 5
|
||
self.restart_count = 0
|
||
self.min_restart_delay = 0.5
|
||
self.max_restart_delay = 5.0
|
||
self.last_restart_time = 0
|
||
|
||
# 新增重启控制变量
|
||
self.restart_lock = threading.Lock()
|
||
self.restart_scheduled = False
|
||
self.last_restart_attempt = 0
|
||
self.pipe_broken = False # 管道破裂标志
|
||
|
||
# 预计算帧处理参数
|
||
self.buffer_size = self.scaled_width * self.scaled_height * 3
|
||
logger.info(
|
||
f"推流线程初始化 | 分辨率: {self.scaled_width}x{self.scaled_height} | 缓冲区: {self.buffer_size // 1024}KB")
|
||
|
||
def run(self):
|
||
"""推流线程主循环"""
|
||
logger.info(f"启动推流线程 | 分辨率: {self.scaled_width}x{self.scaled_height} | FPS: {self.fps}")
|
||
|
||
# 启动时确保FFmpeg已启动
|
||
self.start_ffmpeg()
|
||
|
||
while self.running:
|
||
frame_start = time.perf_counter()
|
||
|
||
try:
|
||
# 根据处理时间动态调整获取超时
|
||
timeout = max(0.1, min(1.0, self.avg_process_time * 2))
|
||
frame = self.queue.get(timeout=timeout)
|
||
|
||
if frame is None: # 停止信号
|
||
logger.info("接收到停止信号")
|
||
break
|
||
|
||
# 检查并处理进程失败
|
||
if self.process_failed():
|
||
logger.warning("检测到进程失效,尝试重启")
|
||
self.safe_restart_ffmpeg()
|
||
|
||
# 处理帧
|
||
processed_frame = self.process_frame(frame)
|
||
|
||
# 安全写入FFmpeg
|
||
write_success = self.write_frame(processed_frame)
|
||
|
||
if write_success:
|
||
self.total_frames_sent += 1
|
||
self.pipe_broken = False # 重置管道破裂标志
|
||
else:
|
||
self.total_frames_dropped += 1
|
||
# 写失败后快速重启
|
||
if self.process_failed():
|
||
self.safe_restart_ffmpeg()
|
||
except queue.Empty:
|
||
# 队列空时进行心跳检测
|
||
self.process_heartbeat()
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"处理帧时发生未知错误: {str(e)}", exc_info=True)
|
||
self.total_frames_dropped += 1
|
||
|
||
# 更新帧处理时间估计(EMA平滑)
|
||
elapsed = time.perf_counter() - frame_start
|
||
self.avg_process_time = self.avg_process_time * 0.9 + elapsed * 0.1
|
||
|
||
# 清理资源
|
||
self.stop_ffmpeg()
|
||
self.log_statistics()
|
||
logger.info("推流线程已停止")
|
||
|
||
def safe_restart_ffmpeg(self):
|
||
"""安全重启FFmpeg(带速率限制和锁保护)"""
|
||
with self.restart_lock:
|
||
current_time = time.time()
|
||
|
||
# 防止频繁重启
|
||
if current_time - self.last_restart_attempt < 2.0:
|
||
logger.debug("跳过重启:距离上次尝试时间过短")
|
||
return False
|
||
|
||
self.last_restart_attempt = current_time
|
||
|
||
# 检查重启次数
|
||
self.restart_count += 1
|
||
if self.restart_count > self.max_restarts:
|
||
logger.error(f"达到最大重启次数 {self.max_restarts},停止推流")
|
||
self.running = False
|
||
return False
|
||
|
||
# 计算退避延迟
|
||
delay = min(
|
||
self.max_restart_delay,
|
||
self.min_restart_delay * (2 ** (self.restart_count - 1))
|
||
)
|
||
|
||
logger.info(f"准备重启FFmpeg ({self.restart_count}/{self.max_restarts}), {delay:.1f}秒后执行...")
|
||
time.sleep(delay)
|
||
self.last_restart_time = current_time
|
||
|
||
try:
|
||
return self.start_ffmpeg()
|
||
except Exception as e:
|
||
logger.error(f"重启失败: {str(e)}")
|
||
return False
|
||
|
||
def process_failed(self):
|
||
"""检查进程状态(带状态缓存)"""
|
||
if not self.process:
|
||
return True
|
||
|
||
try:
|
||
# 检查进程是否已终止
|
||
if self.process.poll() is not None:
|
||
return True
|
||
|
||
# 额外检查进程是否真实存在
|
||
try:
|
||
# 使用psutil检查进程状态(如果可用)
|
||
import psutil
|
||
if not psutil.pid_exists(self.process.pid):
|
||
return True
|
||
process = psutil.Process(self.process.pid)
|
||
if process.status() == psutil.STATUS_ZOMBIE:
|
||
return True
|
||
except ImportError:
|
||
# 如果没有psutil,使用简单检查
|
||
if os.name == 'posix' and not os.path.exists(f"/proc/{self.process.pid}"):
|
||
return True
|
||
|
||
return False
|
||
except Exception:
|
||
return True
|
||
|
||
def process_heartbeat(self):
|
||
"""空闲时的心跳检测"""
|
||
# 每5秒检测一次进程状态
|
||
if time.time() - getattr(self, '_last_heartbeat', 0) > 5:
|
||
if self.process_failed():
|
||
logger.warning("心跳检测发现进程失效,尝试重启")
|
||
self.safe_restart_ffmpeg()
|
||
self._last_heartbeat = time.time()
|
||
|
||
def start_ffmpeg(self):
|
||
"""安全启动FFmpeg进程(带互斥锁和状态跟踪)"""
|
||
if self.process_starting.is_set():
|
||
logger.debug("FFmpeg已在启动中,跳过重复启动")
|
||
return False
|
||
|
||
self.process_starting.set()
|
||
success = False
|
||
try:
|
||
with self.process_lock:
|
||
# 确保关闭现有进程
|
||
if self.process:
|
||
self.stop_ffmpeg()
|
||
|
||
logger.info(f"启动FFmpeg推流进程 | 目标地址: {self.config['url']}")
|
||
|
||
# 构建FFmpeg命令
|
||
command = self.build_ffmpeg_command()
|
||
|
||
# 启动新进程
|
||
self.process = subprocess.Popen(
|
||
command,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.PIPE,
|
||
bufsize=0 # 无缓冲
|
||
)
|
||
|
||
# 启动监控线程
|
||
threading.Thread(
|
||
target=self.monitor_ffmpeg,
|
||
daemon=True,
|
||
name="FFmpegMonitor"
|
||
).start()
|
||
|
||
logger.info(f"FFmpeg启动成功 PID: {self.process.pid}")
|
||
self.restart_count = 0 # 重置重启计数
|
||
self.pipe_broken = False # 重置管道破裂标志
|
||
|
||
# 等待进程稳定
|
||
time.sleep(0.5)
|
||
|
||
# 检查进程是否仍在运行
|
||
if self.process_failed():
|
||
logger.error("FFmpeg启动后立即退出")
|
||
# 尝试读取错误输出
|
||
try:
|
||
err_output = self.process.stderr.read()
|
||
if err_output:
|
||
logger.error(f"FFmpeg错误输出: {err_output.decode('utf-8', errors='ignore')}")
|
||
except:
|
||
pass
|
||
self.process = None
|
||
return False
|
||
|
||
success = True
|
||
except Exception as e:
|
||
logger.error(f"启动FFmpeg失败: {str(e)}", exc_info=True)
|
||
self.process = None
|
||
finally:
|
||
self.process_starting.clear()
|
||
return success
|
||
|
||
def build_ffmpeg_command(self):
|
||
"""构建优化的FFmpeg命令"""
|
||
w, h = self.scaled_width, self.scaled_height
|
||
|
||
# 基本命令
|
||
command = [
|
||
'ffmpeg',
|
||
'-y', '-an', # 覆盖输出文件,禁用音频
|
||
'-loglevel', 'warning', # 减少日志输出
|
||
'-f', 'rawvideo',
|
||
'-vcodec', 'rawvideo',
|
||
'-pix_fmt', self.config.get('pixel_format', 'bgr24'),
|
||
'-s', f'{w}x{h}',
|
||
'-r', str(self.fps),
|
||
'-i', '-', # 从标准输入读取
|
||
'-c:v', self.config['video_codec'],
|
||
'-pix_fmt', 'yuv420p',
|
||
'-preset', self.config['preset'],
|
||
'-g', str(int(self.fps * 1.5)), # 关键帧间隔
|
||
'-crf', str(self.config.get('crf', 23)), # 质量参数
|
||
'-b:v', self.config.get('bitrate', '2000k'), # 码率控制
|
||
'-bufsize', self.config.get('bufsize', '1000k'), # 缓冲区大小
|
||
'-threads', '0', # 自动多线程
|
||
'-f', self.config['format'],
|
||
'-fflags', 'nobuffer', # 最小化缓冲
|
||
self.config['url']
|
||
]
|
||
|
||
# 启用硬件加速
|
||
if self.config.get('gpu_acceleration', False) and torch.cuda.is_available():
|
||
command[1:1] = [
|
||
'-hwaccel', 'cuda',
|
||
'-hwaccel_output_format', 'cuda',
|
||
'-extra_hw_frames', '8'
|
||
]
|
||
logger.info("启用CUDA硬件加速")
|
||
# print("command",command)
|
||
logger.debug(f"FFmpeg命令: {' '.join(command)}")
|
||
return command
|
||
|
||
def monitor_ffmpeg(self):
|
||
"""监控FFmpeg输出流(改进退出条件)"""
|
||
logger.info("启动FFmpeg监控线程")
|
||
try:
|
||
stderr = self.process.stderr
|
||
while self.running and self.process and self.process.poll() is None:
|
||
try:
|
||
line = stderr.readline()
|
||
if line:
|
||
line = line.decode('utf-8', errors='ignore').strip()
|
||
self.process_ffmpeg_output(line)
|
||
else:
|
||
# 进程可能已终止
|
||
if self.process.poll() is not None:
|
||
break
|
||
time.sleep(0.1)
|
||
except Exception as e:
|
||
logger.error(f"监控线程读取错误: {str(e)}")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"监控线程异常: {str(e)}", exc_info=True)
|
||
finally:
|
||
logger.info("FFmpeg监控线程已退出")
|
||
|
||
def process_ffmpeg_output(self, line):
|
||
"""处理FFmpeg输出(根据级别分类)"""
|
||
if not line:
|
||
return
|
||
|
||
# 分类日志级别
|
||
if any(tag in line for tag in ['[error]', 'Failed', 'Error']):
|
||
logger.error(f"FFmpeg ERROR: {line}")
|
||
|
||
# 关键错误处理
|
||
if 'Connection refused' in line:
|
||
logger.error("目标服务器拒绝连接,请检查服务器状态")
|
||
elif 'Cannot open connection' in line:
|
||
logger.error("无法建立网络连接,检查网络和防火墙")
|
||
elif 'Broken pipe' in line:
|
||
logger.critical("检测到底层管道破裂")
|
||
self.pipe_broken = True # 设置管道破裂标志
|
||
# 立即安排重启
|
||
self.schedule_restart()
|
||
|
||
elif any(tag in line for tag in ['[warning]', 'WARNING']):
|
||
logger.warning(f"FFmpeg WARN: {line}")
|
||
elif 'frame=' in line and 'fps=' in line:
|
||
# 忽略进度输出,避免日志污染
|
||
pass
|
||
else:
|
||
logger.debug(f"FFmpeg INFO: {line}")
|
||
|
||
def schedule_restart(self):
|
||
"""安排安全重启(避免重复调度)"""
|
||
if self.restart_scheduled:
|
||
return
|
||
|
||
self.restart_scheduled = True
|
||
|
||
def safe_restart():
|
||
try:
|
||
if self.running:
|
||
self.safe_restart_ffmpeg()
|
||
except Exception as e:
|
||
logger.error(f"安全重启失败: {str(e)}")
|
||
finally:
|
||
self.restart_scheduled = False
|
||
|
||
threading.Thread(target=safe_restart, daemon=True).start()
|
||
|
||
def write_frame(self, frame_data):
|
||
"""安全写入帧数据(带自动重试)"""
|
||
if not self.process:
|
||
return False
|
||
|
||
# 如果管道已经破裂,直接返回失败并触发重启
|
||
if self.pipe_broken:
|
||
logger.warning("管道已破裂,跳过写入并触发重启")
|
||
self.schedule_restart()
|
||
return False
|
||
|
||
max_retries = 1 # 减少重试次数
|
||
for attempt in range(max_retries + 1):
|
||
try:
|
||
# 检查进程状态
|
||
if self.process_failed():
|
||
return False
|
||
|
||
# 尝试写入
|
||
with self.process_lock:
|
||
if self.process and self.process.stdin and self.process.poll() is None:
|
||
self.process.stdin.write(frame_data)
|
||
self.process.stdin.flush()
|
||
return True
|
||
else:
|
||
logger.debug("写入失败 - 管道已关闭")
|
||
return False
|
||
|
||
except BrokenPipeError:
|
||
logger.warning(f"管道破裂 (尝试 {attempt + 1}/{max_retries + 1})")
|
||
self.pipe_broken = True # 设置管道破裂标志
|
||
self.schedule_restart()
|
||
return False
|
||
except OSError as e:
|
||
# 特别处理EAGAIN错误(非阻塞操作)
|
||
if e.errno == errno.EAGAIN:
|
||
logger.warning("资源暂时不可用,等待后重试")
|
||
time.sleep(0.05)
|
||
else:
|
||
logger.error(f"系统级写入错误: {os.strerror(e.errno)}")
|
||
self.schedule_restart()
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"写入异常: {str(e)}")
|
||
self.schedule_restart()
|
||
return False
|
||
|
||
# 重试前短暂暂停
|
||
time.sleep(0.02 * (attempt + 1))
|
||
|
||
return False
|
||
|
||
def stop_ffmpeg(self):
|
||
"""安全停止FFmpeg(改进清理逻辑)"""
|
||
with self.process_lock:
|
||
if not self.process:
|
||
return
|
||
|
||
pid = self.process.pid
|
||
logger.info(f"停止FFmpeg进程 PID: {pid}...")
|
||
|
||
# 第1层: 关闭输入流
|
||
if self.process.stdin:
|
||
try:
|
||
self.process.stdin.close()
|
||
except:
|
||
pass
|
||
|
||
# 第2层: 发送终止信号
|
||
try:
|
||
self.process.terminate()
|
||
except:
|
||
pass
|
||
|
||
# 等待程序优雅退出
|
||
try:
|
||
self.process.wait(timeout=1.0)
|
||
except subprocess.TimeoutExpired:
|
||
# 第3层: 强制终止
|
||
try:
|
||
logger.warning(f"强制终止FFmpeg进程 PID: {pid}")
|
||
self.process.kill()
|
||
self.process.wait(timeout=0.5)
|
||
except:
|
||
pass
|
||
except:
|
||
pass
|
||
|
||
# 确保进程已终止
|
||
if self.process.poll() is None:
|
||
logger.warning(f"FFmpeg进程 PID: {pid} 未能正常终止")
|
||
|
||
self.process = None
|
||
logger.info(f"FFmpeg进程 PID: {pid} 已停止")
|
||
|
||
def process_frame(self, frame):
|
||
"""高效帧处理(减少内存分配)"""
|
||
# 如果需要缩放
|
||
if (self.scaled_width, self.scaled_height) != (self.original_width, self.original_height):
|
||
# 使用更高效的缩放算法
|
||
frame = cv2.resize(frame, (self.scaled_width, self.scaled_height),
|
||
interpolation=cv2.INTER_LANCZOS4)
|
||
|
||
# 确保格式正确
|
||
if frame.dtype != np.uint8:
|
||
frame = frame.astype(np.uint8)
|
||
|
||
if len(frame.shape) == 3 and frame.shape[2] == 4:
|
||
frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
|
||
elif len(frame.shape) == 2:
|
||
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
|
||
|
||
return frame.tobytes()
|
||
|
||
def add_frame(self, frame):
|
||
"""智能帧添加策略(自适应队列管理)"""
|
||
# 如果管道已破裂,跳过帧添加
|
||
if self.pipe_broken:
|
||
logger.debug("管道破裂状态,跳过帧添加")
|
||
self.total_frames_dropped += 1
|
||
return False
|
||
|
||
try:
|
||
# 根据队列状态调整行为
|
||
queue_size = self.queue.qsize()
|
||
queue_capacity = self.queue.maxsize
|
||
|
||
if queue_size < queue_capacity * 0.8:
|
||
# 队列正常 - 直接添加
|
||
self.queue.put_nowait(frame)
|
||
return True
|
||
else:
|
||
# 队列较满 - 尝试优化
|
||
try:
|
||
self.queue.put_nowait(frame)
|
||
return True
|
||
except queue.Full:
|
||
# 使用低开销的缓冲区覆盖方法
|
||
for _ in range(min(5, queue_size // 2)):
|
||
try:
|
||
self.queue.get_nowait()
|
||
except queue.Empty:
|
||
break
|
||
|
||
# 再次尝试放入
|
||
try:
|
||
self.queue.put_nowait(frame)
|
||
self.total_frames_dropped += 1
|
||
return True
|
||
except queue.Full:
|
||
self.total_frames_dropped += 1
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.debug(f"帧处理错误: {str(e)}")
|
||
self.total_frames_dropped += 1
|
||
return False
|
||
|
||
def log_statistics(self):
|
||
"""输出性能统计"""
|
||
elapsed = time.time() - self.start_time
|
||
sent = self.total_frames_sent
|
||
dropped = self.total_frames_dropped
|
||
total = sent + dropped
|
||
|
||
if total > 0:
|
||
fps = sent / elapsed
|
||
drop_rate = (dropped / total) * 100
|
||
|
||
logger.info(
|
||
"推流统计结果:\n"
|
||
f" 持续时间: {elapsed:.1f}秒\n"
|
||
f" 总帧数: {total}\n"
|
||
f" 成功发送: {sent}\n"
|
||
f" 丢弃帧数: {dropped}\n"
|
||
f" 丢帧率: {drop_rate:.1f}%\n"
|
||
f" 平均FPS: {fps:.1f}\n"
|
||
f" 重启次数: {self.restart_count}"
|
||
)
|
||
|
||
def stop(self):
|
||
"""优雅停止整个线程(改进停止逻辑)"""
|
||
if not self.running:
|
||
return
|
||
|
||
logger.info("停止推流线程...")
|
||
self.running = False
|
||
|
||
# 发送停止信号
|
||
try:
|
||
self.queue.put(None, block=False)
|
||
except:
|
||
pass
|
||
|
||
# 清理FFmpeg进程
|
||
self.stop_ffmpeg()
|
||
|
||
# 强制唤醒
|
||
try:
|
||
self.queue.put(None, block=False)
|
||
except:
|
||
pass
|
||
|
||
# 等待线程结束
|
||
if self.is_alive() and threading.current_thread() != self:
|
||
self.join(3.0)
|
||
if self.is_alive():
|
||
logger.warning("推流线程未能及时停止")
|
||
|
||
logger.info("推流线程已停止") |