从 Redis + Channel 到 Redis Stream:一次 Go 任务队列方案的重新理解

在讨论 PostgreSQL queue 方案之后,我又回头看了一下之前自己设想过的 Go + Redis 任务架构。

最早我脑子里的方案大概是:

POST 创建任务
Redis 存状态
Go goroutine 直接处理
进程内 channel 控制并发

这个方案很直观,也很符合 Go 的写法。

用户请求进来后,API 生成一个 task_id,把任务状态写进 Redis,然后把任务塞进 Go 的 channel,由后台 goroutine 消费执行。并发控制就靠一个固定大小的 channel 或 worker pool,比如最多 20 个 goroutine 同时执行任务。

这个设计在单进程里确实很舒服。

但当我开始思考多进程、多容器、worker 崩溃恢复、全局并发控制这些问题时,我发现它还不够完整。

更下一阶段的 Redis 方案应该是:

POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复

这篇文章就是整理这个理解过程。


一、最初方案:Redis 存状态 + Go channel 控并发

最开始的方案大概是这样:

Gin / Go API
  ↓
POST /tasks
  ↓
生成 task_id
  ↓
Redis 记录任务状态
  ↓
taskChan <- task
  ↓
goroutine worker pool 执行任务

Redis 里负责存状态:

task:{task_id}

比如:

status=pending
request={...}
result={...}
error=
created_at=...
updated_at=...

Go 进程内部有一个 channel:

taskChan := make(chan Task, 1000)

再启动固定数量的 goroutine:

for i := 0; i < 20; i++ {
    go worker(taskChan)
}

这样就可以做到:

最多 20 个任务同时执行

这个模型非常容易理解。

API 负责接任务,Redis 负责存状态,goroutine 负责执行,channel 负责进程内排队和并发控制。

对于单进程服务来说,它甚至是一个很好的方案。


二、这个方案为什么看起来很舒服?

因为它充分利用了 Go 的优势。

Go 的 goroutine 很轻,channel 又天然适合做进程内任务分发。

整个模型看起来非常清爽:

请求进来
  ↓
写 Redis
  ↓
丢给 channel
  ↓
goroutine 执行

而且并发控制也很简单。

比如我希望最多同时执行 20 个任务,那我就启动 20 个 worker goroutine。

如果任务主要是 LLM 请求、HTTP 调用、图片下载这类 I/O 型任务,Go 的 goroutine 确实很适合。

单进程下,这个方案没有明显问题。


三、真正的问题:channel 是进程内的

问题出在这里:

channel 是进程内的。

这句话非常关键。

Go channel 只能管理当前这个 Go 进程里的任务。

如果我只有一个 API 进程:

api-1:20 个 goroutine

那确实最多 20 并发。

但如果我把 API 扩成 4 个容器:

api-1:20 个 goroutine
api-2:20 个 goroutine
api-3:20 个 goroutine
api-4:20 个 goroutine

真实并发就变成了:

20 × 4 = 80

这时候所谓的“最多 20 并发”就不是全局的,而只是单进程内的。

所以 channel 控制的是:

进程内并发

不是:

全局并发

这是旧方案最大的边界。


四、任务状态也会分裂

旧方案还有另一个问题:状态容易分裂。

任务提交后,状态在 Redis 里:

Redis:
task:{id} status=pending

但任务本身可能已经进入了 Go 进程内的 channel:

Go channel:
Task{id}

任务执行中,又存在于某个 goroutine 的内存里:

goroutine:
正在处理 task_id

于是系统里出现了三个地方:

Redis 里有任务状态
channel 里有排队任务
goroutine 里有执行中任务

如果进程不挂,这当然没问题。

但如果进程挂了,channel 里的任务就没了。

Redis 里可能还显示:

status=pending

或者:

status=running

这时候就必须额外设计恢复逻辑,比如:

服务启动时扫描 Redis 里的 pending 任务
running 超时后重置为 pending
重新塞回 channel

这当然可以做。

但做到这里就会发现:我其实已经在手写一个任务队列系统了。


