用阿里云 text-embedding-v4 搭一个便宜好用的语义召回层
很多系统一开始都靠关键词匹配。
用户搜“车载腰靠”,数据库里有“汽车腰枕”“lumbar support pillow for car”,如果只做 LIKE 或倒排词,召回很容易断掉。Embedding 解决的是这个问题:把文本变成向量,让“意思接近”的内容在向量空间里靠近。
阿里云百炼里的 text-embedding-v4 很适合做这件事。它接入简单,兼容 OpenAI 风格接口,价格也低,适合拿来做搜索召回、RAG 知识库、商品词聚类、类目匹配、相似标题推荐。
本文只讲一件事:怎么把 text-embedding-v4 接进自己的系统。
一句话结论
如果你要给文本做语义召回,可以这样设计:
业务文本
-> 清洗/去重
-> text-embedding-v4
-> 向量库:pgvector / Milvus / Elasticsearch vector / Hologres / Lindorm
-> 查询文本也向量化
-> cosine similarity 召回候选
-> 再回业务库拿事实数据
关键原则:
- 文档、商品、关键词这类固定语料,尽量离线批量向量化。
- 用户 query 这类实时输入,再在线向量化。
- Embedding 只负责“召回候选”,不要让它直接替代业务事实判断。
- API Key 只放环境变量,永远不要写进代码、日志、博客或 Git。
也就是说,实际落地时有两条路:
实时接口:用户 query、小批量测试、调试验证
离线文件:大规模语料、全量重建、批量补向量
这两条路都走同一个模型,但工程姿态完全不同。实时接口追求低延迟;离线文件追求低成本、可重跑、可恢复。
为什么选 text-embedding-v4
阿里云官方文档推荐纯文本或代码场景使用 text-embedding-v4。它属于 Qwen3-Embedding 系列,支持多种维度,也支持中文、英文和多语言文本。
常用参数:
model: text-embedding-v4
dimensions: 1024
encoding_format: float
维度可以选:
2048, 1536, 1024, 768, 512, 256, 128, 64
实际项目里我一般先用 1024。原因很简单:质量和存储成本比较平衡,pgvector、Milvus、HNSW/IVFFlat 这些索引也都比较好处理。
如果你的场景是:
- 商品标题、关键词、搜索词召回
- 知识库片段召回
- FAQ 相似问题
- 用户意图近似匹配
- 文本聚类和主题发现
1024 维通常是一个稳妥起点。
成本真的很低
按阿里云官方文档里中国大陆的当前价格口径,text-embedding-v4 是:
同步调用:0.0005 元 / 千输入 Token
Batch 调用:0.00025 元 / 千输入 Token
换算一下:
100 万 Token 同步向量化 ≈ 0.5 元
100 万 Token Batch 向量化 ≈ 0.25 元
这就是为什么 embedding 适合做大规模离线预处理。几十万条短文本、几百万条关键词,成本通常不是主要矛盾。真正要认真设计的是:
- 文本怎么去重,避免重复花钱。
- 向量怎么分版本,方便重建和回滚。
- 查询时怎么缓存,避免同一个 query 反复调用。
- 向量召回后怎么回业务库做二次校验。
价格以阿里云官方页面为准;本文写作时查询日期是 2026-06-14。
申请和保存 API Key
开通阿里云百炼后,在控制台创建 API Key。
本地和服务端都建议只用环境变量:
export DASHSCOPE_API_KEY="<your-dashscope-api-key>"
不要这样做:
api_key = "真实 key"
也不要把 key 写进:
.py.ipynb.env公开样例- README
- 博客截图
- CI 日志
- 异常日志
如果项目里需要给别人一个示例,用占位符:
DASHSCOPE_API_KEY=replace-me
最小 Python 示例
DashScope 提供 OpenAI-compatible 接口,所以可以直接用 OpenAI Python SDK。
安装:
pip install openai
代码:
import os
from openai import OpenAI
client = OpenAI(
api_key=os.environ["DASHSCOPE_API_KEY"],
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
response = client.embeddings.create(
model="text-embedding-v4",
input=[
"车载腰靠",
"汽车腰枕",
"lumbar support pillow for car",
],
dimensions=1024,
encoding_format="float",
)
for item in response.data:
print(item.index, len(item.embedding), item.embedding[:5])
你会得到一个 1024 维的 float 数组。这个数组就是后续写入向量库的内容。
curl 示例
不想先装 SDK,也可以直接调 HTTP:
curl --location 'https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings' \
--header "Authorization: Bearer ${DASHSCOPE_API_KEY}" \
--header 'Content-Type: application/json' \
--data '{
"model": "text-embedding-v4",
"input": [
"车载腰靠",
"汽车腰枕",
"lumbar support pillow for car"
],
"dimensions": 1024,
"encoding_format": "float"
}'
注意这里没有把 key 写死在命令里,而是读取环境变量。
批量调用限制
text-embedding-v4 的常用限制:
单次最多 10 条文本
每条最长 8192 Token
所以批量处理时,不要把几万条文本一次塞进一个请求。更稳的做法是每 10 条一组:
from collections.abc import Iterable
def chunks(items: list[str], size: int) -> Iterable[list[str]]:
for start in range(0, len(items), size):
yield items[start : start + size]
def embed_texts(texts: list[str]) -> list[list[float]]:
vectors: list[list[float]] = []
for batch in chunks(texts, 10):
response = client.embeddings.create(
model="text-embedding-v4",
input=batch,
dimensions=1024,
encoding_format="float",
)
ordered = sorted(response.data, key=lambda item: item.index)
vectors.extend([item.embedding for item in ordered])
return vectors
生产环境里再加上:
- 超时
- 重试
- 限流
- 失败队列
- 请求日志脱敏
- 返回向量维度校验
文本先去重,再花钱
Embedding 是按输入 Token 计费。哪怕很便宜,也没必要重复向量化同一段文本。
建议先做 canonical text:
import hashlib
import re
WHITESPACE_RE = re.compile(r"\s+")
def canonical_text(text: str) -> str:
return WHITESPACE_RE.sub(" ", text.strip()).casefold()
def text_hash(text: str) -> str:
return hashlib.sha256(canonical_text(text).encode("utf-8")).hexdigest()
然后用 text_hash 去重:
seen: set[str] = set()
deduped: list[dict[str, str]] = []
for row in rows:
text = row["text"]
digest = text_hash(text)
if digest in seen:
continue
seen.add(digest)
deduped.append({"text_hash": digest, "text": text})
这样做有三个好处:
- 少花钱。
- 少存向量。
- 后续重建索引时可以用
text_hash做稳定主键。
推荐架构:离线语料 + 在线 query
不要每次用户搜索时都把整个库重新向量化。
正确姿势是:
离线阶段:
商品标题 / 关键词 / 文档片段
-> 去重
-> 批量 embedding
-> 写向量库
在线阶段:
用户 query
-> embedding 一次
-> 向量库 topK
-> 回业务库拿真实指标
-> 排序/过滤/解释
这套架构成本低、延迟可控,也方便回滚。
pgvector 落库示例
如果你用 Postgres,可以装 pgvector。
建表:
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE semantic_text_vectors (
index_version text NOT NULL,
text_hash text NOT NULL,
text text NOT NULL,
embedding vector(1024) NOT NULL,
source text,
payload_json jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (index_version, text_hash)
);
写入时保存三个东西:
text_hash:稳定去重主键。embedding:1024维向量。payload_json:业务侧需要回查的字段,比如商品 ID、类目、来源表、数据日期。
查询相似文本:
SELECT
text_hash,
text,
1 - (embedding <=> $1::vector) AS score,
payload_json
FROM semantic_text_vectors
WHERE index_version = 'text_embedding_v4_1024'
ORDER BY embedding <=> $1::vector
LIMIT 20;
<=> 是 cosine distance。1 - distance 可以粗略当成相似分。
索引可以先用 IVFFlat:
CREATE INDEX idx_semantic_text_vectors_ivfflat
ON semantic_text_vectors
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 2000)
WHERE index_version = 'text_embedding_v4_1024';
查询前可以调 probes:
SET ivfflat.probes = 20;
数据量很小时,不建 ANN 索引也能跑;数据上百万以后,索引和分区策略就要认真设计。
离线 Batch 文件这条路
大语料不要靠实时接口一条条打。阿里云百炼支持 OpenAI-compatible Batch File API,流程是:
准备 JSONL 请求文件
-> 上传文件,拿 upload_file_id
-> 创建 Batch 任务,拿 batch_id
-> 轮询 batch_id 状态
-> completed 后拿 output_file_id
-> 下载 output_*.jsonl
-> 用 map 文件把结果映射回 text_hash
-> 写向量库
官方这条 Batch 路线的费用是实时调用的 50%,所以大规模语料基本应该走它。
我们真实跑过的一次结构大概是:
model: text-embedding-v4
dimensions: 1024
batch_count: 71
total_requests: 352,528
total_texts: 3,525,271
每个请求最多 10 条文本
每个请求文件最多 5,000 行请求
也就是一个 requests_000001.jsonl 通常装 5,000 个请求、约 50,000 条文本。每个请求文件旁边都有一个同编号的 map 文件。
这次真实耗时也要写清楚。下面时间按北京时间展示:
2026-06-05 15:29:51 第一批 batch 提交
2026-06-05 15:31:03 最后一批 batch 提交
2026-06-05 16:06:35 第一批云上完成
2026-06-05 16:51:04 最后一批云上完成
2026-06-05 16:07:35 第一份 output 文件下载完成
2026-06-05 17:01:45 最后一份 output 文件下载完成
换成更直观的口径:
71 个 batch 全部提交完成:约 1 分 12 秒
云上第一批跑完:从第一批提交算,约 36 分 44 秒
云上全部跑完:从第一批提交算,约 1 小时 21 分 13 秒
全部 output 下载完成:从第一批提交算,约 1 小时 31 分 54 秒
按单个 batch 看,云上运行时间大概是:
最快:36 分 44 秒
平均:59 分 37 秒
最慢:1 小时 20 分 01 秒
下载不是主要瓶颈。真实等待的大头是阿里云 Batch 在云上处理那一段,下载只是最后把已经生成好的 output_file_id 拉回本地。我们这次单个 batch 从云上完成到本地下载完成,最快约 40 秒,平均约 3 分钟,最慢约 11 分 11 秒。
1. 准备请求文件
先把业务文本导出成统一输入:
text_hash
text
text_char_len
first_source
first_entity_type
first_entity_key
再按 10 条文本一组生成 Batch 请求文件:
{"custom_id":"emb-000000001","method":"POST","url":"/v1/embeddings","body":{"model":"text-embedding-v4","input":["车载腰靠","汽车腰枕","lumbar support pillow for car"],"dimensions":1024,"encoding_format":"float"}}
注意几点:
url写/v1/embeddings。body.model写text-embedding-v4。dimensions和后面的向量库表结构保持一致。custom_id必须稳定,后面下载结果要靠它对齐。- 每行是一个完整 JSON,不是一个 JSON 数组。
同时生成一个 map 文件:
{"custom_id":"emb-000000001","request_index":1,"batch_index":1,"items":[{"response_index":0,"text_global_index":1,"text_hash":"<sha256>","text_char_len":4,"first_source":"keyword","first_entity_type":"keyword","first_entity_key":"车载腰靠"},{"response_index":1,"text_global_index":2,"text_hash":"<sha256>","text_char_len":4,"first_source":"keyword","first_entity_type":"keyword","first_entity_key":"汽车腰枕"}]}
这个 map 文件是离线 Batch 的保险绳。下载回来的结果只保证有 custom_id 和 data[].index,你需要用:
custom_id + response_index -> text_hash
把向量精确挂回自己的文本和业务主键。
2. 本地目录结构
推荐这样放:
/data/semantic_batches/text_embedding_v4_1024/
manifest.json
batch_state.json
requests_000001.jsonl
requests_000001.map.jsonl
requests_000002.jsonl
requests_000002.map.jsonl
/data/semantic_vectors/text_embedding_v4_1024/
output_000001.jsonl
output_000002.jsonl
error_000002.jsonl
manifest.json 记录本地生成物:
{
"provider": "dashscope",
"api_mode": "openai_compatible_batch",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"endpoint": "/v1/embeddings",
"model": "text-embedding-v4",
"dimensions": 1024,
"batch_count": 71,
"total_requests": 352528,
"total_texts": 3525271
}
batch_state.json 记录远程任务状态。真实 batch_id、upload_file_id、output_file_id 不要写进博客或公开仓库,下面是脱敏结构:
{
"provider": "dashscope",
"api_mode": "openai_compatible_batch",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"model": "text-embedding-v4",
"dimensions": 1024,
"batches": [
{
"batch_index": 1,
"request_file": "/data/semantic_batches/text_embedding_v4_1024/requests_000001.jsonl",
"map_file": "/data/semantic_batches/text_embedding_v4_1024/requests_000001.map.jsonl",
"request_count": 5000,
"text_count": 50000,
"upload_file_id": "<file-batch-redacted>",
"batch_id": "<batch-redacted>",
"status": "completed",
"output_file_id": "<file-output-redacted>",
"output_path": "/data/semantic_vectors/text_embedding_v4_1024/output_000001.jsonl"
}
]
}
3. 提交 Batch 任务
下面这个脚本做三件事:
- 读取
manifest.json。 - 逐个上传
requests_*.jsonl。 - 创建 Batch 任务,并把远程 ID 写入
batch_state.json。
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import hashlib
import json
import os
import time
from pathlib import Path
from typing import Any
from openai import OpenAI
def sha256_path(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def save_json(path: Path, payload: dict[str, Any]) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
tmp.replace(path)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--batch-dir", type=Path, required=True)
parser.add_argument("--base-url", default="https://dashscope.aliyuncs.com/compatible-mode/v1")
parser.add_argument("--completion-window", default="24h")
parser.add_argument("--sleep-seconds", type=float, default=0.2)
parser.add_argument("--resume", action="store_true")
return parser.parse_args()
def main() -> None:
args = parse_args()
api_key = os.environ.get("DASHSCOPE_API_KEY")
if not api_key:
raise SystemExit("DASHSCOPE_API_KEY is required")
manifest_path = args.batch_dir / "manifest.json"
state_path = args.batch_dir / "batch_state.json"
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
if args.resume and state_path.exists():
state = json.loads(state_path.read_text(encoding="utf-8"))
done_indexes = {int(row["batch_index"]) for row in state.get("batches", []) if row.get("batch_id")}
else:
state = {
"version": 1,
"provider": "dashscope",
"api_mode": "openai_compatible_batch",
"base_url": args.base_url,
"model": manifest["model"],
"dimensions": manifest["dimensions"],
"created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"batches": [],
}
done_indexes = set()
client = OpenAI(api_key=api_key, base_url=args.base_url, timeout=300, max_retries=3)
for batch_meta in manifest["batches"]:
batch_index = int(batch_meta["batch_index"])
if batch_index in done_indexes:
continue
request_path = Path(batch_meta["request_file"]["path"])
map_path = Path(batch_meta["map_file"]["path"])
with request_path.open("rb") as handle:
uploaded = client.files.create(file=handle, purpose="batch")
batch = client.batches.create(
input_file_id=uploaded.id,
endpoint="/v1/embeddings",
completion_window=args.completion_window,
metadata={
"ds_name": f"text-embedding-v4-{batch_index:06d}",
"batch_index": str(batch_index),
},
)
entry = {
"batch_index": batch_index,
"request_file": str(request_path),
"map_file": str(map_path),
"request_sha256": sha256_path(request_path),
"request_count": batch_meta["request_count"],
"text_count": batch_meta["text_count"],
"upload_file_id": uploaded.id,
"uploaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"batch_id": batch.id,
"submitted_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"status": getattr(batch, "status", "submitted"),
"output_file_id": None,
"error_file_id": None,
"output_path": None,
}
state["batches"].append(entry)
save_json(state_path, state)
print(json.dumps({"batch_index": batch_index, "batch_id": batch.id, "status": entry["status"]}, ensure_ascii=False), flush=True)
time.sleep(args.sleep_seconds)
if __name__ == "__main__":
main()
运行:
export DASHSCOPE_API_KEY="<your-dashscope-api-key>"
python3 submit_dashscope_embedding_batches.py \
--batch-dir /data/semantic_batches/text_embedding_v4_1024 \
--completion-window 24h \
--resume
注意:batch_state.json 里会有远程任务 ID。它可以留在内网机器上,但不要提交到公开仓库。
4. 轮询等待并下载结果
提交完成后,不要人肉刷新控制台。写一个 poller:读取 batch_state.json,循环查每个 batch_id。
状态流转一般是:
validating -> in_progress -> finalizing -> completed
也可能失败:
failed / expired / cancelled
轮询逻辑:
from collections import Counter
from pathlib import Path
import hashlib
import json
import os
import time
from openai import OpenAI
TERMINAL_STATUSES = {"completed", "failed", "expired", "cancelled", "canceled"}
def sha256_path(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def save_state(path: Path, state: dict) -> None:
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
tmp.replace(path)
def download_file(client: OpenAI, file_id: str, path: Path) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
content = client.files.content(file_id)
content.write_to_file(tmp)
tmp.replace(path)
def poll_once(client: OpenAI, state_path: Path, output_dir: Path) -> dict:
state = json.loads(state_path.read_text(encoding="utf-8"))
output_dir.mkdir(parents=True, exist_ok=True)
counts: Counter[str] = Counter()
now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
for entry in state["batches"]:
batch_index = int(entry["batch_index"])
batch = client.batches.retrieve(entry["batch_id"])
status = getattr(batch, "status", None) or "unknown"
output_file_id = getattr(batch, "output_file_id", None)
error_file_id = getattr(batch, "error_file_id", None)
entry["status"] = status
entry["last_checked_at"] = now
entry["output_file_id"] = output_file_id
entry["error_file_id"] = error_file_id
counts[status] += 1
if status == "completed" and output_file_id and not entry.get("output_path"):
out_path = output_dir / f"output_{batch_index:06d}.jsonl"
download_file(client, output_file_id, out_path)
entry["output_path"] = str(out_path)
entry["output_bytes"] = out_path.stat().st_size
entry["output_sha256"] = sha256_path(out_path)
entry["output_downloaded_at"] = now
save_state(state_path, state)
if error_file_id and not entry.get("error_path"):
err_path = output_dir / f"error_{batch_index:06d}.jsonl"
download_file(client, error_file_id, err_path)
entry["error_path"] = str(err_path)
entry["error_bytes"] = err_path.stat().st_size
entry["error_sha256"] = sha256_path(err_path)
save_state(state_path, state)
save_state(state_path, state)
return {"counts": dict(counts)}
完整命令可以这样跑:
python3 poll_dashscope_embedding_batches.py \
--batch-dir /data/semantic_batches/text_embedding_v4_1024 \
--output-dir /data/semantic_vectors/text_embedding_v4_1024 \
--interval-seconds 60
我们本地真实跑法里,poller 每轮会打印类似:
{"checked_at":"2026-06-05T09:00:07Z","counts":{"completed":71},"downloaded":71,"newly_downloaded":0}
5. 下载结果长什么样
output_000001.jsonl 也是 JSONL。每行对应一个 custom_id:
{"custom_id":"emb-000000001","response":{"status_code":200,"body":{"data":[{"index":0,"embedding":[0.0123,-0.0456,0.0789]},{"index":1,"embedding":[0.0234,-0.0567,0.0891]}],"usage":{"prompt_tokens":67,"total_tokens":67},"model":"text-embedding-v4"}},"error":null}
真实向量会有 1024 个 float。博客里不要贴完整向量,没意义,也很占篇幅。
入库时按这个关系恢复:
output row.custom_id
-> requests_000001.map.jsonl 里同 custom_id
-> data[].index 对应 map.items[].response_index
-> 拿到 text_hash / entity_key / source
-> 校验 embedding 维度
-> upsert 到向量库
核心解析逻辑:
for row in read_jsonl(output_path):
custom_id = row["custom_id"]
body = row["response"]["body"]
data = body["data"]
map_items = mapping[custom_id]
items_by_index = {item["response_index"]: item for item in map_items}
for embedding_item in data:
response_index = embedding_item["index"]
map_item = items_by_index[response_index]
vector = embedding_item["embedding"]
if len(vector) != 1024:
raise ValueError(f"bad vector dimension: {len(vector)}")
upsert_vector(
text_hash=map_item["text_hash"],
vector=vector,
payload=map_item,
)
6. 失败和续跑
这条离线路径一定要支持续跑。
建议规则:
batch_state.json每提交成功一个 batch 就立刻落盘。- 下载 output 也要先写
.tmp,完成后再 rename。 failed的 batch 不要自动覆盖,先看错误文件。completed且已有output_path的 batch 跳过。- 入库时按
index_version + text_hash做 upsert。
这样即使中间断网、进程挂掉、机器重启,也不会从头再来。
在线 query 要缓存
用户搜索词会重复出现。比如:
car seat cushion
seat cushion for car
lumbar support pillow
这些 query 不应该每次都实时调用 embedding。
可以做一个简单缓存:
cache_key = sha256(model + dimensions + normalized_query)
ttl = 7 天或 30 天
value = query embedding / topK result
如果你的业务数据更新频繁,缓存 topK 结果时要把 index_version 放进 cache key:
text-embedding-v4:1024:semantic_index_20260614:<query_hash>
这样换索引版本时,旧缓存不会污染新结果。
版本管理
Embedding 一旦落库,就要有版本概念。
建议把版本写成:
text_embedding_v4_1024_20260614
至少包含:
- 模型名:
text-embedding-v4 - 维度:
1024 - 构建日期或构建号
为什么要这么做?
因为下面任何变化都会让向量不可混用:
- 模型从 v3 换到 v4。
- 维度从 1024 换到 512。
- 文本清洗规则变了。
- 语料范围变了。
向量库里保留 index_version,服务端通过环境变量切换当前版本,是最简单的回滚方案。
不要让 embedding 直接做最终判断
Embedding 召回的是“语义上可能接近”的候选,不是事实结论。
比如用户搜:
insulated cup
向量召回可能返回:
stanley cup
tumbler with lid
travel mug
coffee thermos
这些候选很有用,但最终还要回业务库查:
- 真实搜索量
- 价格
- 评论数
- 类目风险
- 品牌/IP 风险
- 库存或上架状态
一个稳的系统应该是:
embedding 负责召回
业务数据库负责判断
规则和证据负责解释
常见坑
1. 把 key 写进代码
不要。
只用:
export DASHSCOPE_API_KEY="<your-dashscope-api-key>"
代码里只读:
os.environ["DASHSCOPE_API_KEY"]
2. 忘记校验维度
如果表是 vector(1024),返回向量就必须是 1024 维。
if len(vector) != 1024:
raise ValueError(f"bad dimension: {len(vector)}")
3. 不做文本去重
相同文本重复向量化,浪费钱,也浪费存储。
4. 不保存原文和来源
只存向量不存 payload,后面没法解释结果。至少保存:
text
text_hash
source
entity_type
entity_key
as_of_date
5. 用在线 embedding 扛全量构建
在线接口适合 query,不适合几百万条语料一条条实时跑。大语料应该走离线批处理。
一个最小可用目录结构
semantic/
export_texts.py # 从业务库导出文本
prepare_embedding_batch.py # 去重并生成请求
poll_embedding_results.py # 下载结果
load_vectors.py # 写入 pgvector
search.py # 在线 query embedding + topK
数据目录:
/data/semantic_staging/current/text_embedding_input.jsonl
/data/semantic_batches/text_embedding_v4_1024/requests_000001.jsonl
/data/semantic_batches/text_embedding_v4_1024/requests_000001.map.jsonl
/data/semantic_vectors/text_embedding_v4_1024/output_000001.jsonl
这套结构的好处是每一步都能重跑:
- 导出错了,重导出。
- 请求文件错了,重生成。
- 远程结果没下完,继续 poll。
- 入库失败,按 batch 重放。
成本估算示例
假设你有 100 万条短文本,每条平均 8 个 Token:
总 Token ≈ 800 万
同步费用 ≈ 8000 × 0.0005 = 4 元
Batch 费用 ≈ 8000 × 0.00025 = 2 元
如果每条平均 20 个 Token:
总 Token ≈ 2000 万
同步费用 ≈ 20000 × 0.0005 = 10 元
Batch 费用 ≈ 20000 × 0.00025 = 5 元
这个量级已经很便宜了。很多时候,向量库机器、索引构建时间、数据清洗成本,会比 embedding 调用费更值得关注。
参考实现片段
下面是一个可以直接改造的最小封装:
import os
import time
from collections.abc import Iterable
from openai import OpenAI
MODEL = "text-embedding-v4"
DIMENSIONS = 1024
MAX_BATCH_SIZE = 10
def make_client() -> OpenAI:
return OpenAI(
api_key=os.environ["DASHSCOPE_API_KEY"],
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
timeout=30,
max_retries=2,
)
def chunks(items: list[str], size: int) -> Iterable[list[str]]:
for start in range(0, len(items), size):
yield items[start : start + size]
def embed_texts(texts: list[str]) -> list[list[float]]:
client = make_client()
vectors: list[list[float]] = []
for batch in chunks(texts, MAX_BATCH_SIZE):
response = client.embeddings.create(
model=MODEL,
input=batch,
dimensions=DIMENSIONS,
encoding_format="float",
)
ordered = sorted(response.data, key=lambda item: item.index)
for item in ordered:
vector = [float(value) for value in item.embedding]
if len(vector) != DIMENSIONS:
raise RuntimeError(f"embedding dimension mismatch: {len(vector)}")
vectors.append(vector)
time.sleep(0.05)
return vectors
这个封装没有做复杂限流,但足够表达接入方式。生产里可以把失败请求写入队列,再异步重试。
总结
text-embedding-v4 的接入成本很低,工程收益很高。
它最适合做:
- 搜索召回扩展
- 相似文本匹配
- RAG 片段召回
- 商品标题/关键词聚类
- 用户 query 到业务实体的语义桥接
我最推荐的落地方式是:
固定语料离线批量向量化
实时 query 单独向量化
向量只做召回
事实判断回业务库
API Key 永远不进代码
这样系统既便宜,又稳定,还方便解释和回滚。
参考资料
- 阿里云百炼:向量化模型说明
https://help.aliyun.com/zh/model-studio/embedding - 阿里云百炼:通用文本向量同步接口 API 详情
https://help.aliyun.com/zh/model-studio/text-embedding-synchronous-api - 阿里云百炼:OpenAI 兼容 Batch(文件输入)
https://help.aliyun.com/zh/model-studio/batch-interfaces-compatible-with-openai/ - 阿里云百炼:模型价格
https://help.aliyun.com/zh/model-studio/model-pricing
陕公网安备61011302002223号