从 Redis 到 PostgreSQL:一次需求驱动的任务系统架构升级

最近我对一个 LLM 任务服务做了一次比较完整的架构升级。

这个服务本身并不复杂:上游提交任务,我返回一个 task_id,后续上游再通过 task_id 查询任务状态和结果。任务的执行内容主要是调用 LLM 或相关多模态接口,单个任务耗时通常在几十秒以内。

最开始的实现是比较常见的组合:

FastAPI + Redis + Docker

API 接到请求后生成任务 ID,把状态写入 Redis,然后由后台逻辑继续执行任务。这个方案在早期足够轻量,开发速度也很快。

但随着需求继续演进,我开始更明确地追求几个能力:

1. 上游可以瞬间提交大量任务
2. API 必须快速返回 task_id
3. 真正执行的任务数量必须全局可控
4. 任务状态要可查询、可恢复、可追踪
5. worker 崩溃后任务不能永久卡死
6. 未来希望减少组件数量,尽量 all in PostgreSQL

也就是说,这次不是简单地“把 Redis 换成 PostgreSQL”。

更准确地说,这是一次从:

轻量状态存储

升级到:

PostgreSQL 中心化任务队列 + 全局并发控制 + 独立 worker 执行

的架构演进。

最后形成的结构是:

API + PostgreSQL Queue + Worker

也就是:

FastAPI API 层:只负责接单,快速返回 task_id
PostgreSQL:负责任务表、状态、队列、锁、并发控制
Worker:独立进程/容器,从 PostgreSQL 领取任务并执行

这套结构跑起来以后,我感觉它非常清楚,也很适合 LLM 任务这类“提交快、执行慢、需要排队和状态追踪”的场景。


一、需求背景:API 要快,执行要可控

这个服务对外提供的接口保持不变:

POST /tasks
POST /tasks/veo
GET /tasks/{task_id}

上游提交任务时,服务要立刻返回:

task_id
status

后续上游通过:

GET /tasks/{task_id}

查询任务状态:

pending
running
completed
failed
not_found

这类系统最重要的边界是:

提交任务

和:

执行任务

不能绑死在一起。

因为提交任务是高频、短耗时操作;执行任务是低频、长耗时操作。

提交任务只需要:

接收请求
生成 task_id
写入任务记录
返回 task_id

而执行任务可能要:

下载图片
构造 prompt
调用 LLM
等待外部 API 返回
处理结果
写入数据库

这两部分的耗时完全不在一个量级。

因此这次改造的目标非常明确:

API 层只接单,不干活。
Worker 层负责真正执行任务。
PostgreSQL 负责在两者之间做任务队列和全局协调。

这样一来,上游瞬间提交大量任务时,API 仍然可以快速返回 task_id;真正向外部 LLM 打出去的请求数量,则由 worker 和 PostgreSQL 控制。


二、为什么主动选择 PostgreSQL

这次选择 PostgreSQL,不只是为了替代 Redis,而是看中了 PostgreSQL 的 all in one 能力。

在这个系统里,任务队列并不只是一个队列。

它同时需要承载:

任务状态
任务请求参数
任务执行结果
错误信息
尝试次数
锁定时间
超时回收
任务查询
历史记录
并发控制

如果继续把状态、队列、锁、结果分散到不同组件里,系统会变得越来越复杂。

而 PostgreSQL 可以把这些能力收敛到一张任务表和一组事务逻辑里:

JSONB:存请求和结果
普通索引:加速 pending/running 查询
事务:保证领取任务原子性
行锁:防止重复领取
advisory lock:保护全局并发计数
timestamp 字段:实现超时回收
SQL 查询:天然支持统计和排查

这就是我看中 PostgreSQL 的地方。

它不是单纯的关系型数据库,而是可以承担很多后端基础设施职责:

状态存储
任务队列
轻量文档存储
锁服务
审计日志
查询分析

对于这个阶段的项目来说,减少组件数量、增强可控性,比单纯追求某一个局部组件的极限性能更重要。

