FastAPI 异步任务服务的并发设计演进:从单进程轮询到多 Worker 协程直处理

本文记录了一个 FastAPI 异步任务服务在并发架构上的思考和演进过程。这个服务的本质很简单:接收客户端请求,转发给下游 AI API,把结果存起来供客户端轮询。它不做复杂的业务计算,不做数据聚合,就是一个纯转发层——接活、派活、存结果。正因为场景足够简单,我们才有机会做一次化繁为简的架构妥协,把原本"看起来该用任务队列"的设计砍到只剩三行核心配置。

一、先说清楚场景:我们到底在干什么

这个服务做的事情可以用一句话概括:

客户端提交参数 → 服务转发给下游 AI API → 等结果 → 存 Redis → 客户端来取。

关键特征:

  • 纯 IO 转发:服务本身不做任何 CPU 密集计算,所有耗时都花在等下游 API 返回。一次调用几秒到几十秒不等,全是网络等待。
  • 异步模式:客户端提交任务后立即拿到 task_id,然后轮询结果。不是同步等响应。
  • 无状态转发:每个任务独立,互不依赖,没有顺序要求,没有事务性。
  • 容忍丢失:偶尔丢一个任务(比如进程挂了),客户端重新提交就行。不是支付、不是订单,丢了不会造成业务损失。

理解这个场景非常重要,因为后面所有的架构决策都建立在这个前提上。如果你的场景涉及 CPU 密集计算、任务间有依赖、或者不能容忍丢失,结论会完全不同。


二、最初的架构:单进程 + BackgroundWorker 轮询

项目最初的并发模型非常经典:

用户请求 → FastAPI 接口 → 创建任务写入 Redis(pending)→ 立即返回 task_id
                                        ↓
                            BackgroundWorker(进程内)
                            - 每秒轮询 Redis,找 pending 状态的任务
                            - 取出来丢给 asyncio.create_task() 处理
                            - max_concurrent_tasks = 5 控制并发上限
                                        ↓
                            调用下游 AI API(async/await,纯 IO 等待)
                                        ↓
                            任务标记 completed,结果写回 Redis
                                        ↓
用户轮询 → GET /tasks/{id} → 从 Redis 读结果

启动命令是:

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

没有 --workers 参数,只有一个 Uvicorn 进程。BackgroundWorker 在 FastAPI 的 lifespan 里启动,和 API 服务绑在同一个进程里。

这个架构的问题

并发天花板很低。 只有一个进程,max_concurrent_tasks 默认是 5,意味着同一时刻最多只有 5 个下游 API 请求在飞。下游调用是纯网络 IO 等待,asyncio 事件循环完全有能力处理更多并发,但被这个数字卡死了。

轮询模式有固有开销。 不管有没有新任务,BackgroundWorker 每秒都要去 Redis 扫一遍 pending 任务。任务少的时候这是无意义的开销,任务多的时候又可能因为轮询间隔导致延迟。

水平扩展有坑。 如果简单地给 Uvicorn 加 --workers 4,会 fork 出 4 个进程,每个进程都会跑一个 BackgroundWorker,4 个 Worker 同时轮询 Redis 抢 pending 任务。没有分布式锁的话,同一个任务会被多个 Worker 重复处理。

回过头看,这个架构对于一个"纯转发"的服务来说,设计过重了。BackgroundWorker、轮询机制、容量控制——这些是为复杂任务队列场景准备的武器,但我们的场景根本用不上。


三、我们讨论过的几种并发方案

方案 A:调大 max_concurrent_tasks

最简单的做法——把 max_concurrent_tasks 从 5 调到 50 甚至 100。下游调用是纯 IO,asyncio 完全扛得住。

优点: 零代码改动,改个环境变量就行。

缺点: 单进程的天花板还在。Python 的 GIL 虽然对 IO 密集型影响不大,但单进程的事件循环、内存、连接数都有上限。而且 BackgroundWorker 的轮询模式本身就不够高效。