五、旧方案适合什么场景?

旧方案不是不好。

它适合这些场景:

单进程服务
任务不太重要
任务丢失可以接受
只需要保护当前进程不要打爆下游
状态只做短期展示
内部工具或一次性任务

比如临时爬虫、内部批处理、小型 webhook 处理、实验性服务,这个模型很爽。

但如果是一个正式的任务系统:

用户提交任务
必须返回 task_id
后续要查询状态
任务不能因为进程重启就丢
多 worker 要共同协作
全局并发必须受控
失败要可恢复

那么 Redis + channel + goroutine 就不够完整了。

它解决的是进程内并发,不是系统级任务调度。


六、下一阶段方案:Redis Stream 入队

如果继续用 Redis,但希望架构更完整,我会把任务队列从 Go channel 提升到 Redis Stream。

新的结构是:

POST 创建任务
  ↓
Redis Hash 存任务状态
  ↓
Redis Stream 入队
  ↓
Worker Consumer Group 消费
  ↓
Redis Lua 控制全局并发
  ↓
XAUTOCLAIM 做崩溃恢复

这时 channel 不再是系统级任务队列。

真正的任务队列变成:

Redis Stream

例如:

tasks:stream

提交任务时:

XADD tasks:stream * task_id abc123

Worker 使用 consumer group 消费:

XREADGROUP GROUP workers worker-1 COUNT 1 BLOCK 1000 STREAMS tasks:stream >

这时候任务不再只存在某个 Go 进程的内存里,而是进入了 Redis 的 Stream。

worker 可以是多个进程、多个容器,大家都从同一个 Stream 里取任务。

这就比 channel 更接近真正的分布式队列。


七、Redis Hash 存任务状态

任务状态仍然可以用 Redis Hash 存。

每个任务一个 key:

task:{task_id}

字段可以是:

status
task_type
request
result
error
attempts
created_at
updated_at
locked_until

例如:

HSET task:abc123 \
  status pending \
  task_type veo \
  request '{...}' \
  attempts 0 \
  created_at 1710000000

查询任务时:

HGETALL task:abc123

这样可以保持原来的接口体验:

GET /tasks/{task_id}

仍然返回:

task_id
status
result
error

也就是说,Redis Hash 负责状态,Redis Stream 负责队列。


八、Redis Stream 比 channel 强在哪里?

Redis Stream 的关键优势是:

队列不在进程内
支持多 worker 消费
支持 consumer group
支持 pending message
支持 ack
支持崩溃后的重新领取

这就比 channel 更适合跨进程任务队列。

在 Redis Stream 里,worker 取走任务后,如果没有确认完成,这条消息不会彻底消失,而是进入 Pending Entries List,也就是 PEL。

worker 完成任务后,需要:

XACK tasks:stream workers message_id

这表示:

这个任务已经处理完成。

如果 worker 崩溃,没有执行 XACK,这条消息还在 Redis 的 pending 列表里。

后续可以被其他 worker 重新 claim。

这就是它比进程内 channel 更可靠的地方。


九、全局并发控制不能靠 channel

如果使用 Redis Stream,只解决了“任务队列”的问题。

但还没解决:

全局最多同时执行 N 个任务

因为多个 worker 都可以从 Stream 里读取任务。

如果没有全局并发控制,worker 多了以后,仍然可能同时执行太多任务。

所以需要一个所有 worker 都能看到的全局 running 记录。

Redis 里可以用 Sorted Set:

tasks:running

score 放 locked_until 时间戳:

ZADD tasks:running 1710000300 abc123

这样可以做两件事:

1. 用 ZCARD 统计当前 running 数量
2. 用 ZRANGEBYSCORE 找出超时 running 任务

但这里有一个关键问题:

检查 running 数量 + 加入 running 集合

这两个动作必须是原子的。

否则多个 worker 同时执行时可能超额。


十、为什么需要 Redis Lua?

假设最大并发是 20。

如果用 Go 普通命令写:

ZCARD tasks:running
如果 count < 20:
    ZADD tasks:running locked_until task_id
    HSET task:{id} status running

