一个带进度条的 tar.gz 多核解压脚本

大文件解压这件事,平时看起来很小,真遇到几十 GB 的 tar.gz 包时就会变得很烦。

最常见的命令是:

tar -xzf archive.tar.gz -C output/

它能用,但有几个问题:

  • gzip 解压基本是单核,机器有很多核也用不上。
  • 没有进度条,不知道还要跑多久。
  • 目标目录已经存在时容易把新旧文件混在一起。
  • 脚本化重跑时,参数和目录约定容易写散。

所以我写了一个小脚本:extract.sh。它不是为了炫技,而是把一次大包解压里最容易踩坑的地方都收起来。

它解决什么问题

这个脚本做的是一件很具体的事:

.tar.gz.tgz 文件解压到普通目录,同时显示百分比、速度和 ETA;如果机器上有 pigz,自动使用多核解压。

典型用法:

bash extract.sh archive.tar.gz output/

也可以不传压缩包路径。脚本会在自己所在目录找唯一一个 .tar.gz.tgz

bash extract.sh

如果目标目录已经存在,默认会停下来,避免把新文件混进旧目录:

错误:目标目录已存在:output/20260502
为避免覆盖或混入旧文件,本次停止。

确认要续跑或覆盖时,再显式加环境变量:

ALLOW_EXISTING=1 bash extract.sh archive.tar.gz output/

这个默认行为非常重要。大文件解压一旦混入旧数据,后面排查通常比重新跑还贵。

核心思路

脚本的核心管线是:

python3 progress_reader.py archive.tar.gz \
  | pigz -dc \
  | tar -xf - -C output/

如果没有安装 pigz,就自动降级为:

python3 progress_reader.py archive.tar.gz \
  | gzip -dc \
  | tar -xf - -C output/

这里分成三段:

  • Python 负责按块读取压缩包,并在 stderr 上刷新进度。
  • pigzgzip 负责把 gzip 流解开。
  • tar 负责把 tar 内容写到目标目录。

进度条统计的是压缩包读取进度,而不是解压后的文件体积。这个口径足够稳定,因为读完整个压缩包就代表解压输入已经消费完成。

为什么用 pigz

pigz 可以理解成并行版 gzip。对 .tar.gz 这种格式来说,普通 gzip -dc 经常只能吃一个 CPU 核心;pigz -dc -p N 可以用多个线程做解压,速度通常会明显更好。

脚本里线程数默认取机器 CPU 数:

PIGZ_THREADS="${PIGZ_THREADS:-$(nproc 2>/dev/null || echo 4)}"

如果想限制线程数,可以这样跑:

PIGZ_THREADS=8 bash extract.sh archive.tar.gz output/

这在生产机上很有用。解压很吃 IO 和 CPU,不一定应该把所有核都打满。

为什么不用 pv

pv 当然也能做进度:

pv archive.tar.gz | pigz -dc | tar -xf - -C output/

pv 不是所有机器都有。这个脚本把进度读取器写成内嵌 Python,依赖更少,只要有 python3targzip 就能跑;有 pigz 时自动加速,没有也不影响功能。

Python 进度器做的事情很简单:

with open(path, "rb") as f:
    while True:
        chunk = f.read(4 * 1024 * 1024)
        if not chunk:
            break
        stdout.write(chunk)
        # 每秒刷新一次百分比、速度、ETA

它把压缩包原样写到标准输出,所以后面的解压器完全不用知道前面发生了什么。

目录命名的小细节

脚本会根据压缩包名字推断预期顶层目录。

例如:

20260502.tar.gz  ->  output/20260502
20260502_xxx.tgz ->  output/20260502
foo.tar.gz       ->  output/foo

这段逻辑的目的不是替代 tar 自己的目录结构,而是在解压前先做一层安全检查:如果这个预期目录已经存在,脚本就先停下来。

ARCHIVE_NAME="$(basename -- "$ARCHIVE")"
EXPECTED_TOP="${ARCHIVE_NAME%.tar.gz}"
EXPECTED_TOP="${EXPECTED_TOP%.tgz}"
if [[ "$ARCHIVE_NAME" =~ ^([0-9]{8}) ]]; then
  EXPECTED_TOP="${BASH_REMATCH[1]}"
fi
TARGET_PATH="$DEST_DIR/$EXPECTED_TOP"