结论: 适合任务量不大、单机够用的过渡方案,但不是长期解。

方案 B:拆独立 Worker 进程 + Redis 原子抢任务

把 BackgroundWorker 从 FastAPI 进程里拆出来,做成独立的 Worker 进程。API 进程只负责接收请求和写 Redis,Worker 进程只负责消费任务。

API 进程 → 写任务到 Redis
Worker 进程 1 ─┐
Worker 进程 2 ─┤── 轮询 Redis,原子抢任务(SETNX / Lua 脚本)
Worker 进程 3 ─┘

优点: API 和 Worker 解耦,可以独立扩缩容。docker-compose scale worker=N 一行命令水平扩。

缺点: 需要实现分布式锁来防止重复处理。多了一个进程类型要维护。轮询模式的固有开销还在。

结论: 比方案 A 好很多,但对于一个纯转发服务来说,引入分布式锁的复杂度不值得。

方案 C:Redis 队列替代轮询

不再轮询 Redis 扫 pending 任务,改用 Redis List(BRPOP)或 Stream 做消息队列。创建任务时 LPUSH 到队列,Worker 用 BRPOP 阻塞等待。

优点: 天然支持多消费者,不需要额外加锁。没有轮询开销,任务来了立刻消费。

缺点: 需要改造任务分发逻辑。Redis Stream 的消费者组管理有一定学习成本。

结论: 如果要走独立 Worker 路线,这是比方案 B 更优的选择。但我们的场景真的需要消息队列吗?

方案 D:引入专业任务队列(Celery / arq)

用 Celery + Redis/RabbitMQ,或者 arq(轻量级 asyncio 任务队列)。自带重试、超时、优先级、监控、结果后端。

优点: 生产级方案,功能全面,社区成熟。

缺点: 重依赖。Celery 本身的配置和运维成本不低。对于一个纯转发服务来说,杀鸡用牛刀。

结论: 如果项目规模继续增长,这是终极方案。但当前阶段过度设计了。

方案 E:去掉 BackgroundWorker,请求直接起协程(最终采用)

回到场景本身思考:我们做的是纯转发,每个请求进来就是"调一次下游 API,存个结果"。那为什么要绕一圈——先写 Redis,再轮询取出来,再处理?

直接在收到请求的时候起协程处理不就完了?

客户端 → Uvicorn (worker 2 接到请求)
            │
            ├─ 创建任务写 Redis(pending)
            ├─ asyncio.create_task()  ← 本进程内直接起协程
            └─ return task_id         ← 立即响应

            协程在 worker 2 内执行:
            ├─ 更新状态为 processing
            ├─ 调用下游 AI API(async/await,纯 IO)
            └─ 更新状态为 completed / failed

客户端 → GET /tasks/{id}(可能落到任意 worker)
            └─ 从 Redis 读结果

配合 Uvicorn --workers 4,就有了 4 个进程级并发。每个进程内 asyncio 再提供协程级并发。

这就是我们最终采用的方案。


四、为什么选方案 E:一次化繁为简的妥协

说"妥协",是因为方案 E 确实有明显的缺陷:

  • 进程挂了任务就丢了。 正在处理的任务会永远卡在 processing 状态,不会被重试。
  • 没有持久化的任务队列。 进程重启后,所有正在处理的任务全部丢失。
  • 没有任务优先级。 所有任务先来先服务,无法插队。

但回到我们的场景:

  • 纯转发,不怕丢。 丢了一个任务,客户端重新提交就行。不是支付,不是订单。
  • 任务有 TTL。 卡在 processing 的僵尸任务,2 小时后 Redis 自动清理。查询接口还有 10 分钟超时检测,主动标记 failed。
  • 不需要优先级。 所有请求平等,没有 VIP 通道的需求。

所以那些"缺陷",在我们的场景下根本不是问题。