就可能出现竞态。

例如:

worker A 看到 running=19
worker B 也看到 running=19
worker A 加入一个任务
worker B 也加入一个任务
最后 running=21

这就突破了全局并发上限。

Redis 单条命令是原子的,但多条命令组合起来不是自动原子的。

这时候就需要 Lua。

Redis Lua 的意思是:

把一小段 Lua 脚本发给 Redis,让 Redis 在服务器内部一次性执行多条命令。

它不是用 Lua 写整个项目。

主语言还是 Go。

Lua 只负责关键的原子逻辑。

比如:

local running_key = KEYS[1]
local task_key = KEYS[2]

local task_id = ARGV[1]
local now = tonumber(ARGV[2])
local locked_until = tonumber(ARGV[3])
local max_running = tonumber(ARGV[4])

redis.call("ZREMRANGEBYSCORE", running_key, "-inf", now)

local running_count = redis.call("ZCARD", running_key)

if running_count >= max_running then
    return 0
end

redis.call("ZADD", running_key, locked_until, task_id)
redis.call("HSET", task_key,
    "status", "running",
    "locked_until", locked_until,
    "updated_at", now
)

return 1

这段脚本做了几件事:

1. 清理过期 running 任务
2. 查询当前 running 数量
3. 判断是否达到最大并发
4. 如果没满,把任务加入 running
5. 更新任务状态为 running
6. 返回是否成功拿到执行槽

Redis 执行 Lua 脚本时,中间不会插入其他客户端命令。

所以这整段逻辑是原子的。

这就相当于 Redis 版的:

事务 + 锁

十一、Worker 消费流程

比较完整的 worker 流程应该是:

1. 从 Redis Stream 读取一条任务消息
2. 调用 Lua 脚本尝试获取全局执行槽
3. 如果没拿到槽位,稍后再试
4. 如果拿到槽位,把任务状态标记为 running
5. 执行 LLM 调用
6. 成功后写 completed + result
7. 失败后写 failed + error
8. 从 tasks:running 移除 task_id
9. XACK Stream 消息

伪代码大概是:

for {
    msg := readFromStream()

    ok := acquireSlotByLua(taskID)
    if !ok {
        sleep()
        continue
    }

    result, err := processTask(taskID)

    if err != nil {
        markFailed(taskID, err)
    } else {
        markCompleted(taskID, result)
    }

    releaseSlot(taskID)
    ackStreamMessage(msg.ID)
}

这里的重点是:

Stream 负责队列
Lua + running zset 负责全局并发
Hash 负责任务状态

这三个职责不能混在一起。


十二、worker 崩溃恢复:XAUTOCLAIM

Redis Stream 有一个重要能力:

XAUTOCLAIM

它用来处理 worker 崩溃后的 pending message。

假设 worker-1 读走了一条任务:

XREADGROUP 读取成功

但 worker-1 执行过程中挂了,没有执行:

XACK

那么这条消息会停留在 consumer group 的 pending 列表里。

其他 worker 可以用:

XAUTOCLAIM tasks:stream workers worker-2 30000 0 COUNT 10

大概含义是:

把 idle 超过 30000ms 的 pending 消息,转移给 worker-2 继续处理。

这就解决了:

worker 读走任务后崩溃,消息没人处理

的问题。

不过 Redis Stream 的恢复比 PostgreSQL queue 稍微复杂,因为你还要同时维护:

Stream pending message
task:{id} Hash 状态
tasks:running ZSET

所以恢复逻辑一般要配套处理:

1. XAUTOCLAIM 抢回长时间未 ack 的消息
2. 清理 tasks:running 中 locked_until 过期的任务
3. 根据 attempts 判断重新执行还是标记 failed
4. 保持 task:{id} 的状态一致

可以做,但一定要设计清楚。


十三、新旧方案的核心差异

旧方案:

POST 创建任务
Redis 存状态
Go goroutine 直接处理
进程内 channel 控并发

优点:

简单
开发快
单进程很好用
Go 写起来很顺

缺点:

