初始化仓库

This commit is contained in:
高手 2025-10-11 13:36:48 +08:00
commit f8b39b149c
8 changed files with 1256 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
*.pdf
*.png
.DS_Store
extracted_images/
.vscode/
output/
input/
output/

40
pdf/extract_images.py Normal file
View File

@ -0,0 +1,40 @@
import fitz # PyMuPDF
import os
from PIL import Image
import io
def extract_images_from_pdf(pdf_path, output_folder):
# 打开PDF文件
document = fitz.open(pdf_path)
# 确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)
# 遍历每一页
for page_number in range(len(document)):
page = document.load_page(page_number)
# 获取页面中的图片
images = page.get_images(full=True)
for image_index, img in enumerate(images):
xref = img[0]
# 提取图片
base_image = document.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]
# 使用Pillow处理图片
image = Image.open(io.BytesIO(image_bytes))
# 构建图片文件名
image_filename = f"page_{page_number + 1}_img_{image_index + 1}.{image_ext}"
image_path = os.path.join(output_folder, image_filename)
# 保存图片
image.save(image_path)
print(f"保存图片: {image_path}")
# 关闭文档
document.close()
# 使用示例
pdf_path = "/Users/xiangyu/Documents/余氏宗谱(新洲区等支族)/第一册/5余氏彩页 P17-40.pdf" # 替换为你的PDF文件路径
output_folder = "extracted_images" # 替换为你想要保存图片的文件夹
extract_images_from_pdf(pdf_path, output_folder)

486
pdf/pdf_processor.py Normal file
View File