而方案 E 带来的好处是实实在在的:

  • 删掉了 BackgroundWorker 整个模块。 轮询逻辑、容量控制、活跃任务追踪、优雅关闭——全部不需要了。
  • 删掉了 get_pending_tasks 方法。 没人轮询了,不需要按状态查 pending 任务。
  • 不需要分布式锁。 每个任务只在收到请求的那个 Worker 里处理,天然不冲突。
  • 不需要额外的进程类型。 没有独立的 Worker 进程要部署和维护。
  • 水平扩展极其简单。 改 --workers 数字就行。

核心逻辑从"写 Redis → 轮询 → 取任务 → 处理"变成了"收到请求 → 起协程 → 处理"。 中间环节全部砍掉。

这不是偷懒,这是对场景的准确判断。一个纯转发服务,不需要任务队列的可靠性保证,那就不要引入任务队列的复杂度。


五、最终方案的实现细节

5.1 整体架构

                    ┌─────────────────────────────────┐
                    │         Docker Compose           │
                    │                                  │
                    │  ┌───────────┐  ┌─────────────┐ │
客户端 ──9527:8000──│──│  Uvicorn  │  │    Redis     │ │
                    │  │ 4 workers │──│  (结果存储)   │ │
                    │  └───────────┘  └─────────────┘ │
                    └─────────────────────────────────┘

每个 Worker 进程内部:
┌──────────────────────────────────────────────┐
│  asyncio 事件循环                              │
│                                                │
│  POST /tasks 请求进来                          │
│    ├─ 写 Redis                                 │
│    ├─ asyncio.create_task(_process_task)       │
│    └─ return task_id                           │
│                                                │
│  _process_task 协程                            │
│    ├─ async with semaphore:  ← 下游并发限流    │
│    │     └─ await call_downstream_api()        │
│    └─ 写结果到 Redis                           │
│                                                │
│  GET /tasks/{id} 请求进来                      │
│    └─ 从 Redis 读结果                          │
└──────────────────────────────────────────────┘

5.2 并发控制:Semaphore 限流

去掉 BackgroundWorker 后,没有了 max_concurrent_tasks 的概念。如果短时间涌入大量请求,每个都直接起协程调下游 API,可能把下游打挂。

解决方案是用 asyncio.Semaphore 限制每个 Worker 进程内的下游 API 并发数:

# dependencies.py
vlm_semaphore: Optional[asyncio.Semaphore] = None

# main.py lifespan
dependencies.vlm_semaphore = asyncio.Semaphore(settings.max_concurrent_vlm)

# routes.py _process_task
async with semaphore:
    result = await client.call_downstream_api(...)

默认每个 Worker 最多 10 个并发下游调用,4 个 Worker 就是最多 40 个。通过环境变量 MAX_CONCURRENT_VLM 可调。

Semaphore 只限制下游调用这一步,不限制任务创建。 100 个请求同时进来,100 个都会立即返回 task_id(对调用方完全无感),然后 100 个协程在后台排队等 Semaphore。排在后面的任务只是完成得慢一些,不会报错,不会拒绝。

这对调用方来说是透明的:提交任务秒回,轮询结果的时候排在后面的任务等久一点而已。

5.3 processing 超时检测

Worker 进程挂了,正在处理的任务会卡在 processing 状态。我们在查询接口里做了被动检测:

PROCESSING_TIMEOUT_MINUTES = 10

if task.status == TaskStatus.PROCESSING:
    elapsed = datetime.now(UTC) - task.updated_at.replace(tzinfo=UTC)
    if elapsed > timedelta(minutes=PROCESSING_TIMEOUT_MINUTES):
        task.mark_failed("Processing timed out after 10 minutes")
        await task_manager.update_task(task)

不需要额外的定时任务。调用方轮询结果的时候,如果发现 processing 超过 10 分钟,直接标记 failed 返回。调用方看到 failed 可以选择重新提交。

