FastAPI 异步任务服务的并发设计演进:从单进程轮询到多 Worker 协程直处理
本文记录了一个 FastAPI 异步任务服务在并发架构上的思考和演进过程。这个服务的本质很简单:接收客户端请求,转发给下游 AI API,把结果存起来供客户端轮询。它不做复杂的业务计算,不做数据聚合,就是一个纯转发层——接活、派活、存结果。正因为场景足够简单,我们才有机会做一次化繁为简的架构妥协,把原本"看起来该用任务队列"的设计砍到只剩三行核心配置。
一、先说清楚场景:我们到底在干什么
这个服务做的事情可以用一句话概括:
客户端提交参数 → 服务转发给下游 AI API → 等结果 → 存 Redis → 客户端来取。
关键特征:
- 纯 IO 转发:服务本身不做任何 CPU 密集计算,所有耗时都花在等下游 API 返回。一次调用几秒到几十秒不等,全是网络等待。
- 异步模式:客户端提交任务后立即拿到 task_id,然后轮询结果。不是同步等响应。
- 无状态转发:每个任务独立,互不依赖,没有顺序要求,没有事务性。
- 容忍丢失:偶尔丢一个任务(比如进程挂了),客户端重新提交就行。不是支付、不是订单,丢了不会造成业务损失。
理解这个场景非常重要,因为后面所有的架构决策都建立在这个前提上。如果你的场景涉及 CPU 密集计算、任务间有依赖、或者不能容忍丢失,结论会完全不同。
二、最初的架构:单进程 + BackgroundWorker 轮询
项目最初的并发模型非常经典:
用户请求 → FastAPI 接口 → 创建任务写入 Redis(pending)→ 立即返回 task_id
↓
BackgroundWorker(进程内)
- 每秒轮询 Redis,找 pending 状态的任务
- 取出来丢给 asyncio.create_task() 处理
- max_concurrent_tasks = 5 控制并发上限
↓
调用下游 AI API(async/await,纯 IO 等待)
↓
任务标记 completed,结果写回 Redis
↓
用户轮询 → GET /tasks/{id} → 从 Redis 读结果
启动命令是:
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
没有 --workers 参数,只有一个 Uvicorn 进程。BackgroundWorker 在 FastAPI 的 lifespan 里启动,和 API 服务绑在同一个进程里。
这个架构的问题
并发天花板很低。 只有一个进程,max_concurrent_tasks 默认是 5,意味着同一时刻最多只有 5 个下游 API 请求在飞。下游调用是纯网络 IO 等待,asyncio 事件循环完全有能力处理更多并发,但被这个数字卡死了。
轮询模式有固有开销。 不管有没有新任务,BackgroundWorker 每秒都要去 Redis 扫一遍 pending 任务。任务少的时候这是无意义的开销,任务多的时候又可能因为轮询间隔导致延迟。
水平扩展有坑。 如果简单地给 Uvicorn 加 --workers 4,会 fork 出 4 个进程,每个进程都会跑一个 BackgroundWorker,4 个 Worker 同时轮询 Redis 抢 pending 任务。没有分布式锁的话,同一个任务会被多个 Worker 重复处理。
回过头看,这个架构对于一个"纯转发"的服务来说,设计过重了。BackgroundWorker、轮询机制、容量控制——这些是为复杂任务队列场景准备的武器,但我们的场景根本用不上。
三、我们讨论过的几种并发方案
方案 A:调大 max_concurrent_tasks
最简单的做法——把 max_concurrent_tasks 从 5 调到 50 甚至 100。下游调用是纯 IO,asyncio 完全扛得住。
优点: 零代码改动,改个环境变量就行。
缺点: 单进程的天花板还在。Python 的 GIL 虽然对 IO 密集型影响不大,但单进程的事件循环、内存、连接数都有上限。而且 BackgroundWorker 的轮询模式本身就不够高效。
结论: 适合任务量不大、单机够用的过渡方案,但不是长期解。
方案 B:拆独立 Worker 进程 + Redis 原子抢任务
把 BackgroundWorker 从 FastAPI 进程里拆出来,做成独立的 Worker 进程。API 进程只负责接收请求和写 Redis,Worker 进程只负责消费任务。
API 进程 → 写任务到 Redis
Worker 进程 1 ─┐
Worker 进程 2 ─┤── 轮询 Redis,原子抢任务(SETNX / Lua 脚本)
Worker 进程 3 ─┘
优点: API 和 Worker 解耦,可以独立扩缩容。docker-compose scale worker=N 一行命令水平扩。
缺点: 需要实现分布式锁来防止重复处理。多了一个进程类型要维护。轮询模式的固有开销还在。
结论: 比方案 A 好很多,但对于一个纯转发服务来说,引入分布式锁的复杂度不值得。
方案 C:Redis 队列替代轮询
不再轮询 Redis 扫 pending 任务,改用 Redis List(BRPOP)或 Stream 做消息队列。创建任务时 LPUSH 到队列,Worker 用 BRPOP 阻塞等待。
优点: 天然支持多消费者,不需要额外加锁。没有轮询开销,任务来了立刻消费。
缺点: 需要改造任务分发逻辑。Redis Stream 的消费者组管理有一定学习成本。
结论: 如果要走独立 Worker 路线,这是比方案 B 更优的选择。但我们的场景真的需要消息队列吗?
方案 D:引入专业任务队列(Celery / arq)
用 Celery + Redis/RabbitMQ,或者 arq(轻量级 asyncio 任务队列)。自带重试、超时、优先级、监控、结果后端。
优点: 生产级方案,功能全面,社区成熟。
缺点: 重依赖。Celery 本身的配置和运维成本不低。对于一个纯转发服务来说,杀鸡用牛刀。
结论: 如果项目规模继续增长,这是终极方案。但当前阶段过度设计了。
方案 E:去掉 BackgroundWorker,请求直接起协程(最终采用)
回到场景本身思考:我们做的是纯转发,每个请求进来就是"调一次下游 API,存个结果"。那为什么要绕一圈——先写 Redis,再轮询取出来,再处理?
直接在收到请求的时候起协程处理不就完了?
客户端 → Uvicorn (worker 2 接到请求)
│
├─ 创建任务写 Redis(pending)
├─ asyncio.create_task() ← 本进程内直接起协程
└─ return task_id ← 立即响应
协程在 worker 2 内执行:
├─ 更新状态为 processing
├─ 调用下游 AI API(async/await,纯 IO)
└─ 更新状态为 completed / failed
客户端 → GET /tasks/{id}(可能落到任意 worker)
└─ 从 Redis 读结果
配合 Uvicorn --workers 4,就有了 4 个进程级并发。每个进程内 asyncio 再提供协程级并发。
这就是我们最终采用的方案。
四、为什么选方案 E:一次化繁为简的妥协
说"妥协",是因为方案 E 确实有明显的缺陷:
- 进程挂了任务就丢了。 正在处理的任务会永远卡在 processing 状态,不会被重试。
- 没有持久化的任务队列。 进程重启后,所有正在处理的任务全部丢失。
- 没有任务优先级。 所有任务先来先服务,无法插队。
但回到我们的场景:
- 纯转发,不怕丢。 丢了一个任务,客户端重新提交就行。不是支付,不是订单。
- 任务有 TTL。 卡在 processing 的僵尸任务,2 小时后 Redis 自动清理。查询接口还有 10 分钟超时检测,主动标记 failed。
- 不需要优先级。 所有请求平等,没有 VIP 通道的需求。
所以那些"缺陷",在我们的场景下根本不是问题。
而方案 E 带来的好处是实实在在的:
- 删掉了 BackgroundWorker 整个模块。 轮询逻辑、容量控制、活跃任务追踪、优雅关闭——全部不需要了。
- 删掉了
get_pending_tasks方法。 没人轮询了,不需要按状态查 pending 任务。 - 不需要分布式锁。 每个任务只在收到请求的那个 Worker 里处理,天然不冲突。
- 不需要额外的进程类型。 没有独立的 Worker 进程要部署和维护。
- 水平扩展极其简单。 改
--workers数字就行。
核心逻辑从"写 Redis → 轮询 → 取任务 → 处理"变成了"收到请求 → 起协程 → 处理"。 中间环节全部砍掉。
这不是偷懒,这是对场景的准确判断。一个纯转发服务,不需要任务队列的可靠性保证,那就不要引入任务队列的复杂度。
五、最终方案的实现细节
5.1 整体架构
┌─────────────────────────────────┐
│ Docker Compose │
│ │
│ ┌───────────┐ ┌─────────────┐ │
客户端 ──9527:8000──│──│ Uvicorn │ │ Redis │ │
│ │ 4 workers │──│ (结果存储) │ │
│ └───────────┘ └─────────────┘ │
└─────────────────────────────────┘
每个 Worker 进程内部:
┌──────────────────────────────────────────────┐
│ asyncio 事件循环 │
│ │
│ POST /tasks 请求进来 │
│ ├─ 写 Redis │
│ ├─ asyncio.create_task(_process_task) │
│ └─ return task_id │
│ │
│ _process_task 协程 │
│ ├─ async with semaphore: ← 下游并发限流 │
│ │ └─ await call_downstream_api() │
│ └─ 写结果到 Redis │
│ │
│ GET /tasks/{id} 请求进来 │
│ └─ 从 Redis 读结果 │
└──────────────────────────────────────────────┘
5.2 并发控制:Semaphore 限流
去掉 BackgroundWorker 后,没有了 max_concurrent_tasks 的概念。如果短时间涌入大量请求,每个都直接起协程调下游 API,可能把下游打挂。
解决方案是用 asyncio.Semaphore 限制每个 Worker 进程内的下游 API 并发数:
# dependencies.py
vlm_semaphore: Optional[asyncio.Semaphore] = None
# main.py lifespan
dependencies.vlm_semaphore = asyncio.Semaphore(settings.max_concurrent_vlm)
# routes.py _process_task
async with semaphore:
result = await client.call_downstream_api(...)
默认每个 Worker 最多 10 个并发下游调用,4 个 Worker 就是最多 40 个。通过环境变量 MAX_CONCURRENT_VLM 可调。
Semaphore 只限制下游调用这一步,不限制任务创建。 100 个请求同时进来,100 个都会立即返回 task_id(对调用方完全无感),然后 100 个协程在后台排队等 Semaphore。排在后面的任务只是完成得慢一些,不会报错,不会拒绝。
这对调用方来说是透明的:提交任务秒回,轮询结果的时候排在后面的任务等久一点而已。
5.3 processing 超时检测
Worker 进程挂了,正在处理的任务会卡在 processing 状态。我们在查询接口里做了被动检测:
PROCESSING_TIMEOUT_MINUTES = 10
if task.status == TaskStatus.PROCESSING:
elapsed = datetime.now(UTC) - task.updated_at.replace(tzinfo=UTC)
if elapsed > timedelta(minutes=PROCESSING_TIMEOUT_MINUTES):
task.mark_failed("Processing timed out after 10 minutes")
await task_manager.update_task(task)
不需要额外的定时任务。调用方轮询结果的时候,如果发现 processing 超过 10 分钟,直接标记 failed 返回。调用方看到 failed 可以选择重新提交。
这是一个"懒检测"策略——只有被查询到的任务才会触发超时判断。没人查的任务会一直卡着,直到 Redis TTL 过期自动清除。对我们的场景来说完全够用,因为调用方一定会轮询结果。
5.4 Uvicorn 多 Worker 模式
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Uvicorn 的多 Worker 模式基于 multiprocessing,会 fork 出 4 个子进程。每个子进程独立运行一个 FastAPI 应用实例,各自有自己的:
- asyncio 事件循环
- Redis 连接(在 lifespan 里各自初始化,Redis 连接不能跨进程共享)
- Semaphore 实例(进程级隔离,互不影响)
请求由 Uvicorn 主进程分发到子进程,天然负载均衡。
一个关键点: 每个 Worker 的 Semaphore 是独立的。设置 MAX_CONCURRENT_VLM=10,4 个 Worker 的实际上限是 4×10=40,不是 10。
5.5 任务 TTL
从 24 小时缩短到 2 小时。理由:
- 这是一个即时性服务,2 小时足够调用方拿到结果
- 缩短 TTL 意味着卡在 processing 的僵尸任务更快被清理
- Redis 内存占用更可控
TTL 通过环境变量 TASK_TTL_HOURS 可调。Redis key 的 TTL 和应用层的 expires_at 字段双重保障。
六、方案对比总结
| 维度 | A:调大并发数 | B:独立 Worker + 锁 | C:Redis 队列 | D:Celery/arq | E:请求直接起协程 |
|---|---|---|---|---|---|
| 改动量 | 零 | 大 | 大 | 大 | 中 |
| 架构复杂度 | 低 | 高 | 中 | 高 | 最低 |
| 水平扩展 | 不支持 | 支持 | 支持 | 支持 | 支持(改 workers 数) |
| 任务可靠性 | 低 | 高 | 高 | 最高 | 中(进程挂了任务丢) |
| 运维成本 | 低 | 中 | 中 | 高 | 最低 |
| 适用场景 | 临时过渡 | 需要可靠投递 | 需要可靠投递 | 大规模生产 | 纯转发、容忍偶尔丢任务 |
七、什么时候需要升级
当前方案的适用边界:
- 场景是纯转发:服务本身不做重计算,只是把请求转给下游、等结果、存起来
- 可以容忍偶尔丢任务:进程挂了正在处理的任务会丢,调用方重试即可
- 不需要任务优先级:所有任务先来先服务
- 单机部署:多 Worker 在同一台机器上
- 任务量在几百 QPS 以内:4 Worker × 10 并发 = 40 个下游请求同时飞,加上排队机制,几百 QPS 没问题
如果未来出现以下情况,就该考虑升级到方案 C 或 D:
- 需要跨机器部署多个实例
- 需要任务持久化和可靠投递(不能丢任务)
- 需要任务优先级、延迟执行、定时任务
- 需要完善的任务监控和管理后台
- 服务不再是纯转发,开始有 CPU 密集的处理逻辑
但在那之前,当前方案就是最好的方案——因为它最简单。
八、关键代码片段
创建任务 + 起协程
@router.post("/tasks", response_model=CreateTaskResponse)
async def create_task(body: CreateTaskRequest):
# ... 参数校验 ...
task = await task_manager.create_task(...)
# 直接在当前 worker 进程内起协程处理任务
asyncio.create_task(_process_task(task))
return CreateTaskResponse(id=task.id, status=task.status)
后台处理协程 + Semaphore 限流
async def _process_task(task):
task_manager = dependencies.task_manager
semaphore = dependencies.vlm_semaphore
try:
task.mark_processing()
await task_manager.update_task(task)
async with semaphore: # 限制下游 API 并发
result = await client.call_downstream_api(...)
task.mark_completed(image_prompt=result["image_prompt"], ...)
await task_manager.update_task(task)
except Exception as exc:
task.mark_failed(f"Processing error: {exc}")
await task_manager.update_task(task)
processing 超时检测
@router.get("/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: str):
task = await task_manager.get_task(task_id)
if task.status == TaskStatus.PROCESSING:
elapsed = datetime.now(UTC) - task.updated_at.replace(tzinfo=UTC)
if elapsed > timedelta(minutes=10):
task.mark_failed("Processing timed out after 10 minutes")
await task_manager.update_task(task)
return TaskResponse(...)
Dockerfile
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
环境变量
| 变量 | 默认值 | 说明 |
|---|---|---|
MAX_CONCURRENT_VLM | 10 | 每个 Worker 进程的下游 API 最大并发数 |
TASK_TTL_HOURS | 2 | 任务过期时间(小时) |
REDIS_URL | redis://localhost:6379/0 | Redis 连接地址 |
九、写在最后
架构设计最容易犯的错误是过度设计。
看到"异步任务"就想上 Celery,看到"多进程"就想上分布式锁,看到"消息传递"就想上 RabbitMQ——这些都是好工具,但不是每个场景都需要。
我们的服务就是一个转发层。请求进来,转给下游,等结果,存起来。没有复杂的业务逻辑,没有事务性要求,没有不能丢的数据。
对于这样的场景,asyncio.create_task() + asyncio.Semaphore + Uvicorn --workers 4 就是最优解。不是因为它功能最强,而是因为它刚好够用,同时复杂度最低。
少一个组件,就少一个故障点。少一层抽象,就少一个需要理解的概念。
这是一次化繁为简的妥协。我们清楚地知道放弃了什么(任务可靠性),也清楚地知道换来了什么(架构简单性)。在当前场景下,这个交换是值得的。
最好的架构不是功能最全的,而是对当前场景做出了正确取舍的。
陕公网安备61011302002223号