@ -0,0 +1,486 @@
import cv2
import numpy as np
import fitz # PyMuPDF
import tkinter as tk
from tkinter import ttk
from PIL import Image, ImageTk
import tkinter.messagebox as messagebox
import datetime
class PDFProcessor:
def __init__(self):
self.image = None
self.regions = []
def convert_pdf_to_image(self, pdf_path, page_num=0):
# 打开PDF文件
doc = fitz.open(pdf_path)
page = doc[page_num]
# 将PDF页面转换为图片
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2倍缩放以获得更好的质量
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
# 转换为OpenCV格式
self.image = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
return self.image
def detect_regions(self):
if self.image is None:
return []
# 转换为灰度图
gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
# 使用高斯模糊减少噪声
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# 自适应阈值处理
binary = cv2.adaptiveThreshold(
blurred,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
11,
2
)
# 膨胀操作
kernel = np.ones((3,3), np.uint8)
dilated = cv2.dilate(binary, kernel, iterations=1)
# 查找所有轮廓
contours, hierarchy = cv2.findContours(
dilated,
cv2.RETR_TREE,
cv2.CHAIN_APPROX_SIMPLE
)
# 收集所有可能的矩形区域
candidates = []
min_area = 1000
for cnt in contours:
peri = cv2.arcLength(cnt, True)
approx = cv2.approxPolyDP(cnt, 0.02 * peri, True)
x, y, w, h = cv2.boundingRect(cnt)
area = w * h
aspect_ratio = w / float(h) if h != 0 else 0
if (area > min_area and
w > 30 and h > 30 and
0.1 < aspect_ratio < 10):
if len(approx) >= 4:
candidates.append((x, y, w, h))
# 按宽度从大到小排序
candidates.sort(key=lambda x: x[2], reverse=True)
# 查找宽高一致的五个区域
target_regions = []
for i, (x1, y1, w1, h1) in enumerate(candidates):
similar_regions = [(x1, y1, w1, h1)]
# 在剩余区域中查找宽高相似的区域
for x2, y2, w2, h2 in candidates[i+1:]:
# 检查宽高是否相似允许2%的误差)
if (abs(w2 - w1) / w1 < 0.02 and
abs(h2 - h1) / h1 < 0.02):
similar_regions.append((x2, y2, w2, h2))
# 如果找到至少5个相似区域
if len(similar_regions) >= 5:
# 按y坐标排序从上到下
similar_regions.sort(key=lambda x: x[1])
target_regions = similar_regions[:5]
break
self.regions = target_regions
return self.regions
class PDFViewer:
def __init__(self, root):
self.root = root
self.root.title("PDF区域检测器")
# 设置窗口初始大小
self.root.geometry("1024x768")
# 创建主框架
self.main_frame = ttk.Frame(self.root)
self.main_frame.pack(fill=tk.BOTH, expand=True)
# 创建工具栏
self.toolbar = ttk.Frame(self.main_frame)
self.toolbar.pack(side=tk.TOP, fill=tk.X)
# 添加缩放控制
self.zoom_label = ttk.Label(self.toolbar, text="缩放: ")
self.zoom_label.pack(side=tk.LEFT, padx=5)
self.zoom_scale = ttk.Scale(
self.toolbar,
from_=10,
to=200,
orient=tk.HORIZONTAL,
length=200,
command=self._on_zoom_change
)
self.zoom_scale.set(30) # 设置初始缩放为30%
self.zoom_scale.pack(side=tk.LEFT, padx=5)
# 创建框架来容纳画布和滚动条
self.frame = ttk.Frame(self.main_frame)
self.frame.pack(fill=tk.BOTH, expand=True)
# 创建水平和垂直滚动条
self.v_scrollbar = ttk.Scrollbar(self.frame, orient="vertical")
self.h_scrollbar = ttk.Scrollbar(self.frame, orient="horizontal")
self.v_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.h_scrollbar.pack(side=tk.BOTTOM, fill=tk.X)
# 创建画布并配置滚动
self.canvas = tk.Canvas(
self.frame,
yscrollcommand=self.v_scrollbar.set,
xscrollcommand=self.h_scrollbar.set,
bg='gray90' # 添加背景色以便于区分
)
self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 配置滚动条
self.v_scrollbar.config(command=self.canvas.yview)
self.h_scrollbar.config(command=self.canvas.xview)
# 绑定鼠标事件
self.canvas.bind('<MouseWheel>', self._on_mousewheel_y)
self.canvas.bind('<Shift-MouseWheel>', self._on_mousewheel_x)
self.canvas.bind('<Button-4>', self._on_mousewheel_y)
self.canvas.bind('<Button-5>', self._on_mousewheel_y)
self.canvas.bind('<Shift-Button-4>', self._on_mousewheel_x)
self.canvas.bind('<Shift-Button-5>', self._on_mousewheel_x)
self.canvas.bind('<ButtonPress-1>', self._start_drag)
self.canvas.bind('<B1-Motion>', self._drag)
# 添加Ctrl+鼠标滚轮缩放
self.canvas.bind('<Control-MouseWheel>', self._on_zoom_wheel)
self.processor = PDFProcessor()
self.current_image = None
self.current_regions = None
self.zoom_factor = 0.3 # 改为30%的初始缩放
# 添加适应窗口大小的按钮
self.fit_button = ttk.Button(
self.toolbar,
text="适应窗口",
command=self._fit_to_window
)
self.fit_button.pack(side=tk.LEFT, padx=5)
# 绑定窗口大小改变事件
self.root.bind('<Configure>', self._on_window_resize)
# 添加页面控制
self.page_frame = ttk.Frame(self.toolbar)
self.page_frame.pack(side=tk.LEFT, padx=5)
self.page_label = ttk.Label(self.page_frame, text="页码:")
self.page_label.pack(side=tk.LEFT)
self.current_page = tk.StringVar(value="1")
self.total_pages = 1
self.page_entry = ttk.Entry(self.page_frame, textvariable=self.current_page, width=5)
self.page_entry.pack(side=tk.LEFT, padx=2)
self.total_pages_label = ttk.Label(self.page_frame, text="/1")
self.total_pages_label.pack(side=tk.LEFT)
self.prev_button = ttk.Button(self.page_frame, text="上一页", command=self._prev_page)
self.prev_button.pack(side=tk.LEFT, padx=2)
self.next_button = ttk.Button(self.page_frame, text="下一页", command=self._next_page)
self.next_button.pack(side=tk.LEFT, padx=2)
# 修改确认按钮文本
self.confirm_button = ttk.Button(
self.toolbar,
text="提取所有页面文字",
command=self._extract_text
)
self.confirm_button.pack(side=tk.LEFT, padx=5)
self.pdf_path = None
self.doc = None
# 加载默认PDF文件
default_pdf = "test.pdf"
try:
self.load_pdf(default_pdf)
except Exception as e:
messagebox.showerror("错误", f"无法加载默认PDF文件{str(e)}")
def load_pdf(self, pdf_path):
"""加载PDF并显示"""
self.pdf_path = pdf_path
self.doc = fitz.open(pdf_path)
self.total_pages = len(self.doc)
self.total_pages_label.configure(text=f"/{self.total_pages}")
self._load_current_page()
def _load_current_page(self):
"""加载当前页面"""
try:
page_num = int(self.current_page.get()) - 1
if 0 <= page_num < self.total_pages:
image = self.processor.convert_pdf_to_image(self.pdf_path, page_num)
regions = self.processor.detect_regions()
self.current_image = image
self.current_regions = regions
self._update_display()
except ValueError:
messagebox.showerror("错误", "请输入有效的页码")
def _prev_page(self):
"""显示上一页"""
try:
current = int(self.current_page.get())
if current > 1:
self.current_page.set(str(current - 1))
self._load_current_page()
except ValueError:
pass
def _next_page(self):
"""显示下一页"""
try:
current = int(self.current_page.get())
if current < self.total_pages:
self.current_page.set(str(current + 1))
self._load_current_page()
except ValueError:
pass
def _update_display(self):
"""更新显示"""
if self.current_image is None:
return
# 获取原始尺寸
height, width = self.current_image.shape[:2]
# 计算缩放后的尺寸
new_width = int(width * self.zoom_factor)
new_height = int(height * self.zoom_factor)
# 缩放图片
resized_image = cv2.resize(self.current_image, (new_width, new_height))
# 转换并显示图片
image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
image_pil = Image.fromarray(image)
self.photo = ImageTk.PhotoImage(image=image_pil)
# 清除画布
self.canvas.delete("all")
# 显示图片
self.canvas.create_image(0, 0, image=self.photo, anchor=tk.NW)
# 绘制区域
if self.current_regions:
for i, (x, y, w, h) in enumerate(self.current_regions):
# 缩放坐标和尺寸
scaled_x = int(x * self.zoom_factor)
scaled_y = int(y * self.zoom_factor)
scaled_w = int(w * self.zoom_factor)
scaled_h = int(h * self.zoom_factor)
rect_id = self.canvas.create_rectangle(
scaled_x, scaled_y,
scaled_x + scaled_w,
scaled_y + scaled_h,
outline="red",
width=max(1, int(2 * self.zoom_factor)),
tags=f"region_{i+1}"
)
self.canvas.create_text(
scaled_x + scaled_w//2,
scaled_y - 10 * self.zoom_factor,
text=f"目标区域 {i+1} ({w}x{h})",
fill="red",
tags=f"region_{i+1}"
)
# 绑定鼠标事件
self.canvas.tag_bind(
f"region_{i+1}",
'<Enter>',
lambda e, rid=rect_id: self._highlight_region(rid)
)
self.canvas.tag_bind(
f"region_{i+1}",
'<Leave>',
lambda e, rid=rect_id: self._unhighlight_region(rid)
)
# 更新画布滚动区域
self.canvas.config(scrollregion=self.canvas.bbox(tk.ALL))
def _on_zoom_wheel(self, event):
"""处理Ctrl+滚轮缩放"""
if event.delta > 0:
self.zoom_scale.set(min(200, self.zoom_scale.get() + 10))
else:
self.zoom_scale.set(max(10, self.zoom_scale.get() - 10))
def _on_zoom_change(self, value):
"""处理缩放变化"""
self.zoom_factor = float(value) / 100
self._update_display()
def _on_mousewheel_y(self, event):
"""处理垂直方向的鼠标滚轮事件"""
if event.num == 4 or event.delta > 0:
self.canvas.yview_scroll(-1, "units")
elif event.num == 5 or event.delta < 0:
self.canvas.yview_scroll(1, "units")
def _on_mousewheel_x(self, event):
"""处理水平方向的鼠标滚轮事件"""
if event.num == 4 or event.delta > 0:
self.canvas.xview_scroll(-1, "units")
elif event.num == 5 or event.delta < 0:
self.canvas.xview_scroll(1, "units")
def _start_drag(self, event):
"""开始拖动"""
self.canvas.scan_mark(event.x, event.y)
def _drag(self, event):
"""拖动画布"""
self.canvas.scan_dragto(event.x, event.y, gain=1)
def _highlight_region(self, region_id):
"""高亮显示区域"""
self.canvas.itemconfig(region_id, width=max(2, int(3 * self.zoom_factor)), outline="yellow")
def _unhighlight_region(self, region_id):
"""取消高亮显示"""
self.canvas.itemconfig(region_id, width=max(1, int(2 * self.zoom_factor)), outline="red")
def _fit_to_window(self):
"""调整缩放以适应窗口大小"""
if self.current_image is None:
return
# 获取窗口和图像尺寸
window_width = self.canvas.winfo_width()
window_height = self.canvas.winfo_height()
image_height, image_width = self.current_image.shape[:2]
# 计算合适的缩放比例
width_ratio = window_width / image_width
height_ratio = window_height / image_height
# 选择较小的比例以确保完全显示
new_zoom = min(width_ratio, height_ratio) * 0.9 # 留出一些边距
# 更新缩放
self.zoom_scale.set(new_zoom * 100)
def _on_window_resize(self, event):
"""窗口大小改变时的处理"""
# 仅当事件来自主窗口时才处理
if event.widget == self.root:
# 更新画布滚动区域
self.canvas.config(scrollregion=self.canvas.bbox(tk.ALL))
def _extract_text(self):
"""提取所有页面的文字并分别保存到五个文件"""
if not self.doc:
messagebox.showwarning("警告", "请先加载PDF文件")
return
try:
# 创建output目录如果不存在
import os
output_dir = "output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 创建五个输出文件(使用追加模式)
base_name = os.path.basename(self.pdf_path).rsplit('.', 1)[0]
output_files = [open(os.path.join(output_dir, f"{base_name}_region{i+1}.txt"), 'a', encoding='utf-8') for i in range(5)]
# 写入分隔符(带时间戳)
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for i, f in enumerate(output_files):
f.write(f"\n\n=== 新的提取任务 {timestamp} ===\n\n")
# 处理每一页
for page_num in range(self.total_pages):
# 更新当前页面以获取区域
self.current_page.set(str(page_num + 1))
self._load_current_page()
if not self.current_regions or len(self.current_regions) < 5:
messagebox.showwarning("警告", f"{page_num+1} 页未检测到足够区域,跳过该页")
continue
page = self.doc[page_num]
# 按y坐标排序处理每个区域
sorted_regions = sorted(self.current_regions, key=lambda x: x[1])
for region_idx, region in enumerate(sorted_regions[:5]): # 只取前五个区域
x, y, w, h = region
# 将OpenCV的坐标转换为PDF坐标除以2因为之前放大了2倍
pdf_x = x / 2
pdf_y = y / 2
pdf_w = w / 2
pdf_h = h / 2
rect = fitz.Rect(pdf_x, pdf_y, pdf_x + pdf_w, pdf_y + pdf_h)
text = page.get_text(clip=rect, sort=True)
if text.strip():
# 将多行文本合并为一行,用空格分隔
single_line_text = ' '.join(text.split())
output_files[region_idx].write(
f"=== 第 {page_num + 1} 页 ===\n"
f"区域坐标: ({x}, {y}) 尺寸: {w}x{h}\n"
f"{single_line_text}\n"
"-------------------\n\n"
)
# 关闭所有文件
for f in output_files:
f.close()
messagebox.showinfo("成功", f"文字已分别保存到output文件夹中的\n"
f"{base_name}_region1.txt\n"
f"{base_name}_region2.txt\n"
f"{base_name}_region3.txt\n"
f"{base_name}_region4.txt\n"
f"{base_name}_region5.txt")
except Exception as e:
messagebox.showerror("错误", f"提取文字时发生错误:{str(e)}")
finally:
# 确保文件被关闭
for f in output_files:
if not f.closed:
f.close()
def main():
root = tk.Tk()
app = PDFViewer(root)
root.mainloop()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,39 @@
import fitz # PyMuPDF
def extract_vertical_text(pdf_path):
"""
提取竖排文本从右至左阅读顺序并包含页码信息
"""
doc = fitz.open(pdf_path)
full_text = []
for page in doc:
# 添加页码标识
full_text.append(f"=== 第{page.number + 1}页 ===")
# 旋转页面以适应竖排文本阅读方向
page.set_rotation(270)
# 获取并排序文本块
blocks = page.get_text("blocks", flags=fitz.TEXT_PRESERVE_LIGATURES)
blocks.sort(key=lambda b: (-b[2], b[1]))
# 收集当前页文本
page_text = []
for b in blocks:
text = b[4].strip()
if text:
page_text.append(text)
full_text.append('\n'.join(page_text))
return '\n\n'.join(full_text)
# 修改后的使用示例
if __name__ == "__main__":
import sys
text = extract_vertical_text('origin_second.pdf')
# 写入文件
with open('output/output.txt', 'w', encoding='utf-8') as f:
f.write(text)
print("文本已保存到 output.txt")

