""" 生成停用词和桑基图数据 """ import pandas as pd import jieba import re from collections import Counter import networkx as nx import matplotlib.pyplot as plt import numpy as np import warnings from matplotlib.font_manager import FontProperties import os warnings.filterwarnings('ignore') from collections import defaultdict from sklearn.feature_extraction.text import CountVectorizer # domain_vocab.py DOMAIN_VOCAB = [ # 中文通用领域词(去重 + 合并) "空间连接", "字段计算器", "建筑面积", "城市规划", "叠加分析", "空间连接功能", "数据表", "建筑层数", "地理处理", "相交功能", "现状地块", "相交叠加", "地块属性", "分地块", "容积率统计", "计算方法", "参数设置", "软件设置", "核密度分析", "热点分析", "带宽", "密度场", "焦点", "焦点统计", "地图代数", "条件分析", "差运算", "最大值", "交通", "像元大小", "参数", "凸包", "餐饮", "住宿", "搜索半径", "栅格计算器", "重分类", "Con函数", # 英文通用领域词(去重) "ArcGIS", "spatial join", "ArcMap", "Map algebra", "Kernel Density", "Con", "Getis - Ord Gi*", "NDVI", "Raster Calculator", "dwg", "catalog", "spatial join", "data manager", "POI", ] def generate_domain_stopwords(df, text_columns, domain_keywords): """结合领域关键词生成停用词表""" # 1. 基础停用词(通用功能词) common_stopwords = {"的", "了", "在", "是", "是否", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这", "那", "这个", "那个", "什么", "怎么", "哪里", "时候", "然后", "可能", "应该", "可以", "就是", "还是", "但是", "不过", "如果", "因为", "所以", "而且", "或者", "其实", "觉得", "认为", "希望", "能够", "需要", "知道", "表示", "这样", "那样", "这些", "那些", "有点", "一点", "一些","进一步", "具体", "问题", "疑惑", "讲解", "需求", "难点", "操作", "应用", "场景", "进行", "对于", "实际", "情况", "结合", "对于", "学生", "老师", "实验", "报告", "作业", "课程", "课堂", "学习", "理解", "掌握", "明白", "清楚", "建议", "希望", "请问", "想问", "不懂", "不会", "不知道", "不太会", "不太懂", "不太清楚", } # 2. 领域关键词(如果有) domain_words = set(domain_keywords) # if domain_keywords_file and os.path.exists(domain_keywords_file): # with open(domain_keywords_file, "r", encoding="utf-8") as f: # domain_words = set([line.strip() for line in f if line.strip()]) # 3. 合并文本并预处理 all_text = "" for col in text_columns: all_text += " ".join(df[col].fillna("").astype(str)) all_text = re.sub(r"[^\w\s]", "", all_text) all_text = re.sub(r"\d+", "", all_text) all_text = all_text.lower() # 统一英文为小写(新增) words = jieba.lcut(all_text) # 4. 统计词频 word_freq = Counter(words) word_freq = {word: freq for word, freq in word_freq.items() if len(word) > 1} # 5. 生成候选停用词 # - 通用功能词 # - 高频但非领域关键词的词汇(出现频率>50%且不在领域词表中) stopwords = common_stopwords.copy() for word, freq in word_freq.items(): if freq > len(df) * 0.5 and word not in domain_words: stopwords.add(word) # 6. 保存停用词表 # with open(output_path, "w", encoding="utf-8") as f: # for word in stopwords: # f.write(word + "\n") # print(f"领域定制化停用词表已生成,共{len(stopwords)}个词,保存至{output_path}") return stopwords def load_and_preprocess_data(df, stopwords, domain_words): """加载数据并进行预处理(优化专业词汇识别)""" stopwords = set(stopwords) # with open(stopwords_path, "r", encoding="utf-8") as f: # for line in f: # stopwords.add(line.strip()) question_types = { "s1": "难点", "s2": "讲解需求", "s3": "操作疑惑", "s4": "应用场景" } # 定义问题因果顺序(索引越小越靠前) question_hierarchy = ["s1", "s2", "s3", "s4"] # 创建专业词汇词典(提前加载所有领域词) professional_dict = {} # 添加域词词汇 for word in domain_words: jieba.add_word(word, freq=10000) # 高频率确保优先识别 professional_dict[word] = 1 # 创建专业词汇的正则表达式模式 # 按长度降序排序,确保长词优先匹配 sorted_domain = sorted(domain_words, key=len, reverse=True) pattern_str = "|".join(re.escape(word) for word in sorted_domain) professional_pattern = re.compile(f"({pattern_str})") def clean_text(text): if not isinstance(text, str): return [] # 第一步:基础清洗 text_cleaned = re.sub(r"[^\w\s]", "", text) text_cleaned = re.sub(r"\d+", "", text_cleaned) text_cleaned = text_cleaned.lower() # 第二步:专业词汇识别和标记 # 找到所有专业词汇的位置 matches = [] for match in professional_pattern.finditer(text_cleaned): start, end = match.span() matches.append((start, end, text_cleaned[start:end])) # 第三步:分割文本(保护专业词汇) segments = [] last_end = 0 # 按匹配位置分割文本 for start, end, word in matches: # 添加前面的普通文本 if start > last_end: segments.append(text_cleaned[last_end:start]) # 添加专业词汇(作为整体) segments.append(word) last_end = end # 添加最后一段文本 if last_end < len(text_cleaned): segments.append(text_cleaned[last_end:]) # 第四步:对非专业词汇部分进行分词 final_words = [] for segment in segments: if segment in professional_dict: # 专业词汇直接添加 final_words.append(segment) else: # 普通文本进行分词 words = jieba.lcut(segment) words = [w for w in words if w not in stopwords and len(w) > 1] final_words.extend(words) return final_words for col in ["s1", "s2", "s3", "s4"]: df[col + "_words"] = df[col].apply(clean_text) return df, question_types, question_hierarchy def build_sankey_data(df, question_columns, top_n=30): """ 构建桑基图用的数据(DataFrame: source, target, value) - 仅保留前 top_n 个全局高频关键词 """ question_labels = { "s1": "S1_难点", "s2": "S2_讲解需求", "s3": "S3_操作疑惑", "s4": "S4_应用场景" } # 1. 统计全局关键词频率 all_keywords = [] for col in question_columns: all_keywords.extend([kw for kws in df[col + "_words"] for kw in kws]) keyword_freq = Counter(all_keywords) core_keywords = set([kw for kw, _ in keyword_freq.most_common(top_n)]) # 2. 构建桑基图原始数据 link_counter = Counter() for _, row in df.iterrows(): for q in question_columns: q_label = question_labels[q] keywords = row[q + "_words"] for kw in keywords: if kw in core_keywords: link_counter[(q_label, kw)] += 1 # 3. 转为 DataFrame sankey_data = pd.DataFrame([ {"source": src, "target": tgt, "value": val} for (src, tgt), val in link_counter.items() ]) # sankey_data.to_csv("E:\\data\\20250621Edu\\sankey_d\\sankey_data.csv", index=False, encoding='utf-8-sig') # sankey_data.to_csv("E:\\data\\20250621Edu\\sankey_d\\sankey_data2.csv", index=False, encoding='utf-8-sig') print("桑基图保存成功!") return sankey_data def generate_sankey_df(file_path, text_columns, domain_words, top_n=30): df = pd.read_excel(file_path) stopwords = generate_domain_stopwords(df, text_columns, domain_words) df, question_types, question_hierarchy = load_and_preprocess_data(df, stopwords, domain_words) sankey_data = build_sankey_data(df, text_columns, top_n) return sankey_data if __name__ == '__main__': """主函数:执行完整的分析流程""" file_path = "E:\\data\\20250621Edu\\ex02.xlsx" stopwords_path = "E:\\data\\20250621Edu\\stop\\stop2.txt" text_columns = ["s1", "s2", "s3", "s4"] top_n = 30 # 设置频数排在前30的关键词 sankey_data = generate_sankey_df(file_path, text_columns, DOMAIN_VOCAB, top_n)