Spaces:
Running
Running
| ''' | |
| 基于向量聚类方式的可视化模块 | |
| ''' | |
| # cluster_insight.py | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.cluster import KMeans | |
| from sklearn.metrics import silhouette_score | |
| from sklearn.manifold import TSNE | |
| import plotly.graph_objects as go | |
| import matplotlib.cm as cm | |
| import matplotlib.colors as mcolors | |
| # from wordcloud import WordCloud | |
| import matplotlib.pyplot as plt | |
| import base64 | |
| from io import BytesIO | |
| from sentence_transformers import SentenceTransformer, util | |
| import os | |
| import pickle | |
| # cluster_insight.py → 新增函数 | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import jieba | |
| # 第一步:在文件开头导入 plotly.io | |
| import plotly.io as pio | |
| # 第二步:设置默认引擎为 kaleido(旧版本 Plotly 也支持) | |
| pio.kaleido.scope.default_format = "png" | |
| print("✅ 已设置 kaleido 为默认图片引擎") | |
| # ========== 核心配置(和之前一致) ========== | |
| FONT_FILE_PATH = "./SourceHanSansCN-Light.otf" # 字体文件在根目录 | |
| # CHINESE_FONT = "Source Han Sans CN Light" | |
| CHINESE_FONT = "SimHei, Microsoft YaHei, Arial Unicode MS, sans-serif" | |
| # ========== 关键:设置环境变量,让 Plotly/Kaleido 找到字体 ========== | |
| os.environ["KALEIDO_FONT_SEARCH_PATH"] = os.getcwd() # 字体搜索路径 = 当前目录 | |
| print(f"🔧 字体搜索路径:{os.getcwd()}") | |
| print(f"🔧 字体文件是否存在:{os.path.exists(FONT_FILE_PATH)}") | |
| # CHINESE_FONT = "Noto Sans SC" # 思源黑体(跨平台兼容,Plotly 自带) | |
| def extract_cluster_keywords_auto(sentences, labels, cluster_id, top_n=3): | |
| """ | |
| 自动提取聚类关键词 | |
| :param sentences: 所有句子 | |
| :param labels: 聚类标签 | |
| :param cluster_id: 当前聚类 | |
| :param top_n: 提取前 n 个词 | |
| :return: 关键词字符串 | |
| """ | |
| # 1. 提取该聚类所有句子 | |
| cluster_texts = [sentences[i] for i in range(len(sentences)) if labels[i] == cluster_id] | |
| if not cluster_texts: | |
| return "无数据" | |
| # 2. 分词(保护领域词) | |
| DOMAIN_SET = { | |
| # 中文通用领域词(去重 + 合并) | |
| "空间连接", "字段计算器", "建筑面积", "城市规划", "叠加分析", "空间连接功能", | |
| "数据表", "建筑层数", "地理处理", "相交功能", "现状地块", "相交叠加", | |
| "地块属性", "分地块", "容积率统计", "计算方法", "参数设置", "软件设置", | |
| "核密度分析", "热点分析", "带宽", "密度场", "焦点", "焦点统计", | |
| "地图代数", "条件分析", "差运算", "最大值", "交通", "像元大小", | |
| "参数", "凸包", "餐饮", "住宿", "搜索半径", "栅格计算器", "重分类", "Con函数", | |
| # 英文通用领域词(去重) | |
| "ArcGIS", "spatial join", "ArcMap", "Map algebra", "Kernel Density", | |
| "Con", "Getis - Ord Gi*", "NDVI", "Raster Calculator", "dwg", "catalog", | |
| "spatial join", "data manager", "POI", | |
| } | |
| for word in DOMAIN_SET: | |
| jieba.add_word(word, freq=10000) | |
| # 1. 基础停用词(通用功能词) | |
| STOPWORDS = {"的", "了", "在", "是", "是否", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这", "那", "这个", "那个", "什么", | |
| "怎么", "哪里", "时候", "然后", "可能", "应该", "可以", "就是", "还是", "但是", "不过", "如果", "因为", "所以", "而且", "或者", "其实", "觉得", "认为", "希望", "能够", "需要", "知道", "表示", "这样", "那样", "这些", "那些", "有点", | |
| "一点", "一些", "进一步", "具体", "问题", "疑惑", "讲解", "需求", "难点", "操作", "应用", "场景", "对于", "进行", "实际", "情况", "结合", | |
| "学生", "老师", "实验", "报告", "作业", "课程", "课堂", "学习", "理解", "掌握", "明白", "清楚", | |
| "建议", "希望", "请问", "想问", "不懂", "不会", "不知道", "不太会", "不太懂", "不太清楚", | |
| } | |
| def tokenize(text): | |
| words = jieba.lcut(text) | |
| return [ | |
| w for w in words | |
| if len(w) > 1 and w not in STOPWORDS and not w.isdigit() | |
| ] | |
| tokenized = [" ".join(tokenize(text)) for text in cluster_texts] | |
| # 3. TF-IDF 提取关键词 | |
| vectorizer = TfidfVectorizer(max_features=100, ngram_range=(1, 2)) | |
| try: | |
| tfidf_matrix = vectorizer.fit_transform(tokenized) | |
| feature_names = vectorizer.get_feature_names_out() | |
| # 取平均 TF-IDF 最高的词 | |
| mean_tfidf = tfidf_matrix.mean(axis=0).A1 | |
| top_indices = mean_tfidf.argsort()[-top_n:][::-1] | |
| keywords = [feature_names[i] for i in top_indices] | |
| return " | ".join(keywords) | |
| except: | |
| return "关键词提取失败" | |
| # model_path = r'.\sbert\models--shibing624--text2vec-base-chinese\snapshots\183bb99aa7af74355fb58d16edf8c13ae7c5433e' | |
| # 从本地加载模型 | |
| model_dir = os.path.join("sbert", "models--shibing624--text2vec-base-chinese", "snapshots", "183bb99aa7af74355fb58d16edf8c13ae7c5433e") | |
| # model = SentenceTransformer(model_dir) | |
| MODEL = SentenceTransformer(model_dir) | |
| def encode_sentences_with_cache(sentences, model): #, cache_path='sentence_vectors.pkl' | |
| """ | |
| 使用 BERT 编码句子,并支持本地缓存 | |
| """ | |
| # if os.path.exists(cache_path): | |
| # print(f"已找到缓存文件:{cache_path},正在加载向量...") | |
| # with open(cache_path, 'rb') as f: | |
| # sentence_vectors = pickle.load(f) | |
| # else: | |
| # print("未找到缓存,开始编码...") | |
| sentence_vectors = model.encode( | |
| sentences, | |
| batch_size=16, | |
| show_progress_bar=True, | |
| convert_to_tensor=False | |
| ) | |
| # print(f"编码完成,保存到:{cache_path}") | |
| # with open(cache_path, 'wb') as f: | |
| # pickle.dump(sentence_vectors, f) | |
| return sentence_vectors | |
| def auto_select_k(embeddings, max_k=10): | |
| """自动选择最佳聚类数(轮廓系数最高)""" | |
| sil_scores = [] | |
| k_range = range(2, min(max_k + 1, len(embeddings) // 2)) | |
| for k in k_range: | |
| kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) | |
| labels = kmeans.fit_predict(embeddings) | |
| sil_scores.append(silhouette_score(embeddings, labels)) | |
| best_k = k_range[np.argmax(sil_scores)] | |
| print(f"自动选择最佳聚类数: k = {best_k} (轮廓系数: {max(sil_scores):.3f})") | |
| return best_k | |
| def cluster_and_visualize( | |
| excel_path: str, | |
| questions=['s1', 's2', 's3', 's4'], | |
| max_k=15 | |
| ): | |
| """ | |
| 输入:Excel + 句向量 pkl | |
| 输出:(聚类图 base64, 统计信息 dict) | |
| """ | |
| # 1. 加载数据 | |
| df = pd.read_excel(excel_path) | |
| sentences = [] | |
| meta = [] | |
| for idx, row in df.iterrows(): | |
| for q in questions: | |
| text = str(row[q]).strip() | |
| if text: | |
| sentences.append(text) | |
| meta.append((row['no'], q)) | |
| emb = encode_sentences_with_cache(sentences, MODEL) | |
| # with open(pkl_path, 'rb') as f: | |
| # emb = pickle.load(f) | |
| # 2. 自动选 k | |
| n_clusters = auto_select_k(emb, max_k=max_k) | |
| # n_clusters = 8 | |
| # 3. 聚类 | |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10).fit(emb) | |
| labels = kmeans.labels_ | |
| closest = np.argmin(np.linalg.norm(emb - kmeans.cluster_centers_[:, np.newaxis], axis=2), axis=1) | |
| # 在聚类后,替换人工关键词 | |
| cluster_keywords_auto = [] | |
| for i in range(n_clusters): | |
| kw = extract_cluster_keywords_auto(sentences, labels, i, top_n=5) | |
| cluster_keywords_auto.append(kw) | |
| # 4. 统计 | |
| stats = [] | |
| total = len(sentences) | |
| for i in range(n_clusters): | |
| cluster_sents = [s for s, l in zip(sentences, labels) if l == i] | |
| size = len(cluster_sents) | |
| rep_sent = sentences[closest[i]] | |
| stats.append({ | |
| 'cluster_id': i, | |
| 'size': size, | |
| 'ratio': size / total, | |
| 'rep_sentence': rep_sent, | |
| 'keyword': cluster_keywords_auto[i] if cluster_keywords_auto else f"聚类 {i}" | |
| }) | |
| # 5. 可视化 | |
| tsne = TSNE(n_components=2, random_state=42) | |
| emb_2d = tsne.fit_transform(emb) | |
| cmap = cm.get_cmap('rainbow', n_clusters) | |
| cluster_colors = [mcolors.rgb2hex(cmap(i)[:3]) for i in range(n_clusters)] | |
| point_colors = [cluster_colors[l] for l in labels] | |
| fig = go.Figure() | |
| # 数据点 | |
| fig.add_trace(go.Scatter( | |
| x=emb_2d[:, 0], y=emb_2d[:, 1], | |
| mode='markers', | |
| marker=dict(size=10, color=point_colors, opacity=0.7), | |
| text=[f"聚类 {l}" for l in labels], | |
| hoverinfo='text', | |
| hoverlabel=dict( | |
| font=dict(family=CHINESE_FONT, size=16), # hover 字体适配中文 | |
| bgcolor='white' | |
| ), | |
| showlegend=False | |
| )) | |
| # 聚类中心 | |
| center_x = emb_2d[closest, 0] | |
| center_y = emb_2d[closest, 1] | |
| for i, (x, y) in enumerate(zip(center_x, center_y)): | |
| keyword = cluster_keywords_auto[i] if cluster_keywords_auto else f"聚类 {i}" | |
| fig.add_trace(go.Scatter( | |
| x=[x], y=[y], | |
| mode='markers+text', | |
| marker=dict(size=30, color=cluster_colors[i], line=dict(width=2, color='black')), | |
| text=[keyword], | |
| textposition="top center", | |
| textfont=dict(family=CHINESE_FONT, size=20, color='black'), | |
| showlegend=False | |
| )) | |
| fig.update_layout( | |
| title="EGISInsight:学生反馈聚类洞察", | |
| titlefont=dict( | |
| family=CHINESE_FONT, | |
| size=22 | |
| ), | |
| font=dict(family=CHINESE_FONT, size=18), | |
| width=900, height=600, | |
| plot_bgcolor='#F5F5F5', | |
| autosize=False, # 关闭自动缩放 | |
| margin=dict(l=50, r=50, t=80, b=50) # 让 Plotly 内部也居中 | |
| ) | |
| # img_bytes = fig.to_image(format="png", width=900, height=600, scale=2 ) # 新增这一行! | |
| # 新代码: | |
| img_bytes = pio.to_image( | |
| fig, | |
| format="png", | |
| width=900, | |
| height=600, | |
| scale=2 | |
| ) | |
| b64 = base64.b64encode(img_bytes).decode('utf-8') | |
| # print(f"{b64}解析成功!") | |
| # return b64, stats | |
| return fig, b64, stats | |