# task_manager.py import os import threading import uuid import time import gc from datetime import datetime from log import logger from global_data import gd from config import get_default_config class TaskManager: """任务管理器,支持多任务并行""" def __init__(self, resource_config=None): self.tasks = {} self.task_lock = threading.RLock() # 改为可重入锁,避免死锁 # 资源监控配置 if resource_config: self.resource_limits = resource_config.get('resource_limits', { 'max_concurrent_tasks': 5, 'min_concurrent_tasks': 1 }) else: self.resource_limits = { 'max_concurrent_tasks': 5, 'min_concurrent_tasks': 1 } # 当前最大并发数(由资源监控动态调整) self.current_max_tasks = self.resource_limits['max_concurrent_tasks'] # 初始化资源监控 self.init_resource_monitor(resource_config) # 清理状态跟踪 self.cleaning_tasks = set() self.cleanup_lock = threading.Lock() def init_resource_monitor(self, config): """初始化资源监控""" try: from resource_monitor import init_resource_monitor if config: init_resource_monitor(config) logger.info("资源监控器初始化完成") except Exception as e: logger.error(f"初始化资源监控器失败: {str(e)}") def get_current_max_tasks(self): """获取当前最大并发任务数(考虑资源限制)""" try: # 从资源监控器获取当前限制 current_limit = gd.get_value('max_concurrent_tasks', self.resource_limits['max_concurrent_tasks']) # 确保在最小和最大范围内 self.current_max_tasks = max( self.resource_limits['min_concurrent_tasks'], min(current_limit, self.resource_limits['max_concurrent_tasks']) ) return self.current_max_tasks except: return self.resource_limits['max_concurrent_tasks'] def can_create_task(self): """检查是否可以创建新任务(考虑资源限制)""" try: # 获取当前活动任务数 active_count = self.get_active_tasks_count() # 获取动态调整的最大任务数 max_tasks = self.get_current_max_tasks() logger.info(f"当前活动任务: {active_count}/{max_tasks}") # 如果已经达到最大限制,检查是否可以放宽 if active_count >= max_tasks: # 获取详细的资源使用情况 resources = gd.get_value('system_resources') if resources: cpu_usage = resources.get('cpu_percent', 0) memory_usage = resources.get('memory_percent', 0) gpu_usage = 0 # 如果有GPU,获取GPU使用率 if 'gpus' in resources and resources['gpus']: gpu_usage = max(gpu['load'] for gpu in resources['gpus']) # 资源使用阈值 cpu_threshold = 70 # 70% memory_threshold = 75 # 75% gpu_threshold = 80 # 80% # 如果所有资源都低于阈值,可以放宽限制 if (cpu_usage < cpu_threshold and memory_usage < memory_threshold and gpu_usage < gpu_threshold): logger.info( f"资源充足,允许创建额外任务 (CPU: {cpu_usage:.1f}%, 内存: {memory_usage:.1f}%, GPU: {gpu_usage:.1f}%)") return True else: logger.warning( f"资源紧张,拒绝创建新任务 (CPU: {cpu_usage:.1f}%, 内存: {memory_usage:.1f}%, GPU: {gpu_usage:.1f}%)") return False else: # 没有资源数据时保守处理 logger.warning("无法获取资源数据,保守拒绝创建新任务") return False return True except Exception as e: logger.error(f"检查任务创建条件失败: {str(e)}") return False # 出错时保守拒绝 # task_manager.py - 修改 create_task 方法 def create_task(self, config, socketio): """创建新任务 - 强制加密验证""" try: with self.task_lock: # 检查资源限制 if not self.can_create_task(): raise Exception(f"达到资源限制,当前最大并发任务数: {self.get_current_max_tasks()}") # 生成唯一任务ID task_id = str(uuid.uuid4()) # 验证配置格式 - 必须是多模型 if 'models' not in config or not isinstance(config['models'], list): raise Exception("配置必须包含models列表") # ============ 增强验证 ============ # 验证所有模型都有加密密钥 for i, model_cfg in enumerate(config['models']): if not model_cfg.get('encryption_key'): raise Exception(f"模型 {i} ({model_cfg.get('path', 'unknown')}) 必须提供加密密钥") # 验证模型路径是否指向加密模型目录 model_path = model_cfg.get('path', '') if not model_path.startswith('encrypted_models/') and not os.path.isabs(model_path): # 尝试在加密目录中查找 model_filename = os.path.basename(model_path) if not model_filename.endswith('.enc'): model_filename += '.enc' # 更新配置中的路径 model_cfg['path'] = f"encrypted_models/{model_filename}" # 准备任务信息 task_info = { 'task_id': task_id, 'config': config.copy(), 'status': 'creating', 'created_at': datetime.now().isoformat(), 'last_updated': time.time(), 'thread': None, 'socketio': socketio, 'stats': { 'total_frames': 0, 'detections': 0, 'avg_fps': 0, 'start_time': time.time(), 'models_loaded': len(config['models']), 'encrypted_models': len(config['models']), # 所有模型都加密 'key_validation_required': True, 'key_validated': True, # 创建时已验证 'validation_time': time.time() }, 'key_validation': { 'required': True, 'validated': True, 'validated_at': time.time(), 'models_count': len(config['models']) } } import json # print(f"获取任务状态 - task_id: {task_id}") # print(f"task_info内容: {json.dumps(task_info, indent=2, ensure_ascii=False, default=str)}") # 更新配置中的任务ID task_info['config']['task']['taskid'] = task_id # 存储任务信息 self.tasks[task_id] = task_info gd.get_or_create_dict('tasks')[task_id] = task_info logger.info(f"创建加密任务成功: {task_id}, 加密模型数: {len(config['models'])}, 密钥已验证") return task_id except Exception as e: logger.error(f"创建任务失败: {str(e)}") raise e def start_task(self, task_id): """启动任务""" try: if task_id not in self.tasks: raise Exception(f"任务不存在: {task_id}") task_info = self.tasks[task_id] # 检查任务是否已在运行 current_status = task_info.get('status', 'unknown') if current_status in ['running', 'starting']: logger.warning(f"任务已在运行或启动中: {task_id}, 状态: {current_status}") return True # 已经运行中,返回成功 if current_status in ['stopping', 'cleaning']: logger.warning(f"任务正在停止或清理中: {task_id}, 等待清理完成") # 等待清理完成 for _ in range(10): # 最多等待5秒 time.sleep(0.5) if task_info.get('status') not in ['stopping', 'cleaning']: break if task_info.get('status') in ['stopping', 'cleaning']: return False # 创建并启动检测线程 try: from detectionThread import DetectionThread # 配置socketio task_info['config']['socketIO'] = task_info['socketio'] # 创建检测线程 detection_thread = DetectionThread(task_info['config']) detection_thread.task_id = task_id # 设置任务ID detection_thread._force_stop = False # 确保停止标志为False # 存储线程引用 task_info['thread'] = detection_thread task_info['status'] = 'starting' task_info['last_updated'] = time.time() # 启动线程 detection_thread.start() # 等待线程初始化 time.sleep(1.0) # 更新任务状态为运行中 task_info['status'] = 'running' task_info['last_updated'] = time.time() logger.info(f"任务启动成功: {task_id}") return True except Exception as e: logger.error(f"启动任务线程失败: {task_id}, 错误: {str(e)}") task_info['status'] = 'failed' task_info['last_updated'] = time.time() return False except Exception as e: logger.error(f"启动任务失败: {task_id}, 错误: {str(e)}") return False def stop_task(self, task_id, force=False): """停止任务""" try: with self.task_lock: if task_id not in self.tasks: logger.warning(f"任务不存在: {task_id}") return True # 任务不存在,认为停止成功 task_info = self.tasks[task_id] current_status = task_info.get('status', 'unknown') if current_status in ['stopped', 'failed', 'error']: logger.info(f"任务已停止: {task_id}") return True logger.info(f"正在停止任务: {task_id}, 当前状态: {current_status}") # 更新状态为停止中 task_info['status'] = 'stopping' task_info['last_updated'] = time.time() # 停止线程 thread = task_info.get('thread') if thread and isinstance(thread, threading.Thread): try: # 调用线程的停止方法 if hasattr(thread, 'stop'): thread.stop() logger.info(f"已调用线程停止方法: {task_id}") # 等待线程停止 wait_time = 10.0 if force else 5.0 thread.join(wait_time) if thread.is_alive(): if force: logger.warning(f"强制停止任务线程超时: {task_id}") # 尝试更强制的方法 if hasattr(thread, 'cleanup'): thread.cleanup() else: logger.warning(f"停止任务线程超时: {task_id}") return False else: logger.info(f"任务线程已停止: {task_id}") except Exception as e: logger.error(f"停止任务线程异常 {task_id}: {str(e)}") if not force: return False else: logger.info(f"任务线程不存在: {task_id}") # 更新任务状态 task_info['status'] = 'stopped' task_info['last_updated'] = time.time() # 从全局数据中移除活跃状态 tasks_dict = gd.get_or_create_dict('tasks') if task_id in tasks_dict: tasks_dict[task_id]['status'] = 'stopped' logger.info(f"任务停止成功: {task_id}") return True except Exception as e: logger.error(f"停止任务异常: {task_id}, 错误: {str(e)}") return False def get_task_status(self, task_id): """获取任务状态 - 包含加密信息""" if task_id not in self.tasks: return None task_info = self.tasks[task_id] # 打印task_info内容 import json # print(f"获取任务状态 - task_id: {task_id}") # print(f"task_info内容: {json.dumps(task_info, indent=2, ensure_ascii=False, default=str)}") # 构建返回数据 result = { 'task_id': task_id, 'status': task_info['status'], 'config': { 'rtmp_url': task_info['config']['rtmp']['url'], 'push_url': task_info['config'].get('push', {}).get('url', ''), 'taskname': task_info['config']['task']['taskname'], 'enable_push': task_info['config'].get('push', {}).get('enable_push', False), 'algoInstancesName': task_info['config']['task']['algoInstancesName'], 'uavType': task_info['config']['task']['uavType'] }, 'models': [], 'stats': task_info['stats'], 'performance': { 'fps': getattr(task_info.get('thread'), 'fps', 0), 'avg_process_time': getattr(task_info.get('thread'), 'avg_process_time', 0.033), 'latency': getattr(task_info.get('thread'), 'avg_process_time', 0.033) * 1000, # 转换为毫秒 'last_fps': getattr(task_info.get('thread'), 'last_fps', 0) }, 'encryption_info': { 'required': True, 'models_count': task_info['stats']['encrypted_models'], 'key_validation_required': task_info['stats']['key_validation_required'] }, 'created_at': task_info['created_at'], 'last_updated': task_info.get('last_updated', task_info['created_at']) } # 获取模型信息(不包含密钥) if 'models' in task_info['config'] and isinstance(task_info['config']['models'], list): for i, model_cfg in enumerate(task_info['config']['models']): model_info = { 'id': i, 'name': os.path.basename(model_cfg.get('path', 'unknown')).split('.')[0], 'path': model_cfg.get('path', 'unknown'), 'enabled': model_cfg.get('enabled', True), 'color': model_cfg.get('color'), 'conf_thres': model_cfg.get('conf_thres', 0.25), 'encrypted': True, 'key_provided': bool(model_cfg.get('encryption_key')) } result['models'].append(model_info) return result def get_all_tasks(self): """获取所有任务信息""" result = [] for task_id in self.tasks: task_status = self.get_task_status(task_id) if task_status: result.append(task_status) return result def get_active_tasks_count(self): """获取活动任务数量""" try: count = 0 for task_id in self.tasks: task_status = self.tasks[task_id].get('status', '') if task_status in ['running', 'starting', 'creating']: count += 1 return count except Exception as e: logger.error(f"获取活动任务数失败: {str(e)}") return 0 def cleanup_task(self, task_id): """清理任务资源""" with self.cleanup_lock: if task_id in self.cleaning_tasks: logger.warning(f"任务已在清理中: {task_id}") return True self.cleaning_tasks.add(task_id) try: if task_id not in self.tasks: logger.warning(f"任务不存在,无法清理: {task_id}") with self.cleanup_lock: self.cleaning_tasks.discard(task_id) return True task_info = self.tasks[task_id] logger.info(f"开始清理任务: {task_id}, 状态: {task_info.get('status', 'unknown')}") # 1. 如果任务正在运行,先停止 if task_info.get('status') in ['running', 'starting']: self.stop_task(task_id, force=True) time.sleep(1.0) # 等待停止完成 # 2. 标记为清理中 task_info['status'] = 'cleaning' task_info['last_updated'] = time.time() # 3. 清理线程资源 thread = task_info.get('thread') if thread and isinstance(thread, threading.Thread): try: if thread.is_alive(): logger.info(f"等待线程完全停止: {task_id}") thread.join(3.0) if thread.is_alive(): logger.warning(f"线程 {task_id} 未及时停止,强制清理") except Exception as e: logger.error(f"等待线程停止失败: {str(e)}") # 清理线程引用 task_info['thread'] = None # 4. 清理任务推流器(如果存在) try: if hasattr(thread, 'cleanup_task_streamer'): thread.cleanup_task_streamer() except: pass # 5. 清理模型资源 try: if hasattr(thread, 'models'): for model_info in thread.models: if 'model' in model_info: try: del model_info['model'] except: pass thread.models = [] except: pass # 6. 从全局数据中移除 tasks_dict = gd.get_or_create_dict('tasks') if task_id in tasks_dict: del tasks_dict[task_id] # 7. 从本地字典中移除 if task_id in self.tasks: del self.tasks[task_id] # 8. 强制垃圾回收 gc.collect() logger.info(f"任务清理完成: {task_id}") return True except Exception as e: logger.error(f"清理任务 {task_id} 失败: {str(e)}") return False finally: with self.cleanup_lock: self.cleaning_tasks.discard(task_id) def update_task_status(self, task_id, status): """更新任务状态""" with self.task_lock: if task_id in self.tasks: old_status = self.tasks[task_id].get('status', 'unknown') self.tasks[task_id]['status'] = status self.tasks[task_id]['last_updated'] = time.time() logger.info(f"更新任务状态: {task_id} {old_status} -> {status}") # 同步到全局数据 tasks_dict = gd.get_or_create_dict('tasks') if task_id in tasks_dict: tasks_dict[task_id]['status'] = status return True return False def cleanup_all_tasks(self): """清理所有任务""" logger.info("开始清理所有任务...") # 先收集所有任务ID task_ids = [] with self.task_lock: task_ids = list(self.tasks.keys()) cleaned_count = 0 failed_count = 0 for task_id in task_ids: try: if self.cleanup_task(task_id): cleaned_count += 1 else: failed_count += 1 except Exception as e: logger.error(f"清理任务 {task_id} 异常: {str(e)}") failed_count += 1 logger.info(f"任务清理完成: 成功 {cleaned_count} 个, 失败 {failed_count} 个, 总计 {len(task_ids)} 个") # 强制垃圾回收 gc.collect() return cleaned_count def cleanup_stopped_tasks(self): """清理已停止的任务""" logger.info("开始清理已停止的任务...") task_ids = [] with self.task_lock: for task_id, task_info in self.tasks.items(): status = task_info.get('status', 'unknown') if status in ['stopped', 'failed', 'error']: task_ids.append(task_id) cleaned_count = 0 for task_id in task_ids: try: if self.cleanup_task(task_id): cleaned_count += 1 except Exception as e: logger.error(f"清理已停止任务 {task_id} 异常: {str(e)}") logger.info(f"清理已停止任务完成: 清理了 {cleaned_count} 个") return cleaned_count # 创建全局任务管理器实例 try: _config = get_default_config() task_manager = TaskManager(_config) except Exception as e: logger.error(f"创建任务管理器实例失败: {str(e)}") task_manager = None