用阿里云 text-embedding-v4 搭一个便宜好用的语义召回层

很多系统一开始都靠关键词匹配。

用户搜“车载腰靠”,数据库里有“汽车腰枕”“lumbar support pillow for car”,如果只做 LIKE 或倒排词,召回很容易断掉。Embedding 解决的是这个问题:把文本变成向量,让“意思接近”的内容在向量空间里靠近。

阿里云百炼里的 text-embedding-v4 很适合做这件事。它接入简单,兼容 OpenAI 风格接口,价格也低,适合拿来做搜索召回、RAG 知识库、商品词聚类、类目匹配、相似标题推荐。

本文只讲一件事:怎么把 text-embedding-v4 接进自己的系统。

一句话结论

如果你要给文本做语义召回,可以这样设计:

业务文本
  -> 清洗/去重
  -> text-embedding-v4
  -> 向量库:pgvector / Milvus / Elasticsearch vector / Hologres / Lindorm
  -> 查询文本也向量化
  -> cosine similarity 召回候选
  -> 再回业务库拿事实数据

关键原则:

  • 文档、商品、关键词这类固定语料,尽量离线批量向量化。
  • 用户 query 这类实时输入,再在线向量化。
  • Embedding 只负责“召回候选”,不要让它直接替代业务事实判断。
  • API Key 只放环境变量,永远不要写进代码、日志、博客或 Git。

也就是说,实际落地时有两条路:

实时接口:用户 query、小批量测试、调试验证
离线文件:大规模语料、全量重建、批量补向量

这两条路都走同一个模型,但工程姿态完全不同。实时接口追求低延迟;离线文件追求低成本、可重跑、可恢复。

为什么选 text-embedding-v4

阿里云官方文档推荐纯文本或代码场景使用 text-embedding-v4。它属于 Qwen3-Embedding 系列,支持多种维度,也支持中文、英文和多语言文本。

常用参数:

model: text-embedding-v4
dimensions: 1024
encoding_format: float

维度可以选:

2048, 1536, 1024, 768, 512, 256, 128, 64

实际项目里我一般先用 1024。原因很简单:质量和存储成本比较平衡,pgvector、Milvus、HNSW/IVFFlat 这些索引也都比较好处理。

如果你的场景是:

  • 商品标题、关键词、搜索词召回
  • 知识库片段召回
  • FAQ 相似问题
  • 用户意图近似匹配
  • 文本聚类和主题发现

1024 维通常是一个稳妥起点。

成本真的很低

按阿里云官方文档里中国大陆的当前价格口径,text-embedding-v4 是:

同步调用:0.0005 元 / 千输入 Token
Batch 调用:0.00025 元 / 千输入 Token

换算一下:

100 万 Token 同步向量化 ≈ 0.5 元
100 万 Token Batch 向量化 ≈ 0.25 元

这就是为什么 embedding 适合做大规模离线预处理。几十万条短文本、几百万条关键词,成本通常不是主要矛盾。真正要认真设计的是:

  • 文本怎么去重,避免重复花钱。
  • 向量怎么分版本,方便重建和回滚。
  • 查询时怎么缓存,避免同一个 query 反复调用。
  • 向量召回后怎么回业务库做二次校验。

价格以阿里云官方页面为准;本文写作时查询日期是 2026-06-14。

申请和保存 API Key

开通阿里云百炼后,在控制台创建 API Key。

本地和服务端都建议只用环境变量:

export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

不要这样做:

api_key = "真实 key"

也不要把 key 写进:

  • .py
  • .ipynb
  • .env 公开样例
  • README
  • 博客截图
  • CI 日志
  • 异常日志

如果项目里需要给别人一个示例,用占位符:

DASHSCOPE_API_KEY=replace-me

最小 Python 示例

DashScope 提供 OpenAI-compatible 接口,所以可以直接用 OpenAI Python SDK。

安装:

pip install openai

代码:

import os
from openai import OpenAI