所以这次我主动选择:

All in PostgreSQL

把任务系统的核心状态全部收敛进去。


三、任务表:系统的中心协议

这次改造后,任务表成为整个系统的中心。

表结构核心如下:

CREATE TABLE IF NOT EXISTS tasks (
    task_id TEXT PRIMARY KEY,
    status TEXT NOT NULL,
    task_type TEXT,
    request JSONB,
    result JSONB,
    error TEXT,
    attempts INTEGER NOT NULL DEFAULT 0,
    locked_until TIMESTAMPTZ,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    expires_at TIMESTAMPTZ
);

配套索引:

CREATE INDEX IF NOT EXISTS idx_tasks_pending_created_at
ON tasks (created_at)
WHERE status = 'pending'
  AND task_type IS NOT NULL
  AND request IS NOT NULL;

CREATE INDEX IF NOT EXISTS idx_tasks_running_locked_until
ON tasks (locked_until)
WHERE status = 'running'
  AND request IS NOT NULL;

这张表承担的不只是存储职责。

它实际上变成了 API 和 worker 之间的协议。

API 只需要写入:

task_id
status = pending
task_type
request

Worker 只需要读取 pending 任务,领取后改成:

running

执行完成后再改成:

completed

或者:

failed

查询接口只需要根据 task_id 从这张表里读状态和结果。

这样系统边界非常清楚:

API 不需要知道 worker 是 Python 还是 Go。
Worker 不需要知道上游是谁。
双方只需要遵守 tasks 表的状态协议。

这一点是这次架构升级里很关键的收获。


四、API 层:只接单,立刻返回

改造后的 API 层逻辑非常简单。

收到:

POST /tasks
POST /tasks/veo

以后,只做几件事:

1. 生成 task_id
2. 把任务写入 PostgreSQL
3. 返回 task_id 和 pending 状态

也就是:

enqueue_task(...)
return task_id

API 层不再负责真正执行任务。

这样做的好处非常直接:

上游提交速度和任务执行速度解耦

上游可以很快提交 100 个、1000 个甚至更多任务。

这些请求进入 API 后,只是变成 PostgreSQL 里的任务行。真正执行的节奏由 worker 控制。

这也是这个架构最核心的设计:

API 负责吞吐。
Worker 负责执行。
PostgreSQL 负责协调。

五、Worker:独立执行单元

任务执行被移动到了独立 worker。

Worker 的职责是:

1. 循环尝试领取任务
2. 把任务从 pending 改为 running
3. 执行任务逻辑
4. 成功后写 completed + result
5. 失败后写 failed + error
6. 定期处理超时 running 任务

最开始 worker 可以是 Python:

python -m app.worker

后来我进一步把 worker 改造成 Go。

因为 worker 这类服务本质上很适合 Go:

循环取任务
HTTP 调用
超时控制
数据库更新
日志
优雅退出

Go 在这类场景下内存占用低、启动快、部署简单。

而且由于 API 和 worker 之间通过 PostgreSQL tasks 表通信,所以 worker 的语言并不影响外部接口。

这也是架构边界清楚后的好处:

API 可以继续用 FastAPI。
Worker 可以独立换成 Go。
上游完全无感知。

六、任务领取:FOR UPDATE SKIP LOCKED

多个 worker 同时运行时,最重要的问题是:

不能让多个 worker 领取到同一个任务。

PostgreSQL 里可以用:

FOR UPDATE SKIP LOCKED

来解决这个问题。

领取任务的核心 SQL 是:

