From 8fa0757c3a422cb5dc70e250004badd7f2fe098f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E6=A2=A6=E5=8D=83=E5=B9=B4?= <421281095@qq.com> Date: Fri, 9 Jan 2026 14:07:33 +0800 Subject: [PATCH] =?UTF-8?q?feat(detection):=20=E6=B7=BB=E5=8A=A0=E6=8E=A8?= =?UTF-8?q?=E6=B5=81=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5=E5=92=8C=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=8A=B6=E6=80=81=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现 check_push_health 方法用于检查推流健康状态 - 添加 original_status 和 _current_status 属性跟踪任务状态 - 在推流连续失败时将任务状态更新为降级状态 - 在推流器恢复成功时恢复任务到正常运行状态 - 当恢复失败且达到最大重启次数时将任务设置为错误状态并停止线程 - 在主循环中集成推流健康检查逻辑 - 添加本地状态跟踪以确保状态同步 --- detectionThread.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/detectionThread.py b/detectionThread.py index 088af4d..645069b 100644 --- a/detectionThread.py +++ b/detectionThread.py @@ -293,9 +293,24 @@ class DetectionThread(threading.Thread): 'push_success_rate': 1.0, 'ffmpeg_restarts': 0 } + # 记录原始任务状态,以便恢复时知道应该恢复到什么状态 + self.original_status = 'running' + # 当前任务状态跟踪 + self._current_status = 'initializing' # 密钥验证记录 self.key_verification_results = {} + def check_push_health(self): + """检查推流健康状态,如果长时间失败则更新任务状态""" + # 检查推流失败次数和时间 + current_time = time.time() + + # 如果推流失败次数过多,或长时间没有成功推流,则认为推流健康状态不佳 + if (self.push_error_count > self.max_push_errors or + (current_time - self.last_push_time > 60 and self.last_push_time > 0)): # 60秒内无成功推流 + return False + return True + # 绘制结果 logger.info(f"检测线程初始化完成: {self.taskname}") @@ -606,6 +621,8 @@ class DetectionThread(threading.Thread): # 连续失败处理 if self.push_error_count >= self.max_push_errors: logger.error(f"任务 {self.task_id} 推流连续失败,尝试恢复") + # 将任务状态更新为降级状态 + self.update_task_status('degraded') self.recover_task_streamer() return success @@ -662,9 +679,20 @@ class DetectionThread(threading.Thread): success = self.initialize_task_streamer() if success: self.push_error_count = 0 + # 恢复正常状态 + if hasattr(self, 'original_status') and self.original_status == 'running': + self.update_task_status('running') logger.info(f"任务 {self.task_id} 推流器恢复成功 (第{self.stream_stats['ffmpeg_restarts']}次重启)") else: logger.error(f"任务 {self.task_id} 推流器恢复失败") + # 如果恢复失败,且已达到最大重启次数,将任务状态设置为错误 + if self.stream_stats['ffmpeg_restarts'] >= 3: # 假设最大重启3次 + self.update_task_status('error') + # 同时停止整个检测线程 + self._force_stop = True + self._should_stop.set() + self.stop_event.set() + self.running = False return success @@ -1086,6 +1114,7 @@ class DetectionThread(threading.Thread): # 8. 更新任务状态 self.initialized = True self.update_task_status('running') + self.original_status = 'running' # 记录原始状态为running logger.info("资源初始化完成") # 9. 主循环 @@ -1156,6 +1185,15 @@ class DetectionThread(threading.Thread): self.send_to_websocket(model_detections) # # 上传处理 # self.handle_upload(annotated_frame, model_detections, current_time) + + # 检查推流健康状态 + if self.enable_push: + if not self.check_push_health(): + # 如果推流不健康,且当前不是错误状态,则更新为降级状态 + current_status = getattr(self, '_current_status', 'running') + if current_status != 'error' and current_status != 'degraded': + self.update_task_status('degraded') + self.last_log_time = current_time self.frame_count += 1 @@ -1376,5 +1414,9 @@ class DetectionThread(threading.Thread): if self.task_id in tasks_dict: tasks_dict[self.task_id]['status'] = status logger.debug(f"更新任务 {self.task_id} 状态为: {status}") + + # 更新本地状态跟踪 + self._current_status = status + except Exception as e: logger.warning(f"更新任务状态失败: {str(e)}")