client = OpenAI(
    api_key=os.environ["DASHSCOPE_API_KEY"],
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

response = client.embeddings.create(
    model="text-embedding-v4",
    input=[
        "车载腰靠",
        "汽车腰枕",
        "lumbar support pillow for car",
    ],
    dimensions=1024,
    encoding_format="float",
)

for item in response.data:
    print(item.index, len(item.embedding), item.embedding[:5])

你会得到一个 1024 维的 float 数组。这个数组就是后续写入向量库的内容。

curl 示例

不想先装 SDK,也可以直接调 HTTP:

curl --location 'https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings' \
  --header "Authorization: Bearer ${DASHSCOPE_API_KEY}" \
  --header 'Content-Type: application/json' \
  --data '{
    "model": "text-embedding-v4",
    "input": [
      "车载腰靠",
      "汽车腰枕",
      "lumbar support pillow for car"
    ],
    "dimensions": 1024,
    "encoding_format": "float"
  }'

注意这里没有把 key 写死在命令里,而是读取环境变量。

批量调用限制

text-embedding-v4 的常用限制:

单次最多 10 条文本
每条最长 8192 Token

所以批量处理时,不要把几万条文本一次塞进一个请求。更稳的做法是每 10 条一组:

from collections.abc import Iterable


def chunks(items: list[str], size: int) -> Iterable[list[str]]:
    for start in range(0, len(items), size):
        yield items[start : start + size]


def embed_texts(texts: list[str]) -> list[list[float]]:
    vectors: list[list[float]] = []
    for batch in chunks(texts, 10):
        response = client.embeddings.create(
            model="text-embedding-v4",
            input=batch,
            dimensions=1024,
            encoding_format="float",
        )
        ordered = sorted(response.data, key=lambda item: item.index)
        vectors.extend([item.embedding for item in ordered])
    return vectors

生产环境里再加上:

  • 超时
  • 重试
  • 限流
  • 失败队列
  • 请求日志脱敏
  • 返回向量维度校验

文本先去重,再花钱

Embedding 是按输入 Token 计费。哪怕很便宜,也没必要重复向量化同一段文本。

建议先做 canonical text:

import hashlib
import re


WHITESPACE_RE = re.compile(r"\s+")


def canonical_text(text: str) -> str:
    return WHITESPACE_RE.sub(" ", text.strip()).casefold()


def text_hash(text: str) -> str:
    return hashlib.sha256(canonical_text(text).encode("utf-8")).hexdigest()

然后用 text_hash 去重:

seen: set[str] = set()
deduped: list[dict[str, str]] = []

for row in rows:
    text = row["text"]
    digest = text_hash(text)
    if digest in seen:
        continue
    seen.add(digest)
    deduped.append({"text_hash": digest, "text": text})

这样做有三个好处:

  • 少花钱。
  • 少存向量。
  • 后续重建索引时可以用 text_hash 做稳定主键。

推荐架构:离线语料 + 在线 query

不要每次用户搜索时都把整个库重新向量化。

正确姿势是:

离线阶段:
  商品标题 / 关键词 / 文档片段
  -> 去重
  -> 批量 embedding
  -> 写向量库

在线阶段:
  用户 query
  -> embedding 一次
  -> 向量库 topK
  -> 回业务库拿真实指标
  -> 排序/过滤/解释

这套架构成本低、延迟可控,也方便回滚。

pgvector 落库示例

如果你用 Postgres,可以装 pgvector。

建表:

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE semantic_text_vectors (
    index_version text NOT NULL,
    text_hash text NOT NULL,
    text text NOT NULL,
    embedding vector(1024) NOT NULL,
    source text,
    payload_json jsonb NOT NULL DEFAULT '{}'::jsonb,
    created_at timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (index_version, text_hash)
);

写入时保存三个东西:

  • text_hash:稳定去重主键。
  • embedding1024 维向量。
  • payload_json:业务侧需要回查的字段,比如商品 ID、类目、来源表、数据日期。

查询相似文本:

SELECT
    text_hash,
    text,
    1 - (embedding <=> $1::vector) AS score,
    payload_json
FROM semantic_text_vectors
WHERE index_version = 'text_embedding_v4_1024'
ORDER BY embedding <=> $1::vector
LIMIT 20;

