# qwen_api.py import requests import json from retry import retry # 导入重试库 # 1. QWEN_API_KEY = "sk-6bb5d1eb5eab468ba4e0b38451526fae" # sk-6bb5d1eb5eab468ba4e0b38451526fae QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" # 2. 国内代理(用 Cloudflare Workers 搭建的免费国内代理,或用公开国内代理) # 推荐:自己用 Cloudflare Workers 搭一个代理 PROXY_URL = "https://proxy01.rosanneantheso.workers.dev/" # 替换成你的代理地址 # 3. 带重试+代理的通义千问调用函数 @retry(tries=3, delay=2, backoff=2) # 重试3次,每次间隔2秒,指数退避 def call_qwen(prompt: str) -> str: headers = { "Authorization": f"Bearer {QWEN_API_KEY}", "Content-Type": "application/json" } payload = { "model": "qwen-plus", "input": {"prompt": prompt}, "parameters": { "result_format": "text", "temperature": 0.7, "top_p": 0.8 } } try: resp = requests.post(QWEN_API_URL, headers=headers, json=payload, timeout=30) resp.raise_for_status() # 触发 HTTP 错误(如 401/500) result = resp.json() if "output" in result and "text" in result["output"]: return result["output"]["text"] else: return f"【大模型返回格式异常】{str(result)}" except requests.exceptions.Timeout: return "【大模型调用超时】网络波动导致请求超时,已自动重试3次仍失败,请稍后再试" except requests.exceptions.ConnectionError: return "【大模型连接失败】无法连接到服务,请检查网络或代理配置" except Exception as e: # 捕获其他异常(如 API 密钥错误、模型不存在等) return f"【大模型调用失败】{str(e)}"