对按日期归档的数据包来说,这个约定很实用。

失败处理

脚本开头用了:

set -euo pipefail

这几个选项组合起来,能让脚本在出错时尽早失败:

  • -e:命令失败就退出。
  • -u:引用未定义变量就报错。
  • pipefail:管线中任意一段失败,整个管线都失败。

这对解压管线尤其关键。否则可能出现 pigz 失败了,但最后一段命令退出码看起来正常的情况。

完整使用示例

指定压缩包和目标目录:

bash extract.sh 20260502.tar.gz ./data/

使用 8 个线程:

PIGZ_THREADS=8 bash extract.sh 20260502.tar.gz ./data/

允许目标目录已存在:

ALLOW_EXISTING=1 bash extract.sh 20260502.tar.gz ./data/

把脚本放到数据包同目录,并自动选择唯一压缩包:

bash extract.sh

运行时会输出类似这样的信息:

归档: 20260502.tar.gz
大小: 55.00 GiB
目标: ./data/
预期顶层目录: ./data/20260502
解压器: pigz (32 threads)

解压过程中会刷新:

[###############---------------]  50.12%  27.56 GiB / 55.00 GiB  180.00 MiB/s  ETA 2m35s

结束后会打印耗时和输出目录大小。

完整脚本

下面是完整版本,可以保存为 extract.sh 后直接使用:

#!/usr/bin/env bash
# 解压 tar.gz 到普通文件夹,带进度显示(百分比 / 速度 / ETA)。
# 用 pigz 多核解压加速,用 python 封装读取做进度条。
#
# 用法:
#   ./extract.sh                          # 默认解压脚本同目录下唯一的 *.tar.gz
#   ./extract.sh /path/to/archive.tar.gz  # 指定压缩包
#   ./extract.sh archive.tar.gz DEST_DIR  # 指定压缩包 & 解压目标目录

set -euo pipefail

SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"

# ---------- 解析参数 ----------
ARCHIVE="${1:-}"
DEST_DIR="${2:-$SCRIPT_DIR}"

if [[ -z "$ARCHIVE" ]]; then
  # 自动挑选同目录下唯一的 .tar.gz
  mapfile -t CANDIDATES < <(find "$SCRIPT_DIR" -maxdepth 1 -type f \( -name '*.tar.gz' -o -name '*.tgz' \) | sort)
  if [[ ${#CANDIDATES[@]} -eq 0 ]]; then
    echo "错误:未找到 *.tar.gz,请显式传入路径。" >&2
    exit 1
  elif [[ ${#CANDIDATES[@]} -gt 1 ]]; then
    echo "同目录下存在多个归档,请指定一个:" >&2
    printf '  %s\n' "${CANDIDATES[@]}" >&2
    exit 1
  fi
  ARCHIVE="${CANDIDATES[0]}"
fi

if [[ ! -f "$ARCHIVE" ]]; then
  echo "错误:找不到文件 $ARCHIVE" >&2
  exit 1
fi
mkdir -p "$DEST_DIR"

ARCHIVE_NAME="$(basename -- "$ARCHIVE")"
EXPECTED_TOP="${ARCHIVE_NAME%.tar.gz}"
EXPECTED_TOP="${EXPECTED_TOP%.tgz}"
if [[ "$ARCHIVE_NAME" =~ ^([0-9]{8}) ]]; then
  EXPECTED_TOP="${BASH_REMATCH[1]}"
fi
TARGET_PATH="$DEST_DIR/$EXPECTED_TOP"

if [[ -e "$TARGET_PATH" && "${ALLOW_EXISTING:-0}" != "1" ]]; then
  echo "错误:目标目录已存在:$TARGET_PATH" >&2
  echo "为避免覆盖或混入旧文件,本次停止。" >&2
  echo "如确认要续跑/覆盖,可执行:ALLOW_EXISTING=1 $0 \"$ARCHIVE\" \"$DEST_DIR\"" >&2
  exit 1
fi

TOTAL_BYTES=$(stat -c%s "$ARCHIVE")
TOTAL_GB=$(awk -v b="$TOTAL_BYTES" 'BEGIN{printf "%.2f", b/1073741824}')
PIGZ_THREADS="${PIGZ_THREADS:-$(nproc 2>/dev/null || echo 4)}"

if command -v pigz >/dev/null; then
  DECOMP_LABEL="pigz (${PIGZ_THREADS} threads)"
else
  DECOMP_LABEL="gzip"
fi

echo "=============================================="
echo " 归档: $ARCHIVE"
echo " 大小: ${TOTAL_GB} GiB  (${TOTAL_BYTES} bytes)"
echo " 目标: $DEST_DIR"
echo " 预期顶层目录: $TARGET_PATH"
echo " 解压器: $DECOMP_LABEL"
echo " 开始时间: $(date '+%F %T')"
echo "=============================================="

# ---------- 选择解压工具 ----------
if command -v pigz >/dev/null; then
  DECOMP=(pigz -dc -p "$PIGZ_THREADS")
else
  DECOMP=(gzip -dc)
fi

# ---------- 进度器(内嵌 python)----------
# 从参数里的文件按 4 MiB 块读 -> stdout(管道给 pigz),
# 同时在 stderr 上每秒刷新一次进度。
PROGRESS_PY='
import os, sys, time
path = sys.argv[1]
total = os.path.getsize(path)
BUF = 4 * 1024 * 1024
read = 0
start = time.time()
last = 0.0
out = sys.stdout.buffer
err = sys.stderr
def fmt_size(n):
    for u in ("B","KiB","MiB","GiB","TiB"):
        if n < 1024: return f"{n:6.2f} {u}"
        n /= 1024
    return f"{n:.2f} PiB"
def fmt_eta(s):
    s = int(s)
    h, s = divmod(s, 3600)
    m, s = divmod(s, 60)
    if h: return f"{h:d}h{m:02d}m{s:02d}s"
    if m: return f"{m:d}m{s:02d}s"
    return f"{s:d}s"
with open(path, "rb") as f:
    while True:
        chunk = f.read(BUF)
        if not chunk:
            break
        out.write(chunk)
        read += len(chunk)
        now = time.time()
        if now - last >= 1.0 or read == total:
            elapsed = now - start
            speed = read / elapsed if elapsed > 0 else 0
            pct = read * 100.0 / total if total else 100.0
            eta = (total - read) / speed if speed > 0 else 0
            bar_w = 30
            filled = int(bar_w * pct / 100)
            bar = "#" * filled + "-" * (bar_w - filled)
            err.write(
                f"\r[{bar}] {pct:6.2f}%  "
                f"{fmt_size(read)} / {fmt_size(total)}  "
                f"{fmt_size(speed)}/s  ETA {fmt_eta(eta)}   "
            )
            err.flush()
            last = now
err.write("\n")
'

# ---------- 执行管线 ----------
# python 读压缩包 -> pigz 解压 -> tar 写入目标目录
# 用 pipefail 确保任意一环失败都能被捕获
START_TS=$(date +%s)

python3 -c "$PROGRESS_PY" "$ARCHIVE" \
  | "${DECOMP[@]}" \
  | tar -xf - -C "$DEST_DIR"

END_TS=$(date +%s)
ELAPSED=$((END_TS - START_TS))
H=$((ELAPSED/3600)); M=$(((ELAPSED%3600)/60)); S=$((ELAPSED%60))

echo "=============================================="
echo " 完成!耗时 ${H}h${M}m${S}s"
echo " 输出目录大小:"
du -sh "$DEST_DIR"/*/ 2>/dev/null | sed 's/^/   /' || true
echo "=============================================="

适合放在哪

这个脚本适合放在数据目录旁边,也适合放到项目的 scripts/ 目录里。

我的建议是:如果它服务于某类固定数据包,就放到这类数据的目录下;如果多个数据源都会用它,就放到项目根目录的 scripts/extract.sh,并在 README 里写一行标准命令。

还能继续增强什么

目前这个脚本已经覆盖了大文件解压里最关键的几个点。后续如果要继续增强,可以考虑:

  • 解压前校验 sha256sum
  • 支持 .tar.zst,用 zstd -dc 替代 gzip/pigz
  • 解压失败时自动提示是否清理半成品目录。
  • 增加 --strip-components 参数,处理不规范的包结构。
  • 解压前用 tar -tf 快速检查顶层目录是否符合预期。

但这些都不是第一优先级。对大多数场景来说,一个稳定、可重跑、有进度、有保护的解压脚本,已经能省掉很多等待和误操作。

总结

这个脚本的价值不在于某一行命令有多复杂,而在于它把大文件解压这件事变成了一个可预期的流程:

  • 有进度,不用盲等。
  • 有多核,速度更好。
  • 有默认保护,不轻易污染旧目录。
  • 有明确参数,适合写进数据导入流程。

很多工程效率不是靠大系统提升的,而是靠这种小工具把重复操作里的不确定性一点点拿掉。

Read more

用阿里云 text-embedding-v4 搭一个便宜好用的语义召回层

很多系统一开始都靠关键词匹配。 用户搜“车载腰靠”,数据库里有“汽车腰枕”“lumbar support pillow for car”,如果只做 LIKE 或倒排词,召回很容易断掉。Embedding 解决的是这个问题:把文本变成向量,让“意思接近”的内容在向量空间里靠近。 阿里云百炼里的 text-embedding-v4 很适合做这件事。它接入简单,兼容 OpenAI 风格接口,价格也低,适合拿来做搜索召回、RAG 知识库、商品词聚类、类目匹配、相似标题推荐。 本文只讲一件事:怎么把 text-embedding-v4 接进自己的系统。 一句话结论 如果你要给文本做语义召回,可以这样设计: 业务文本 -> 清洗/去重 -> text-embedding-v4 -&

By ladydd

三台机器部署 ClickHouse 高可用集群实战记录

本文是一份可发布版部署记录。真实 IP、域名、账号、密码、下载链接、业务目录名、机器唯一标识等敏感信息已经替换为占位符。命令中的 <...> 需要按自己的环境替换。 目标与拓扑 这次目标是用三台数据节点部署一套 ClickHouse 高可用集群,拓扑采用: 1 shard x 3 replicas 含义是:集群只有一个逻辑分片,三台机器都保存同一份数据的完整副本。任意一台数据节点宕机时,只要 ClickHouse Keeper 仍然有多数派,剩余节点仍可继续提供读写服务。 规划节点如下: 主机名示例地址角色ch-01<ch-01-ip>ClickHouse Server + ClickHouse Keeperch-02<ch-02-ip>ClickHouse Server + ClickHouse Keeperch-03<ch-03-ip&

By ladydd

折腾记(二):接入火山引擎实时语音 API,家庭语音助手体验直接拉满

接上篇 上一篇用全开源组件(Whisper + Hermes + Edge-TTS)搭了个语音助手,能跑,但体验就是"能用"二字: * 中文识别只有 70 分,方言基本歇菜 * 英文唤醒词"Alexa"喊着别扭 * 说完到回复要等 4-8 秒 * 它说话的时候你插不了嘴 这些问题靠堆开源组件很难根治。于是我去试了火山引擎(字节跳动)的语音服务,结果直接换了条路。 这篇分两段:先讲怎么用火山引擎的 ASR/TTS 替换掉开源组件(小改),再讲怎么上端到端实时语音模型(大改)。 第一段:先把 ASR 和 TTS 换成火山引擎 为什么换 我用豆包输入法的时候发现它语音识别准得离谱。一查,豆包用的就是字节自家的火山引擎 Seed-ASR。开通后有免费额度(

By ladydd

折腾记(一):用全开源组件给家里搭一个语音助手,对接自己的 Hermes Agent

起因 事情是从一块 ESP32-S3 开发板开始的。 我手上有一块 Seeed Studio XIAO ESP32-S3 Sense,带摄像头和麦克风。最初的想法很美好:用这块板子做一个无线语音终端,对着它说话,连到我服务器上跑的 Hermes Agent(一个自托管的 AI agent),让它回答我。 但折腾到一半我突然意识到一件事:我的麦克风、音响、服务器全在家里,为什么要绕一圈用 ESP32?直接把麦克风和音响插到服务器上不就行了? ESP32 那条路(做无线拾音终端)当然也有价值,但那是"为了学嵌入式而学",不是解决问题的最短路径。于是这个项目就从"嵌入式项目"变成了"在服务器上拼一个语音助手"。这篇就记录后者。 教训零:先想清楚你要解决的是什么问题。很多时候最优解比你最初设想的简单得多。 目标

By ladydd
陕公网安备61011302002223号 | 陕ICP备2025083092号