<=> 是 cosine distance。1 - distance 可以粗略当成相似分。

索引可以先用 IVFFlat:

CREATE INDEX idx_semantic_text_vectors_ivfflat
ON semantic_text_vectors
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 2000)
WHERE index_version = 'text_embedding_v4_1024';

查询前可以调 probes:

SET ivfflat.probes = 20;

数据量很小时,不建 ANN 索引也能跑;数据上百万以后,索引和分区策略就要认真设计。

离线 Batch 文件这条路

大语料不要靠实时接口一条条打。阿里云百炼支持 OpenAI-compatible Batch File API,流程是:

准备 JSONL 请求文件
  -> 上传文件,拿 upload_file_id
  -> 创建 Batch 任务,拿 batch_id
  -> 轮询 batch_id 状态
  -> completed 后拿 output_file_id
  -> 下载 output_*.jsonl
  -> 用 map 文件把结果映射回 text_hash
  -> 写向量库

官方这条 Batch 路线的费用是实时调用的 50%,所以大规模语料基本应该走它。

我们真实跑过的一次结构大概是:

model: text-embedding-v4
dimensions: 1024
batch_count: 71
total_requests: 352,528
total_texts: 3,525,271
每个请求最多 10 条文本
每个请求文件最多 5,000 行请求

也就是一个 requests_000001.jsonl 通常装 5,000 个请求、约 50,000 条文本。每个请求文件旁边都有一个同编号的 map 文件。

这次真实耗时也要写清楚。下面时间按北京时间展示:

2026-06-05 15:29:51  第一批 batch 提交
2026-06-05 15:31:03  最后一批 batch 提交
2026-06-05 16:06:35  第一批云上完成
2026-06-05 16:51:04  最后一批云上完成
2026-06-05 16:07:35  第一份 output 文件下载完成
2026-06-05 17:01:45  最后一份 output 文件下载完成

换成更直观的口径:

71 个 batch 全部提交完成:约 1 分 12 秒
云上第一批跑完:从第一批提交算,约 36 分 44 秒
云上全部跑完:从第一批提交算,约 1 小时 21 分 13 秒
全部 output 下载完成:从第一批提交算,约 1 小时 31 分 54 秒

按单个 batch 看,云上运行时间大概是:

最快:36 分 44 秒
平均:59 分 37 秒
最慢:1 小时 20 分 01 秒

下载不是主要瓶颈。真实等待的大头是阿里云 Batch 在云上处理那一段,下载只是最后把已经生成好的 output_file_id 拉回本地。我们这次单个 batch 从云上完成到本地下载完成,最快约 40 秒,平均约 3 分钟,最慢约 11 分 11 秒。

1. 准备请求文件

先把业务文本导出成统一输入:

text_hash
text
text_char_len
first_source
first_entity_type
first_entity_key

再按 10 条文本一组生成 Batch 请求文件:

{"custom_id":"emb-000000001","method":"POST","url":"/v1/embeddings","body":{"model":"text-embedding-v4","input":["车载腰靠","汽车腰枕","lumbar support pillow for car"],"dimensions":1024,"encoding_format":"float"}}

注意几点:

  • url/v1/embeddings
  • body.modeltext-embedding-v4
  • dimensions 和后面的向量库表结构保持一致。
  • custom_id 必须稳定,后面下载结果要靠它对齐。
  • 每行是一个完整 JSON,不是一个 JSON 数组。

同时生成一个 map 文件:

{"custom_id":"emb-000000001","request_index":1,"batch_index":1,"items":[{"response_index":0,"text_global_index":1,"text_hash":"<sha256>","text_char_len":4,"first_source":"keyword","first_entity_type":"keyword","first_entity_key":"车载腰靠"},{"response_index":1,"text_global_index":2,"text_hash":"<sha256>","text_char_len":4,"first_source":"keyword","first_entity_type":"keyword","first_entity_key":"汽车腰枕"}]}

这个 map 文件是离线 Batch 的保险绳。下载回来的结果只保证有 custom_iddata[].index,你需要用:

