''' 基于向量聚类方式的可视化模块 ''' # cluster_insight.py import pandas as pd import numpy as np from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score from sklearn.manifold import TSNE import plotly.graph_objects as go import matplotlib.cm as cm import matplotlib.colors as mcolors # from wordcloud import WordCloud import matplotlib.pyplot as plt import base64 from io import BytesIO from sentence_transformers import SentenceTransformer, util import os import pickle # cluster_insight.py → 新增函数 from sklearn.feature_extraction.text import TfidfVectorizer import jieba # ========== 核心配置(和之前一致) ========== import plotly.io as pio pio.kaleido.scope.default_font = "Noto Sans CJK SC" FONT_FILE_PATH = "./SourceHanSansCN-Light.otf" # 字体文件在根目录 # CHINESE_FONT = "Source Han Sans CN Light" CHINESE_FONT = "Noto Sans CJK SC" # CHINESE_FONT = "Noto Sans SC" # 思源黑体(跨平台兼容,Plotly 自带) def extract_cluster_keywords_auto(sentences, labels, cluster_id, top_n=3): """ 自动提取聚类关键词 :param sentences: 所有句子 :param labels: 聚类标签 :param cluster_id: 当前聚类 :param top_n: 提取前 n 个词 :return: 关键词字符串 """ # 1. 提取该聚类所有句子 cluster_texts = [sentences[i] for i in range(len(sentences)) if labels[i] == cluster_id] if not cluster_texts: return "无数据" # 2. 分词(保护领域词) DOMAIN_SET = { # 中文通用领域词(去重 + 合并) "空间连接", "字段计算器", "建筑面积", "城市规划", "叠加分析", "空间连接功能", "数据表", "建筑层数", "地理处理", "相交功能", "现状地块", "相交叠加", "地块属性", "分地块", "容积率统计", "计算方法", "参数设置", "软件设置", "核密度分析", "热点分析", "带宽", "密度场", "焦点", "焦点统计", "地图代数", "条件分析", "差运算", "最大值", "交通", "像元大小", "参数", "凸包", "餐饮", "住宿", "搜索半径", "栅格计算器", "重分类", "Con函数", # 英文通用领域词(去重) "ArcGIS", "spatial join", "ArcMap", "Map algebra", "Kernel Density", "Con", "Getis - Ord Gi*", "NDVI", "Raster Calculator", "dwg", "catalog", "spatial join", "data manager", "POI", } for word in DOMAIN_SET: jieba.add_word(word, freq=10000) # 1. 基础停用词(通用功能词) STOPWORDS = {"的", "了", "在", "是", "是否", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这", "那", "这个", "那个", "什么", "怎么", "哪里", "时候", "然后", "可能", "应该", "可以", "就是", "还是", "但是", "不过", "如果", "因为", "所以", "而且", "或者", "其实", "觉得", "认为", "希望", "能够", "需要", "知道", "表示", "这样", "那样", "这些", "那些", "有点", "一点", "一些", "进一步", "具体", "问题", "疑惑", "讲解", "需求", "难点", "操作", "应用", "场景", "对于", "进行", "实际", "情况", "结合", "学生", "老师", "实验", "报告", "作业", "课程", "课堂", "学习", "理解", "掌握", "明白", "清楚", "建议", "希望", "请问", "想问", "不懂", "不会", "不知道", "不太会", "不太懂", "不太清楚", } def tokenize(text): words = jieba.lcut(text) return [ w for w in words if len(w) > 1 and w not in STOPWORDS and not w.isdigit() ] tokenized = [" ".join(tokenize(text)) for text in cluster_texts] # 3. TF-IDF 提取关键词 vectorizer = TfidfVectorizer(max_features=100, ngram_range=(1, 2)) try: tfidf_matrix = vectorizer.fit_transform(tokenized) feature_names = vectorizer.get_feature_names_out() # 取平均 TF-IDF 最高的词 mean_tfidf = tfidf_matrix.mean(axis=0).A1 top_indices = mean_tfidf.argsort()[-top_n:][::-1] keywords = [feature_names[i] for i in top_indices] return " | ".join(keywords) except: return "关键词提取失败" # model_path = r'.\sbert\models--shibing624--text2vec-base-chinese\snapshots\183bb99aa7af74355fb58d16edf8c13ae7c5433e' # 从本地加载模型 model_dir = os.path.join("sbert", "models--shibing624--text2vec-base-chinese", "snapshots", "183bb99aa7af74355fb58d16edf8c13ae7c5433e") # model = SentenceTransformer(model_dir) MODEL = SentenceTransformer(model_dir) def encode_sentences_with_cache(sentences, model): #, cache_path='sentence_vectors.pkl' """ 使用 BERT 编码句子,并支持本地缓存 """ # if os.path.exists(cache_path): # print(f"已找到缓存文件:{cache_path},正在加载向量...") # with open(cache_path, 'rb') as f: # sentence_vectors = pickle.load(f) # else: # print("未找到缓存,开始编码...") sentence_vectors = model.encode( sentences, batch_size=32, show_progress_bar=False, convert_to_tensor=False, normalize_embeddings=True, # 直接归一化 device="cpu" # 明确指定 cpu ) # print(f"编码完成,保存到:{cache_path}") # with open(cache_path, 'wb') as f: # pickle.dump(sentence_vectors, f) return sentence_vectors def auto_select_k(embeddings, max_k=10): """自动选择最佳聚类数(轮廓系数最高)""" sil_scores = [] k_range = range(2, min(max_k + 1, len(embeddings) // 2)) for k in k_range: kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) labels = kmeans.fit_predict(embeddings) sil_scores.append(silhouette_score(embeddings, labels)) best_k = k_range[np.argmax(sil_scores)] print(f"自动选择最佳聚类数: k = {best_k} (轮廓系数: {max(sil_scores):.3f})") return best_k def fig_to_base64(fig): # 关键修复:转图片前,强制给 fig 中所有文本设置注册字体(覆盖所有场景) fig.update_layout( # 全局字体(标题、坐标轴、图例等) font=dict(family=CHINESE_FONT, size=18), # 标题字体(单独强化,避免被覆盖) titlefont=dict(family=CHINESE_FONT, size=22), # 坐标轴标签字体(如果有坐标轴,必须显式指定) xaxis=dict(titlefont=dict(family=CHINESE_FONT), tickfont=dict(family=CHINESE_FONT)), yaxis=dict(titlefont=dict(family=CHINESE_FONT), tickfont=dict(family=CHINESE_FONT)), # 图例字体(如果有图例,补充) legend=dict(font=dict(family=CHINESE_FONT)) ) # 处理图表中的所有 trace(比如桑基图节点、散点图文本等) for trace in fig.data: # 桑基图节点文本 if hasattr(trace, "node") and hasattr(trace.node, "font"): trace.node.font.family = CHINESE_FONT # 散点图/折线图的文本标签(比如聚类中心的关键词) if hasattr(trace, "textfont"): trace.textfont.family = CHINESE_FONT # Hover 提示文本(强化,避免遗漏) if hasattr(trace, "hoverlabel") and hasattr(trace.hoverlabel, "font"): trace.hoverlabel.font.family = CHINESE_FONT # 原有转 Base64 逻辑不变 buffer = BytesIO() fig.write_image( buffer, format="png", engine="kaleido", width=900, height=600, scale=2, # 提高清晰度,同时确保字体渲染更稳定 validate=False ) buffer.seek(0) return base64.b64encode(buffer.read()).decode("ascii") def cluster_and_visualize( excel_path: str, questions=['s1', 's2', 's3', 's4'], max_k=15 ): """ 输入:Excel + 句向量 pkl 输出:(聚类图 base64, 统计信息 dict) """ # 1. 加载数据 df = pd.read_excel(excel_path) sentences = [] meta = [] for idx, row in df.iterrows(): for q in questions: text = str(row[q]).strip() if text: sentences.append(text) meta.append((row['no'], q)) emb = encode_sentences_with_cache(sentences, MODEL) # with open(pkl_path, 'rb') as f: # emb = pickle.load(f) # 2. 自动选 k n_clusters = auto_select_k(emb, max_k=max_k) # n_clusters = 8 # 3. 聚类 kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10).fit(emb) labels = kmeans.labels_ closest = np.argmin(np.linalg.norm(emb - kmeans.cluster_centers_[:, np.newaxis], axis=2), axis=1) # 在聚类后,替换人工关键词 cluster_keywords_auto = [] for i in range(n_clusters): kw = extract_cluster_keywords_auto(sentences, labels, i, top_n=5) cluster_keywords_auto.append(kw) # 4. 统计 stats = [] total = len(sentences) for i in range(n_clusters): cluster_sents = [s for s, l in zip(sentences, labels) if l == i] size = len(cluster_sents) rep_sent = sentences[closest[i]] stats.append({ 'cluster_id': i, 'size': size, 'ratio': size / total, 'rep_sentence': rep_sent, 'keyword': cluster_keywords_auto[i] if cluster_keywords_auto else f"聚类 {i}" }) # 5. 可视化 tsne = TSNE(n_components=2, random_state=42) emb_2d = tsne.fit_transform(emb) cmap = cm.get_cmap('rainbow', n_clusters) cluster_colors = [mcolors.rgb2hex(cmap(i)[:3]) for i in range(n_clusters)] point_colors = [cluster_colors[l] for l in labels] fig = go.Figure() # 数据点 fig.add_trace(go.Scatter( x=emb_2d[:, 0], y=emb_2d[:, 1], mode='markers', marker=dict(size=10, color=point_colors, opacity=0.7), text=[f"聚类 {l}" for l in labels], hoverinfo='text', hoverlabel=dict( font=dict(family="Noto Sans CJK SC", size=16), # hover 字体适配中文 bgcolor='white' ), textfont=dict(family="Noto Sans CJK SC"), showlegend=False )) # 聚类中心 center_x = emb_2d[closest, 0] center_y = emb_2d[closest, 1] for i, (x, y) in enumerate(zip(center_x, center_y)): keyword = cluster_keywords_auto[i] if cluster_keywords_auto else f"聚类 {i}" fig.add_trace(go.Scatter( x=[x], y=[y], mode='markers+text', marker=dict(size=30, color=cluster_colors[i], line=dict(width=2, color='black')), text=[keyword], textposition="top center", textfont=dict(family="Noto Sans CJK SC", size=20, color='black'), showlegend=False )) fig.update_layout( title="EGISInsight:学生反馈聚类洞察", titlefont=dict( family="Noto Sans CJK SC", size=22 ), font=dict(family="Noto Sans CJK SC", size=18), width=900, height=600, plot_bgcolor='#F5F5F5', autosize=False, # 关闭自动缩放 margin=dict(l=50, r=50, t=80, b=50) # 让 Plotly 内部也居中 ) img_bytes = fig.to_image(format="png", width=900, height=600, scale=2, engine='kaleido' ) # 新增这一行! b64 = base64.b64encode(img_bytes).decode('utf-8') # 新代码: # img_bytes = pio.to_image( # fig, # format="png", # width=900, # height=600, # scale=2 # ) # b64 = base64.b64encode(img_bytes).decode('utf-8') # b64 = fig_to_base64(fig) # print(f"{b64}解析成功!") # return b64, stats return fig, b64, stats