基本流程
1、文档处理
使用文档解析工具,对现有的 PDF、Word、PPT 等文档进行解析,提取其中的纯文本内容。
2、文本清洗
获取到纯文本后,需要先清洗文本,常见操作有去除多余空格、统一中英文标点、修复 PDF 断行、去除页眉页脚、去除无意义字符等等。
3、切分
有很多种切分方式,如下:
1、按固定长度切分
这是最简单的方式,比如每 500 个 token 切一块。优点是简单,但是可能把一句话、一个段落、一个表格切断,影响检索和回答效果。
2、带重叠切分
为了避免上下文断裂,通常会设置 overlap,也就是 chunk 之间会有一部分重复内容,例如
chunk 1:第 1 ~ 500 token
chunk 2:第 401 ~ 900 token
chunk 3:第 801 ~ 1300 token
这样即使关键信息刚好在边界附近,也不容易被切断。
3、按段落切分
按照文本自然段落切分更加合理。若某个段落太长,再按长度继续切分。
4、按标题层级切分
这是企业文档中很常见的切分方式。 Markdown、Word、制度文档一般都有标题结构,可以按大小标题切成不同的 chunk,优点是语义完整而且回答时也可以引用来源。
5、递归切分
递归切分是比较常用的方式,它会按优先级尝试不同的切分方法,先按大标题切,切不开再按小标题切,再切不开按段落切,再切不开按句子切,最后才按固定长度切。这样切分更加自然。
6、按语义切分
语义切分不是只看长度,而是判断内容意思是否发生变化。例如一段文档中前半部分讲员工请假制度,后半部分讲差旅报销制度,应该被分为两个 chunk。语义切分通常会用以下方法
- Embedding 相似度
- 句子之间的语义变化
- 主题变化检测
- LLM 辅助判断
7、按特殊结构切分
企业文档里经常有表格、代码、FAQ、合同条款等,这些都不能随便切分。
表格最好整表保留,或者按行组切分。
FAQ 最好按一问一答切分。
合同、制度类文档适合按条款切分。
技术文档中的代码不要从函数中间切开,而是一个函数、一个类、一个配置文件片段作为一个 chunk。
8、父子切分
企业 RAG 中还有一种常见方式,叫做 Parent-Child Chunking。分为大小(父子)chunk,小 chunk 用来检索,大 chunk 用来回答。用户提问时,系统先用小 chunk 精准匹配问题,找到后再把它对应的父 chunk 交给大模型回答。这样做会让检索更精准、回答上下文更完整且不容易断章取义。
4、元数据绑定
每个 chunk 不应该只保存正文,还要保存 metadata,例如:
1 2 3 4 5 6 7 8 9 10
| { "content": "一线城市住宿费每日最高报销 600 元……", "doc_name": "员工报销制度.pdf", "page": 3, "section": "第二章 差旅报销", "chunk_id": "doc001_chunk003", "department": "HR", "permission": ["HR", "ADMIN"], "updated_at": "2026-06-01" }
|
这些 metadata 可以用于引用来源、权限过滤、按文档/部门检索、按更新时间过滤、排查回答来源。
动手做
我们以比亚迪 2025 年度报告(PDF 文件)为例,来处理文档。
初次尝试,我们先把 PDF 文件处理成纯文字,清洗后使用带重叠切分的方法切分文本,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import fitz import json import re from pathlib import Path
PDF_PATH = "data/byd2025.pdf" OUTPUT_PATH = "output/byd2025_chunks.jsonl"
def clean_text(text: str) -> str: text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) lines = [line.strip() for line in text.splitlines()] return "\n".join(lines).strip()
def split_text(text: str, chunk_size: int = 800, overlap: int = 100): chunks = [] start = 0
while start < len(text): end = start + chunk_size chunk = text[start:end]
if chunk.strip(): chunks.append(chunk.strip())
start = end - overlap
return chunks
def main(): Path("output").mkdir(exist_ok=True)
doc = fitz.open(PDF_PATH)
with open(OUTPUT_PATH, "w", encoding="utf-8") as f: for page_index, page in enumerate(doc, start=1): raw_text = page.get_text("text") text = clean_text(raw_text)
if not text: continue
chunks = split_text(text)
for chunk_index, chunk in enumerate(chunks, start=1): item = { "doc_name": Path(PDF_PATH).name, "page": page_index, "chunk_id": f"page_{page_index}_chunk_{chunk_index}", "content": chunk } f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"处理完成:{OUTPUT_PATH}")
if __name__ == "__main__": main()
|
该程序整体执行流程如下:
1. 导入库
2. 设置 PDF_PATH 和 OUTPUT_PATH
3. 定义 clean_text 函数
4. 定义 split_text 函数
5. 定义 main 函数
6. 判断当前文件是否直接运行
7. 调用 main()
8. 创建 output 文件夹
9. 打开 PDF
10. 遍历每一页
11. 提取当前页文本
12. 清洗文本
13. 如果为空就跳过
14. 把当前页文本切成多个 chunk
15. 给每个 chunk 绑定 doc_name、page、chunk_id、content
16. 写入 jsonl 文件
17. 打印处理完成
但是这样做,按字符数量硬切太过粗糙,效果很差。有没有更好的方案呢?
我决定先将 PDF 文件转成 markdown 文档,然后优先按照标题、段落、句号切分。完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| import json import re from pathlib import Path
import pymupdf4llm from langchain_text_splitters import RecursiveCharacterTextSplitter
PDF_PATH = "data/byd2025.pdf" OUTPUT_PATH = "output/annual_report_chunks.jsonl"
def clean_markdown(text: str) -> str: text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
lines = [line.strip() for line in text.splitlines()]
return "\n".join(lines).strip()
def main(): Path("output").mkdir(exist_ok=True)
pages = pymupdf4llm.to_markdown( PDF_PATH, page_chunks=True, header=False, footer=False, )
splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=150, separators=[ "\n# ", "\n## ", "\n### ", "\n\n", "\n", "。", ";", ",", " ", "", ], keep_separator=True, )
with open(OUTPUT_PATH, "w", encoding="utf-8") as f: for page in pages: page_text = clean_markdown(page["text"])
if not page_text: continue
metadata = page.get("metadata", {}) page_number = metadata.get("page_number")
chunks = splitter.split_text(page_text)
for chunk_index, chunk in enumerate(chunks, start=1): item = { "doc_name": Path(PDF_PATH).name, "page": page_number, "chunk_id": f"page_{page_number}_chunk_{chunk_index}", "content": chunk, "metadata": { "source": str(PDF_PATH), "page": page_number, } }
f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"处理完成:{OUTPUT_PATH}")
if __name__ == "__main__": main()
|
这一版本极大减少了页脚页眉的影响,并且每一个 chunk 的连续性和完整性都得到了提升。
针对该文件的文本切分已经完成,下下一步准备尝试将这些 chunk 存入向量库。