custom_id + response_index -> text_hash

把向量精确挂回自己的文本和业务主键。

2. 本地目录结构

推荐这样放:

/data/semantic_batches/text_embedding_v4_1024/
  manifest.json
  batch_state.json
  requests_000001.jsonl
  requests_000001.map.jsonl
  requests_000002.jsonl
  requests_000002.map.jsonl

/data/semantic_vectors/text_embedding_v4_1024/
  output_000001.jsonl
  output_000002.jsonl
  error_000002.jsonl

manifest.json 记录本地生成物:

{
  "provider": "dashscope",
  "api_mode": "openai_compatible_batch",
  "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
  "endpoint": "/v1/embeddings",
  "model": "text-embedding-v4",
  "dimensions": 1024,
  "batch_count": 71,
  "total_requests": 352528,
  "total_texts": 3525271
}

batch_state.json 记录远程任务状态。真实 batch_idupload_file_idoutput_file_id 不要写进博客或公开仓库,下面是脱敏结构:

{
  "provider": "dashscope",
  "api_mode": "openai_compatible_batch",
  "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
  "model": "text-embedding-v4",
  "dimensions": 1024,
  "batches": [
    {
      "batch_index": 1,
      "request_file": "/data/semantic_batches/text_embedding_v4_1024/requests_000001.jsonl",
      "map_file": "/data/semantic_batches/text_embedding_v4_1024/requests_000001.map.jsonl",
      "request_count": 5000,
      "text_count": 50000,
      "upload_file_id": "<file-batch-redacted>",
      "batch_id": "<batch-redacted>",
      "status": "completed",
      "output_file_id": "<file-output-redacted>",
      "output_path": "/data/semantic_vectors/text_embedding_v4_1024/output_000001.jsonl"
    }
  ]
}

3. 提交 Batch 任务

下面这个脚本做三件事:

  1. 读取 manifest.json
  2. 逐个上传 requests_*.jsonl
  3. 创建 Batch 任务,并把远程 ID 写入 batch_state.json
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import hashlib
import json
import os
import time
from pathlib import Path
from typing import Any

from openai import OpenAI


def sha256_path(path: Path) -> str:
    digest = hashlib.sha256()
    with path.open("rb") as handle:
        for chunk in iter(lambda: handle.read(1024 * 1024), b""):
            digest.update(chunk)
    return digest.hexdigest()


def save_json(path: Path, payload: dict[str, Any]) -> None:
    tmp = path.with_suffix(path.suffix + ".tmp")
    tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
    tmp.replace(path)


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument("--batch-dir", type=Path, required=True)
    parser.add_argument("--base-url", default="https://dashscope.aliyuncs.com/compatible-mode/v1")
    parser.add_argument("--completion-window", default="24h")
    parser.add_argument("--sleep-seconds", type=float, default=0.2)
    parser.add_argument("--resume", action="store_true")
    return parser.parse_args()


