import os import base64 import cv2 import numpy as np from PIL import Image import pytesseract import requests from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup import html2text import json import time import webbrowser import urllib.parse import copy import html import tempfile import uuid import datetime import threading import atexit from huggingface_hub import HfApi import gradio as gr import subprocess import re # --------------------------------------------------------------------------- # Video temp-file management (per-session tracking and cleanup) # --------------------------------------------------------------------------- VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos") VIDEO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours _SESSION_VIDEO_FILES: Dict[str, List[str]] = {} _VIDEO_FILES_LOCK = threading.Lock() def _ensure_video_dir_exists() -> None: try: os.makedirs(VIDEO_TEMP_DIR, exist_ok=True) except Exception: pass def _register_video_for_session(session_id: Optional[str], file_path: str) -> None: if not session_id or not file_path: return with _VIDEO_FILES_LOCK: if session_id not in _SESSION_VIDEO_FILES: _SESSION_VIDEO_FILES[session_id] = [] _SESSION_VIDEO_FILES[session_id].append(file_path) def cleanup_session_videos(session_id: Optional[str]) -> None: if not session_id: return with _VIDEO_FILES_LOCK: file_list = _SESSION_VIDEO_FILES.pop(session_id, []) for path in file_list: try: if path and os.path.exists(path): os.unlink(path) except Exception: # Best-effort cleanup pass def reap_old_videos(ttl_seconds: int = VIDEO_FILE_TTL_SECONDS) -> None: """Delete old video files in the temp directory based on modification time.""" try: _ensure_video_dir_exists() now_ts = time.time() for name in os.listdir(VIDEO_TEMP_DIR): path = os.path.join(VIDEO_TEMP_DIR, name) try: if not os.path.isfile(path): continue mtime = os.path.getmtime(path) if now_ts - mtime > ttl_seconds: os.unlink(path) except Exception: pass except Exception: # Temp dir might not exist or be accessible; ignore pass # --------------------------------------------------------------------------- # Audio temp-file management (per-session tracking and cleanup) # --------------------------------------------------------------------------- AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio") AUDIO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours _SESSION_AUDIO_FILES: Dict[str, List[str]] = {} _AUDIO_FILES_LOCK = threading.Lock() def _ensure_audio_dir_exists() -> None: try: os.makedirs(AUDIO_TEMP_DIR, exist_ok=True) except Exception: pass def _register_audio_for_session(session_id: Optional[str], file_path: str) -> None: if not session_id or not file_path: return with _AUDIO_FILES_LOCK: if session_id not in _SESSION_AUDIO_FILES: _SESSION_AUDIO_FILES[session_id] = [] _SESSION_AUDIO_FILES[session_id].append(file_path) def cleanup_session_audio(session_id: Optional[str]) -> None: if not session_id: return with _AUDIO_FILES_LOCK: file_list = _SESSION_AUDIO_FILES.pop(session_id, []) for path in file_list: try: if path and os.path.exists(path): os.unlink(path) except Exception: pass def reap_old_audio(ttl_seconds: int = AUDIO_FILE_TTL_SECONDS) -> None: try: _ensure_audio_dir_exists() now_ts = time.time() for name in os.listdir(AUDIO_TEMP_DIR): path = os.path.join(AUDIO_TEMP_DIR, name) try: if not os.path.isfile(path): continue mtime = os.path.getmtime(path) if now_ts - mtime > ttl_seconds: os.unlink(path) except Exception: pass except Exception: pass # --------------------------------------------------------------------------- # General temp media file management (per-session tracking and cleanup) # --------------------------------------------------------------------------- MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media") MEDIA_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours _SESSION_MEDIA_FILES: Dict[str, List[str]] = {} _MEDIA_FILES_LOCK = threading.Lock() # Global dictionary to store temporary media files for the session temp_media_files = {} def _ensure_media_dir_exists() -> None: """Ensure the media temp directory exists.""" try: os.makedirs(MEDIA_TEMP_DIR, exist_ok=True) except Exception: pass def track_session_media_file(session_id: Optional[str], file_path: str) -> None: """Track a media file for session-based cleanup.""" if not session_id or not file_path: return with _MEDIA_FILES_LOCK: if session_id not in _SESSION_MEDIA_FILES: _SESSION_MEDIA_FILES[session_id] = [] _SESSION_MEDIA_FILES[session_id].append(file_path) def cleanup_session_media(session_id: Optional[str]) -> None: """Clean up media files for a specific session.""" if not session_id: return with _MEDIA_FILES_LOCK: files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, []) for path in files_to_clean: try: if path and os.path.exists(path): os.unlink(path) except Exception: # Best-effort cleanup pass def reap_old_media(ttl_seconds: int = MEDIA_FILE_TTL_SECONDS) -> None: """Delete old media files in the temp directory based on modification time.""" try: _ensure_media_dir_exists() now_ts = time.time() for name in os.listdir(MEDIA_TEMP_DIR): path = os.path.join(MEDIA_TEMP_DIR, name) if os.path.isfile(path): try: mtime = os.path.getmtime(path) if (now_ts - mtime) > ttl_seconds: os.unlink(path) except Exception: pass except Exception: # Temp dir might not exist or be accessible; ignore pass def cleanup_all_temp_media_on_startup() -> None: """Clean up all temporary media files on app startup.""" try: # Clean up temp_media_files registry temp_media_files.clear() # Clean up actual files from disk (assume all are orphaned on startup) _ensure_media_dir_exists() for name in os.listdir(MEDIA_TEMP_DIR): path = os.path.join(MEDIA_TEMP_DIR, name) if os.path.isfile(path): try: os.unlink(path) except Exception: pass # Clear session tracking with _MEDIA_FILES_LOCK: _SESSION_MEDIA_FILES.clear() print("[StartupCleanup] Cleaned up orphaned temporary media files") except Exception as e: print(f"[StartupCleanup] Error during media cleanup: {str(e)}") def cleanup_all_temp_media_on_shutdown() -> None: """Clean up all temporary media files on app shutdown.""" try: print("[ShutdownCleanup] Cleaning up temporary media files...") # Clean up temp_media_files registry and remove files for file_id, file_info in temp_media_files.items(): try: if os.path.exists(file_info['path']): os.unlink(file_info['path']) except Exception: pass temp_media_files.clear() # Clean up all session files with _MEDIA_FILES_LOCK: for session_id, file_paths in _SESSION_MEDIA_FILES.items(): for path in file_paths: try: if path and os.path.exists(path): os.unlink(path) except Exception: pass _SESSION_MEDIA_FILES.clear() print("[ShutdownCleanup] Temporary media cleanup completed") except Exception as e: print(f"[ShutdownCleanup] Error during cleanup: {str(e)}") # Register shutdown cleanup handler atexit.register(cleanup_all_temp_media_on_shutdown) def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: Optional[str] = None) -> str: """Create a temporary file and return a local URL for preview.""" try: # Create unique filename with timestamp and UUID timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = str(uuid.uuid4())[:8] base_name, ext = os.path.splitext(filename) unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}" # Create temporary file in the dedicated directory _ensure_media_dir_exists() temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename) # Write media bytes to temporary file with open(temp_path, 'wb') as f: f.write(media_bytes) # Track file for session-based cleanup if session_id: track_session_media_file(session_id, temp_path) # Store the file info for later upload file_id = f"{media_type}_{unique_id}" temp_media_files[file_id] = { 'path': temp_path, 'filename': filename, 'media_type': media_type, 'media_bytes': media_bytes } # Return file:// URL for preview file_url = f"file://{temp_path}" print(f"[TempMedia] Created temporary {media_type} file: {file_url}") return file_url except Exception as e: print(f"[TempMedia] Failed to create temporary file: {str(e)}") return f"Error creating temporary {media_type} file: {str(e)}" def upload_media_to_hf(media_bytes: bytes, filename: str, media_type: str = "image", token: gr.OAuthToken | None = None, use_temp: bool = True) -> str: """Upload media file to user's Hugging Face account or create temporary file.""" try: # If use_temp is True, create temporary file for preview if use_temp: return create_temp_media_url(media_bytes, filename, media_type) # Otherwise, upload to Hugging Face for permanent URL # Try to get token from OAuth first, then fall back to environment variable hf_token = None if token and token.token: hf_token = token.token else: hf_token = os.getenv('HF_TOKEN') if not hf_token: return "Error: Please log in with your Hugging Face account to upload media, or set HF_TOKEN environment variable." # Initialize HF API api = HfApi(token=hf_token) # Get current user info to determine username try: user_info = api.whoami() username = user_info.get('name', 'unknown-user') except Exception as e: print(f"[HFUpload] Could not get user info: {e}") username = 'anycoder-user' # Create repository name for media storage repo_name = f"{username}/anycoder-media" # Try to create the repository if it doesn't exist try: api.create_repo( repo_id=repo_name, repo_type="dataset", private=False, exist_ok=True ) print(f"[HFUpload] Repository {repo_name} ready") except Exception as e: print(f"[HFUpload] Repository creation/access issue: {e}") # Continue anyway, repo might already exist # Create unique filename with timestamp and UUID timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = str(uuid.uuid4())[:8] base_name, ext = os.path.splitext(filename) unique_filename = f"{media_type}/{timestamp}_{unique_id}_{base_name}{ext}" # Create temporary file for upload with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: temp_file.write(media_bytes) temp_path = temp_file.name try: # Upload file to HF repository api.upload_file( path_or_fileobj=temp_path, path_in_repo=unique_filename, repo_id=repo_name, repo_type="dataset", commit_message=f"Upload {media_type} generated by AnyCoder" ) # Generate permanent URL permanent_url = f"https://huggingface.co/datasets/{repo_name}/resolve/main/{unique_filename}" print(f"[HFUpload] Successfully uploaded {media_type} to {permanent_url}") return permanent_url finally: # Clean up temporary file try: os.unlink(temp_path) except Exception: pass except Exception as e: print(f"[HFUpload] Upload failed: {str(e)}") return f"Error uploading {media_type} to Hugging Face: {str(e)}" def upload_temp_files_to_hf_and_replace_urls(html_content: str, token: gr.OAuthToken | None = None) -> str: """Upload all temporary media files to HF and replace their URLs in HTML content.""" try: if not temp_media_files: print("[DeployUpload] No temporary media files to upload") return html_content print(f"[DeployUpload] Uploading {len(temp_media_files)} temporary media files to HF") updated_content = html_content for file_id, file_info in temp_media_files.items(): try: # Upload to HF with permanent URL permanent_url = upload_media_to_hf( file_info['media_bytes'], file_info['filename'], file_info['media_type'], token, use_temp=False # Force permanent upload ) if not permanent_url.startswith("Error"): # Replace the temporary file URL with permanent URL temp_url = f"file://{file_info['path']}" updated_content = updated_content.replace(temp_url, permanent_url) print(f"[DeployUpload] Replaced {temp_url} with {permanent_url}") else: print(f"[DeployUpload] Failed to upload {file_id}: {permanent_url}") except Exception as e: print(f"[DeployUpload] Error uploading {file_id}: {str(e)}") continue # Clean up temporary files after upload cleanup_temp_media_files() return updated_content except Exception as e: print(f"[DeployUpload] Failed to upload temporary files: {str(e)}") return html_content def cleanup_temp_media_files(): """Clean up temporary media files from disk and memory.""" try: for file_id, file_info in temp_media_files.items(): try: if os.path.exists(file_info['path']): os.remove(file_info['path']) print(f"[TempCleanup] Removed {file_info['path']}") except Exception as e: print(f"[TempCleanup] Failed to remove {file_info['path']}: {str(e)}") # Clear the global dictionary temp_media_files.clear() print("[TempCleanup] Cleared temporary media files registry") except Exception as e: print(f"[TempCleanup] Error during cleanup: {str(e)}") def generate_image_with_qwen(prompt: str, image_index: int = 0, token: gr.OAuthToken | None = None) -> str: """Generate image using Qwen image model via Hugging Face InferenceClient and upload to HF for permanent URL""" try: # Check if HF_TOKEN is available if not os.getenv('HF_TOKEN'): return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." # Create InferenceClient for Qwen image generation client = InferenceClient( provider="auto", api_key=os.getenv('HF_TOKEN'), bill_to="huggingface", ) # Generate image using Qwen/Qwen-Image model image = client.text_to_image( prompt, model="Qwen/Qwen-Image", ) # Resize image to reduce size while maintaining quality max_size = 1024 # Increased size since we're not using data URIs if image.width > max_size or image.height > max_size: image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) # Convert PIL Image to bytes for upload import io buffer = io.BytesIO() # Save as JPEG with good quality since we're not embedding image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True) image_bytes = buffer.getvalue() # Create temporary URL for preview (will be uploaded to HF during deploy) filename = f"generated_image_{image_index}.jpg" temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True) # Check if creation was successful if temp_url.startswith("Error"): return temp_url # Return HTML img tag with temporary URL return f'{prompt}' except Exception as e: print(f"Image generation error: {str(e)}") return f"Error generating image: {str(e)}" def generate_image_to_image(input_image_data, prompt: str, token: gr.OAuthToken | None = None) -> str: """Generate an image using image-to-image with Qwen-Image-Edit via Hugging Face InferenceClient.""" try: # Check token if not os.getenv('HF_TOKEN'): return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." # Prepare client client = InferenceClient( provider="auto", api_key=os.getenv('HF_TOKEN'), bill_to="huggingface", ) # Normalize input image to bytes import io from PIL import Image try: import numpy as np except Exception: np = None if hasattr(input_image_data, 'read'): # File-like object raw = input_image_data.read() pil_image = Image.open(io.BytesIO(raw)) elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'): # PIL Image pil_image = input_image_data elif np is not None and isinstance(input_image_data, np.ndarray): pil_image = Image.fromarray(input_image_data) elif isinstance(input_image_data, (bytes, bytearray)): pil_image = Image.open(io.BytesIO(input_image_data)) else: # Fallback: try to convert via bytes pil_image = Image.open(io.BytesIO(bytes(input_image_data))) # Ensure RGB if pil_image.mode != 'RGB': pil_image = pil_image.convert('RGB') # Resize input image to avoid request body size limits max_input_size = 1024 if pil_image.width > max_input_size or pil_image.height > max_input_size: pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS) buf = io.BytesIO() pil_image.save(buf, format='JPEG', quality=85, optimize=True) input_bytes = buf.getvalue() # Call image-to-image image = client.image_to_image( input_bytes, prompt=prompt, model="Qwen/Qwen-Image-Edit", ) # Resize/optimize (larger since not using data URIs) max_size = 1024 if image.width > max_size or image.height > max_size: image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) out_buf = io.BytesIO() image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True) image_bytes = out_buf.getvalue() # Create temporary URL for preview (will be uploaded to HF during deploy) filename = "image_to_image_result.jpg" temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True) # Check if creation was successful if temp_url.startswith("Error"): return temp_url return f"\"{prompt}\"" except Exception as e: print(f"Image-to-image generation error: {str(e)}") return f"Error generating image (image-to-image): {str(e)}" def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str: """Generate a video from an input image and prompt using Hugging Face InferenceClient.""" try: print("[Image2Video] Starting video generation") if not os.getenv('HF_TOKEN'): print("[Image2Video] Missing HF_TOKEN") return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." # Prepare client client = InferenceClient( provider="auto", api_key=os.getenv('HF_TOKEN'), bill_to="huggingface", ) print(f"[Image2Video] InferenceClient initialized (provider=auto)") # Normalize input image to bytes, with downscale/compress to cap request size import io from PIL import Image try: import numpy as np except Exception: np = None def _load_pil(img_like) -> Image.Image: if hasattr(img_like, 'read'): return Image.open(io.BytesIO(img_like.read())) if hasattr(img_like, 'mode') and hasattr(img_like, 'size'): return img_like if np is not None and isinstance(img_like, np.ndarray): return Image.fromarray(img_like) if isinstance(img_like, (bytes, bytearray)): return Image.open(io.BytesIO(img_like)) return Image.open(io.BytesIO(bytes(img_like))) pil_image = _load_pil(input_image_data) if pil_image.mode != 'RGB': pil_image = pil_image.convert('RGB') try: print(f"[Image2Video] Input PIL image size={pil_image.size} mode={pil_image.mode}") except Exception: pass # Progressive encode to keep payload under ~3.9MB (below 4MB limit) MAX_BYTES = 3_900_000 max_dim = 1024 # initial cap on longest edge quality = 90 def encode_current(pil: Image.Image, q: int) -> bytes: tmp = io.BytesIO() pil.save(tmp, format='JPEG', quality=q, optimize=True) return tmp.getvalue() # Downscale while the longest edge exceeds max_dim while max(pil_image.size) > max_dim: ratio = max_dim / float(max(pil_image.size)) new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio))) pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS) encoded = encode_current(pil_image, quality) # If still too big, iteratively reduce quality, then dimensions while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640): if quality > 40: quality -= 10 else: # reduce dims by 15% if already at low quality new_w = max(1, int(pil_image.size[0] * 0.85)) new_h = max(1, int(pil_image.size[1] * 0.85)) pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS) encoded = encode_current(pil_image, quality) input_bytes = encoded # Call image-to-video; require method support model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled" image_to_video_method = getattr(client, "image_to_video", None) if not callable(image_to_video_method): print("[Image2Video] InferenceClient.image_to_video not available in this huggingface_hub version") return ( "Error generating video (image-to-video): Your installed huggingface_hub version " "does not expose InferenceClient.image_to_video. Please upgrade with " "`pip install -U huggingface_hub` and try again." ) print(f"[Image2Video] Calling image_to_video with model={model_id}, prompt length={len(prompt or '')}") video_bytes = image_to_video_method( input_bytes, prompt=prompt, model=model_id, ) print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}") # Create temporary URL for preview (will be uploaded to HF during deploy) filename = "image_to_video_result.mp4" temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True) # Check if creation was successful if temp_url.startswith("Error"): return temp_url video_html = ( f'' ) print(f"[Image2Video] Successfully generated video HTML tag with temporary URL: {temp_url}") # Validate the generated video HTML if not validate_video_html(video_html): print("[Image2Video] Generated video HTML failed validation") return "Error: Generated video HTML is malformed" return video_html except Exception as e: import traceback print("[Image2Video] Exception during generation:") traceback.print_exc() print(f"Image-to-video generation error: {str(e)}") return f"Error generating video (image-to-video): {str(e)}" def generate_video_from_text(prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str: """Generate a video from a text prompt using Hugging Face InferenceClient.""" try: print("[Text2Video] Starting video generation from text") if not os.getenv('HF_TOKEN'): print("[Text2Video] Missing HF_TOKEN") return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token." client = InferenceClient( provider="auto", api_key=os.getenv('HF_TOKEN'), bill_to="huggingface", ) print("[Text2Video] InferenceClient initialized (provider=auto)") # Ensure the client has text_to_video (newer huggingface_hub) text_to_video_method = getattr(client, "text_to_video", None) if not callable(text_to_video_method): print("[Text2Video] InferenceClient.text_to_video not available in this huggingface_hub version") return ( "Error generating video (text-to-video): Your installed huggingface_hub version " "does not expose InferenceClient.text_to_video. Please upgrade with " "`pip install -U huggingface_hub` and try again." ) model_id = "Wan-AI/Wan2.2-T2V-A14B" prompt_str = (prompt or "").strip() print(f"[Text2Video] Calling text_to_video with model={model_id}, prompt length={len(prompt_str)}") video_bytes = text_to_video_method( prompt_str, model=model_id, ) print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}") # Create temporary URL for preview (will be uploaded to HF during deploy) filename = "text_to_video_result.mp4" temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True) # Check if creation was successful if temp_url.startswith("Error"): return temp_url video_html = ( f'' ) print(f"[Text2Video] Successfully generated video HTML tag with temporary URL: {temp_url}") # Validate the generated video HTML if not validate_video_html(video_html): print("[Text2Video] Generated video HTML failed validation") return "Error: Generated video HTML is malformed" return video_html except Exception as e: import traceback print("[Text2Video] Exception during generation:") traceback.print_exc() print(f"Text-to-video generation error: {str(e)}") return f"Error generating video (text-to-video): {str(e)}" def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str: """Generate music from a text prompt using ElevenLabs Music API and return an HTML