背景 链接到标题

最近在研究从视频中自动生成字幕,选中了阿里云的 FunASR(Fun Audio Speech Recognition)模型。FunASR 支持端到端的语音识别和时间戳提取,理论上非常适合用于视频字幕生成。

已知 Bug 链接到标题

FunASR v1.3.1(PyPI 最新版)配合 Fun-ASR-Nano 模型使用时存在两个官方未修复的 bug:

Bug 1: batch decoding is not implemented 链接到标题

VAD 切段后 ASR 模型收到 batch 输入会抛出 NotImplementedError。原因是 auto_model.py 默认将 batch_size 设为 300000,但模型不支持 batch 输入。

修复:传入 batch_size_s=0

详见:#2273 - NotImplementedError: batch decoding is not implemented

Bug 2: KeyError: 0 链接到标题

逐段处理后时间戳偏移时报 KeyError: 0。原因是 Fun-ASR-Nano 返回的时间戳是 dict 格式 {"start_time": ..., "end_time": ...},而代码假设是 list 格式 [start, end]

修复:在调用方做 dict/list 格式兼容(见下方 normalize_entries 函数)

详见:#2825 - KeyError: 0 in inference_with_vad when using MLT models with VAD enabled

执行脚本 链接到标题

执行过程:

flowchart TD A["输入视频文件"] --> B["提取音频
ffmpeg转WAV 16kHz"] B --> C["加载FunASR模型"] C --> D["语音识别
model.generate"] D --> E{"时间戳格式?"} E -->|"dict"| F["normalize_entries
兼容#2825"] E -->|"list"| G["normalize_entries
标准处理"] F --> H["build_srt分句"] G --> H H --> I["输出SRT文件"]

代码如下:

import subprocess
import tempfile
import torch
import soundfile as sf
from funasr import AutoModel


def ms_to_srt_time(ms: float) -> str:
    sec = ms / 1000
    h = int(sec // 3600)
    m = int((sec % 3600) // 60)
    s = int(sec % 60)
    cs = int(round((sec - int(sec)) * 1000))
    return f"{h:02d}:{m:02d}:{s:02d},{cs:03d}"


def sec_to_srt_time(sec: float) -> str:
    h = int(sec // 3600)
    m = int((sec % 3600) // 60)
    s = int(sec % 60)
    ms = int(round((sec - int(sec)) * 1000))
    return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"


def normalize_entries(r0, is_dict, timestamps_raw):
    # 借鉴 #2825 修复思路:Fun-ASR-Nano 返回 dict 格式时间戳,需兼容 list 格式
    # https://github.com/modelscope/FunASR/issues/2825
    if is_dict:
        return [
            {"token": t["token"], "start": t["start_time"] * 1000, "end": t["end_time"] * 1000}
            for t in timestamps_raw
        ]
    else:
        words = r0.get("text", "").split()
        return [
            {"token": w, "start": float(ts[0]), "end": float(ts[1])}
            for w, ts in zip(words, timestamps_raw)
        ] if words else [
            {"token": f"[{i}]", "start": float(ts[0]), "end": float(ts[1])}
            for i, ts in enumerate(timestamps_raw)
        ]


def build_srt(entries):
    if not entries:
        return ""
    PUNCT = set(",。!?;:、,.;:!?")
    gap_thresh = 500
    seg_buf = []
    cur = [entries[0]]
    for e in entries[1:]:
        gap = e["start"] - cur[-1]["end"]
        if gap > gap_thresh or len(cur) >= 12:
            seg_buf.append(cur)
            cur = [e]
        else:
            cur.append(e)
    if cur:
        seg_buf.append(cur)
    merged = [seg_buf[0]]
    for seg in seg_buf[1:]:
        all_punct = all(t["token"] in PUNCT for t in seg)
        if all_punct and merged:
            merged[-1].extend(seg)
        else:
            merged.append(seg)
    seg_buf = merged
    srt_lines = []
    for idx, seg in enumerate(seg_buf, 1):
        start_val = seg[0]["start"]
        end_val = seg[-1]["end"]
        seg_text = "".join(t["token"] for t in seg)
        srt_lines.append(f"{idx}")
        srt_lines.append(f"{ms_to_srt_time(start_val)} --> {ms_to_srt_time(end_val)}")
        srt_lines.append(seg_text)
        srt_lines.append("")
    return "\n".join(srt_lines)


def extract_audio(video_path: str, audio_path: str):
    subprocess.run(
        ["ffmpeg", "-i", video_path, "-vn",
         "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
         "-y", audio_path],
        check=True, capture_output=True, text=True)


def main(video_file: str, output_srt: str):
    with tempfile.TemporaryDirectory() as tmp:
        audio_file = f"{tmp}/audio.wav"
        extract_audio(video_file, audio_file)

        audio_np, sr = sf.read(audio_file)
        audio = torch.from_numpy(audio_np).float()

        print("加载模型...")
        model = AutoModel(
            model="FunAudioLLM/Fun-ASR-Nano-2512",
            vad_model="fsmn-vad",
            vad_kwargs={"max_single_segment_time": 30000},
            device="cuda:0" if torch.cuda.is_available() else "cpu",
            disable_update=True,
        )

        print("语音识别中...")
        res = model.generate(
            input=audio,
            cache={},
            language="auto",
            use_itn=True,
            disable_update=True,
            batch_size_s=0,
        )

        r0 = res[0]
        timestamps_raw = r0.get("timestamps") or r0.get("timestamp")
        is_dict = bool(timestamps_raw) and isinstance(timestamps_raw[0], dict)
        entries = normalize_entries(r0, is_dict, timestamps_raw) if timestamps_raw else []

        srt_content = build_srt(entries)
        with open(output_srt, "w", encoding="utf-8") as f:
            f.write(srt_content)

        print(f"字幕已生成: {output_srt}")


if __name__ == "__main__":
    import sys
    main(sys.argv[1], sys.argv[2])

Docker 镜像 链接到标题

为方便部署,基于 Ubuntu 24.04 创建了镜像:

FROM ubuntu:24.04

RUN apt-get update && apt-get install -y \
    ffmpeg python3-pip libstdc++6 curl && \
    rm -rf /var/lib/apt/lists/*

RUN pip3 install funasr torch torchaudio transformers soundfile boto3 --break-system-packages

WORKDIR /workspace
CMD ["python3"]

下一步 链接到标题

  • 集成到工作流