def main() -> None:
    args = parse_args()
    api_key = os.environ.get("DASHSCOPE_API_KEY")
    if not api_key:
        raise SystemExit("DASHSCOPE_API_KEY is required")

    manifest_path = args.batch_dir / "manifest.json"
    state_path = args.batch_dir / "batch_state.json"
    manifest = json.loads(manifest_path.read_text(encoding="utf-8"))

    if args.resume and state_path.exists():
        state = json.loads(state_path.read_text(encoding="utf-8"))
        done_indexes = {int(row["batch_index"]) for row in state.get("batches", []) if row.get("batch_id")}
    else:
        state = {
            "version": 1,
            "provider": "dashscope",
            "api_mode": "openai_compatible_batch",
            "base_url": args.base_url,
            "model": manifest["model"],
            "dimensions": manifest["dimensions"],
            "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "batches": [],
        }
        done_indexes = set()

    client = OpenAI(api_key=api_key, base_url=args.base_url, timeout=300, max_retries=3)

    for batch_meta in manifest["batches"]:
        batch_index = int(batch_meta["batch_index"])
        if batch_index in done_indexes:
            continue

        request_path = Path(batch_meta["request_file"]["path"])
        map_path = Path(batch_meta["map_file"]["path"])

        with request_path.open("rb") as handle:
            uploaded = client.files.create(file=handle, purpose="batch")

        batch = client.batches.create(
            input_file_id=uploaded.id,
            endpoint="/v1/embeddings",
            completion_window=args.completion_window,
            metadata={
                "ds_name": f"text-embedding-v4-{batch_index:06d}",
                "batch_index": str(batch_index),
            },
        )

        entry = {
            "batch_index": batch_index,
            "request_file": str(request_path),
            "map_file": str(map_path),
            "request_sha256": sha256_path(request_path),
            "request_count": batch_meta["request_count"],
            "text_count": batch_meta["text_count"],
            "upload_file_id": uploaded.id,
            "uploaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "batch_id": batch.id,
            "submitted_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "status": getattr(batch, "status", "submitted"),
            "output_file_id": None,
            "error_file_id": None,
            "output_path": None,
        }
        state["batches"].append(entry)
        save_json(state_path, state)
        print(json.dumps({"batch_index": batch_index, "batch_id": batch.id, "status": entry["status"]}, ensure_ascii=False), flush=True)
        time.sleep(args.sleep_seconds)


if __name__ == "__main__":
    main()

运行:

export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

python3 submit_dashscope_embedding_batches.py \
  --batch-dir /data/semantic_batches/text_embedding_v4_1024 \
  --completion-window 24h \
  --resume

注意:batch_state.json 里会有远程任务 ID。它可以留在内网机器上,但不要提交到公开仓库。

4. 轮询等待并下载结果

提交完成后,不要人肉刷新控制台。写一个 poller:读取 batch_state.json,循环查每个 batch_id

状态流转一般是:

validating -> in_progress -> finalizing -> completed

也可能失败:

failed / expired / cancelled

轮询逻辑:

from collections import Counter
from pathlib import Path
import hashlib
import json
import os
import time

from openai import OpenAI


TERMINAL_STATUSES = {"completed", "failed", "expired", "cancelled", "canceled"}


def sha256_path(path: Path) -> str:
    digest = hashlib.sha256()
    with path.open("rb") as handle:
        for chunk in iter(lambda: handle.read(1024 * 1024), b""):
            digest.update(chunk)
    return digest.hexdigest()


def save_state(path: Path, state: dict) -> None:
    tmp = path.with_suffix(".json.tmp")
    tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
    tmp.replace(path)


def download_file(client: OpenAI, file_id: str, path: Path) -> None:
    tmp = path.with_suffix(path.suffix + ".tmp")
    content = client.files.content(file_id)
    content.write_to_file(tmp)
    tmp.replace(path)


def poll_once(client: OpenAI, state_path: Path, output_dir: Path) -> dict:
    state = json.loads(state_path.read_text(encoding="utf-8"))
    output_dir.mkdir(parents=True, exist_ok=True)

    counts: Counter[str] = Counter()
    now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

    for entry in state["batches"]:
        batch_index = int(entry["batch_index"])
        batch = client.batches.retrieve(entry["batch_id"])
        status = getattr(batch, "status", None) or "unknown"
        output_file_id = getattr(batch, "output_file_id", None)
        error_file_id = getattr(batch, "error_file_id", None)

        entry["status"] = status
        entry["last_checked_at"] = now
        entry["output_file_id"] = output_file_id
        entry["error_file_id"] = error_file_id
        counts[status] += 1

        if status == "completed" and output_file_id and not entry.get("output_path"):
            out_path = output_dir / f"output_{batch_index:06d}.jsonl"
            download_file(client, output_file_id, out_path)
            entry["output_path"] = str(out_path)
            entry["output_bytes"] = out_path.stat().st_size
            entry["output_sha256"] = sha256_path(out_path)
            entry["output_downloaded_at"] = now
            save_state(state_path, state)

        if error_file_id and not entry.get("error_path"):
            err_path = output_dir / f"error_{batch_index:06d}.jsonl"
            download_file(client, error_file_id, err_path)
            entry["error_path"] = str(err_path)
            entry["error_bytes"] = err_path.stat().st_size
            entry["error_sha256"] = sha256_path(err_path)
            save_state(state_path, state)

    save_state(state_path, state)
    return {"counts": dict(counts)}

