diff --git a/detectionThread.py b/detectionThread.py index 088af4d..645069b 100644 --- a/detectionThread.py +++ b/detectionThread.py @@ -293,9 +293,24 @@ class DetectionThread(threading.Thread): 'push_success_rate': 1.0, 'ffmpeg_restarts': 0 } + # 记录原始任务状态,以便恢复时知道应该恢复到什么状态 + self.original_status = 'running' + # 当前任务状态跟踪 + self._current_status = 'initializing' # 密钥验证记录 self.key_verification_results = {} + def check_push_health(self): + """检查推流健康状态,如果长时间失败则更新任务状态""" + # 检查推流失败次数和时间 + current_time = time.time() + + # 如果推流失败次数过多,或长时间没有成功推流,则认为推流健康状态不佳 + if (self.push_error_count > self.max_push_errors or + (current_time - self.last_push_time > 60 and self.last_push_time > 0)): # 60秒内无成功推流 + return False + return True + # 绘制结果 logger.info(f"检测线程初始化完成: {self.taskname}") @@ -606,6 +621,8 @@ class DetectionThread(threading.Thread): # 连续失败处理 if self.push_error_count >= self.max_push_errors: logger.error(f"任务 {self.task_id} 推流连续失败,尝试恢复") + # 将任务状态更新为降级状态 + self.update_task_status('degraded') self.recover_task_streamer() return success @@ -662,9 +679,20 @@ class DetectionThread(threading.Thread): success = self.initialize_task_streamer() if success: self.push_error_count = 0 + # 恢复正常状态 + if hasattr(self, 'original_status') and self.original_status == 'running': + self.update_task_status('running') logger.info(f"任务 {self.task_id} 推流器恢复成功 (第{self.stream_stats['ffmpeg_restarts']}次重启)") else: logger.error(f"任务 {self.task_id} 推流器恢复失败") + # 如果恢复失败,且已达到最大重启次数,将任务状态设置为错误 + if self.stream_stats['ffmpeg_restarts'] >= 3: # 假设最大重启3次 + self.update_task_status('error') + # 同时停止整个检测线程 + self._force_stop = True + self._should_stop.set() + self.stop_event.set() + self.running = False return success @@ -1086,6 +1114,7 @@ class DetectionThread(threading.Thread): # 8. 更新任务状态 self.initialized = True self.update_task_status('running') + self.original_status = 'running' # 记录原始状态为running logger.info("资源初始化完成") # 9. 主循环 @@ -1156,6 +1185,15 @@ class DetectionThread(threading.Thread): self.send_to_websocket(model_detections) # # 上传处理 # self.handle_upload(annotated_frame, model_detections, current_time) + + # 检查推流健康状态 + if self.enable_push: + if not self.check_push_health(): + # 如果推流不健康,且当前不是错误状态,则更新为降级状态 + current_status = getattr(self, '_current_status', 'running') + if current_status != 'error' and current_status != 'degraded': + self.update_task_status('degraded') + self.last_log_time = current_time self.frame_count += 1 @@ -1376,5 +1414,9 @@ class DetectionThread(threading.Thread): if self.task_id in tasks_dict: tasks_dict[self.task_id]['status'] = status logger.debug(f"更新任务 {self.task_id} 状态为: {status}") + + # 更新本地状态跟踪 + self._current_status = status + except Exception as e: logger.warning(f"更新任务状态失败: {str(e)}")