Spaces:
Running
Running
| """ | |
| 生成停用词和桑基图数据 | |
| """ | |
| import pandas as pd | |
| import jieba | |
| import re | |
| from collections import Counter | |
| import networkx as nx | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import warnings | |
| from matplotlib.font_manager import FontProperties | |
| import os | |
| warnings.filterwarnings('ignore') | |
| from collections import defaultdict | |
| from sklearn.feature_extraction.text import CountVectorizer | |
| # domain_vocab.py | |
| DOMAIN_VOCAB = [ | |
| # 中文通用领域词(去重 + 合并) | |
| "空间连接", "字段计算器", "建筑面积", "城市规划", "叠加分析", "空间连接功能", | |
| "数据表", "建筑层数", "地理处理", "相交功能", "现状地块", "相交叠加", | |
| "地块属性", "分地块", "容积率统计", "计算方法", "参数设置", "软件设置", | |
| "核密度分析", "热点分析", "带宽", "密度场", "焦点", "焦点统计", | |
| "地图代数", "条件分析", "差运算", "最大值", "交通", "像元大小", | |
| "参数", "凸包", "餐饮", "住宿", "搜索半径", "栅格计算器", "重分类", "Con函数", | |
| # 英文通用领域词(去重) | |
| "ArcGIS", "spatial join", "ArcMap", "Map algebra", "Kernel Density", | |
| "Con", "Getis - Ord Gi*", "NDVI", "Raster Calculator", "dwg", "catalog", | |
| "spatial join", "data manager", "POI", | |
| ] | |
| def generate_domain_stopwords(df, text_columns, domain_keywords): | |
| """结合领域关键词生成停用词表""" | |
| # 1. 基础停用词(通用功能词) | |
| common_stopwords = {"的", "了", "在", "是", "是否", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这", "那", "这个", "那个", "什么", "怎么", | |
| "哪里", "时候", "然后", "可能", "应该", "可以", "就是", "还是", "但是", "不过", "如果", "因为", "所以", "而且", "或者", "其实", "觉得", "认为", "希望", "能够", "需要", "知道", "表示", "这样", "那样", "这些", "那些", "有点", "一点", | |
| "一些","进一步", "具体", "问题", "疑惑", "讲解", "需求", "难点", "操作", "应用", "场景", "进行", "对于", "实际", "情况", "结合", "对于", | |
| "学生", "老师", "实验", "报告", "作业", "课程", "课堂", "学习", "理解", "掌握", "明白", "清楚", | |
| "建议", "希望", "请问", "想问", "不懂", "不会", "不知道", "不太会", "不太懂", "不太清楚", | |
| } | |
| # 2. 领域关键词(如果有) | |
| domain_words = set(domain_keywords) | |
| # if domain_keywords_file and os.path.exists(domain_keywords_file): | |
| # with open(domain_keywords_file, "r", encoding="utf-8") as f: | |
| # domain_words = set([line.strip() for line in f if line.strip()]) | |
| # 3. 合并文本并预处理 | |
| all_text = "" | |
| for col in text_columns: | |
| all_text += " ".join(df[col].fillna("").astype(str)) | |
| all_text = re.sub(r"[^\w\s]", "", all_text) | |
| all_text = re.sub(r"\d+", "", all_text) | |
| all_text = all_text.lower() # 统一英文为小写(新增) | |
| words = jieba.lcut(all_text) | |
| # 4. 统计词频 | |
| word_freq = Counter(words) | |
| word_freq = {word: freq for word, freq in word_freq.items() if len(word) > 1} | |
| # 5. 生成候选停用词 | |
| # - 通用功能词 | |
| # - 高频但非领域关键词的词汇(出现频率>50%且不在领域词表中) | |
| stopwords = common_stopwords.copy() | |
| for word, freq in word_freq.items(): | |
| if freq > len(df) * 0.5 and word not in domain_words: | |
| stopwords.add(word) | |
| # 6. 保存停用词表 | |
| # with open(output_path, "w", encoding="utf-8") as f: | |
| # for word in stopwords: | |
| # f.write(word + "\n") | |
| # print(f"领域定制化停用词表已生成,共{len(stopwords)}个词,保存至{output_path}") | |
| return stopwords | |
| def load_and_preprocess_data(df, stopwords, domain_words): | |
| """加载数据并进行预处理(优化专业词汇识别)""" | |
| stopwords = set(stopwords) | |
| # with open(stopwords_path, "r", encoding="utf-8") as f: | |
| # for line in f: | |
| # stopwords.add(line.strip()) | |
| question_types = { | |
| "s1": "难点", | |
| "s2": "讲解需求", | |
| "s3": "操作疑惑", | |
| "s4": "应用场景" | |
| } | |
| # 定义问题因果顺序(索引越小越靠前) | |
| question_hierarchy = ["s1", "s2", "s3", "s4"] | |
| # 创建专业词汇词典(提前加载所有领域词) | |
| professional_dict = {} | |
| # 添加域词词汇 | |
| for word in domain_words: | |
| jieba.add_word(word, freq=10000) # 高频率确保优先识别 | |
| professional_dict[word] = 1 | |
| # 创建专业词汇的正则表达式模式 | |
| # 按长度降序排序,确保长词优先匹配 | |
| sorted_domain = sorted(domain_words, key=len, reverse=True) | |
| pattern_str = "|".join(re.escape(word) for word in sorted_domain) | |
| professional_pattern = re.compile(f"({pattern_str})") | |
| def clean_text(text): | |
| if not isinstance(text, str): | |
| return [] | |
| # 第一步:基础清洗 | |
| text_cleaned = re.sub(r"[^\w\s]", "", text) | |
| text_cleaned = re.sub(r"\d+", "", text_cleaned) | |
| text_cleaned = text_cleaned.lower() | |
| # 第二步:专业词汇识别和标记 | |
| # 找到所有专业词汇的位置 | |
| matches = [] | |
| for match in professional_pattern.finditer(text_cleaned): | |
| start, end = match.span() | |
| matches.append((start, end, text_cleaned[start:end])) | |
| # 第三步:分割文本(保护专业词汇) | |
| segments = [] | |
| last_end = 0 | |
| # 按匹配位置分割文本 | |
| for start, end, word in matches: | |
| # 添加前面的普通文本 | |
| if start > last_end: | |
| segments.append(text_cleaned[last_end:start]) | |
| # 添加专业词汇(作为整体) | |
| segments.append(word) | |
| last_end = end | |
| # 添加最后一段文本 | |
| if last_end < len(text_cleaned): | |
| segments.append(text_cleaned[last_end:]) | |
| # 第四步:对非专业词汇部分进行分词 | |
| final_words = [] | |
| for segment in segments: | |
| if segment in professional_dict: | |
| # 专业词汇直接添加 | |
| final_words.append(segment) | |
| else: | |
| # 普通文本进行分词 | |
| words = jieba.lcut(segment) | |
| words = [w for w in words if w not in stopwords and len(w) > 1] | |
| final_words.extend(words) | |
| return final_words | |
| for col in ["s1", "s2", "s3", "s4"]: | |
| df[col + "_words"] = df[col].apply(clean_text) | |
| return df, question_types, question_hierarchy | |
| def build_sankey_data(df, question_columns, top_n=30): | |
| """ | |
| 构建桑基图用的数据(DataFrame: source, target, value) | |
| - 仅保留前 top_n 个全局高频关键词 | |
| """ | |
| question_labels = { | |
| "s1": "S1_难点", | |
| "s2": "S2_讲解需求", | |
| "s3": "S3_操作疑惑", | |
| "s4": "S4_应用场景" | |
| } | |
| # 1. 统计全局关键词频率 | |
| all_keywords = [] | |
| for col in question_columns: | |
| all_keywords.extend([kw for kws in df[col + "_words"] for kw in kws]) | |
| keyword_freq = Counter(all_keywords) | |
| core_keywords = set([kw for kw, _ in keyword_freq.most_common(top_n)]) | |
| # 2. 构建桑基图原始数据 | |
| link_counter = Counter() | |
| for _, row in df.iterrows(): | |
| for q in question_columns: | |
| q_label = question_labels[q] | |
| keywords = row[q + "_words"] | |
| for kw in keywords: | |
| if kw in core_keywords: | |
| link_counter[(q_label, kw)] += 1 | |
| # 3. 转为 DataFrame | |
| sankey_data = pd.DataFrame([ | |
| {"source": src, "target": tgt, "value": val} | |
| for (src, tgt), val in link_counter.items() | |
| ]) | |
| # sankey_data.to_csv("E:\\data\\20250621Edu\\sankey_d\\sankey_data.csv", index=False, encoding='utf-8-sig') | |
| # sankey_data.to_csv("E:\\data\\20250621Edu\\sankey_d\\sankey_data2.csv", index=False, encoding='utf-8-sig') | |
| print("桑基图保存成功!") | |
| return sankey_data | |
| def generate_sankey_df(file_path, text_columns, domain_words, top_n=30): | |
| df = pd.read_excel(file_path) | |
| stopwords = generate_domain_stopwords(df, text_columns, domain_words) | |
| df, question_types, question_hierarchy = load_and_preprocess_data(df, stopwords, domain_words) | |
| sankey_data = build_sankey_data(df, text_columns, top_n) | |
| return sankey_data | |
| if __name__ == '__main__': | |
| """主函数:执行完整的分析流程""" | |
| file_path = "E:\\data\\20250621Edu\\ex02.xlsx" | |
| stopwords_path = "E:\\data\\20250621Edu\\stop\\stop2.txt" | |
| text_columns = ["s1", "s2", "s3", "s4"] | |
| top_n = 30 # 设置频数排在前30的关键词 | |
| sankey_data = generate_sankey_df(file_path, text_columns, DOMAIN_VOCAB, top_n) | |