完整命令可以这样跑:

python3 poll_dashscope_embedding_batches.py \
  --batch-dir /data/semantic_batches/text_embedding_v4_1024 \
  --output-dir /data/semantic_vectors/text_embedding_v4_1024 \
  --interval-seconds 60

我们本地真实跑法里,poller 每轮会打印类似:

{"checked_at":"2026-06-05T09:00:07Z","counts":{"completed":71},"downloaded":71,"newly_downloaded":0}

5. 下载结果长什么样

output_000001.jsonl 也是 JSONL。每行对应一个 custom_id

{"custom_id":"emb-000000001","response":{"status_code":200,"body":{"data":[{"index":0,"embedding":[0.0123,-0.0456,0.0789]},{"index":1,"embedding":[0.0234,-0.0567,0.0891]}],"usage":{"prompt_tokens":67,"total_tokens":67},"model":"text-embedding-v4"}},"error":null}

真实向量会有 1024 个 float。博客里不要贴完整向量,没意义,也很占篇幅。

入库时按这个关系恢复:

output row.custom_id
  -> requests_000001.map.jsonl 里同 custom_id
  -> data[].index 对应 map.items[].response_index
  -> 拿到 text_hash / entity_key / source
  -> 校验 embedding 维度
  -> upsert 到向量库

核心解析逻辑:

for row in read_jsonl(output_path):
    custom_id = row["custom_id"]
    body = row["response"]["body"]
    data = body["data"]
    map_items = mapping[custom_id]
    items_by_index = {item["response_index"]: item for item in map_items}

    for embedding_item in data:
        response_index = embedding_item["index"]
        map_item = items_by_index[response_index]
        vector = embedding_item["embedding"]
        if len(vector) != 1024:
            raise ValueError(f"bad vector dimension: {len(vector)}")
        upsert_vector(
            text_hash=map_item["text_hash"],
            vector=vector,
            payload=map_item,
        )

6. 失败和续跑

这条离线路径一定要支持续跑。

建议规则:

  • batch_state.json 每提交成功一个 batch 就立刻落盘。
  • 下载 output 也要先写 .tmp,完成后再 rename。
  • failed 的 batch 不要自动覆盖,先看错误文件。
  • completed 且已有 output_path 的 batch 跳过。
  • 入库时按 index_version + text_hash 做 upsert。

这样即使中间断网、进程挂掉、机器重启,也不会从头再来。

在线 query 要缓存

用户搜索词会重复出现。比如:

car seat cushion
seat cushion for car
lumbar support pillow

这些 query 不应该每次都实时调用 embedding。

可以做一个简单缓存:

cache_key = sha256(model + dimensions + normalized_query)
ttl = 7 天或 30 天
value = query embedding / topK result

如果你的业务数据更新频繁,缓存 topK 结果时要把 index_version 放进 cache key:

text-embedding-v4:1024:semantic_index_20260614:<query_hash>

这样换索引版本时,旧缓存不会污染新结果。

版本管理

Embedding 一旦落库,就要有版本概念。

建议把版本写成:

text_embedding_v4_1024_20260614

至少包含:

  • 模型名:text-embedding-v4
  • 维度:1024
  • 构建日期或构建号

为什么要这么做?

因为下面任何变化都会让向量不可混用:

  • 模型从 v3 换到 v4。
  • 维度从 1024 换到 512。
  • 文本清洗规则变了。
  • 语料范围变了。

向量库里保留 index_version,服务端通过环境变量切换当前版本,是最简单的回滚方案。

不要让 embedding 直接做最终判断

Embedding 召回的是“语义上可能接近”的候选,不是事实结论。

比如用户搜:

insulated cup

向量召回可能返回:

stanley cup
tumbler with lid
travel mug
coffee thermos