145
sync_tags.py Normal file
View File

@ -0,0 +1,145 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 将macOS 标签同步到 NFO 文件
import sys
import subprocess
import os
import xml.etree.ElementTree as ET
import ast
import xattr
import plistlib
# --- CONFIGURATION ---
# Add any tags you want to exclude from syncing here.
# For example: EXCLUDED_TAGS = {'TagToExclude1', 'TagToExclude2'}
EXCLUDED_TAGS = {'绿色'}
# Video file extensions to look for.
VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv'}
# --- END CONFIGURATION ---
def get_finder_tags(filepath):
tags = get_macos_tags(filepath)
return parse_tags(tags)
def get_macos_tags(file_path):
try:
# 获取扩展属性内容(字节串)
attrs = xattr.xattr(file_path)
tag_data = attrs.get('com.apple.metadata:_kMDItemUserTags')
# 解码 Binary plist
plist = plistlib.loads(tag_data)
# plist 是标签的字符串列表
return plist
except Exception as e:
# print(f"读取标签失败: {e}")
return []
def parse_tags(tags):
parsed = []
for tag in tags:
if '\n' in tag:
name, color = tag.split('\n')
# parsed.append((name, int(color)))
parsed.append(name)
else:
parsed.append(tag)
return parsed
def sync_tags_to_nfo(video_path, excluded_tags):
"""
Syncs Finder tags to a .nfo file, creating or overwriting it.
All existing tags in the .nfo file will be replaced.
"""
# 1. Get Finder tags and filter out the excluded ones.
finder_tags = set(get_finder_tags(video_path))
tags_to_sync = finder_tags - excluded_tags
print(f"Found Finder tags: {list(finder_tags) or 'None'}")
if not tags_to_sync:
print("No tags to sync after exclusion. Skipping NFO update.")
return
print(f"Tags to be written to NFO: {list(tags_to_sync)}")
# 2. Determine the .nfo file path.
base_name = os.path.splitext(video_path)[0]
nfo_path = base_name + '.nfo'
root = None
tree = None
# 3. Read existing NFO or create a new XML root.
if os.path.exists(nfo_path):
try:
tree = ET.parse(nfo_path)
root = tree.getroot()
# Remove all existing 'tag' elements to ensure a clean sync.
for tag_elem in root.findall('tag'):
root.remove(tag_elem)
except ET.ParseError:
print(f"Warning: Could not parse '{os.path.basename(nfo_path)}'. A new file will be created.")
root = ET.Element('movie')
else:
print(f"NFO file '{os.path.basename(nfo_path)}' not found. Creating a new one.")
root = ET.Element('movie')
# 4. Add the filtered Finder tags to the XML structure.
for tag_text in sorted(list(tags_to_sync)): # Sort for consistent order
new_tag_element = ET.Element('tag')
new_tag_element.text = tag_text
root.append(new_tag_element)
# 5. Write the updated XML structure back to the .nfo file.
try:
if tree is None:
tree = ET.ElementTree(root)
# Indent the XML for better readability (requires Python 3.9+)
try:
ET.indent(tree, space=" ", level=0)
except AttributeError:
# ET.indent is not available in Python < 3.9, file will not be pretty-printed.
pass
tree.write(nfo_path, encoding='utf-8', xml_declaration=True)
print(f"Successfully synced tags to '{os.path.basename(nfo_path)}'")
except Exception as e:
print(f"Error: Failed to write to NFO file '{nfo_path}': {e}")
def main():
"""
Main function to scan a directory for videos and sync their tags.
"""
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <directory_path>")
sys.exit(1)
target_directory = sys.argv[1]
if not os.path.isdir(target_directory):
print(f"Error: Provided path '{target_directory}' is not a valid directory.")
sys.exit(1)
print(f"Starting scan in directory: '{target_directory}'")
print(f"Excluded tags: {list(EXCLUDED_TAGS) or 'None'}")
print("-" * 40)
for dirpath, _, filenames in os.walk(target_directory):
for filename in filenames:
file_ext = os.path.splitext(filename)[1].lower()
if file_ext in VIDEO_EXTENSIONS:
video_file_path = os.path.join(dirpath, filename)
print(f"\nProcessing: '{os.path.basename(video_file_path)}'")
sync_tags_to_nfo(video_file_path, EXCLUDED_TAGS)
print("\n" + "-" * 40)
print("Script finished.")
if __name__ == "__main__":
main()

