大模型批处理推理的拥塞控制:CONCUR 系统深度解析
一句话总结
通过借鉴分布式系统的拥塞控制思想,CONCUR 系统将大模型 Agent 批处理推理的吞吐量提升最高 4.09 倍,解决了 KV Cache 中期崩溃的核心问题。
为什么需要这个?
性能瓶颈在哪里?
在传统的 LLM 推理场景中,我们通常关注两个阶段:
- Prefill 阶段:处理输入 prompt,内存消耗激增
- Decode 阶段:逐 token 生成,内存消耗稳定
但在 Agent 批处理推理场景中,出现了一个新的性能杀手:
时间线:
T0: 启动 100 个 Agent,KV Cache 使用率 30% ✓ 正常
T1: Agent 运行 5 轮,KV Cache 使用率 60% ✓ 正常
T2: Agent 运行 10 轮,KV Cache 使用率 85% ✗ 开始变慢
T3: Agent 运行 15 轮,KV Cache 使用率 95% ✗✗ 严重抖动
实测数据(Qwen3-32B 模型):
- 初期吞吐量:1200 tokens/s
- 中期吞吐量:280 tokens/s(下降 76%)
- 内存使用率:仅 65%(还有 35% 空闲!)
硬件层面发生了什么?
KV Cache 不足时,系统开始频繁地:
- 驱逐旧的 KV 状态(eviction)
- 重新计算被驱逐的状态(recomputation)
- 内存拷贝在 CPU/GPU 之间搬运数据(swapping)
这就像一个停车场:容量还有 35%,但因为车辆频繁进出,反而造成了严重的交通堵塞。
现有方案的问题
传统的 KV Cache 管理策略都是被动的:
| 策略 | 问题 |
|---|---|
| 请求级驱逐(PagedAttention) | Agent 上下文被破坏,重算成本高 |
| 动态批处理(vLLM) | 无法预测中期崩溃,依然会触发抖动 |
| CPU 卸载(FlexGen) | 带宽瓶颈,延迟增加 10x+ |
核心原理
直觉:从 TCP 拥塞控制获得灵感
想象你在管理一个物流系统:
传统方案(被动):
货车不断进仓库,直到仓库满了再开始往外搬货
→ 结果:搬进搬出,效率崩溃
CONCUR(主动):
在仓库门口设卡,根据仓库使用率动态调整进入车辆数
→ 结果:仓库始终保持在最佳利用率
硬件层面:KV Cache 的生命周期
Agent 1: [Prompt] → [KV₁] → [KV₂] → [KV₃] → ... → [KV_n]
Agent 2: [Prompt] → [KV₁] → [KV₂] → [KV₃] → ... → [KV_n]
...
Agent N: [Prompt] → [KV₁] → [KV₂] → [KV₃] → ... → [KV_n]
KV Cache 总量 = Σ(所有 Agent 的累积 KV)
关键观察:
- Agent 寿命长:不像单次对话,Agent 可能运行几十轮
- KV 持续增长:每轮都会积累新的 KV 状态
- 驱逐代价高:Agent 的上下文被破坏后,重算成本是线性的
数学建模
定义拥塞信号 C(t):
控制算法(类似 TCP AIMD):
if C(t) < threshold:
# 加性增加(Additive Increase)
N_agents += α
else:
# 乘性减少(Multiplicative Decrease)
N_agents *= β # β < 1
代码实现
Baseline:朴素的批处理推理
class NaiveBatchInference:
def __init__(self, model, max_batch_size=128):
self.model = model
self.max_batch = max_batch_size
self.kv_cache = KVCache(capacity=80 * 1024**3) # 80GB
def run_agents(self, agents):
active = agents[:self.max_batch] # 直接取前 N 个
while active:
outputs = self.model.generate(
inputs=[a.get_prompt() for a in active],
kv_cache=self.kv_cache
)
# 处理每个 Agent 的输出
for agent, output in zip(active, outputs):
agent.step(output)
if agent.is_done():
active.remove(agent)
# 没有空间给后续 Agent 了,等待...
性能分析:
| 时间点 | Active Agents | KV Usage | 吞吐量 (tok/s) |
|---|---|---|---|
| 0-5 轮 | 128 | 40% | 1150 |
| 5-10 轮 | 128 | 75% | 680 |
| 10-15 轮 | 128 | 92% | 290 |
瓶颈:
- KV Cache 利用率超过 80% 后,驱逐开始频繁
model.generate()耗时从 12ms 飙升到 58ms- Nsight 显示 41% 的时间花在 KV 重计算上
优化版本:CONCUR 拥塞控制
class CONCURController:
def __init__(self, model, kv_capacity):
self.model = model
self.kv_cache = KVCache(capacity=kv_capacity)
# 控制参数
self.N_active = 32 # 初始活跃 Agent 数
self.alpha = 2 # 加性增加步长
self.beta = 0.8 # 乘性减少因子
self.threshold = 0.75 # 拥塞阈值
# 监控指标
self.eviction_rate = 0
self.request_rate = 0
def compute_congestion_signal(self):
"""计算拥塞信号 C(t)"""
kv_usage = self.kv_cache.usage_ratio()
# 驱逐率 = 最近 100 次请求中的驱逐次数
eviction_ratio = self.eviction_rate / max(self.request_rate, 1)
# 组合信号
C = kv_usage * (1 + eviction_ratio)
return C
def adjust_concurrency(self):
"""动态调整活跃 Agent 数"""
C = self.compute_congestion_signal()
if C < self.threshold:
# 网络畅通,增加并发
self.N_active = min(self.N_active + self.alpha, 256)
else:
# 出现拥塞,减少并发
self.N_active = max(int(self.N_active * self.beta), 8)
def run_agents(self, agents):
waiting_queue = agents.copy()
active_agents = []
while waiting_queue or active_agents:
# 补充活跃 Agent(如果有空位)
while len(active_agents) < self.N_active and waiting_queue:
active_agents.append(waiting_queue.pop(0))
# 批处理推理
prompts = [a.get_prompt() for a in active_agents]
outputs = self.model.generate(
prompts,
kv_cache=self.kv_cache
)
# 更新监控指标
self.request_rate = len(outputs)
self.eviction_rate = self.kv_cache.get_eviction_count()
# 处理输出
for agent, output in zip(active_agents, outputs):
agent.step(output)
if agent.is_done():
active_agents.remove(agent)
# 拥塞控制核心
self.adjust_concurrency()
为什么更快:
- 减少驱逐:通过主动控制并发,KV 使用率始终在 70-80% 区间
- 保持连续性:Agent 不会因为 KV 驱逐而被中断,避免重计算
- 自适应:根据实时信号调整,而不是固定批大小
关键优化:Cache-Aware Scoring
CONCUR 的进阶版本会根据 Agent 的 KV 成本排序:
def prioritize_agents(self, agents):
"""优先选择 KV 成本低的 Agent"""
scores = []
for agent in agents:
kv_size = agent.accumulated_kv_size()
progress = agent.completion_ratio() # 已完成百分比
# 评分:优先选择快完成且 KV 小的
score = progress / (kv_size + 1e-6)
scores.append((score, agent))
scores.sort(reverse=True)
return [agent for _, agent in scores[:self.N_active]]
性能对比数据:
| 实现版本 | 时间 (s) | 吞吐量 (tok/s) | 内存峰值 |
|---|---|---|---|
| Baseline | 180 | 290 | 78GB |
| CONCUR | 44 | 1186 | 62GB |
| 加速比 | 4.09x | 4.09x | -20% |
常见错误(重要!)
错误 1:阈值设置过高
# ❌ 错误:等到 KV 95% 才触发控制
self.threshold = 0.95
# 结果:驱逐已经开始,为时已晚
正确做法:
# ✓ 正确:在 75-80% 就开始预防
self.threshold = 0.75
# 原因:驱逐的开销是非线性的,超过 80% 后急剧恶化
错误 2:忽略 Agent 的优先级
# ❌ 错误:FIFO 调度,不考虑 KV 成本
active = waiting_queue[:N_active]
正确做法:
# ✓ 正确:优先调度低 KV 成本的 Agent
active = prioritize_agents(waiting_queue)[:N_active]
错误 3:更新频率过低
# ❌ 错误:每 100 个请求才调整一次
if self.iteration % 100 == 0:
self.adjust_concurrency()
正确做法:
# ✓ 正确:每轮都动态调整(延迟 < 1ms)
self.adjust_concurrency()
性能实测
测试环境:
- GPU:NVIDIA A100 80GB
- 模型:Qwen3-32B (bfloat16)
- 工作负载:100 个 Agent,平均 20 轮对话
吞吐量对比
| Agent 轮数 | Baseline | CONCUR | 提升 |
|---|---|---|---|
| 1-5 | 1150 | 1200 | +4% |
| 6-10 | 680 | 1180 | +73% |
| 11-15 | 290 | 1160 | +300% |
| 16-20 | 180 | 1150 | +538% |
KV Cache 利用率
Baseline:
[====================================] 95% ← 频繁驱逐
↓
[重计算占比 41%]
CONCUR:
[============================ ] 75% ← 稳定运行
↓
[重计算占比 8%]
不同模型的表现
| 模型 | Baseline | CONCUR | 加速比 |
|---|---|---|---|
| Qwen3-32B | 290 tok/s | 1186 tok/s | 4.09x |
| DeepSeek-V3 | 480 tok/s | 912 tok/s | 1.9x |
| Llama-3-70B | 210 tok/s | 756 tok/s | 3.6x |
什么时候用 / 不用?
适用场景
| 场景 | 原因 |
|---|---|
| 多轮 Agent 对话 | KV 持续累积,中期崩溃严重 |
| 代码生成任务 | 上下文长,驱逐代价高 |
| 批量推理服务 | 需要稳定的高吞吐量 |
不适用场景
| 场景 | 原因 |
|---|---|
| 单次问答 | 没有中期崩溃问题,额外开销不值得 |
| 内存充裕 | KV Cache 用不完,无需控制 |
| 延迟敏感 | 调度开销(~1ms)可能不可接受 |
调试技巧
1. 监控拥塞信号
import matplotlib.pyplot as plt
def visualize_congestion(controller):
history = controller.congestion_history
plt.plot(history['C'], label='Congestion Signal')
plt.axhline(y=controller.threshold, color='r',
linestyle='--', label='Threshold')
plt.plot(history['N_active'], label='Active Agents')
plt.legend()
plt.show()
2. Nsight 性能分析
关键指标:
- Kernel Replay Overhead:重计算占比(应 < 10%)
- Memory Throughput:驱逐导致的带宽浪费(应 < 20%)
- SM Utilization:计算单元利用率(应 > 80%)
3. 常见 Bug 排查
| 症状 | 可能原因 | 解决方案 |
|---|---|---|
| 吞吐量不稳定 | alpha/beta 设置不当 |
增大 alpha,减小 beta |
| 内存溢出 | threshold 过高 |
降低到 0.7-0.75 |
| 低利用率 | threshold 过低 |
提高到 0.8 |
延伸阅读
相关论文
- PagedAttention (vLLM):KV Cache 的分页管理
- FlexGen:CPU-GPU 混合推理策略
- ORCA:Iteration-level 调度器
官方文档重点
- CUDA Unified Memory:理解 KV 驱逐的底层机制
- Nsight Systems:定位性能瓶颈的完整流程(第 4 章”Memory Analysis”是关键)
进阶话题
- 多 GPU 拥塞控制:如何在分布式环境中协调 KV Cache?
- 模型感知调度:不同层的 KV 成本差异如何利用?
- 预测性控制:能否用 RL 学习最优的
alpha/beta参数?
论文链接:https://arxiv.org/abs/2601.22705
测试环境:本文所有性能数据基于 NVIDIA A100 80GB + CUDA 12.1 + PyTorch 2.3.0
Comments