egisinsight / qwen_api.py
wxy01giser's picture
Update qwen_api.py
74d293b verified
# qwen_api.py
import requests
import json
from retry import retry # 导入重试库
# 1.
QWEN_API_KEY = "sk-6bb5d1eb5eab468ba4e0b38451526fae" # sk-6bb5d1eb5eab468ba4e0b38451526fae
QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
# 2. 国内代理(用 Cloudflare Workers 搭建的免费国内代理,或用公开国内代理)
# 推荐:自己用 Cloudflare Workers 搭一个代理
PROXY_URL = "https://proxy01.rosanneantheso.workers.dev/" # 替换成你的代理地址
# 3. 带重试+代理的通义千问调用函数
@retry(tries=3, delay=2, backoff=2) # 重试3次,每次间隔2秒,指数退避
def call_qwen(prompt: str) -> str:
headers = {
"Authorization": f"Bearer {QWEN_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "qwen-plus",
"input": {"prompt": prompt},
"parameters": {
"result_format": "text",
"temperature": 0.7,
"top_p": 0.8
}
}
try:
resp = requests.post(QWEN_API_URL, headers=headers, json=payload, timeout=30)
resp.raise_for_status() # 触发 HTTP 错误(如 401/500)
result = resp.json()
if "output" in result and "text" in result["output"]:
return result["output"]["text"]
else:
return f"【大模型返回格式异常】{str(result)}"
except requests.exceptions.Timeout:
return "【大模型调用超时】网络波动导致请求超时,已自动重试3次仍失败,请稍后再试"
except requests.exceptions.ConnectionError:
return "【大模型连接失败】无法连接到服务,请检查网络或代理配置"
except Exception as e:
# 捕获其他异常(如 API 密钥错误、模型不存在等)
return f"【大模型调用失败】{str(e)}"