egisinsight / cluster_insight.py
wxy01giser's picture
Update cluster_insight.py
1bd5bd5 verified
raw
history blame
10.3 kB
'''
基于向量聚类方式的可视化模块
'''
# cluster_insight.py
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.manifold import TSNE
import plotly.graph_objects as go
import matplotlib.cm as cm
import matplotlib.colors as mcolors
# from wordcloud import WordCloud
import matplotlib.pyplot as plt
import base64
from io import BytesIO
from sentence_transformers import SentenceTransformer, util
import os
import pickle
# cluster_insight.py → 新增函数
from sklearn.feature_extraction.text import TfidfVectorizer
import jieba
# 第一步:在文件开头导入 plotly.io
import plotly.io as pio
# 第二步:设置默认引擎为 kaleido(旧版本 Plotly 也支持)
pio.kaleido.scope.default_format = "png"
print("✅ 已设置 kaleido 为默认图片引擎")
# ========== 核心配置(和之前一致) ==========
FONT_FILE_PATH = "./SourceHanSansCN-Light.otf" # 字体文件在根目录
# CHINESE_FONT = "Source Han Sans CN Light"
CHINESE_FONT = "SimHei, Microsoft YaHei, Arial Unicode MS, sans-serif"
# ========== 关键:设置环境变量,让 Plotly/Kaleido 找到字体 ==========
os.environ["KALEIDO_FONT_SEARCH_PATH"] = os.getcwd() # 字体搜索路径 = 当前目录
print(f"🔧 字体搜索路径:{os.getcwd()}")
print(f"🔧 字体文件是否存在:{os.path.exists(FONT_FILE_PATH)}")
# CHINESE_FONT = "Noto Sans SC" # 思源黑体(跨平台兼容,Plotly 自带)
def extract_cluster_keywords_auto(sentences, labels, cluster_id, top_n=3):
"""
自动提取聚类关键词
:param sentences: 所有句子
:param labels: 聚类标签
:param cluster_id: 当前聚类
:param top_n: 提取前 n 个词
:return: 关键词字符串
"""
# 1. 提取该聚类所有句子
cluster_texts = [sentences[i] for i in range(len(sentences)) if labels[i] == cluster_id]
if not cluster_texts:
return "无数据"
# 2. 分词(保护领域词)
DOMAIN_SET = {
# 中文通用领域词(去重 + 合并)
"空间连接", "字段计算器", "建筑面积", "城市规划", "叠加分析", "空间连接功能",
"数据表", "建筑层数", "地理处理", "相交功能", "现状地块", "相交叠加",
"地块属性", "分地块", "容积率统计", "计算方法", "参数设置", "软件设置",
"核密度分析", "热点分析", "带宽", "密度场", "焦点", "焦点统计",
"地图代数", "条件分析", "差运算", "最大值", "交通", "像元大小",
"参数", "凸包", "餐饮", "住宿", "搜索半径", "栅格计算器", "重分类", "Con函数",
# 英文通用领域词(去重)
"ArcGIS", "spatial join", "ArcMap", "Map algebra", "Kernel Density",
"Con", "Getis - Ord Gi*", "NDVI", "Raster Calculator", "dwg", "catalog",
"spatial join", "data manager", "POI",
}
for word in DOMAIN_SET:
jieba.add_word(word, freq=10000)
# 1. 基础停用词(通用功能词)
STOPWORDS = {"的", "了", "在", "是", "是否", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这", "那", "这个", "那个", "什么",
"怎么", "哪里", "时候", "然后", "可能", "应该", "可以", "就是", "还是", "但是", "不过", "如果", "因为", "所以", "而且", "或者", "其实", "觉得", "认为", "希望", "能够", "需要", "知道", "表示", "这样", "那样", "这些", "那些", "有点",
"一点", "一些", "进一步", "具体", "问题", "疑惑", "讲解", "需求", "难点", "操作", "应用", "场景", "对于", "进行", "实际", "情况", "结合",
"学生", "老师", "实验", "报告", "作业", "课程", "课堂", "学习", "理解", "掌握", "明白", "清楚",
"建议", "希望", "请问", "想问", "不懂", "不会", "不知道", "不太会", "不太懂", "不太清楚",
}
def tokenize(text):
words = jieba.lcut(text)
return [
w for w in words
if len(w) > 1 and w not in STOPWORDS and not w.isdigit()
]
tokenized = [" ".join(tokenize(text)) for text in cluster_texts]
# 3. TF-IDF 提取关键词
vectorizer = TfidfVectorizer(max_features=100, ngram_range=(1, 2))
try:
tfidf_matrix = vectorizer.fit_transform(tokenized)
feature_names = vectorizer.get_feature_names_out()
# 取平均 TF-IDF 最高的词
mean_tfidf = tfidf_matrix.mean(axis=0).A1
top_indices = mean_tfidf.argsort()[-top_n:][::-1]
keywords = [feature_names[i] for i in top_indices]
return " | ".join(keywords)
except:
return "关键词提取失败"
# model_path = r'.\sbert\models--shibing624--text2vec-base-chinese\snapshots\183bb99aa7af74355fb58d16edf8c13ae7c5433e'
# 从本地加载模型
model_dir = os.path.join("sbert", "models--shibing624--text2vec-base-chinese", "snapshots", "183bb99aa7af74355fb58d16edf8c13ae7c5433e")
# model = SentenceTransformer(model_dir)
MODEL = SentenceTransformer(model_dir)
def encode_sentences_with_cache(sentences, model): #, cache_path='sentence_vectors.pkl'
"""
使用 BERT 编码句子,并支持本地缓存
"""
# if os.path.exists(cache_path):
# print(f"已找到缓存文件:{cache_path},正在加载向量...")
# with open(cache_path, 'rb') as f:
# sentence_vectors = pickle.load(f)
# else:
# print("未找到缓存,开始编码...")
sentence_vectors = model.encode(
sentences,
batch_size=16,
show_progress_bar=True,
convert_to_tensor=False
)
# print(f"编码完成,保存到:{cache_path}")
# with open(cache_path, 'wb') as f:
# pickle.dump(sentence_vectors, f)
return sentence_vectors
def auto_select_k(embeddings, max_k=10):
"""自动选择最佳聚类数(轮廓系数最高)"""
sil_scores = []
k_range = range(2, min(max_k + 1, len(embeddings) // 2))
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans.fit_predict(embeddings)
sil_scores.append(silhouette_score(embeddings, labels))
best_k = k_range[np.argmax(sil_scores)]
print(f"自动选择最佳聚类数: k = {best_k} (轮廓系数: {max(sil_scores):.3f})")
return best_k
def cluster_and_visualize(
excel_path: str,
questions=['s1', 's2', 's3', 's4'],
max_k=15
):
"""
输入:Excel + 句向量 pkl
输出:(聚类图 base64, 统计信息 dict)
"""
# 1. 加载数据
df = pd.read_excel(excel_path)
sentences = []
meta = []
for idx, row in df.iterrows():
for q in questions:
text = str(row[q]).strip()
if text:
sentences.append(text)
meta.append((row['no'], q))
emb = encode_sentences_with_cache(sentences, MODEL)
# with open(pkl_path, 'rb') as f:
# emb = pickle.load(f)
# 2. 自动选 k
n_clusters = auto_select_k(emb, max_k=max_k)
# n_clusters = 8
# 3. 聚类
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10).fit(emb)
labels = kmeans.labels_
closest = np.argmin(np.linalg.norm(emb - kmeans.cluster_centers_[:, np.newaxis], axis=2), axis=1)
# 在聚类后,替换人工关键词
cluster_keywords_auto = []
for i in range(n_clusters):
kw = extract_cluster_keywords_auto(sentences, labels, i, top_n=5)
cluster_keywords_auto.append(kw)
# 4. 统计
stats = []
total = len(sentences)
for i in range(n_clusters):
cluster_sents = [s for s, l in zip(sentences, labels) if l == i]
size = len(cluster_sents)
rep_sent = sentences[closest[i]]
stats.append({
'cluster_id': i,
'size': size,
'ratio': size / total,
'rep_sentence': rep_sent,
'keyword': cluster_keywords_auto[i] if cluster_keywords_auto else f"聚类 {i}"
})
# 5. 可视化
tsne = TSNE(n_components=2, random_state=42)
emb_2d = tsne.fit_transform(emb)
cmap = cm.get_cmap('rainbow', n_clusters)
cluster_colors = [mcolors.rgb2hex(cmap(i)[:3]) for i in range(n_clusters)]
point_colors = [cluster_colors[l] for l in labels]
fig = go.Figure()
# 数据点
fig.add_trace(go.Scatter(
x=emb_2d[:, 0], y=emb_2d[:, 1],
mode='markers',
marker=dict(size=10, color=point_colors, opacity=0.7),
text=[f"聚类 {l}" for l in labels],
hoverinfo='text',
hoverlabel=dict(
font=dict(family=CHINESE_FONT, size=16), # hover 字体适配中文
bgcolor='white'
),
showlegend=False
))
# 聚类中心
center_x = emb_2d[closest, 0]
center_y = emb_2d[closest, 1]
for i, (x, y) in enumerate(zip(center_x, center_y)):
keyword = cluster_keywords_auto[i] if cluster_keywords_auto else f"聚类 {i}"
fig.add_trace(go.Scatter(
x=[x], y=[y],
mode='markers+text',
marker=dict(size=30, color=cluster_colors[i], line=dict(width=2, color='black')),
text=[keyword],
textposition="top center",
textfont=dict(family=CHINESE_FONT, size=20, color='black'),
showlegend=False
))
fig.update_layout(
title="EGISInsight:学生反馈聚类洞察",
titlefont=dict(
family=CHINESE_FONT,
size=22
),
font=dict(family=CHINESE_FONT, size=18),
width=900, height=600,
plot_bgcolor='#F5F5F5',
autosize=False, # 关闭自动缩放
margin=dict(l=50, r=50, t=80, b=50) # 让 Plotly 内部也居中
)
# img_bytes = fig.to_image(format="png", width=900, height=600, scale=2 ) # 新增这一行!
# 新代码:
img_bytes = pio.to_image(
fig,
format="png",
width=900,
height=600,
scale=2
)
b64 = base64.b64encode(img_bytes).decode('utf-8')
# print(f"{b64}解析成功!")
# return b64, stats
return fig, b64, stats