99
word/README.md Normal file
View File

@ -0,0 +1,99 @@
# Word文档词频统计分析工具
这是一个用于分析Word文档词频的Python工具支持中文分词、词频统计、可视化展示和结果导出。
## 功能特性
- 📖 读取Word文档.docx格式
- 🔤 中文文本分词基于jieba
- 📊 词频统计分析
- 📈 生成词云图和词频柱状图
- 📋 导出Excel分析报告
- 🚫 智能过滤停用词
## 安装依赖
在运行脚本之前请先安装所需的Python包
```bash
pip install -r requirements.txt
```
## 使用方法
### 1. 基本使用
将Word文档放在脚本同一目录下然后运行
```bash
python word_frequency_analyzer.py
```
### 2. 自定义目录
```python
from word_frequency_analyzer import WordFrequencyAnalyzer
# 指定Word文档所在目录
analyzer = WordFrequencyAnalyzer('/path/to/your/word/documents')
results = analyzer.analyze_all_documents()
analyzer.print_results(results)
```
### 3. 单个文档分析
```python
analyzer = WordFrequencyAnalyzer()
result = analyzer.analyze_single_document('your_document.docx')
```
## 输出文件
脚本运行后会生成以下文件:
- `词频分析结果.xlsx` - Excel格式的详细分析报告
- `词云图_[文档名].png` - 每个文档的词云图
- `词频图_[文档名].png` - 每个文档的词频柱状图
## 分析结果包含
- 总词数统计
- 不重复词数统计
- 高频词排行榜前20名
- 词云可视化
- 词频柱状图
- Excel详细报告
## 支持的文档格式
- Microsoft Word (.docx)
## 注意事项
1. 确保Word文档为.docx格式不支持.doc格式
2. 脚本会自动过滤常见停用词和标点符号
3. 词云图需要系统支持中文字体
4. 建议在Python 3.7+环境下运行
## 自定义配置
### 修改停用词
可以在`WordFrequencyAnalyzer`类的`_load_stop_words`方法中添加或删除停用词。
### 调整分析参数
- 修改`get_top_words`方法的`top_n`参数来改变高频词数量
- 调整词云图和柱状图的样式参数
## 故障排除
如果遇到字体相关错误,请确保系统安装了中文字体,或修改脚本中的字体路径。
macOS用户可以使用
- `/System/Library/Fonts/PingFang.ttc`
- `/System/Library/Fonts/STHeiti Light.ttc`
Windows用户可以使用
- `C:/Windows/Fonts/simhei.ttf`
- `C:/Windows/Fonts/msyh.ttc`