channel 只控制当前进程
多容器后并发会放大
任务可能藏在进程内存里
进程崩溃后需要额外恢复
Redis 状态和 channel 队列是两份东西

新方案:

POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复

优点:

队列在 Redis,不在进程内
多个 worker 可以共同消费
全局并发可以通过 Lua 控制
worker 崩溃后消息可以重新 claim
更适合多进程、多容器

缺点:

实现复杂度更高
需要理解 Redis Stream / Consumer Group / PEL
需要写 Lua 保证原子性
状态一致性要自己维护
查询统计能力不如 PostgreSQL

十四、和 PostgreSQL queue 的对比

Redis Stream 方案和 PostgreSQL queue 都可以做任务系统。

但它们的气质不同。

PostgreSQL queue 更像:

任务账本

它擅长:

状态持久化
复杂查询
结果落库
失败排查
事务一致性
任务历史

Redis Stream 更像:

高速调度器

它擅长:

高吞吐
轻量队列
短期状态
快速分发
内存型消息处理

如果任务是 LLM 调用、视频生成、图片处理这种:

任务时间较长
需要 task_id 查询
需要保存 result/error
需要失败追踪
任务量不是每秒几十万

PostgreSQL queue 会更稳。

如果任务是:

极高吞吐
短任务
短期状态
对复杂查询要求不高
更偏消息流处理

Redis Stream 会很漂亮。


十五、这次我对 channel 的重新定位

这次讨论后,我对 Go channel 的定位更清楚了。

channel 很好,但它应该是:

进程内并发工具

而不是:

系统级任务队列

它适合:

当前进程内部任务分发
worker 内部 pipeline
goroutine 之间通信
局部背压

但如果要做跨进程、跨容器、可恢复的任务系统,队列最好放在所有 worker 都能访问的地方。

比如:

PostgreSQL
Redis Stream
RabbitMQ
Kafka
NATS

而不是某个进程里的 channel。

更准确的分层应该是:

channel:
    进程内协调工具

Redis Hash:
    短期任务状态

Redis Stream:
    分布式任务队列

Redis Lua:
    原子并发控制

XAUTOCLAIM:
    崩溃恢复

PostgreSQL:
    任务事实来源和长期账本

十六、如果真的用 Go + Redis,我会怎么设计

如果要做一个相对优雅的 Go + Redis 版本,我会这样拆:

cmd/api
cmd/worker

internal/taskstore
internal/queue
internal/limiter
internal/processor
internal/llm

各模块职责:

taskstore:
    读写 task:{id}
    markPending
    markRunning
    markCompleted
    markFailed

queue:
    XADD
    XREADGROUP
    XACK
    XAUTOCLAIM

limiter:
    Lua acquireSlot
    releaseSlot
    heartbeat
    cleanupExpiredRunning

processor:
    根据 task_type 调用不同任务逻辑

llm:
    封装外部 LLM 请求

Redis key 设计:

task:{task_id}      Hash,存任务状态
tasks:stream        Stream,存任务消息
tasks:running       ZSET,存 running 任务和过期时间

这套就比 Redis + channel 更像一个完整任务系统。


十七、最后的理解

我现在对这两套 Go + Redis 方案的理解是:

旧方案:

POST 创建任务
Redis 存状态
Go goroutine 直接处理
进程内 channel 控并发

它是一个很好的单进程异步模型。

新方案:

POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复

它才更像一个可扩展的分布式任务队列模型。

最核心的区别不在于用了不用 Redis,而在于:

队列到底在哪里?
并发控制到底在哪里?
任务恢复到底靠什么?

旧方案的答案是:

队列在进程内 channel
并发控制在进程内 channel
恢复需要额外扫描 Redis

新方案的答案是:

队列在 Redis Stream
并发控制在 Redis Lua + running zset
恢复靠 Redis Stream pending + XAUTOCLAIM

这就是层级上的差别。


结语

Go 的 goroutine 和 channel 很强,但它们首先是进程内并发工具。

如果只是单进程、轻量任务、临时系统,用:

