-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpdf_preprocessor.py
More file actions
205 lines (162 loc) · 7.86 KB
/
pdf_preprocessor.py
File metadata and controls
205 lines (162 loc) · 7.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python3
"""
Large PDF Preprocessor
预处理超大PDF文件,将其分割为更小的可管理文件
"""
import os
import sys
from pathlib import Path
from llama_index.core import SimpleDirectoryReader, Document
from llama_index.core.node_parser import TokenTextSplitter
import math
def analyze_pdf_size(pdf_directory="./documents/"):
"""分析PDF文件大小和复杂度"""
print("📊 Analyzing PDF files...")
if not os.path.exists(pdf_directory):
print(f"❌ Directory {pdf_directory} does not exist")
return None
try:
documents = SimpleDirectoryReader(input_dir=pdf_directory).load_data()
if not documents:
print("❌ No documents found")
return None
analysis = {
'document_count': len(documents),
'total_chars': sum(len(doc.text) for doc in documents),
'documents': []
}
for i, doc in enumerate(documents):
doc_info = {
'index': i,
'char_count': len(doc.text),
'word_count': len(doc.text.split()),
'estimated_tokens': len(doc.text) // 4, # 粗略估算
'source': getattr(doc, 'metadata', {}).get('file_name', f'Document {i+1}')
}
analysis['documents'].append(doc_info)
# 打印分析结果
print(f"📋 Analysis Results:")
print(f" Total documents: {analysis['document_count']}")
print(f" Total characters: {analysis['total_chars']:,}")
print(f" Average per document: {analysis['total_chars'] // analysis['document_count']:,}")
for doc_info in analysis['documents']:
status = "🔥 VERY LARGE" if doc_info['char_count'] > 500000 else (
"⚠️ LARGE" if doc_info['char_count'] > 100000 else "✅ OK"
)
print(f" {doc_info['source']}: {doc_info['char_count']:,} chars {status}")
return analysis, documents
except Exception as e:
print(f"❌ Error analyzing PDFs: {e}")
return None
def split_large_documents(documents, max_chars_per_chunk=50000):
"""将大文档分割为更小的块"""
print(f"\n🔪 Splitting large documents (max {max_chars_per_chunk:,} chars per chunk)...")
processed_docs = []
for i, doc in enumerate(documents):
source_name = getattr(doc, 'metadata', {}).get('file_name', f'Document {i+1}')
if len(doc.text) <= max_chars_per_chunk:
print(f" ✅ {source_name}: Keeping as-is ({len(doc.text):,} chars)")
processed_docs.append(doc)
else:
# 计算需要分割的块数
num_chunks = math.ceil(len(doc.text) / max_chars_per_chunk)
print(f" 🔪 {source_name}: Splitting into {num_chunks} chunks")
# 分割文档
for chunk_idx in range(num_chunks):
start_pos = chunk_idx * max_chars_per_chunk
end_pos = min((chunk_idx + 1) * max_chars_per_chunk, len(doc.text))
chunk_text = doc.text[start_pos:end_pos]
# 创建新的文档块
chunk_doc = Document(
text=chunk_text,
metadata={
**getattr(doc, 'metadata', {}),
'original_source': source_name,
'chunk_index': chunk_idx,
'total_chunks': num_chunks
}
)
processed_docs.append(chunk_doc)
print(f" 📄 Chunk {chunk_idx + 1}/{num_chunks}: {len(chunk_text):,} chars")
print(f"\n📊 Splitting complete: {len(documents)} → {len(processed_docs)} documents")
return processed_docs
def estimate_processing_requirements(documents):
"""估算处理需求"""
total_chars = sum(len(doc.text) for doc in documents)
# 使用TokenTextSplitter估算节点数量
splitter = TokenTextSplitter(chunk_size=256, chunk_overlap=30)
estimated_nodes = 0
for doc in documents[:3]: # 只取前3个文档进行估算
nodes = splitter.get_nodes_from_documents([doc])
estimated_nodes += len(nodes)
# 按比例推算总节点数
if len(documents) > 3:
avg_nodes_per_doc = estimated_nodes / min(3, len(documents))
total_estimated_nodes = int(avg_nodes_per_doc * len(documents))
else:
total_estimated_nodes = estimated_nodes
print(f"\n🧮 Processing Estimates:")
print(f" Total characters: {total_chars:,}")
print(f" Estimated nodes: {total_estimated_nodes:,}")
print(f" Estimated processing time: {total_estimated_nodes // 100 + 5} minutes")
# 风险评估
if total_estimated_nodes > 2000:
print(f" 🔥 HIGH RISK: Very large dataset may cause segment errors")
print(f" 📝 Recommendation: Reduce PDF size or use batch processing")
elif total_estimated_nodes > 1000:
print(f" ⚠️ MEDIUM RISK: Large dataset, use optimized settings")
else:
print(f" ✅ LOW RISK: Dataset size is manageable")
return total_estimated_nodes
def save_processed_documents(documents, output_dir="./processed_documents/"):
"""保存处理后的文档到新目录"""
print(f"\n💾 Saving processed documents to {output_dir}...")
os.makedirs(output_dir, exist_ok=True)
for i, doc in enumerate(documents):
filename = f"processed_doc_{i+1:03d}.txt"
filepath = os.path.join(output_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(doc.text)
print(f" 💾 Saved: {filename} ({len(doc.text):,} chars)")
print(f"✅ Saved {len(documents)} processed documents")
return output_dir
def main():
"""主函数"""
print("🚀 Large PDF Preprocessor Starting...")
# 1. 分析原始PDF
result = analyze_pdf_size()
if not result:
return
analysis, documents = result
# 2. 估算处理需求
estimated_nodes = estimate_processing_requirements(documents)
# 3. 如果文档过大,提供分割选项
if analysis['total_chars'] > 500000:
print(f"\n🤔 Large documents detected. Options:")
print(f" 1. Proceed with current size (risk of segment errors)")
print(f" 2. Split large documents into smaller chunks")
print(f" 3. Cancel and manually reduce PDF size")
choice = input("Enter your choice (1/2/3): ").strip()
if choice == '2':
max_chunk_size = int(input("Max characters per chunk (default 50000): ") or "50000")
processed_docs = split_large_documents(documents, max_chunk_size)
# 保存处理后的文档
save_processed_documents(processed_docs)
print(f"\n✅ Documents preprocessed successfully!")
print(f"📝 Use the processed_documents/ directory for indexing")
elif choice == '3':
print("👋 Please reduce your PDF size and try again")
return
else:
print("🚀 Proceeding with original documents...")
print(f"\n🎯 Recommendations for rag_server.py:")
if estimated_nodes > 1000:
print(f" - Use chunk_size=256 and batch_size=200")
print(f" - Enable batch processing mode")
print(f" - Use FLAT index for stability")
print(f" - Increase wait times")
else:
print(f" - Standard settings should work fine")
print(f" - You can use default chunk_size=512")
if __name__ == "__main__":
main()