Spaces:
Runtime error
Runtime error
| """ | |
| Enhanced Viral Content Agent | |
| - Deterministic, testable, dependency-light | |
| - Action loop with tool allow‑list and guarded parsing | |
| - Pluggable LLM backends (Hugging Face Inference API, OpenAI, generic HTTP JSON API) with graceful fallback | |
| - Research tool with real HTTP search (DuckDuckGo HTML) + Wikipedia summary fallback; offline synthetic fallback retained | |
| - JSONL logging and reproducible runs via seed | |
| Runtime targets: Python 3.9+ | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import io | |
| import json | |
| import time | |
| import uuid | |
| import math | |
| import random | |
| import logging | |
| import contextlib | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional, Tuple, Iterable | |
| import requests | |
| # --------------------------- | |
| # Logging | |
| # --------------------------- | |
| LOGGER_NAME = "viral_agent" | |
| logger = logging.getLogger(LOGGER_NAME) | |
| if not logger.handlers: | |
| level = os.getenv("AGENT_LOG_LEVEL", "INFO").upper() | |
| logging.basicConfig(level=getattr(logging, level, logging.INFO), format="%(asctime)s %(levelname)s | %(message)s") | |
| # --------------------------- | |
| # Prompts (kept concise; multi‑line strings) | |
| # --------------------------- | |
| PREFIX = ( | |
| "You are an Advanced Viral Content Generator with self‑research and self‑improvement capabilities.\n" | |
| "Tools: GENERATE_IDEA, RESEARCH, GENERATE_CONTENT, SELF_EVALUATE, IMPROVE_CONTENT, FORMAT_CONTENT, PUBLISH, COMPLETE.\n" | |
| "Trigger using lines: action: <TOOL> and action_input=<TEXT>.\n" | |
| ) | |
| IDEA_GENERATOR_PROMPT = ( | |
| "Generate one viral content idea. Consider trending topics, underserved niches, controversy, practical value, and emotion.\n" | |
| "Return a single concise title. Topic: {topic}. History: {history}" | |
| ) | |
| RESEARCH_PROMPT = ( | |
| "You are researching: {topic}. Summarize key facts with bullet points. Include stats with sources when available." | |
| ) | |
| CONTENT_PROMPT = ( | |
| "Create {format_type} content about: {topic}. Use the following research notes: {research}.\n" | |
| "Hook, sections with headings, and a clear wrap‑up. Keep it factual and concise." | |
| ) | |
| EVALUATE_PROMPT = ( | |
| "Evaluate content quality and viral potential from 1‑10 for engagement, accuracy, originality, emotion, readability, and headline strength.\n" | |
| "Return compact JSON with fields per_criterion and overall plus three specific improvements. Content: {content}" | |
| ) | |
| IMPROVE_PROMPT = ( | |
| "Improve the content using this feedback: {feedback}. Strengthen hook, structure, and specificity. Return the full revised content. Content: {content}" | |
| ) | |
| FORMAT_PROMPT = ( | |
| "Format the content for publication. Add an SEO title (<70 chars), meta description (<160 chars), h2/h3 where useful, and a short CTA. Content: {content}" | |
| ) | |
| PUBLISH_PROMPT = ( | |
| "Prepare publication package fields: title, summary, tags[], canonical, published_at (ISO8601 UTC), body. Content: {content}" | |
| ) | |
| # --------------------------- | |
| # Utilities | |
| # --------------------------- | |
| def utc_now_iso() -> str: | |
| return datetime.now(timezone.utc).replace(microsecond=0).isoformat() | |
| def json_dumps(obj: Any) -> str: | |
| return json.dumps(obj, ensure_ascii=False, separators=(",", ":")) | |
| def clamp_text(s: str, max_len: int = 6000) -> str: | |
| if len(s) <= max_len: | |
| return s | |
| return s[: max(0, max_len - 3)] + "..." | |
| # --------------------------- | |
| # LLM backends | |
| # --------------------------- | |
| class LLM: | |
| def complete(self, prompt: str, max_tokens: int = 800) -> str: | |
| raise NotImplementedError | |
| class HFInferenceLLM(LLM): | |
| """Hugging Face text‑generation inference. Expects env HUGGINGFACE_API_TOKEN and HUGGINGFACE_MODEL.""" | |
| def __init__(self, model: Optional[str] = None, timeout: int = 60): | |
| self.token = os.getenv("HUGGINGFACE_API_TOKEN") | |
| self.model = model or os.getenv("HUGGINGFACE_MODEL", "gpt2") | |
| self.timeout = timeout | |
| self.endpoint = f"https://api-inference.huggingface.co/models/{self.model}" | |
| def complete(self, prompt: str, max_tokens: int = 800) -> str: | |
| if not self.token: | |
| raise RuntimeError("HUGGINGFACE_API_TOKEN not set") | |
| headers = {"Authorization": f"Bearer {self.token}", "Accept": "application/json"} | |
| payload = {"inputs": prompt, "parameters": {"max_new_tokens": max_tokens, "return_full_text": False}} | |
| r = requests.post(self.endpoint, headers=headers, json=payload, timeout=self.timeout) | |
| r.raise_for_status() | |
| data = r.json() | |
| # Response shape can vary; normalize | |
| if isinstance(data, list) and data and "generated_text" in data[0]: | |
| return str(data[0]["generated_text"]).strip() | |
| if isinstance(data, dict) and "generated_text" in data: | |
| return str(data["generated_text"]).strip() | |
| # Fallback parsing | |
| return json_dumps(data) | |
| class OpenAILLM(LLM): | |
| """OpenAI responses via /v1/chat/completions. Requires OPENAI_API_KEY and OPENAI_MODEL.""" | |
| def __init__(self, model: Optional[str] = None, timeout: int = 60): | |
| self.key = os.getenv("OPENAI_API_KEY") | |
| self.model = model or os.getenv("OPENAI_MODEL", "gpt-4o-mini") | |
| self.timeout = timeout | |
| self.url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1/chat/completions") | |
| def complete(self, prompt: str, max_tokens: int = 800) -> str: | |
| if not self.key: | |
| raise RuntimeError("OPENAI_API_KEY not set") | |
| headers = {"Authorization": f"Bearer {self.key}", "Content-Type": "application/json"} | |
| payload = { | |
| "model": self.model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0.2, | |
| "max_tokens": max_tokens, | |
| } | |
| r = requests.post(self.url, headers=headers, json=payload, timeout=self.timeout) | |
| r.raise_for_status() | |
| data = r.json() | |
| return data["choices"][0]["message"]["content"].strip() | |
| class GenericHTTPJSONLLM(LLM): | |
| """POSTs to LLM_ENDPOINT with JSON {prompt,max_tokens}. Expects plain text in response body or JSON {text:...}.""" | |
| def __init__(self, endpoint: Optional[str] = None, timeout: int = 60): | |
| self.endpoint = endpoint or os.getenv("LLM_ENDPOINT") | |
| self.timeout = timeout | |
| def complete(self, prompt: str, max_tokens: int = 800) -> str: | |
| if not self.endpoint: | |
| raise RuntimeError("LLM_ENDPOINT not set") | |
| r = requests.post(self.endpoint, json={"prompt": prompt, "max_tokens": max_tokens}, timeout=self.timeout) | |
| r.raise_for_status() | |
| try: | |
| data = r.json() | |
| return str(data.get("text") or data.get("output") or data).strip() | |
| except Exception: | |
| return r.text.strip() | |
| class RuleBasedLLM(LLM): | |
| """Offline, deterministic fallback. Produces concise templates to keep the pipeline functional without keys.""" | |
| def complete(self, prompt: str, max_tokens: int = 800) -> str: | |
| # Very small heuristics to keep output useful and testable | |
| if "Generate one viral content idea" in prompt: | |
| return "AI Side‑Hustles in 2025: 11 Practical Plays That Actually Work" | |
| if "You are researching" in prompt: | |
| topic = re.search(r"researching:\s*(.+?)\.\s*Summarize", prompt) | |
| t = topic.group(1) if topic else "the topic" | |
| return ( | |
| f"- Definition and scope of {t}\n" | |
| f"- 2024–2025 trendline and adoption\n" | |
| f"- 3 data points with sources\n" | |
| f"- Risks, regulation, and future outlook" | |
| ) | |
| if "Evaluate content quality" in prompt: | |
| return json_dumps({ | |
| "per_criterion": { | |
| "engagement": 8, "accuracy": 7, "originality": 7, | |
| "emotion": 7, "readability": 8, "headline": 7, | |
| }, | |
| "overall": 7.5, | |
| "improvements": [ | |
| "Tighten hook with concrete stat", | |
| "Add one contrarian insight", | |
| "Replace generic CTA with a next‑step checklist", | |
| ], | |
| }) | |
| if "Improve the content" in prompt: | |
| return "[Improved] " + clamp_text(prompt.split("Content:", 1)[-1].strip()) | |
| if "Format the content for publication" in prompt: | |
| return ( | |
| "SEO Title: Practical AI Side‑Hustles for 2025\n" | |
| "Meta: A concise guide with data, risks, and an action checklist.\n" | |
| "\n## Introduction\n...\n\n## CTA\nGrab the checklist." | |
| ) | |
| if "Prepare publication package" in prompt: | |
| now = utc_now_iso() | |
| return json_dumps({ | |
| "title": "Practical AI Side‑Hustles for 2025", | |
| "summary": "Concise, data‑guided ideas with risks and a checklist.", | |
| "tags": ["AI", "side‑hustle", "2025"], | |
| "canonical": "", | |
| "published_at": now, | |
| "body": "...", | |
| }) | |
| # Default short echo | |
| return clamp_text("[draft] " + prompt[-max_tokens:]) | |
| def build_llm() -> LLM: | |
| # Order of preference: explicit endpoint, OpenAI, HF, fallback | |
| try: | |
| if os.getenv("LLM_ENDPOINT"): | |
| logger.info("Using GenericHTTPJSONLLM") | |
| return GenericHTTPJSONLLM() | |
| if os.getenv("OPENAI_API_KEY"): | |
| logger.info("Using OpenAILLM") | |
| return OpenAILLM() | |
| if os.getenv("HUGGINGFACE_API_TOKEN"): | |
| logger.info("Using HFInferenceLLM") | |
| return HFInferenceLLM() | |
| except Exception as e: | |
| logger.warning("LLM backend init failed, falling back: %s", e) | |
| logger.info("Using RuleBasedLLM fallback") | |
| return RuleBasedLLM() | |
| # --------------------------- | |
| # Research utilities | |
| # --------------------------- | |
| def ddg_search_snippets(query: str, limit: int = 5, timeout: int = 15) -> List[Dict[str, str]]: | |
| """Very light HTML scrape of DuckDuckGo HTML to avoid heavy APIs. Returns [{title,url,snippet}]""" | |
| try: | |
| url = "https://html.duckduckgo.com/html/" | |
| r = requests.post(url, data={"q": query}, timeout=timeout, headers={"User-Agent": "agent/1.0"}) | |
| r.raise_for_status() | |
| html = r.text | |
| # naive parsing | |
| results = [] | |
| for m in re.finditer(r'<a[^>]+class="result__a"[^>]*href=\"([^\"]+)\"[^>]*>(.*?)</a>', html): | |
| link = m.group(1) | |
| title = re.sub("<.*?>", "", m.group(2)) | |
| results.append({"title": title, "url": link, "snippet": ""}) | |
| if len(results) >= limit: | |
| break | |
| return results | |
| except Exception as e: | |
| logger.warning("ddg_search_snippets failed: %s", e) | |
| return [] | |
| def wikipedia_summary(topic: str, timeout: int = 15) -> Optional[str]: | |
| try: | |
| api = "https://en.wikipedia.org/api/rest_v1/page/summary/" + requests.utils.quote(topic) | |
| r = requests.get(api, timeout=timeout, headers={"User-Agent": "agent/1.0"}) | |
| if r.status_code == 200: | |
| data = r.json() | |
| return data.get("extract") | |
| except Exception as e: | |
| logger.warning("wikipedia_summary failed: %s", e) | |
| return None | |
| # --------------------------- | |
| # Tools | |
| # --------------------------- | |
| class AgentHistory: | |
| items: List[str] = field(default_factory=list) | |
| def add(self, line: str) -> None: | |
| self.items.append(line) | |
| def render(self, max_len: int = 4000) -> str: | |
| text = "\n".join(self.items) | |
| return clamp_text(text, max_len) | |
| class AgentConfig: | |
| seed: int = 42 | |
| max_loops: int = 8 | |
| max_tokens: int = 800 | |
| log_jsonl: Optional[str] = os.getenv("AGENT_LOG_JSONL") | |
| class ViralAgent: | |
| ALLOWED_TOOLS = { | |
| "GENERATE_IDEA", | |
| "RESEARCH", | |
| "GENERATE_CONTENT", | |
| "SELF_EVALUATE", | |
| "IMPROVE_CONTENT", | |
| "FORMAT_CONTENT", | |
| "PUBLISH", | |
| "COMPLETE", | |
| } | |
| def __init__(self, llm: Optional[LLM] = None, cfg: Optional[AgentConfig] = None): | |
| self.llm = llm or build_llm() | |
| self.cfg = cfg or AgentConfig() | |
| random.seed(self.cfg.seed) | |
| self.history = AgentHistory() | |
| self.session_id = uuid.uuid4().hex[:8] | |
| logger.info("session=%s seed=%s", self.session_id, self.cfg.seed) | |
| # -------- action loop -------- | |
| ACTION_RE = re.compile(r"^\s*action:\s*([A-Z_]+)\s*\naction_input=(.*)", re.S) | |
| def run(self, task: str, purpose: str = "Generate viral content") -> Dict[str, Any]: | |
| self.history.add(f"task: {task}") | |
| context = PREFIX + f"Current Date/Time: {utc_now_iso()}\nPurpose: {purpose}\n" | |
| for step in range(1, self.cfg.max_loops + 1): | |
| prompt = ( | |
| f"{context}\nHistory:\n{self.history.render()}\n\n" | |
| "Decide next step. Output exactly two lines:\n" | |
| "action: <TOOL>\n" | |
| "action_input=<TEXT>\n" | |
| ) | |
| raw = self.llm.complete(prompt, max_tokens=self.cfg.max_tokens) | |
| tool, payload = self._parse_action(raw) | |
| logger.info("step=%s tool=%s", step, tool) | |
| obs = self._dispatch(tool, payload, task) | |
| self.history.add(f"observation: {clamp_text(obs, 800)}") | |
| if tool == "COMPLETE": | |
| return {"status": "ok", "session": self.session_id, "history": self.history.items} | |
| return {"status": "max_loops", "session": self.session_id, "history": self.history.items} | |
| # -------- parsing and dispatch -------- | |
| def _parse_action(self, text: str) -> Tuple[str, str]: | |
| m = self.ACTION_RE.search(text or "") | |
| if not m: | |
| logger.warning("action parse failed; default to GENERATE_IDEA") | |
| return "GENERATE_IDEA", "general tech trends 2025" | |
| tool = m.group(1).strip().upper() | |
| payload = m.group(2).strip() | |
| if tool not in self.ALLOWED_TOOLS: | |
| logger.warning("tool not allowed: %s", tool) | |
| tool = "GENERATE_IDEA" | |
| # guard payload | |
| payload = clamp_text(payload, 4000) | |
| return tool, payload | |
| def _dispatch(self, tool: str, payload: str, task: str) -> str: | |
| if tool == "GENERATE_IDEA": | |
| idea = self.generate_idea(task, payload) | |
| self.history.add(f"thought: generated idea -> {idea}") | |
| return idea | |
| if tool == "RESEARCH": | |
| notes = self.research(payload or task) | |
| self.history.add("thought: researched topic") | |
| return notes | |
| if tool == "GENERATE_CONTENT": | |
| fmt = self._guess_format(payload) | |
| notes = self._latest_research() or "key facts unavailable" | |
| content = self.generate_content(task, fmt, notes) | |
| self.history.add("thought: drafted content") | |
| return content | |
| if tool == "SELF_EVALUATE": | |
| content = self._latest_content() or payload | |
| return self.evaluate(content) | |
| if tool == "IMPROVE_CONTENT": | |
| content, feedback = self._split_two(payload) | |
| improved = self.improve(content, feedback) | |
| self.history.add("thought: improved content") | |
| return improved | |
| if tool == "FORMAT_CONTENT": | |
| return self.format_content(payload) | |
| if tool == "PUBLISH": | |
| return self.publish(payload) | |
| if tool == "COMPLETE": | |
| return "done" | |
| return "noop" | |
| # -------- tool implementations -------- | |
| def generate_idea(self, topic: str, description: str) -> str: | |
| p = IDEA_GENERATOR_PROMPT.format(topic=topic or description, history=self.history.render()) | |
| return self.llm.complete(p, max_tokens=120) | |
| def research(self, topic: str) -> str: | |
| topic = topic or "general topic" | |
| bullets = [] | |
| # Try Wikipedia summary | |
| s = wikipedia_summary(topic) | |
| if s: | |
| bullets.append("Wikipedia summary: " + s) | |
| # Try DDG snippets | |
| for r in ddg_search_snippets(topic, limit=5): | |
| bullets.append(f"- {r['title']} — {r['url']}") | |
| # LLM consolidation | |
| prompt = RESEARCH_PROMPT.format(topic=topic) | |
| llm_notes = self.llm.complete(prompt, max_tokens=200) | |
| bullets.append(llm_notes) | |
| notes = "\n".join(bullets) | |
| # persist short log row | |
| self._log_jsonl({"t": utc_now_iso(), "event": "research", "topic": topic, "notes": clamp_text(notes, 2000)}) | |
| return notes | |
| def _guess_format(self, s: str) -> str: | |
| s = s.lower() | |
| for key in ["blog", "book", "review", "paper", "newsletter", "social"]: | |
| if key in s: | |
| return { | |
| "blog": "blog_article", | |
| "book": "book_chapter", | |
| "review": "review_article", | |
| "paper": "academic_paper", | |
| "newsletter": "newsletter", | |
| "social": "social_media_post", | |
| }[key] | |
| return "blog_article" | |
| def generate_content(self, topic: str, format_type: str, research: str) -> str: | |
| p = CONTENT_PROMPT.format(topic=topic, format_type=format_type, research=clamp_text(research, 2000)) | |
| content = self.llm.complete(p, max_tokens=700) | |
| self._log_jsonl({"t": utc_now_iso(), "event": "content", "format": format_type, "len": len(content)}) | |
| return content | |
| def evaluate(self, content: str) -> str: | |
| p = EVALUATE_PROMPT.format(content=clamp_text(content, 2500)) | |
| out = self.llm.complete(p, max_tokens=220) | |
| # validate JSON when possible | |
| try: | |
| obj = json.loads(out) | |
| if isinstance(obj, dict): | |
| out = json_dumps(obj) | |
| except Exception: | |
| pass | |
| self._log_jsonl({"t": utc_now_iso(), "event": "evaluate"}) | |
| return out | |
| def improve(self, content: str, feedback: str) -> str: | |
| p = IMPROVE_PROMPT.format(content=clamp_text(content, 2500), feedback=clamp_text(feedback, 800)) | |
| out = self.llm.complete(p, max_tokens=700) | |
| self._log_jsonl({"t": utc_now_iso(), "event": "improve"}) | |
| return out | |
| def format_content(self, content: str) -> str: | |
| p = FORMAT_PROMPT.format(content=clamp_text(content, 2500)) | |
| out = self.llm.complete(p, max_tokens=300) | |
| self._log_jsonl({"t": utc_now_iso(), "event": "format"}) | |
| return out | |
| def publish(self, content: str) -> str: | |
| p = PUBLISH_PROMPT.format(content=clamp_text(content, 2000)) | |
| out = self.llm.complete(p, max_tokens=220) | |
| # ensure minimal JSON shape | |
| try: | |
| obj = json.loads(out) | |
| if "published_at" not in obj: | |
| obj["published_at"] = utc_now_iso() | |
| out = json_dumps(obj) | |
| except Exception: | |
| # wrap as minimal manifest | |
| out = json_dumps({"title": "Untitled", "summary": "", "tags": [], "canonical": "", "published_at": utc_now_iso(), "body": out}) | |
| self._log_jsonl({"t": utc_now_iso(), "event": "publish"}) | |
| return out | |
| # -------- helpers -------- | |
| def _split_two(self, block: str) -> Tuple[str, str]: | |
| parts = block.split("\n\n", 1) | |
| if len(parts) == 2: | |
| return parts[0].strip(), parts[1].strip() | |
| return block, "" | |
| def _latest_research(self) -> Optional[str]: | |
| for line in reversed(self.history.items): | |
| if line.startswith("observation:") and ("Wikipedia summary:" in line or line.strip().startswith("- ")): | |
| return line.split("observation:", 1)[-1].strip() | |
| return None | |
| def _latest_content(self) -> Optional[str]: | |
| for line in reversed(self.history.items): | |
| if line.startswith("observation:") and len(line) > 30 and ("##" in line or "#" in line or "\n" in line): | |
| return line.split("observation:", 1)[-1].strip() | |
| return None | |
| def _log_jsonl(self, row: Dict[str, Any]) -> None: | |
| path = self.cfg.log_jsonl | |
| if not path: | |
| return | |
| try: | |
| with open(path, "a", encoding="utf-8") as f: | |
| f.write(json_dumps(row) + "\n") | |
| except Exception as e: | |
| logger.warning("jsonl log failed: %s", e) | |
| # --------------------------- | |
| # CLI | |
| # --------------------------- | |
| def run_cli() -> None: | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Viral content agent") | |
| parser.add_argument("task", help="Task to execute, e.g., 'Write a blog about X'") | |
| parser.add_argument("--purpose", default="Generate viral content") | |
| parser.add_argument("--seed", type=int, default=int(os.getenv("AGENT_SEED", "42"))) | |
| parser.add_argument("--max-loops", type=int, default=int(os.getenv("AGENT_MAX_LOOPS", "6"))) | |
| parser.add_argument("--log-jsonl", default=os.getenv("AGENT_LOG_JSONL")) | |
| args = parser.parse_args() | |
| cfg = AgentConfig(seed=args.seed, max_loops=args.max_loops, log_jsonl=args.log_jsonl) | |
| agent = ViralAgent(cfg=cfg) | |
| result = agent.run(task=args.task, purpose=args.purpose) | |
| print(json_dumps(result)) | |
| if __name__ == "__main__": | |
| run_cli() | |