WITH next_task AS (
    SELECT task_id
    FROM tasks
    WHERE status = 'pending'
      AND task_type IS NOT NULL
      AND request IS NOT NULL
    ORDER BY created_at
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE tasks AS t
SET status = 'running',
    attempts = t.attempts + 1,
    locked_until = now() + ($1::int * interval '1 second'),
    updated_at = now(),
    expires_at = NULL
FROM next_task
WHERE t.task_id = next_task.task_id
RETURNING t.task_id, t.task_type, t.request::text AS request;

它的效果可以理解成:

多个 worker 同时来抢 pending 任务。
某个 worker 锁住一行后,其他 worker 会跳过这行。
每个 worker 最终拿到不同的任务。

这解决了任务重复领取问题。


七、全局并发控制:advisory lock + running count

FOR UPDATE SKIP LOCKED 解决的是“不要重复领取同一个任务”。

但我还需要另一个能力:

全局最多同时执行 N 个任务。

比如设置:

MAX_CONCURRENT_TASKS=20

那不管启动多少 worker,真正处于 running 的任务都不应该超过 20。

为了做到这一点,领取任务时使用了 PostgreSQL 的事务级 advisory lock:

SELECT pg_advisory_xact_lock(...);

完整逻辑是:

开启事务
  ↓
获取 pg_advisory_xact_lock
  ↓
回收超时 running 任务
  ↓
查询当前 running 数量
  ↓
如果 running >= MAX_CONCURRENT_TASKS,直接不领取
  ↓
如果 running < MAX_CONCURRENT_TASKS,领取一个 pending 任务
  ↓
同一个事务里更新成 running
  ↓
提交事务
  ↓
worker 开始调用 LLM

这里的设计重点是两个锁分工明确:

pg_advisory_xact_lock:
    负责把“统计 running 数 + 领取任务”这个临界区串行化

FOR UPDATE SKIP LOCKED:
    负责具体任务行的抢占

这样就能避免多个 worker 同时看到 running 数未满,然后一起领取导致短暂超额。

也就是说,PostgreSQL 不只是存任务,还承担了全局并发控制器的角色。


八、事务边界:不能拖着 LLM 调用

这个设计里还有一个非常重要的边界:

数据库事务只包住领取任务,不能包住 LLM 调用。

正确流程是:

开启事务
  ↓
领取任务
  ↓
标记 running
  ↓
提交事务
  ↓
调用 LLM
  ↓
写入 completed / failed

这样数据库锁只持有很短时间。

不能这样做:

开启事务
  ↓
领取任务
  ↓
调用 LLM 等几十秒
  ↓
写结果
  ↓
提交事务

因为这样会形成长事务,把外部 API 的不确定性带进数据库事务里。

最终我采用的是短事务领取,事务外执行任务。

这也是这类 PostgreSQL queue 设计里非常关键的一点。


九、worker 崩溃恢复:locked_until

如果 worker 领取任务后进程崩溃,任务会停留在:

status = running

为了避免任务永久卡住,任务表里设计了:

locked_until
attempts

worker 领取任务前,会先回收超时任务:

UPDATE tasks
SET status = CASE
        WHEN attempts >= $1::int THEN 'failed'
        ELSE 'pending'
    END,
    error = CASE
        WHEN attempts >= $1::int THEN 'worker timeout'
        ELSE error
    END,
    locked_until = NULL,
    updated_at = now(),
    expires_at = CASE
        WHEN attempts >= $1::int THEN now() + ($2::int * interval '1 second')
        ELSE NULL
    END
WHERE status = 'running'
  AND request IS NOT NULL
  AND locked_until IS NOT NULL
  AND locked_until < now();

这样 worker 崩溃后:

任务先保持 running
  ↓
locked_until 过期
  ↓
其他 worker 领取任务前发现它超时
  ↓
根据 attempts 决定重新 pending 或最终 failed

这让任务系统具备了自恢复能力。


十、Docker Compose:用 scale 控制 worker 数量

worker 独立以后,可以用 Docker Compose 很直观地控制执行单元数量。

启动 20 个 worker:

docker compose up -d --scale worker=20

停止整个项目:

docker compose down

把 worker 降到 0,但保留 API 和 PostgreSQL:

docker compose up -d --scale worker=0

恢复 worker:

docker compose up -d --scale worker=20

这个模型非常清楚:

worker=0:系统可以接单,但暂停执行
worker=3:低并发执行
worker=20:高并发执行

这比把并发藏在代码内部更直观。

当然,真正的全局并发仍然由 PostgreSQL 的 MAX_CONCURRENT_TASKS 兜底控制。

所以即使 worker 数量多于上限,也不会无限制向外打请求。


十一、Python worker 与 Go worker

一开始 worker 是 Python 版本。

测试下来,Python worker 空闲大概 30MiB,执行任务时大概 40MiB。

这个内存其实并不夸张:

40MiB × 20 = 800MiB

完全可接受。

后来我尝试把 worker 改成 Go,效果更轻:

Go worker 待机:3MiB ~ 6MiB

如果启动 20 个 Go worker:

6MiB × 20 = 120MiB

这个资源占用非常漂亮。

这也让我更加确认:

FastAPI 做 API
Go 做 worker
PostgreSQL 做中心任务系统

是一个非常舒服的组合。

FastAPI 保留了 Python 在接口开发上的效率,Go worker 则提供了更轻的运行时和更稳定的执行层。


十二、为什么没有使用 Celery

这次没有引入 Celery。

不是 Celery 不好,而是这次需求更适合一个直接、透明、PostgreSQL 中心化的方案。

Celery 通常会带来:

broker
result backend
worker
task decorator
routing
retry
beat
monitoring

它适合更复杂的任务生态。

而这个系统当前的核心诉求很明确:

任务入库
快速返回 task_id
worker 领取任务
控制全局执行并发
写入结果
支持查询
支持超时恢复

这些能力用 PostgreSQL tasks 表 + worker 就可以覆盖。

更重要的是,当前系统的 GET /tasks/{task_id} 本来就是核心接口。

任务状态天然就应该落在业务数据库里。

如果额外引入 Celery,很可能会出现:

Celery 管一份任务状态
PostgreSQL 管一份业务状态
API 查询时还要做映射

这会让系统边界变复杂。

现在的设计更直接:

tasks 表就是事实来源。

十三、关于 goroutine 的思考

Go worker 跑起来后,我也考虑过是否要用 goroutine 进一步优化。

现在的模式是:

20 个 worker 容器
每个容器 1 个执行槽

Go 当然可以改成:

1 个 worker 容器
内部 20 个 goroutine

或者折中:

4 个 worker 容器
每个容器 5 个 goroutine

goroutine 的优势是空闲成本很低,一个 Go 进程里开多个 goroutine,比启动多个进程和多个容器更省资源。

但我最后暂时没有这么做。

原因是当前 Go worker 已经足够轻:

单个 worker 只有 3MiB ~ 6MiB

20 个容器也只是 60MiB ~ 120MiB 的量级。

而一个容器一个执行槽的好处也很明显:

结构直观
故障隔离好
排查简单
一个 worker 挂了只损失一个执行槽
不会过早引入进程内并发复杂度

所以当前阶段,我选择保持:

一个 worker 容器 = 一个执行单元

后续如果确实需要优化,可以再演进到:

少量容器 × 每个容器多个 goroutine

但不是现在。


十四、资源占用情况

实际观察下来,资源占用比预期更好。

大概数据是:

FastAPI API:约 160MiB
PostgreSQL:约 127MiB
Go worker:单个 3MiB ~ 6MiB

如果启动 20 个 Go worker:

Go worker 总内存:约 60MiB ~ 120MiB

整体结构非常轻:

FastAPI:160MiB
PostgreSQL:127MiB
20 个 Go worker:60~120MiB

即使加上运行时波动,总体也很可控。

PostgreSQL 肯定不会像 Redis 那样极致省内存,但它提供的是完全不同的能力:

任务状态
事务
锁
JSONB
查询
索引
恢复
统计

用 100 多 MiB 的常驻内存换来这些能力,在这个系统里非常值。


十五、提交压测结果

我跑了一次提交压测:

python scripts/load_submit.py --total 100 --concurrency 50 --docker-stats

结果:

submitted=100/100
ok=100
failed=0
elapsed=0.17s
avg_submit_rate=594.4/s
latency_seconds min=0.004 p50=0.008 p95=0.012 p99=0.015 max=0.015

这说明 API 入队链路很轻:

HTTP 请求
FastAPI 接收
写 PostgreSQL
返回 task_id

在这个测试下,p95 只有 12ms 左右。

这正是“API 只接单,不执行任务”的效果。

后续还可以继续扩大测试:

python scripts/load_submit.py --total 1000 --concurrency 100 --docker-stats
python scripts/load_submit.py --total 5000 --concurrency 200 --docker-stats
python scripts/load_submit.py --total 10000 --concurrency 200 --docker-stats

主要观察:

ok 是否等于 total
failed 是否为 0
p95 / p99 是否稳定
PostgreSQL CPU / IO 是否稳定
API 内存是否稳定
running 是否始终 <= MAX_CONCURRENT_TASKS

提交速度只是第一部分。

更重要的是确认:

任务可以大量提交
但执行并发始终受控

十六、这套架构的优点

1. API 完全兼容

对外接口不变:

POST /tasks
POST /tasks/veo
GET /tasks/{task_id}

返回结构不变:

task_id
status
result
error

状态值不变:

pending
running
completed
failed
not_found

这意味着上游调用方不需要改。

2. 提交和执行解耦

API 负责快速入队,worker 负责慢速执行。

上游提交压力不会直接变成 LLM 请求压力。

3. 全局并发可控

真正执行任务的数量由:

MAX_CONCURRENT_TASKS
worker 数量
PostgreSQL 锁

共同控制。

即使上游瞬间提交很多任务,也只会按设定并发慢慢执行。

4. 状态统一

任务状态全部在 PostgreSQL 中。

查询、排查、统计都很直接。

5. 崩溃可恢复

通过:

locked_until
attempts
timeout recycle

worker 挂掉后任务可以重新被处理或最终失败。

6. worker 可替换

Python worker 可以换成 Go worker。

未来 API 也可以从 FastAPI 换成 Gin。

核心协议是 PostgreSQL tasks 表。


十七、当前方案的取舍

这套架构也有取舍。

1. worker 是常驻的

启动 20 个 worker 后,即使没有任务,它们也会占用基础内存。

不过 Go worker 单个只有 3MiB ~ 6MiB,目前这个成本可以接受。

2. Docker Compose 不自动扩缩容

普通 Docker Compose 不会根据 pending 任务数量自动扩缩 worker。

如果未来需要,可以写一个轻量 autoscaler:

pending > 100:scale worker=20
pending > 20:scale worker=10
pending = 0 持续一段时间:scale worker=1

但当前阶段没必要过早引入。

3. PostgreSQL 成为核心依赖

PostgreSQL 现在是任务系统中心。

未来生产化需要关注:

volume
备份
连接数
索引
vacuum
慢查询
历史任务清理

这是 all in PostgreSQL 必须接受的责任。


十八、最终理解

这次架构升级让我进一步确认了一件事:

任务系统的核心不是“异步执行”,而是“边界清晰”。

一个健康的任务系统应该把这几件事拆开:

谁接收请求?
谁记录任务?
谁控制并发?
谁执行任务?
谁负责失败恢复?
谁提供查询?

在这次方案里,答案很清楚:

FastAPI 接收请求
PostgreSQL 记录任务、控制并发、提供状态查询
Go worker 执行任务
Docker Compose 管理 worker 数量

这套结构不是最复杂的,也不是最重的,但它非常清楚。

我最满意的是:

tasks 表成为了系统协议。

API、worker、未来其他服务,都围绕这张表协作。

这让系统从一个普通的异步接口,升级成了一个可控、可恢复、可扩展的任务系统。


结语

这次从 Redis 到 PostgreSQL 的迁移,本质上不是“换一个存储组件”。

它是一次需求驱动的架构升级:

从轻量状态存储
升级到
PostgreSQL 中心化任务系统

最终架构是:

Python FastAPI:负责接单
PostgreSQL:负责任务队列、状态、锁、并发控制
Go worker:负责执行任务
Docker Compose:负责 worker scale

对我来说,这个方案最优雅的地方在于:

上游可以快速提交任务,
API 可以立即返回 task_id,
worker 可以按固定并发慢慢执行,
所有状态都能在 PostgreSQL 中被查询和恢复。

这就是我这次想要的效果。

Read more

传统 SaaS 转向 AI 时代,我目前的一点理解:先把数据能力变成 Agent 可调用的基础设施

最近我一直在思考一个问题:传统 SaaS 到底应该怎么转向 AI? 一开始很容易想到的方向是:给原来的系统加一个 AI 助手。 比如在页面右下角放一个聊天框,让用户可以问数据、生成报告、总结内容、解释指标。这个当然有价值,但我现在越来越觉得,这只是比较表层的一种转型。 真正的变化,可能不是“在 SaaS 里面加 AI”,而是 SaaS 本身的能力形态发生变化。 过去的 SaaS,核心是给人使用。 人登录系统,看页面、点按钮、筛选数据、导出报表、判断问题,然后再去做决策。数据库是给 Web 页面供数的,后端 API 是给前端页面服务的,整个产品的中心是“人如何操作软件”。 但 AI 时代,尤其是 Agent 逐渐发展之后,

By ladydd

对 Python 应用场景的一次重新思考:FastAPI、协程、线程、数据库与任务系统边界

最近在重新设计一个任务系统时,我顺便把自己对 Python,尤其是 CPython 应用场景的理解重新梳理了一遍。 这次讨论的背景是一个典型的异步任务服务: 上游提交任务 API 立即返回 task_id 后台 worker 慢慢执行 用户通过 task_id 查询任务状态 任务主要是 LLM 调用、图片下载、外部 HTTP 请求这类 I/O 型工作。 一开始关注的是队列、Redis、PostgreSQL、worker 并发控制这些问题。但聊到后面,其实更核心的问题变成了: Python 到底应该放在什么位置? 哪些并发适合 Python? 哪些并发不要硬塞给 Python? FastAPI、协程、线程、数据库之间应该怎么分工? 这篇文章就是这次思考的整理。 一、我不想抛弃 Python,

By ladydd

Go 和 Python 的并发模型对比:进程、线程、协程、并发和并行到底怎么理解?

最近我在写 worker 任务系统的时候,重新理解了一遍 Python 和 Go 的并发差异。 以前写 Python,多 worker 经常要考虑: 多进程怎么管理? 日志会不会串? 一个 worker 崩了怎么办? 怎么吃满多核心? 后来换成 Go,发现一个进程里开多个 goroutine worker 就很自然: go worker(1) go worker(2) go worker(3) go worker(4) 日志也好管,状态也好管,而且单进程还能利用多个 CPU 核心。 一开始很容易误会成: Python 不行,Go 行 但更准确的理解应该是: Python 和

By ladydd

Python 进程和 Go 进程的区别:为什么 Go 单进程多 worker 用起来更爽?

最近我在做 worker 任务系统的时候,突然意识到一个很关键的问题: 以前写 Python,多 worker 的时候经常要小心日志串、文件切割乱、时间不好管理。 但是换成 Go 以后,一个进程里开多个 goroutine worker,反而可以比较自然地写到同一个日志文件里。 一开始我以为这是“Python 和 Go 写日志能力不一样”,后来想明白了,核心不是日志本身,而是: Python 常见 worker 模型:多进程 Go 常见 worker 模型:单进程 + 多 goroutine 这背后其实是两个语言在并发模型上的巨大差异。 一、进程、线程、goroutine 先分清楚 先把几个概念捋一下。 进程:操作系统分配资源的单位 线程:CPU 调度执行的基本单位

By ladydd
陕公网安备61011302002223号 | 陕ICP备2025083092号