背景 链接到标题
最近在研究从视频中自动生成字幕,选中了阿里云的 FunASR(Fun Audio Speech Recognition)模型。FunASR 支持端到端的语音识别和时间戳提取,理论上非常适合用于视频字幕生成。
已知 Bug 链接到标题
FunASR v1.3.1(PyPI 最新版)配合 Fun-ASR-Nano 模型使用时存在两个官方未修复的 bug:
Bug 1: batch decoding is not implemented 链接到标题
VAD 切段后 ASR 模型收到 batch 输入会抛出 NotImplementedError。原因是 auto_model.py 默认将 batch_size 设为 300000,但模型不支持 batch 输入。
修复:传入 batch_size_s=0
详见:#2273 - NotImplementedError: batch decoding is not implemented
Bug 2: KeyError: 0 链接到标题
逐段处理后时间戳偏移时报 KeyError: 0。原因是 Fun-ASR-Nano 返回的时间戳是 dict 格式 {"start_time": ..., "end_time": ...},而代码假设是 list 格式 [start, end]。
修复:在调用方做 dict/list 格式兼容(见下方 normalize_entries 函数)
详见:#2825 - KeyError: 0 in inference_with_vad when using MLT models with VAD enabled
执行脚本 链接到标题
执行过程:
flowchart TD
A["输入视频文件"] --> B["提取音频
ffmpeg转WAV 16kHz"] B --> C["加载FunASR模型"] C --> D["语音识别
model.generate"] D --> E{"时间戳格式?"} E -->|"dict"| F["normalize_entries
兼容#2825"] E -->|"list"| G["normalize_entries
标准处理"] F --> H["build_srt分句"] G --> H H --> I["输出SRT文件"]
ffmpeg转WAV 16kHz"] B --> C["加载FunASR模型"] C --> D["语音识别
model.generate"] D --> E{"时间戳格式?"} E -->|"dict"| F["normalize_entries
兼容#2825"] E -->|"list"| G["normalize_entries
标准处理"] F --> H["build_srt分句"] G --> H H --> I["输出SRT文件"]
代码如下:
import subprocess
import tempfile
import torch
import soundfile as sf
from funasr import AutoModel
def ms_to_srt_time(ms: float) -> str:
sec = ms / 1000
h = int(sec // 3600)
m = int((sec % 3600) // 60)
s = int(sec % 60)
cs = int(round((sec - int(sec)) * 1000))
return f"{h:02d}:{m:02d}:{s:02d},{cs:03d}"
def sec_to_srt_time(sec: float) -> str:
h = int(sec // 3600)
m = int((sec % 3600) // 60)
s = int(sec % 60)
ms = int(round((sec - int(sec)) * 1000))
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def normalize_entries(r0, is_dict, timestamps_raw):
# 借鉴 #2825 修复思路:Fun-ASR-Nano 返回 dict 格式时间戳,需兼容 list 格式
# https://github.com/modelscope/FunASR/issues/2825
if is_dict:
return [
{"token": t["token"], "start": t["start_time"] * 1000, "end": t["end_time"] * 1000}
for t in timestamps_raw
]
else:
words = r0.get("text", "").split()
return [
{"token": w, "start": float(ts[0]), "end": float(ts[1])}
for w, ts in zip(words, timestamps_raw)
] if words else [
{"token": f"[{i}]", "start": float(ts[0]), "end": float(ts[1])}
for i, ts in enumerate(timestamps_raw)
]
def build_srt(entries):
if not entries:
return ""
PUNCT = set(",。!?;:、,.;:!?")
gap_thresh = 500
seg_buf = []
cur = [entries[0]]
for e in entries[1:]:
gap = e["start"] - cur[-1]["end"]
if gap > gap_thresh or len(cur) >= 12:
seg_buf.append(cur)
cur = [e]
else:
cur.append(e)
if cur:
seg_buf.append(cur)
merged = [seg_buf[0]]
for seg in seg_buf[1:]:
all_punct = all(t["token"] in PUNCT for t in seg)
if all_punct and merged:
merged[-1].extend(seg)
else:
merged.append(seg)
seg_buf = merged
srt_lines = []
for idx, seg in enumerate(seg_buf, 1):
start_val = seg[0]["start"]
end_val = seg[-1]["end"]
seg_text = "".join(t["token"] for t in seg)
srt_lines.append(f"{idx}")
srt_lines.append(f"{ms_to_srt_time(start_val)} --> {ms_to_srt_time(end_val)}")
srt_lines.append(seg_text)
srt_lines.append("")
return "\n".join(srt_lines)
def extract_audio(video_path: str, audio_path: str):
subprocess.run(
["ffmpeg", "-i", video_path, "-vn",
"-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
"-y", audio_path],
check=True, capture_output=True, text=True)
def main(video_file: str, output_srt: str):
with tempfile.TemporaryDirectory() as tmp:
audio_file = f"{tmp}/audio.wav"
extract_audio(video_file, audio_file)
audio_np, sr = sf.read(audio_file)
audio = torch.from_numpy(audio_np).float()
print("加载模型...")
model = AutoModel(
model="FunAudioLLM/Fun-ASR-Nano-2512",
vad_model="fsmn-vad",
vad_kwargs={"max_single_segment_time": 30000},
device="cuda:0" if torch.cuda.is_available() else "cpu",
disable_update=True,
)
print("语音识别中...")
res = model.generate(
input=audio,
cache={},
language="auto",
use_itn=True,
disable_update=True,
batch_size_s=0,
)
r0 = res[0]
timestamps_raw = r0.get("timestamps") or r0.get("timestamp")
is_dict = bool(timestamps_raw) and isinstance(timestamps_raw[0], dict)
entries = normalize_entries(r0, is_dict, timestamps_raw) if timestamps_raw else []
srt_content = build_srt(entries)
with open(output_srt, "w", encoding="utf-8") as f:
f.write(srt_content)
print(f"字幕已生成: {output_srt}")
if __name__ == "__main__":
import sys
main(sys.argv[1], sys.argv[2])
Docker 镜像 链接到标题
为方便部署,基于 Ubuntu 24.04 创建了镜像:
FROM ubuntu:24.04
RUN apt-get update && apt-get install -y \
ffmpeg python3-pip libstdc++6 curl && \
rm -rf /var/lib/apt/lists/*
RUN pip3 install funasr torch torchaudio transformers soundfile boto3 --break-system-packages
WORKDIR /workspace
CMD ["python3"]
下一步 链接到标题
- 集成到工作流