这些候选很有用,但最终还要回业务库查:

  • 真实搜索量
  • 价格
  • 评论数
  • 类目风险
  • 品牌/IP 风险
  • 库存或上架状态

一个稳的系统应该是:

embedding 负责召回
业务数据库负责判断
规则和证据负责解释

常见坑

1. 把 key 写进代码

不要。

只用:

export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

代码里只读:

os.environ["DASHSCOPE_API_KEY"]

2. 忘记校验维度

如果表是 vector(1024),返回向量就必须是 1024 维。

if len(vector) != 1024:
    raise ValueError(f"bad dimension: {len(vector)}")

3. 不做文本去重

相同文本重复向量化,浪费钱,也浪费存储。

4. 不保存原文和来源

只存向量不存 payload,后面没法解释结果。至少保存:

text
text_hash
source
entity_type
entity_key
as_of_date

5. 用在线 embedding 扛全量构建

在线接口适合 query,不适合几百万条语料一条条实时跑。大语料应该走离线批处理。

一个最小可用目录结构

semantic/
  export_texts.py              # 从业务库导出文本
  prepare_embedding_batch.py    # 去重并生成请求
  poll_embedding_results.py     # 下载结果
  load_vectors.py               # 写入 pgvector
  search.py                     # 在线 query embedding + topK

数据目录:

/data/semantic_staging/current/text_embedding_input.jsonl
/data/semantic_batches/text_embedding_v4_1024/requests_000001.jsonl
/data/semantic_batches/text_embedding_v4_1024/requests_000001.map.jsonl
/data/semantic_vectors/text_embedding_v4_1024/output_000001.jsonl

这套结构的好处是每一步都能重跑:

  • 导出错了,重导出。
  • 请求文件错了,重生成。
  • 远程结果没下完,继续 poll。
  • 入库失败,按 batch 重放。

成本估算示例

假设你有 100 万条短文本,每条平均 8 个 Token:

总 Token ≈ 800 万
同步费用 ≈ 8000 × 0.0005 = 4 元
Batch 费用 ≈ 8000 × 0.00025 = 2 元

如果每条平均 20 个 Token:

总 Token ≈ 2000 万
同步费用 ≈ 20000 × 0.0005 = 10 元
Batch 费用 ≈ 20000 × 0.00025 = 5 元

这个量级已经很便宜了。很多时候,向量库机器、索引构建时间、数据清洗成本,会比 embedding 调用费更值得关注。

参考实现片段

下面是一个可以直接改造的最小封装:

import os
import time
from collections.abc import Iterable
from openai import OpenAI


MODEL = "text-embedding-v4"
DIMENSIONS = 1024
MAX_BATCH_SIZE = 10


def make_client() -> OpenAI:
    return OpenAI(
        api_key=os.environ["DASHSCOPE_API_KEY"],
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        timeout=30,
        max_retries=2,
    )


def chunks(items: list[str], size: int) -> Iterable[list[str]]:
    for start in range(0, len(items), size):
        yield items[start : start + size]


def embed_texts(texts: list[str]) -> list[list[float]]:
    client = make_client()
    vectors: list[list[float]] = []

    for batch in chunks(texts, MAX_BATCH_SIZE):
        response = client.embeddings.create(
            model=MODEL,
            input=batch,
            dimensions=DIMENSIONS,
            encoding_format="float",
        )
        ordered = sorted(response.data, key=lambda item: item.index)
        for item in ordered:
            vector = [float(value) for value in item.embedding]
            if len(vector) != DIMENSIONS:
                raise RuntimeError(f"embedding dimension mismatch: {len(vector)}")
            vectors.append(vector)
        time.sleep(0.05)

    return vectors

这个封装没有做复杂限流,但足够表达接入方式。生产里可以把失败请求写入队列,再异步重试。

总结

text-embedding-v4 的接入成本很低,工程收益很高。

它最适合做:

  • 搜索召回扩展
  • 相似文本匹配
  • RAG 片段召回
  • 商品标题/关键词聚类
  • 用户 query 到业务实体的语义桥接