这是一个"懒检测"策略——只有被查询到的任务才会触发超时判断。没人查的任务会一直卡着,直到 Redis TTL 过期自动清除。对我们的场景来说完全够用,因为调用方一定会轮询结果。

5.4 Uvicorn 多 Worker 模式

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Uvicorn 的多 Worker 模式基于 multiprocessing,会 fork 出 4 个子进程。每个子进程独立运行一个 FastAPI 应用实例,各自有自己的:

  • asyncio 事件循环
  • Redis 连接(在 lifespan 里各自初始化,Redis 连接不能跨进程共享)
  • Semaphore 实例(进程级隔离,互不影响)

请求由 Uvicorn 主进程分发到子进程,天然负载均衡。

一个关键点: 每个 Worker 的 Semaphore 是独立的。设置 MAX_CONCURRENT_VLM=10,4 个 Worker 的实际上限是 4×10=40,不是 10。

5.5 任务 TTL

从 24 小时缩短到 2 小时。理由:

  • 这是一个即时性服务,2 小时足够调用方拿到结果
  • 缩短 TTL 意味着卡在 processing 的僵尸任务更快被清理
  • Redis 内存占用更可控

TTL 通过环境变量 TASK_TTL_HOURS 可调。Redis key 的 TTL 和应用层的 expires_at 字段双重保障。


六、方案对比总结

维度A:调大并发数B:独立 Worker + 锁C:Redis 队列D:Celery/arqE:请求直接起协程
改动量
架构复杂度最低
水平扩展不支持支持支持支持支持(改 workers 数)
任务可靠性最高中(进程挂了任务丢)
运维成本最低
适用场景临时过渡需要可靠投递需要可靠投递大规模生产纯转发、容忍偶尔丢任务

七、什么时候需要升级

当前方案的适用边界:

  • 场景是纯转发:服务本身不做重计算,只是把请求转给下游、等结果、存起来
  • 可以容忍偶尔丢任务:进程挂了正在处理的任务会丢,调用方重试即可
  • 不需要任务优先级:所有任务先来先服务
  • 单机部署:多 Worker 在同一台机器上
  • 任务量在几百 QPS 以内:4 Worker × 10 并发 = 40 个下游请求同时飞,加上排队机制,几百 QPS 没问题

如果未来出现以下情况,就该考虑升级到方案 C 或 D:

  • 需要跨机器部署多个实例
  • 需要任务持久化和可靠投递(不能丢任务)
  • 需要任务优先级、延迟执行、定时任务
  • 需要完善的任务监控和管理后台
  • 服务不再是纯转发,开始有 CPU 密集的处理逻辑

但在那之前,当前方案就是最好的方案——因为它最简单。


八、关键代码片段

创建任务 + 起协程

@router.post("/tasks", response_model=CreateTaskResponse)
async def create_task(body: CreateTaskRequest):
    # ... 参数校验 ...

    task = await task_manager.create_task(...)

    # 直接在当前 worker 进程内起协程处理任务
    asyncio.create_task(_process_task(task))

    return CreateTaskResponse(id=task.id, status=task.status)

后台处理协程 + Semaphore 限流

async def _process_task(task):
    task_manager = dependencies.task_manager
    semaphore = dependencies.vlm_semaphore

    try:
        task.mark_processing()
        await task_manager.update_task(task)

        async with semaphore:  # 限制下游 API 并发
            result = await client.call_downstream_api(...)

        task.mark_completed(image_prompt=result["image_prompt"], ...)
        await task_manager.update_task(task)
    except Exception as exc:
        task.mark_failed(f"Processing error: {exc}")
        await task_manager.update_task(task)

processing 超时检测