Redis 存状态 + channel 控并发 + goroutine 执行

完全可以,而且写起来很爽。

但如果目标是多 worker、多容器、全局并发、崩溃恢复,那么更完整的 Redis 方案应该升级为:

Redis Stream + Consumer Group + Lua + XAUTOCLAIM

也就是:

POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复

这套方案比旧方案复杂,但边界更清楚。

而这次真正的收获是:

channel 不是不能用,而是不能把它当成分布式任务队列。它应该留在进程内部;系统级队列和全局并发控制,应该放在所有 worker 都能共同访问的地方。

Read more

传统 SaaS 转向 AI 时代,我目前的一点理解:先把数据能力变成 Agent 可调用的基础设施

最近我一直在思考一个问题:传统 SaaS 到底应该怎么转向 AI? 一开始很容易想到的方向是:给原来的系统加一个 AI 助手。 比如在页面右下角放一个聊天框,让用户可以问数据、生成报告、总结内容、解释指标。这个当然有价值,但我现在越来越觉得,这只是比较表层的一种转型。 真正的变化,可能不是“在 SaaS 里面加 AI”,而是 SaaS 本身的能力形态发生变化。 过去的 SaaS,核心是给人使用。 人登录系统,看页面、点按钮、筛选数据、导出报表、判断问题,然后再去做决策。数据库是给 Web 页面供数的,后端 API 是给前端页面服务的,整个产品的中心是“人如何操作软件”。 但 AI 时代,尤其是 Agent 逐渐发展之后,

By ladydd

对 Python 应用场景的一次重新思考:FastAPI、协程、线程、数据库与任务系统边界

最近在重新设计一个任务系统时,我顺便把自己对 Python,尤其是 CPython 应用场景的理解重新梳理了一遍。 这次讨论的背景是一个典型的异步任务服务: 上游提交任务 API 立即返回 task_id 后台 worker 慢慢执行 用户通过 task_id 查询任务状态 任务主要是 LLM 调用、图片下载、外部 HTTP 请求这类 I/O 型工作。 一开始关注的是队列、Redis、PostgreSQL、worker 并发控制这些问题。但聊到后面,其实更核心的问题变成了: Python 到底应该放在什么位置? 哪些并发适合 Python? 哪些并发不要硬塞给 Python? FastAPI、协程、线程、数据库之间应该怎么分工? 这篇文章就是这次思考的整理。 一、我不想抛弃 Python,

By ladydd

Go 和 Python 的并发模型对比:进程、线程、协程、并发和并行到底怎么理解?

最近我在写 worker 任务系统的时候,重新理解了一遍 Python 和 Go 的并发差异。 以前写 Python,多 worker 经常要考虑: 多进程怎么管理? 日志会不会串? 一个 worker 崩了怎么办? 怎么吃满多核心? 后来换成 Go,发现一个进程里开多个 goroutine worker 就很自然: go worker(1) go worker(2) go worker(3) go worker(4) 日志也好管,状态也好管,而且单进程还能利用多个 CPU 核心。 一开始很容易误会成: Python 不行,Go 行 但更准确的理解应该是: Python 和

By ladydd

Python 进程和 Go 进程的区别:为什么 Go 单进程多 worker 用起来更爽?

最近我在做 worker 任务系统的时候,突然意识到一个很关键的问题: 以前写 Python,多 worker 的时候经常要小心日志串、文件切割乱、时间不好管理。 但是换成 Go 以后,一个进程里开多个 goroutine worker,反而可以比较自然地写到同一个日志文件里。 一开始我以为这是“Python 和 Go 写日志能力不一样”,后来想明白了,核心不是日志本身,而是: Python 常见 worker 模型:多进程 Go 常见 worker 模型:单进程 + 多 goroutine 这背后其实是两个语言在并发模型上的巨大差异。 一、进程、线程、goroutine 先分清楚 先把几个概念捋一下。 进程:操作系统分配资源的单位 线程:CPU 调度执行的基本单位

By ladydd
陕公网安备61011302002223号 | 陕ICP备2025083092号