我最推荐的落地方式是:

固定语料离线批量向量化
实时 query 单独向量化
向量只做召回
事实判断回业务库
API Key 永远不进代码

这样系统既便宜,又稳定,还方便解释和回滚。

参考资料

Read more

一个带进度条的 tar.gz 多核解压脚本

大文件解压这件事,平时看起来很小,真遇到几十 GB 的 tar.gz 包时就会变得很烦。 最常见的命令是: tar -xzf archive.tar.gz -C output/ 它能用,但有几个问题: * gzip 解压基本是单核,机器有很多核也用不上。 * 没有进度条,不知道还要跑多久。 * 目标目录已经存在时容易把新旧文件混在一起。 * 脚本化重跑时,参数和目录约定容易写散。 所以我写了一个小脚本:extract.sh。它不是为了炫技,而是把一次大包解压里最容易踩坑的地方都收起来。 它解决什么问题 这个脚本做的是一件很具体的事: 把 .tar.gz 或 .tgz 文件解压到普通目录,同时显示百分比、速度和 ETA;如果机器上有 pigz,自动使用多核解压。 典型用法: bash extract.sh archive.

By ladydd

三台机器部署 ClickHouse 高可用集群实战记录

本文是一份可发布版部署记录。真实 IP、域名、账号、密码、下载链接、业务目录名、机器唯一标识等敏感信息已经替换为占位符。命令中的 <...> 需要按自己的环境替换。 目标与拓扑 这次目标是用三台数据节点部署一套 ClickHouse 高可用集群,拓扑采用: 1 shard x 3 replicas 含义是:集群只有一个逻辑分片,三台机器都保存同一份数据的完整副本。任意一台数据节点宕机时,只要 ClickHouse Keeper 仍然有多数派,剩余节点仍可继续提供读写服务。 规划节点如下: 主机名示例地址角色ch-01<ch-01-ip>ClickHouse Server + ClickHouse Keeperch-02<ch-02-ip>ClickHouse Server + ClickHouse Keeperch-03<ch-03-ip&

By ladydd

折腾记(二):接入火山引擎实时语音 API,家庭语音助手体验直接拉满

接上篇 上一篇用全开源组件(Whisper + Hermes + Edge-TTS)搭了个语音助手,能跑,但体验就是"能用"二字: * 中文识别只有 70 分,方言基本歇菜 * 英文唤醒词"Alexa"喊着别扭 * 说完到回复要等 4-8 秒 * 它说话的时候你插不了嘴 这些问题靠堆开源组件很难根治。于是我去试了火山引擎(字节跳动)的语音服务,结果直接换了条路。 这篇分两段:先讲怎么用火山引擎的 ASR/TTS 替换掉开源组件(小改),再讲怎么上端到端实时语音模型(大改)。 第一段:先把 ASR 和 TTS 换成火山引擎 为什么换 我用豆包输入法的时候发现它语音识别准得离谱。一查,豆包用的就是字节自家的火山引擎 Seed-ASR。开通后有免费额度(

By ladydd

折腾记(一):用全开源组件给家里搭一个语音助手,对接自己的 Hermes Agent

起因 事情是从一块 ESP32-S3 开发板开始的。 我手上有一块 Seeed Studio XIAO ESP32-S3 Sense,带摄像头和麦克风。最初的想法很美好:用这块板子做一个无线语音终端,对着它说话,连到我服务器上跑的 Hermes Agent(一个自托管的 AI agent),让它回答我。 但折腾到一半我突然意识到一件事:我的麦克风、音响、服务器全在家里,为什么要绕一圈用 ESP32?直接把麦克风和音响插到服务器上不就行了? ESP32 那条路(做无线拾音终端)当然也有价值,但那是"为了学嵌入式而学",不是解决问题的最短路径。于是这个项目就从"嵌入式项目"变成了"在服务器上拼一个语音助手"。这篇就记录后者。 教训零:先想清楚你要解决的是什么问题。很多时候最优解比你最初设想的简单得多。 目标

By ladydd
陕公网安备61011302002223号 | 陕ICP备2025083092号