6
word/requirements.txt Normal file
View File

@ -0,0 +1,6 @@
python-docx==0.8.11
jieba==0.42.1
matplotlib==3.7.2
wordcloud==1.9.2
pandas==2.0.3
openpyxl==3.1.2

View File

@ -0,0 +1,433 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Word文档词频统计分析工具
支持读取Word文档进行中文分词和词频统计
"""
import os
import re
from collections import Counter
from docx import Document
import jieba
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import pandas as pd
from typing import List, Dict, Tuple
# 设置matplotlib支持中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
class WordFrequencyAnalyzer:
def __init__(self, input_dir: str = None, output_dir: str = None):
"""
初始化词频分析器
Args:
input_dir: Word文档输入目录默认为当前目录下的input文件夹
output_dir: 结果输出目录默认为当前目录下的output文件夹
"""
current_dir = os.path.dirname(os.path.abspath(__file__))
self.input_dir = input_dir or os.path.join(current_dir, 'input')
self.output_dir = output_dir or os.path.join(current_dir, 'output')
# 确保输入和输出目录存在
os.makedirs(self.input_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
self.stop_words = self._load_stop_words()
def _load_stop_words(self) -> set:
"""加载停用词列表"""
# 常见中文停用词
stop_words = {
'', '', '', '', '', '', '', '', '', '', '', '', '一个',
'', '', '', '', '', '', '', '', '', '', '没有', '', '',
'自己', '', '', '', '就是', '', '', '', '或者', '因为', '所以',
'但是', '如果', '这样', '那样', '可以', '能够', '应该', '必须', '已经',
'正在', '将要', '可能', '也许', '当然', '确实', '真的', '非常', '特别',
'尤其', '特殊', '一般', '通常', '经常', '总是', '从来', '永远', '马上',
'立即', '现在', '以前', '以后', '今天', '明天', '昨天', '这里', '那里',
'哪里', '什么', '怎么', '为什么', '多少', '几个', '一些', '许多', '大量',
'少量', '全部', '部分', '每个', '各种', '不同', '相同', '类似', '差不多',
'大概', '左右', '之间', '以上', '以下', '包括', '除了', '关于', '对于',
'根据', '按照', '通过', '由于', '为了', '虽然', '尽管', '然而', '不过',
'而且', '另外', '此外', '同时', '首先', '其次', '最后', '总之', '因此',
'所以', '于是', '然后', '接着', '随后', '后来', '最终', '结果', '这种',
'那种', '各种', '某种', '任何', '所有', '整个', '全部', '完全', '十分',
'相当', '比较', '更加', '', '极其', '非常', '', '', '', '',
'还是', '或者', '要么', '不是', '', '', '不要', '不能', '不会',
'不用', '无法', '', '无论', '不管', '无论如何', '总之', '反正',
'', '', '', '', '', '', '', '"', '"', ''', ''', '', '',
'', '', '', '', '', '', '', '', '', '', '', '',
'', '', '', '·', '', '', '', '', '', '', '', '',
'', '', '', '', '', '', '$', '¥', '£', '¢', '', '',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'
}
return stop_words
def read_docx(self, file_path: str) -> str:
"""
读取Word文档内容
Args:
file_path: Word文档路径
Returns:
文档文本内容
"""
try:
doc = Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
except Exception as e:
print(f"读取文档 {file_path} 时出错: {e}")
return ""
def clean_text(self, text: str) -> str:
"""
清理文本去除特殊字符和多余空白
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 去除特殊字符,保留中文、英文、数字
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s]', '', text)
# 去除多余空白
text = re.sub(r'\s+', ' ', text).strip()
return text
def segment_text(self, text: str) -> List[str]:
"""
中文分词
Args:
text: 待分词文本
Returns:
分词结果列表
"""
# 使用jieba进行分词
words = jieba.lcut(text)
# 过滤停用词和长度小于2的词
filtered_words = []
for word in words:
word = word.strip()
if (len(word) >= 2 and
word not in self.stop_words and
not word.isdigit() and
not word.isspace()):
filtered_words.append(word)
return filtered_words
def calculate_frequency(self, words: List[str]) -> Dict[str, int]:
"""
计算词频
Args:
words: 词语列表
Returns:
词频字典
"""
return dict(Counter(words))
def get_top_words(self, word_freq: Dict[str, int], top_n: int = 20) -> List[Tuple[str, int]]:
"""
获取高频词
Args:
word_freq: 词频字典
top_n: 返回前N个高频词
Returns:
高频词列表格式为[(, 频次), ...]
"""
return sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:top_n]
def analyze_single_document(self, file_path: str) -> Dict:
"""
分析单个文档
Args:
file_path: 文档路径
Returns:
分析结果字典
"""
print(f"正在分析文档: {os.path.basename(file_path)}")
# 读取文档
text = self.read_docx(file_path)
if not text:
return {}
# 清理文本
cleaned_text = self.clean_text(text)
# 分词
words = self.segment_text(cleaned_text)
# 计算词频
word_freq = self.calculate_frequency(words)
# 获取高频词
top_words = self.get_top_words(word_freq, 30)
return {
'file_name': os.path.basename(file_path),
'total_words': len(words),
'unique_words': len(word_freq),
'word_frequency': word_freq,
'top_words': top_words,
'text_length': len(text),
'cleaned_text_length': len(cleaned_text)
}
def analyze_all_documents(self) -> Dict:
"""
分析输入目录下所有Word文档
Returns:
所有文档的分析结果
"""
results = {}
docx_files = [f for f in os.listdir(self.input_dir) if f.endswith('.docx')]
if not docx_files:
print(f"在输入目录 {self.input_dir} 中未找到Word文档文件")
return results
print(f"在输入目录中找到 {len(docx_files)} 个Word文档")
for file_name in docx_files:
file_path = os.path.join(self.input_dir, file_name)
result = self.analyze_single_document(file_path)
if result:
results[file_name] = result
return results
def create_word_cloud(self, word_freq: Dict[str, int], title: str = "词云图",
save_path: str = None) -> None:
"""
生成词云图
Args:
word_freq: 词频字典
title: 图表标题
save_path: 保存路径
"""
if not word_freq:
print("词频数据为空,无法生成词云图")
return
# 尝试多个字体路径
font_paths = [
'/System/Library/Fonts/PingFang.ttc',
'/System/Library/Fonts/STHeiti Light.ttc',
'/System/Library/Fonts/Helvetica.ttc',
'/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf',
None # 使用默认字体
]
font_path = None
for path in font_paths:
if path is None or os.path.exists(path):
font_path = path
break
# 创建词云
wordcloud_params = {
'width': 800,
'height': 600,
'background_color': 'white',
'max_words': 100,
'colormap': 'viridis'
}
if font_path:
wordcloud_params['font_path'] = font_path
try:
wordcloud = WordCloud(**wordcloud_params).generate_from_frequencies(word_freq)
except Exception as e:
print(f"生成词云图时出错: {e}")
print("尝试使用默认字体...")
wordcloud_params.pop('font_path', None)
wordcloud = WordCloud(**wordcloud_params).generate_from_frequencies(word_freq)
# 绘制图像
plt.figure(figsize=(10, 8))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title(title, fontsize=16, pad=20)
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"词云图已保存到: {save_path}")
else:
# 不显示图像,直接关闭
plt.close()
def create_frequency_chart(self, top_words: List[Tuple[str, int]],
title: str = "词频统计", save_path: str = None) -> None:
"""
生成词频柱状图
Args:
top_words: 高频词列表
title: 图表标题
save_path: 保存路径
"""
if not top_words:
print("高频词数据为空,无法生成图表")
return
words, frequencies = zip(*top_words)
plt.figure(figsize=(12, 8))
bars = plt.bar(range(len(words)), frequencies, color='skyblue', alpha=0.8)
# 设置x轴标签
plt.xticks(range(len(words)), words, rotation=45, ha='right')
plt.xlabel('词语')
plt.ylabel('频次')
plt.title(title, fontsize=16, pad=20)
# 在柱子上显示数值
for bar, freq in zip(bars, frequencies):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
str(freq), ha='center', va='bottom')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"词频图表已保存到: {save_path}")
else:
# 不显示图像,直接关闭
plt.close()
def export_to_excel(self, results: Dict, save_path: str = None) -> None:
"""
导出分析结果到Excel
Args:
results: 分析结果
save_path: 保存路径
"""
if not results:
print("没有分析结果可导出")
return
if not save_path:
save_path = os.path.join(self.output_dir, "词频分析结果.xlsx")
with pd.ExcelWriter(save_path, engine='openpyxl') as writer:
# 创建汇总表
summary_data = []
for file_name, result in results.items():
summary_data.append({
'文档名称': result['file_name'],
'总词数': result['total_words'],
'不重复词数': result['unique_words'],
'文本长度': result['text_length'],
'清理后文本长度': result['cleaned_text_length']
})
summary_df = pd.DataFrame(summary_data)
summary_df.to_excel(writer, sheet_name='汇总统计', index=False)
# 为每个文档创建详细词频表
for file_name, result in results.items():
if result['top_words']:
df_data = []
for word, freq in result['top_words']:
df_data.append({'词语': word, '频次': freq})
df = pd.DataFrame(df_data)
sheet_name = result['file_name'][:20] # Excel工作表名称限制
df.to_excel(writer, sheet_name=sheet_name, index=False)
print(f"分析结果已导出到: {save_path}")
def print_results(self, results: Dict) -> None:
"""
打印分析结果
Args:
results: 分析结果
"""
if not results:
print("没有分析结果")
return
print("\n" + "="*60)
print("Word文档词频分析结果")
print("="*60)
for file_name, result in results.items():
print(f"\n文档: {result['file_name']}")
print(f"总词数: {result['total_words']}")
print(f"不重复词数: {result['unique_words']}")
print(f"文本长度: {result['text_length']} 字符")
print(f"\n前20个高频词:")
for i, (word, freq) in enumerate(result['top_words'][:20], 1):
print(f"{i:2d}. {word:<10} {freq:>4}")
print("\n" + "="*60)
def main():
"""主函数"""
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 创建分析器
analyzer = WordFrequencyAnalyzer()
print("开始分析Word文档...")
# 分析所有文档
results = analyzer.analyze_all_documents()
if not results:
print("没有找到可分析的文档")
return
# 打印结果
analyzer.print_results(results)
# 导出Excel
analyzer.export_to_excel(results)
# 为每个文档生成可视化图表
for file_name, result in results.items():
if result['word_frequency'] and result['top_words']:
# 生成词云图
cloud_title = f"词云图 - {result['file_name']}"
cloud_path = os.path.join(analyzer.output_dir, f"词云图_{result['file_name']}.png")
analyzer.create_word_cloud(result['word_frequency'], cloud_title, cloud_path)
# 生成词频图表
chart_title = f"词频统计 - {result['file_name']}"
chart_path = os.path.join(analyzer.output_dir, f"词频图_{result['file_name']}.png")
analyzer.create_frequency_chart(result['top_words'][:15], chart_title, chart_path)
print("\n分析完成!")
if __name__ == "__main__":
main()