303 lines
8.1 KiB
Markdown
303 lines
8.1 KiB
Markdown
# Socket API 接口文档
|
||
|
||
## 项目概述
|
||
|
||
本项目是一个基于 Flask 和 SocketIO 的实时任务管理系统,主要用于处理 AI 模型的实时推理任务,支持多模型并发处理、实时状态更新和WebSocket通信。
|
||
|
||
## SocketIO 服务器配置
|
||
|
||
### 初始化配置
|
||
|
||
```python
|
||
# Windows平台配置
|
||
if platform.system() == 'Windows':
|
||
socketio = SocketIO(app,
|
||
cors_allowed_origins="*",
|
||
async_mode='threading',
|
||
allow_unsafe_werkzeug=True,
|
||
max_http_buffer_size=5 * 1024 * 1024)
|
||
else:
|
||
socketio = SocketIO(app,
|
||
cors_allowed_origins="*",
|
||
async_mode='threading',
|
||
max_http_buffer_size=5 * 1024 * 1024)
|
||
```
|
||
|
||
### 配置说明
|
||
|
||
| 配置项 | 值 | 说明 |
|
||
|-------|-----|------|
|
||
| cors_allowed_origins | "*" | 允许所有跨域请求 |
|
||
| async_mode | 'threading' | 使用线程模式处理异步事件 |
|
||
| allow_unsafe_werkzeug | True | 允许使用Flask的开发服务器(仅Windows) |
|
||
| max_http_buffer_size | 5 * 1024 * 1024 | 最大HTTP缓冲区大小为5MB |
|
||
|
||
## WebSocket 事件列表
|
||
|
||
### 1. 系统事件
|
||
|
||
#### 连接事件
|
||
|
||
| 事件名 | 方向 | 功能描述 | 回调参数 |
|
||
|-------|-----|----------|----------|
|
||
| `connect` | 客户端 → 服务器 | 客户端连接到服务器 | 无 |
|
||
| `disconnect` | 客户端 → 服务器 | 客户端断开连接 | 无 |
|
||
|
||
#### 连接事件处理示例
|
||
|
||
```python
|
||
@socketio.on('connect')
|
||
def handle_connect():
|
||
logger.info(f"Socket客户端已连接: {request.sid}")
|
||
|
||
@socketio.on('disconnect')
|
||
def handle_disconnect():
|
||
logger.info(f"Socket客户端断开: {request.sid}")
|
||
```
|
||
|
||
### 2. 任务相关事件
|
||
|
||
#### 任务订阅事件
|
||
|
||
| 事件名 | 方向 | 功能描述 | 请求参数 | 成功响应 | 失败响应 |
|
||
|-------|-----|----------|----------|----------|----------|
|
||
| `subscribe_task` | 客户端 → 服务器 | 订阅特定任务的实时更新 | `{"task_id": "<任务ID>"}` | `{"status": "subscribed", "task_id": "<任务ID>"}` | `{"status": "error", "message": "需要提供task_id"}` |
|
||
|
||
#### 订阅事件处理示例
|
||
|
||
```python
|
||
@socketio.on('subscribe_task')
|
||
def handle_subscribe_task(data):
|
||
"""订阅特定任务的WebSocket消息"""
|
||
task_id = data.get('task_id')
|
||
if task_id:
|
||
# 这里可以记录客户端订阅关系
|
||
logger.info(f"客户端 {request.sid} 订阅任务: {task_id}")
|
||
return {"status": "subscribed", "task_id": task_id}
|
||
|
||
return {"status": "error", "message": "需要提供task_id"}
|
||
```
|
||
|
||
### 3. 服务器推送事件
|
||
|
||
服务器会主动向客户端推送以下事件:
|
||
|
||
#### 任务状态更新事件
|
||
|
||
| 事件名 | 方向 | 功能描述 | 推送数据结构 |
|
||
|-------|-----|----------|-------------|
|
||
| `task_status_update` | 服务器 → 客户端 | 推送任务状态更新 | `{"task_id": "<任务ID>", "status": "<状态>", "stats": {...}, "models": [...]}` |
|
||
|
||
#### 推理结果事件
|
||
|
||
| 事件名 | 方向 | 功能描述 | 推送数据结构 |
|
||
|-------|-----|----------|-------------|
|
||
| `inference_result` | 服务器 → 客户端 | 推送实时推理结果 | `{"task_id": "<任务ID>", "model_id": "<模型ID>", "result": {...}, "timestamp": "<时间戳>"}` |
|
||
|
||
#### 系统通知事件
|
||
|
||
| 事件名 | 方向 | 功能描述 | 推送数据结构 |
|
||
|-------|-----|----------|-------------|
|
||
| `system_notification` | 服务器 → 客户端 | 推送系统通知 | `{"type": "<通知类型>", "message": "<通知消息>", "timestamp": "<时间戳>"}` |
|
||
|
||
## 客户端使用示例
|
||
|
||
### JavaScript 客户端
|
||
|
||
```javascript
|
||
// 引入SocketIO客户端
|
||
import io from 'socket.io-client';
|
||
|
||
// 连接到服务器
|
||
const socket = io('http://localhost:5000', {
|
||
transports: ['websocket'],
|
||
reconnection: true,
|
||
reconnectionAttempts: 5,
|
||
reconnectionDelay: 1000
|
||
});
|
||
|
||
// 连接事件
|
||
.socket.on('connect', () => {
|
||
console.log('Connected to server');
|
||
});
|
||
|
||
// 断开连接事件
|
||
.socket.on('disconnect', () => {
|
||
console.log('Disconnected from server');
|
||
});
|
||
|
||
// 订阅任务
|
||
function subscribeToTask(taskId) {
|
||
socket.emit('subscribe_task', { task_id: taskId }, (response) => {
|
||
if (response.status === 'subscribed') {
|
||
console.log(`Subscribed to task: ${taskId}`);
|
||
} else {
|
||
console.error('Failed to subscribe:', response.message);
|
||
}
|
||
});
|
||
}
|
||
|
||
// 监听任务状态更新
|
||
.socket.on('task_status_update', (data) => {
|
||
console.log('Task status update:', data);
|
||
// 处理任务状态更新
|
||
});
|
||
|
||
// 监听推理结果
|
||
.socket.on('inference_result', (data) => {
|
||
console.log('Inference result:', data);
|
||
// 处理推理结果
|
||
});
|
||
|
||
// 监听系统通知
|
||
.socket.on('system_notification', (data) => {
|
||
console.log('System notification:', data);
|
||
// 处理系统通知
|
||
});
|
||
```
|
||
|
||
### Python 客户端
|
||
|
||
```python
|
||
import socketio
|
||
|
||
# 创建SocketIO客户端
|
||
sio = socketio.Client()
|
||
|
||
# 连接事件
|
||
@sio.event
|
||
def connect():
|
||
print('Connected to server')
|
||
|
||
# 断开连接事件
|
||
@sio.event
|
||
def disconnect():
|
||
print('Disconnected from server')
|
||
|
||
# 监听任务状态更新
|
||
@sio.on('task_status_update')
|
||
def on_task_status_update(data):
|
||
print('Task status update:', data)
|
||
|
||
# 监听推理结果
|
||
@sio.on('inference_result')
|
||
def on_inference_result(data):
|
||
print('Inference result:', data)
|
||
|
||
# 连接到服务器
|
||
sio.connect('http://localhost:5000')
|
||
|
||
# 订阅任务
|
||
sio.emit('subscribe_task', {"task_id": "task_123"}, callback=lambda response: print('Subscription response:', response))
|
||
|
||
# 保持运行
|
||
while True:
|
||
pass
|
||
```
|
||
|
||
## 与 REST API 的集成
|
||
|
||
### 任务创建与 Socket 关联
|
||
|
||
当通过 REST API 创建任务时,SocketIO 实例会被传递给任务管理器:
|
||
|
||
```python
|
||
# 创建任务
|
||
task_id = task_manager.create_task(config, socketio)
|
||
|
||
# 启动任务
|
||
success = task_manager.start_task(task_id)
|
||
```
|
||
|
||
### 实时状态更新
|
||
|
||
任务管理器会通过 SocketIO 向客户端推送实时状态更新:
|
||
|
||
```python
|
||
# 推送任务状态更新
|
||
socketio.emit('task_status_update', {
|
||
'task_id': task_id,
|
||
'status': status,
|
||
'stats': stats,
|
||
'models': models_info
|
||
}, room=client_id) # 可选:指定房间
|
||
```
|
||
|
||
## 注意事项
|
||
|
||
1. **连接管理**
|
||
- 客户端应实现重连机制,确保网络波动时能够自动恢复连接
|
||
- 服务器会在客户端断开连接时记录日志,但不会主动清理订阅关系
|
||
|
||
2. **事件处理**
|
||
- 客户端应及时处理服务器推送的事件,避免事件堆积
|
||
- 对于大量数据的事件(如推理结果),应考虑分批处理
|
||
|
||
3. **性能优化**
|
||
- 对于高频更新的事件(如推理结果),客户端可以考虑使用防抖或节流机制
|
||
- 服务器端应限制推送频率,避免过度消耗带宽
|
||
|
||
4. **安全性**
|
||
- 生产环境中应配置具体的 `cors_allowed_origins`,而不是使用 "*"
|
||
- 应实现身份验证机制,确保只有授权客户端能够订阅任务
|
||
|
||
5. **错误处理**
|
||
- 客户端应处理可能的连接错误和事件处理错误
|
||
- 服务器端应捕获并记录事件处理过程中的异常
|
||
|
||
## 常见问题与解决方案
|
||
|
||
### 1. 客户端无法连接到服务器
|
||
|
||
**原因**:
|
||
- 服务器未运行
|
||
- 网络连接问题
|
||
- CORS 配置错误
|
||
|
||
**解决方案**:
|
||
- 确认服务器已启动并监听正确的端口
|
||
- 检查网络连接和防火墙设置
|
||
- 确认 `cors_allowed_origins` 配置正确
|
||
|
||
### 2. 事件推送延迟或丢失
|
||
|
||
**原因**:
|
||
- 网络带宽限制
|
||
- 服务器负载过高
|
||
- 客户端处理速度慢
|
||
|
||
**解决方案**:
|
||
- 优化网络连接
|
||
- 增加服务器资源
|
||
- 优化客户端事件处理逻辑
|
||
|
||
### 3. 订阅任务后未收到状态更新
|
||
|
||
**原因**:
|
||
- 任务ID不存在
|
||
- 订阅关系未正确建立
|
||
- 任务状态未发生变化
|
||
|
||
**解决方案**:
|
||
- 确认任务ID正确且任务存在
|
||
- 检查订阅回调是否成功
|
||
- 检查任务是否正在运行
|
||
|
||
## 版本历史
|
||
|
||
| 版本 | 日期 | 变更内容 |
|
||
|------|------|----------|
|
||
| 1.0 | 2026-02-07 | 初始版本,实现基本WebSocket通信 |
|
||
| 1.1 | 2026-02-08 | 添加任务订阅功能 |
|
||
| 1.2 | 2026-02-09 | 优化事件推送机制 |
|
||
|
||
## 联系与支持
|
||
|
||
如有任何问题或建议,请联系系统管理员或查看项目文档。
|
||
|
||
---
|
||
|
||
**文档更新时间**: 2026-02-07
|
||
**文档版本**: 1.0
|
||
**项目版本**: v1.0.0
|