433 lines
15 KiB
Python
433 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Word文档词频统计分析工具
|
||
支持读取Word文档,进行中文分词和词频统计
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
from collections import Counter
|
||
from docx import Document
|
||
import jieba
|
||
import matplotlib.pyplot as plt
|
||
from wordcloud import WordCloud
|
||
import pandas as pd
|
||
from typing import List, Dict, Tuple
|
||
|
||
# 设置matplotlib支持中文字体
|
||
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
class WordFrequencyAnalyzer:
|
||
def __init__(self, input_dir: str = None, output_dir: str = None):
|
||
"""
|
||
初始化词频分析器
|
||
|
||
Args:
|
||
input_dir: Word文档输入目录,默认为当前目录下的input文件夹
|
||
output_dir: 结果输出目录,默认为当前目录下的output文件夹
|
||
"""
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
self.input_dir = input_dir or os.path.join(current_dir, 'input')
|
||
self.output_dir = output_dir or os.path.join(current_dir, 'output')
|
||
|
||
# 确保输入和输出目录存在
|
||
os.makedirs(self.input_dir, exist_ok=True)
|
||
os.makedirs(self.output_dir, exist_ok=True)
|
||
|
||
self.stop_words = self._load_stop_words()
|
||
|
||
def _load_stop_words(self) -> set:
|
||
"""加载停用词列表"""
|
||
# 常见中文停用词
|
||
stop_words = {
|
||
'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个',
|
||
'上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好',
|
||
'自己', '这', '那', '里', '就是', '还', '把', '比', '或者', '因为', '所以',
|
||
'但是', '如果', '这样', '那样', '可以', '能够', '应该', '必须', '已经',
|
||
'正在', '将要', '可能', '也许', '当然', '确实', '真的', '非常', '特别',
|
||
'尤其', '特殊', '一般', '通常', '经常', '总是', '从来', '永远', '马上',
|
||
'立即', '现在', '以前', '以后', '今天', '明天', '昨天', '这里', '那里',
|
||
'哪里', '什么', '怎么', '为什么', '多少', '几个', '一些', '许多', '大量',
|
||
'少量', '全部', '部分', '每个', '各种', '不同', '相同', '类似', '差不多',
|
||
'大概', '左右', '之间', '以上', '以下', '包括', '除了', '关于', '对于',
|
||
'根据', '按照', '通过', '由于', '为了', '虽然', '尽管', '然而', '不过',
|
||
'而且', '另外', '此外', '同时', '首先', '其次', '最后', '总之', '因此',
|
||
'所以', '于是', '然后', '接着', '随后', '后来', '最终', '结果', '这种',
|
||
'那种', '各种', '某种', '任何', '所有', '整个', '全部', '完全', '十分',
|
||
'相当', '比较', '更加', '最', '极其', '非常', '很', '太', '挺', '蛮',
|
||
'还是', '或者', '要么', '不是', '没', '别', '不要', '不能', '不会',
|
||
'不用', '无法', '无', '无论', '不管', '无论如何', '总之', '反正',
|
||
'?', '!', '。', ',', '、', ';', ':', '"', '"', ''', ''', '(', ')',
|
||
'【', '】', '《', '》', '〈', '〉', '「', '」', '『', '』', '〔', '〕',
|
||
'…', '—', '-', '·', '~', '#', '&', '*', '+', '=', '<', '>',
|
||
'%', '@', '¥', '£', '¢', '€', '$', '¥', '£', '¢', '₹', '₽',
|
||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
|
||
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
|
||
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
|
||
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
|
||
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'
|
||
}
|
||
return stop_words
|
||
|
||
def read_docx(self, file_path: str) -> str:
|
||
"""
|
||
读取Word文档内容
|
||
|
||
Args:
|
||
file_path: Word文档路径
|
||
|
||
Returns:
|
||
文档文本内容
|
||
"""
|
||
try:
|
||
doc = Document(file_path)
|
||
text = ""
|
||
for paragraph in doc.paragraphs:
|
||
text += paragraph.text + "\n"
|
||
return text
|
||
except Exception as e:
|
||
print(f"读取文档 {file_path} 时出错: {e}")
|
||
return ""
|
||
|
||
def clean_text(self, text: str) -> str:
|
||
"""
|
||
清理文本,去除特殊字符和多余空白
|
||
|
||
Args:
|
||
text: 原始文本
|
||
|
||
Returns:
|
||
清理后的文本
|
||
"""
|
||
# 去除特殊字符,保留中文、英文、数字
|
||
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s]', '', text)
|
||
# 去除多余空白
|
||
text = re.sub(r'\s+', ' ', text).strip()
|
||
return text
|
||
|
||
def segment_text(self, text: str) -> List[str]:
|
||
"""
|
||
中文分词
|
||
|
||
Args:
|
||
text: 待分词文本
|
||
|
||
Returns:
|
||
分词结果列表
|
||
"""
|
||
# 使用jieba进行分词
|
||
words = jieba.lcut(text)
|
||
|
||
# 过滤停用词和长度小于2的词
|
||
filtered_words = []
|
||
for word in words:
|
||
word = word.strip()
|
||
if (len(word) >= 2 and
|
||
word not in self.stop_words and
|
||
not word.isdigit() and
|
||
not word.isspace()):
|
||
filtered_words.append(word)
|
||
|
||
return filtered_words
|
||
|
||
def calculate_frequency(self, words: List[str]) -> Dict[str, int]:
|
||
"""
|
||
计算词频
|
||
|
||
Args:
|
||
words: 词语列表
|
||
|
||
Returns:
|
||
词频字典
|
||
"""
|
||
return dict(Counter(words))
|
||
|
||
def get_top_words(self, word_freq: Dict[str, int], top_n: int = 20) -> List[Tuple[str, int]]:
|
||
"""
|
||
获取高频词
|
||
|
||
Args:
|
||
word_freq: 词频字典
|
||
top_n: 返回前N个高频词
|
||
|
||
Returns:
|
||
高频词列表,格式为[(词, 频次), ...]
|
||
"""
|
||
return sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:top_n]
|
||
|
||
def analyze_single_document(self, file_path: str) -> Dict:
|
||
"""
|
||
分析单个文档
|
||
|
||
Args:
|
||
file_path: 文档路径
|
||
|
||
Returns:
|
||
分析结果字典
|
||
"""
|
||
print(f"正在分析文档: {os.path.basename(file_path)}")
|
||
|
||
# 读取文档
|
||
text = self.read_docx(file_path)
|
||
if not text:
|
||
return {}
|
||
|
||
# 清理文本
|
||
cleaned_text = self.clean_text(text)
|
||
|
||
# 分词
|
||
words = self.segment_text(cleaned_text)
|
||
|
||
# 计算词频
|
||
word_freq = self.calculate_frequency(words)
|
||
|
||
# 获取高频词
|
||
top_words = self.get_top_words(word_freq, 30)
|
||
|
||
return {
|
||
'file_name': os.path.basename(file_path),
|
||
'total_words': len(words),
|
||
'unique_words': len(word_freq),
|
||
'word_frequency': word_freq,
|
||
'top_words': top_words,
|
||
'text_length': len(text),
|
||
'cleaned_text_length': len(cleaned_text)
|
||
}
|
||
|
||
def analyze_all_documents(self) -> Dict:
|
||
"""
|
||
分析输入目录下所有Word文档
|
||
|
||
Returns:
|
||
所有文档的分析结果
|
||
"""
|
||
results = {}
|
||
docx_files = [f for f in os.listdir(self.input_dir) if f.endswith('.docx')]
|
||
|
||
if not docx_files:
|
||
print(f"在输入目录 {self.input_dir} 中未找到Word文档文件")
|
||
return results
|
||
|
||
print(f"在输入目录中找到 {len(docx_files)} 个Word文档")
|
||
|
||
for file_name in docx_files:
|
||
file_path = os.path.join(self.input_dir, file_name)
|
||
result = self.analyze_single_document(file_path)
|
||
if result:
|
||
results[file_name] = result
|
||
|
||
return results
|
||
|
||
def create_word_cloud(self, word_freq: Dict[str, int], title: str = "词云图",
|
||
save_path: str = None) -> None:
|
||
"""
|
||
生成词云图
|
||
|
||
Args:
|
||
word_freq: 词频字典
|
||
title: 图表标题
|
||
save_path: 保存路径
|
||
"""
|
||
if not word_freq:
|
||
print("词频数据为空,无法生成词云图")
|
||
return
|
||
|
||
# 尝试多个字体路径
|
||
font_paths = [
|
||
'/System/Library/Fonts/PingFang.ttc',
|
||
'/System/Library/Fonts/STHeiti Light.ttc',
|
||
'/System/Library/Fonts/Helvetica.ttc',
|
||
'/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf',
|
||
None # 使用默认字体
|
||
]
|
||
|
||
font_path = None
|
||
for path in font_paths:
|
||
if path is None or os.path.exists(path):
|
||
font_path = path
|
||
break
|
||
|
||
# 创建词云
|
||
wordcloud_params = {
|
||
'width': 800,
|
||
'height': 600,
|
||
'background_color': 'white',
|
||
'max_words': 100,
|
||
'colormap': 'viridis'
|
||
}
|
||
|
||
if font_path:
|
||
wordcloud_params['font_path'] = font_path
|
||
|
||
try:
|
||
wordcloud = WordCloud(**wordcloud_params).generate_from_frequencies(word_freq)
|
||
except Exception as e:
|
||
print(f"生成词云图时出错: {e}")
|
||
print("尝试使用默认字体...")
|
||
wordcloud_params.pop('font_path', None)
|
||
wordcloud = WordCloud(**wordcloud_params).generate_from_frequencies(word_freq)
|
||
|
||
# 绘制图像
|
||
plt.figure(figsize=(10, 8))
|
||
plt.imshow(wordcloud, interpolation='bilinear')
|
||
plt.axis('off')
|
||
plt.title(title, fontsize=16, pad=20)
|
||
|
||
if save_path:
|
||
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||
print(f"词云图已保存到: {save_path}")
|
||
else:
|
||
# 不显示图像,直接关闭
|
||
plt.close()
|
||
|
||
def create_frequency_chart(self, top_words: List[Tuple[str, int]],
|
||
title: str = "词频统计", save_path: str = None) -> None:
|
||
"""
|
||
生成词频柱状图
|
||
|
||
Args:
|
||
top_words: 高频词列表
|
||
title: 图表标题
|
||
save_path: 保存路径
|
||
"""
|
||
if not top_words:
|
||
print("高频词数据为空,无法生成图表")
|
||
return
|
||
|
||
words, frequencies = zip(*top_words)
|
||
|
||
plt.figure(figsize=(12, 8))
|
||
bars = plt.bar(range(len(words)), frequencies, color='skyblue', alpha=0.8)
|
||
|
||
# 设置x轴标签
|
||
plt.xticks(range(len(words)), words, rotation=45, ha='right')
|
||
plt.xlabel('词语')
|
||
plt.ylabel('频次')
|
||
plt.title(title, fontsize=16, pad=20)
|
||
|
||
# 在柱子上显示数值
|
||
for bar, freq in zip(bars, frequencies):
|
||
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
|
||
str(freq), ha='center', va='bottom')
|
||
|
||
plt.tight_layout()
|
||
|
||
if save_path:
|
||
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||
print(f"词频图表已保存到: {save_path}")
|
||
else:
|
||
# 不显示图像,直接关闭
|
||
plt.close()
|
||
|
||
def export_to_excel(self, results: Dict, save_path: str = None) -> None:
|
||
"""
|
||
导出分析结果到Excel
|
||
|
||
Args:
|
||
results: 分析结果
|
||
save_path: 保存路径
|
||
"""
|
||
if not results:
|
||
print("没有分析结果可导出")
|
||
return
|
||
|
||
if not save_path:
|
||
save_path = os.path.join(self.output_dir, "词频分析结果.xlsx")
|
||
|
||
with pd.ExcelWriter(save_path, engine='openpyxl') as writer:
|
||
# 创建汇总表
|
||
summary_data = []
|
||
for file_name, result in results.items():
|
||
summary_data.append({
|
||
'文档名称': result['file_name'],
|
||
'总词数': result['total_words'],
|
||
'不重复词数': result['unique_words'],
|
||
'文本长度': result['text_length'],
|
||
'清理后文本长度': result['cleaned_text_length']
|
||
})
|
||
|
||
summary_df = pd.DataFrame(summary_data)
|
||
summary_df.to_excel(writer, sheet_name='汇总统计', index=False)
|
||
|
||
# 为每个文档创建详细词频表
|
||
for file_name, result in results.items():
|
||
if result['top_words']:
|
||
df_data = []
|
||
for word, freq in result['top_words']:
|
||
df_data.append({'词语': word, '频次': freq})
|
||
|
||
df = pd.DataFrame(df_data)
|
||
sheet_name = result['file_name'][:20] # Excel工作表名称限制
|
||
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
||
|
||
print(f"分析结果已导出到: {save_path}")
|
||
|
||
def print_results(self, results: Dict) -> None:
|
||
"""
|
||
打印分析结果
|
||
|
||
Args:
|
||
results: 分析结果
|
||
"""
|
||
if not results:
|
||
print("没有分析结果")
|
||
return
|
||
|
||
print("\n" + "="*60)
|
||
print("Word文档词频分析结果")
|
||
print("="*60)
|
||
|
||
for file_name, result in results.items():
|
||
print(f"\n文档: {result['file_name']}")
|
||
print(f"总词数: {result['total_words']}")
|
||
print(f"不重复词数: {result['unique_words']}")
|
||
print(f"文本长度: {result['text_length']} 字符")
|
||
|
||
print(f"\n前20个高频词:")
|
||
for i, (word, freq) in enumerate(result['top_words'][:20], 1):
|
||
print(f"{i:2d}. {word:<10} {freq:>4} 次")
|
||
|
||
print("\n" + "="*60)
|
||
|
||
def main():
|
||
"""主函数"""
|
||
# 获取当前脚本所在目录
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
# 创建分析器
|
||
analyzer = WordFrequencyAnalyzer()
|
||
|
||
print("开始分析Word文档...")
|
||
|
||
# 分析所有文档
|
||
results = analyzer.analyze_all_documents()
|
||
|
||
if not results:
|
||
print("没有找到可分析的文档")
|
||
return
|
||
|
||
# 打印结果
|
||
analyzer.print_results(results)
|
||
|
||
# 导出Excel
|
||
analyzer.export_to_excel(results)
|
||
|
||
# 为每个文档生成可视化图表
|
||
for file_name, result in results.items():
|
||
if result['word_frequency'] and result['top_words']:
|
||
# 生成词云图
|
||
cloud_title = f"词云图 - {result['file_name']}"
|
||
cloud_path = os.path.join(analyzer.output_dir, f"词云图_{result['file_name']}.png")
|
||
analyzer.create_word_cloud(result['word_frequency'], cloud_title, cloud_path)
|
||
|
||
# 生成词频图表
|
||
chart_title = f"词频统计 - {result['file_name']}"
|
||
chart_path = os.path.join(analyzer.output_dir, f"词频图_{result['file_name']}.png")
|
||
analyzer.create_frequency_chart(result['top_words'][:15], chart_title, chart_path)
|
||
|
||
print("\n分析完成!")
|
||
|
||
if __name__ == "__main__":
|
||
main() |