Skip to content

任务中间件与数据工具

系统在处理高并发仿真请求时,依赖 Celery 作为任务总线,并使用 Numpy 驱动的 helper 模块在数据落库前进行“瘦身”处理,以平衡计算精度与存储压力。


🛰️ Celery 生产级配置 (celery_app.py)

Section titled “🛰️ Celery 生产级配置 (celery_app.py)”

配置采用了动态环境感知设计,能够无缝切换本地调试模式与 Docker 容器化集群模式。

预取限制 (Prefetch)

worker_prefetch_multiplier=1:强制 Worker 每次仅获取一个任务。由于仿真计算耗时极长,此设置可防止任务在某个 Worker 堆积导致其他 Worker 空闲。

生命周期重启

worker_max_tasks_per_child=50:Worker 执行 50 个任务后自动重启,有效防止科学计算库(如 Numpy/Scipy)可能存在的内存泄露。

安全确认机制

task_acks_late=True:任务执行完毕且成功写入数据库后才发送确认。若 Worker 在计算中途意外崩溃,任务会自动重新入队。


科学计算产生的数据量级通常在 $10^5$ 至 $10^7$ 行之间。helper 模块负责在数据返回前端前进行关键的降采样 (Downsampling) 预处理。

1. 动态降采样逻辑 (optimize_calculation_data)

Section titled “1. 动态降采样逻辑 (optimize_calculation_data)”

为了保证 ECharts 渲染不卡顿,系统会根据配置中的 DOWNSAMPLE_THRESHOLD 动态调整步长。

def optimize_calculation_data(data_rows):
threshold = current_app.config['DOWNSAMPLE_THRESHOLD'] # 默认建议 400-800
row_count = len(data_rows)
if row_count > threshold:
step = row_count // threshold
# 使用 Numpy 切片实现高效等步长采样
data_rows = data_rows[::step]
# 将 float64 转为 float32 或处理 NaN/Inf,确保 JSON 序列化合法
return np.nan_to_num(data_rows.astype(np.float64))

2. 参数类型归一化 (preprocess_params)

Section titled “2. 参数类型归一化 (preprocess_params)”

该函数确保从 API 层传入的表单数据(通常是字符串)在进入计算引擎前被正确转换为浮点数,同时保护复杂的 table_data 结构不被破坏。


配置项推荐值物理/工程意义
task_time_limit180s硬件保护,防止计算超时导致 CPU 长时间占用
DOWNSAMPLE_THRESHOLD500前端渲染的最佳平衡点,兼顾曲线平滑度与加载速度
worker_concurrency1/Core科学计算任务推荐一个核心只跑一个 Worker