@router.get("/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: str):
    task = await task_manager.get_task(task_id)

    if task.status == TaskStatus.PROCESSING:
        elapsed = datetime.now(UTC) - task.updated_at.replace(tzinfo=UTC)
        if elapsed > timedelta(minutes=10):
            task.mark_failed("Processing timed out after 10 minutes")
            await task_manager.update_task(task)

    return TaskResponse(...)

Dockerfile

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

环境变量

变量默认值说明
MAX_CONCURRENT_VLM10每个 Worker 进程的下游 API 最大并发数
TASK_TTL_HOURS2任务过期时间(小时)
REDIS_URLredis://localhost:6379/0Redis 连接地址

九、写在最后

架构设计最容易犯的错误是过度设计。

看到"异步任务"就想上 Celery,看到"多进程"就想上分布式锁,看到"消息传递"就想上 RabbitMQ——这些都是好工具,但不是每个场景都需要。

我们的服务就是一个转发层。请求进来,转给下游,等结果,存起来。没有复杂的业务逻辑,没有事务性要求,没有不能丢的数据。

对于这样的场景,asyncio.create_task() + asyncio.Semaphore + Uvicorn --workers 4 就是最优解。不是因为它功能最强,而是因为它刚好够用,同时复杂度最低

少一个组件,就少一个故障点。少一层抽象,就少一个需要理解的概念。

这是一次化繁为简的妥协。我们清楚地知道放弃了什么(任务可靠性),也清楚地知道换来了什么(架构简单性)。在当前场景下,这个交换是值得的。

最好的架构不是功能最全的,而是对当前场景做出了正确取舍的。

Read more

MCP 服务端的隐藏设计:结论性数据如何改变

Agent 的工作方式 我们以为 MCP 服务只是查数据的管道,拆开一看,发现服务端已经把分析结论都算好了。这个发现改变了我对 Agent 架构的理解。 起因:一次对 MCP 服务的逆向探索 最近在研究 MCP(Model Context Protocol)的实际应用,我选了一个真实的商业 MCP 服务 —— 某电商卖家流量分析平台作为研究对象。该服务提供了 27 个工具,覆盖关键词分析、流量运营、广告洞察等领域。 最初的预期很简单:MCP 服务就是一个数据接口,Agent(LLM)调用它拿到原始数据,然后自己分析、得出结论、给用户建议。 实际拆开一看,完全不是这么回事。 第一个发现:返回数据里藏着完整的分析结论 我写了一个 Python 脚本,绕过所有 AI 客户端,直接用

By ladydd

从连上一个 MCP 服务到理解 AI 系统的工程本质

一次从"会用"到"理解原理"再到"能优化"的完整探索记录。 本文记录了我通过实际动手连接一个远程 MCP 服务(SIF —— 亚马逊卖家流量分析平台),一步步深入理解 MCP 协议机制、LLM 上下文管理、注意力资源分配、以及工具编排优化方案的全过程。 一、起点:连上一个真实的 MCP 服务 什么是 MCP? MCP(Model Context Protocol)是 Anthropic 主导设计的一个开放协议,目的是标准化 AI 应用与外部工具/数据源之间的通信方式。你可以把它理解为"AI 世界的 USB 接口"

By ladydd

OpenCLI 学习 08:现实约束与兼容层思路

1. 我当前面对的现实约束 虽然我现在越来越倾向于: * 上层做 Agent * 下层做 Harness 但现实里调用我的人,很多时候只会通过 API 的形式来调用能力。 这意味着: * 我未必能决定上层最终长成什么样 * 外部接入形式可能仍然是 HTTP、函数调用或者一次性接口 2. 我当前的重要判断 我现在认为,这并不和 Agent + Harness 的方向冲突。 更合理的理解是: * Agent + Harness 是内部核心结构 * API、函数调用、HTTP 等形式是外部兼容层 也就是说,我真正需要先做好的是: * Agent 的能力设计 * Harness 的抽象与落地 * Agent 和 Harness 之间的接口关系 而不是一开始就被外部接入形式绑死。 3. 一个重要认识:不是 API 和 Agent 二选一 我当前更认可的分层是:

By ladydd
陕公网安备61011302002223号 | 陕ICP备2025083092号