import os
import base64
import cv2
import numpy as np
from PIL import Image
import pytesseract
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
import html2text
import json
import time
import webbrowser
import urllib.parse
import copy
import html
import tempfile
import uuid
import datetime
import threading
import atexit
from huggingface_hub import HfApi
import gradio as gr
import subprocess
import re
# ---------------------------------------------------------------------------
# Video temp-file management (per-session tracking and cleanup)
# ---------------------------------------------------------------------------
VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos")
VIDEO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
_SESSION_VIDEO_FILES: Dict[str, List[str]] = {}
_VIDEO_FILES_LOCK = threading.Lock()
def _ensure_video_dir_exists() -> None:
try:
os.makedirs(VIDEO_TEMP_DIR, exist_ok=True)
except Exception:
pass
def _register_video_for_session(session_id: Optional[str], file_path: str) -> None:
if not session_id or not file_path:
return
with _VIDEO_FILES_LOCK:
if session_id not in _SESSION_VIDEO_FILES:
_SESSION_VIDEO_FILES[session_id] = []
_SESSION_VIDEO_FILES[session_id].append(file_path)
def cleanup_session_videos(session_id: Optional[str]) -> None:
if not session_id:
return
with _VIDEO_FILES_LOCK:
file_list = _SESSION_VIDEO_FILES.pop(session_id, [])
for path in file_list:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Best-effort cleanup
pass
def reap_old_videos(ttl_seconds: int = VIDEO_FILE_TTL_SECONDS) -> None:
"""Delete old video files in the temp directory based on modification time."""
try:
_ensure_video_dir_exists()
now_ts = time.time()
for name in os.listdir(VIDEO_TEMP_DIR):
path = os.path.join(VIDEO_TEMP_DIR, name)
try:
if not os.path.isfile(path):
continue
mtime = os.path.getmtime(path)
if now_ts - mtime > ttl_seconds:
os.unlink(path)
except Exception:
pass
except Exception:
# Temp dir might not exist or be accessible; ignore
pass
# ---------------------------------------------------------------------------
# Audio temp-file management (per-session tracking and cleanup)
# ---------------------------------------------------------------------------
AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio")
AUDIO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
_SESSION_AUDIO_FILES: Dict[str, List[str]] = {}
_AUDIO_FILES_LOCK = threading.Lock()
def _ensure_audio_dir_exists() -> None:
try:
os.makedirs(AUDIO_TEMP_DIR, exist_ok=True)
except Exception:
pass
def _register_audio_for_session(session_id: Optional[str], file_path: str) -> None:
if not session_id or not file_path:
return
with _AUDIO_FILES_LOCK:
if session_id not in _SESSION_AUDIO_FILES:
_SESSION_AUDIO_FILES[session_id] = []
_SESSION_AUDIO_FILES[session_id].append(file_path)
def cleanup_session_audio(session_id: Optional[str]) -> None:
if not session_id:
return
with _AUDIO_FILES_LOCK:
file_list = _SESSION_AUDIO_FILES.pop(session_id, [])
for path in file_list:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
pass
def reap_old_audio(ttl_seconds: int = AUDIO_FILE_TTL_SECONDS) -> None:
try:
_ensure_audio_dir_exists()
now_ts = time.time()
for name in os.listdir(AUDIO_TEMP_DIR):
path = os.path.join(AUDIO_TEMP_DIR, name)
try:
if not os.path.isfile(path):
continue
mtime = os.path.getmtime(path)
if now_ts - mtime > ttl_seconds:
os.unlink(path)
except Exception:
pass
except Exception:
pass
# ---------------------------------------------------------------------------
# General temp media file management (per-session tracking and cleanup)
# ---------------------------------------------------------------------------
MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media")
MEDIA_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
_SESSION_MEDIA_FILES: Dict[str, List[str]] = {}
_MEDIA_FILES_LOCK = threading.Lock()
# Global dictionary to store temporary media files for the session
temp_media_files = {}
def _ensure_media_dir_exists() -> None:
"""Ensure the media temp directory exists."""
try:
os.makedirs(MEDIA_TEMP_DIR, exist_ok=True)
except Exception:
pass
def track_session_media_file(session_id: Optional[str], file_path: str) -> None:
"""Track a media file for session-based cleanup."""
if not session_id or not file_path:
return
with _MEDIA_FILES_LOCK:
if session_id not in _SESSION_MEDIA_FILES:
_SESSION_MEDIA_FILES[session_id] = []
_SESSION_MEDIA_FILES[session_id].append(file_path)
def cleanup_session_media(session_id: Optional[str]) -> None:
"""Clean up media files for a specific session."""
if not session_id:
return
with _MEDIA_FILES_LOCK:
files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, [])
for path in files_to_clean:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Best-effort cleanup
pass
def reap_old_media(ttl_seconds: int = MEDIA_FILE_TTL_SECONDS) -> None:
"""Delete old media files in the temp directory based on modification time."""
try:
_ensure_media_dir_exists()
now_ts = time.time()
for name in os.listdir(MEDIA_TEMP_DIR):
path = os.path.join(MEDIA_TEMP_DIR, name)
if os.path.isfile(path):
try:
mtime = os.path.getmtime(path)
if (now_ts - mtime) > ttl_seconds:
os.unlink(path)
except Exception:
pass
except Exception:
# Temp dir might not exist or be accessible; ignore
pass
def cleanup_all_temp_media_on_startup() -> None:
"""Clean up all temporary media files on app startup."""
try:
# Clean up temp_media_files registry
temp_media_files.clear()
# Clean up actual files from disk (assume all are orphaned on startup)
_ensure_media_dir_exists()
for name in os.listdir(MEDIA_TEMP_DIR):
path = os.path.join(MEDIA_TEMP_DIR, name)
if os.path.isfile(path):
try:
os.unlink(path)
except Exception:
pass
# Clear session tracking
with _MEDIA_FILES_LOCK:
_SESSION_MEDIA_FILES.clear()
print("[StartupCleanup] Cleaned up orphaned temporary media files")
except Exception as e:
print(f"[StartupCleanup] Error during media cleanup: {str(e)}")
def cleanup_all_temp_media_on_shutdown() -> None:
"""Clean up all temporary media files on app shutdown."""
try:
print("[ShutdownCleanup] Cleaning up temporary media files...")
# Clean up temp_media_files registry and remove files
for file_id, file_info in temp_media_files.items():
try:
if os.path.exists(file_info['path']):
os.unlink(file_info['path'])
except Exception:
pass
temp_media_files.clear()
# Clean up all session files
with _MEDIA_FILES_LOCK:
for session_id, file_paths in _SESSION_MEDIA_FILES.items():
for path in file_paths:
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
pass
_SESSION_MEDIA_FILES.clear()
print("[ShutdownCleanup] Temporary media cleanup completed")
except Exception as e:
print(f"[ShutdownCleanup] Error during cleanup: {str(e)}")
# Register shutdown cleanup handler
atexit.register(cleanup_all_temp_media_on_shutdown)
def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: Optional[str] = None) -> str:
"""Create a temporary file and return a local URL for preview."""
try:
# Create unique filename with timestamp and UUID
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
base_name, ext = os.path.splitext(filename)
unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}"
# Create temporary file in the dedicated directory
_ensure_media_dir_exists()
temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename)
# Write media bytes to temporary file
with open(temp_path, 'wb') as f:
f.write(media_bytes)
# Track file for session-based cleanup
if session_id:
track_session_media_file(session_id, temp_path)
# Store the file info for later upload
file_id = f"{media_type}_{unique_id}"
temp_media_files[file_id] = {
'path': temp_path,
'filename': filename,
'media_type': media_type,
'media_bytes': media_bytes
}
# Return file:// URL for preview
file_url = f"file://{temp_path}"
print(f"[TempMedia] Created temporary {media_type} file: {file_url}")
return file_url
except Exception as e:
print(f"[TempMedia] Failed to create temporary file: {str(e)}")
return f"Error creating temporary {media_type} file: {str(e)}"
def upload_media_to_hf(media_bytes: bytes, filename: str, media_type: str = "image", token: gr.OAuthToken | None = None, use_temp: bool = True) -> str:
"""Upload media file to user's Hugging Face account or create temporary file."""
try:
# If use_temp is True, create temporary file for preview
if use_temp:
return create_temp_media_url(media_bytes, filename, media_type)
# Otherwise, upload to Hugging Face for permanent URL
# Try to get token from OAuth first, then fall back to environment variable
hf_token = None
if token and token.token:
hf_token = token.token
else:
hf_token = os.getenv('HF_TOKEN')
if not hf_token:
return "Error: Please log in with your Hugging Face account to upload media, or set HF_TOKEN environment variable."
# Initialize HF API
api = HfApi(token=hf_token)
# Get current user info to determine username
try:
user_info = api.whoami()
username = user_info.get('name', 'unknown-user')
except Exception as e:
print(f"[HFUpload] Could not get user info: {e}")
username = 'anycoder-user'
# Create repository name for media storage
repo_name = f"{username}/anycoder-media"
# Try to create the repository if it doesn't exist
try:
api.create_repo(
repo_id=repo_name,
repo_type="dataset",
private=False,
exist_ok=True
)
print(f"[HFUpload] Repository {repo_name} ready")
except Exception as e:
print(f"[HFUpload] Repository creation/access issue: {e}")
# Continue anyway, repo might already exist
# Create unique filename with timestamp and UUID
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
base_name, ext = os.path.splitext(filename)
unique_filename = f"{media_type}/{timestamp}_{unique_id}_{base_name}{ext}"
# Create temporary file for upload
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
temp_file.write(media_bytes)
temp_path = temp_file.name
try:
# Upload file to HF repository
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=unique_filename,
repo_id=repo_name,
repo_type="dataset",
commit_message=f"Upload {media_type} generated by AnyCoder"
)
# Generate permanent URL
permanent_url = f"https://huggingface.co/datasets/{repo_name}/resolve/main/{unique_filename}"
print(f"[HFUpload] Successfully uploaded {media_type} to {permanent_url}")
return permanent_url
finally:
# Clean up temporary file
try:
os.unlink(temp_path)
except Exception:
pass
except Exception as e:
print(f"[HFUpload] Upload failed: {str(e)}")
return f"Error uploading {media_type} to Hugging Face: {str(e)}"
def upload_temp_files_to_hf_and_replace_urls(html_content: str, token: gr.OAuthToken | None = None) -> str:
"""Upload all temporary media files to HF and replace their URLs in HTML content."""
try:
if not temp_media_files:
print("[DeployUpload] No temporary media files to upload")
return html_content
print(f"[DeployUpload] Uploading {len(temp_media_files)} temporary media files to HF")
updated_content = html_content
for file_id, file_info in temp_media_files.items():
try:
# Upload to HF with permanent URL
permanent_url = upload_media_to_hf(
file_info['media_bytes'],
file_info['filename'],
file_info['media_type'],
token,
use_temp=False # Force permanent upload
)
if not permanent_url.startswith("Error"):
# Replace the temporary file URL with permanent URL
temp_url = f"file://{file_info['path']}"
updated_content = updated_content.replace(temp_url, permanent_url)
print(f"[DeployUpload] Replaced {temp_url} with {permanent_url}")
else:
print(f"[DeployUpload] Failed to upload {file_id}: {permanent_url}")
except Exception as e:
print(f"[DeployUpload] Error uploading {file_id}: {str(e)}")
continue
# Clean up temporary files after upload
cleanup_temp_media_files()
return updated_content
except Exception as e:
print(f"[DeployUpload] Failed to upload temporary files: {str(e)}")
return html_content
def cleanup_temp_media_files():
"""Clean up temporary media files from disk and memory."""
try:
for file_id, file_info in temp_media_files.items():
try:
if os.path.exists(file_info['path']):
os.remove(file_info['path'])
print(f"[TempCleanup] Removed {file_info['path']}")
except Exception as e:
print(f"[TempCleanup] Failed to remove {file_info['path']}: {str(e)}")
# Clear the global dictionary
temp_media_files.clear()
print("[TempCleanup] Cleared temporary media files registry")
except Exception as e:
print(f"[TempCleanup] Error during cleanup: {str(e)}")
def generate_image_with_qwen(prompt: str, image_index: int = 0, token: gr.OAuthToken | None = None) -> str:
"""Generate image using Qwen image model via Hugging Face InferenceClient and upload to HF for permanent URL"""
try:
# Check if HF_TOKEN is available
if not os.getenv('HF_TOKEN'):
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
# Create InferenceClient for Qwen image generation
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
# Generate image using Qwen/Qwen-Image model
image = client.text_to_image(
prompt,
model="Qwen/Qwen-Image",
)
# Resize image to reduce size while maintaining quality
max_size = 1024 # Increased size since we're not using data URIs
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert PIL Image to bytes for upload
import io
buffer = io.BytesIO()
# Save as JPEG with good quality since we're not embedding
image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True)
image_bytes = buffer.getvalue()
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = f"generated_image_{image_index}.jpg"
temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
# Return HTML img tag with temporary URL
return f''
except Exception as e:
print(f"Image generation error: {str(e)}")
return f"Error generating image: {str(e)}"
def generate_image_to_image(input_image_data, prompt: str, token: gr.OAuthToken | None = None) -> str:
"""Generate an image using image-to-image with Qwen-Image-Edit via Hugging Face InferenceClient."""
try:
# Check token
if not os.getenv('HF_TOKEN'):
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
# Prepare client
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
# Normalize input image to bytes
import io
from PIL import Image
try:
import numpy as np
except Exception:
np = None
if hasattr(input_image_data, 'read'):
# File-like object
raw = input_image_data.read()
pil_image = Image.open(io.BytesIO(raw))
elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'):
# PIL Image
pil_image = input_image_data
elif np is not None and isinstance(input_image_data, np.ndarray):
pil_image = Image.fromarray(input_image_data)
elif isinstance(input_image_data, (bytes, bytearray)):
pil_image = Image.open(io.BytesIO(input_image_data))
else:
# Fallback: try to convert via bytes
pil_image = Image.open(io.BytesIO(bytes(input_image_data)))
# Ensure RGB
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
# Resize input image to avoid request body size limits
max_input_size = 1024
if pil_image.width > max_input_size or pil_image.height > max_input_size:
pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS)
buf = io.BytesIO()
pil_image.save(buf, format='JPEG', quality=85, optimize=True)
input_bytes = buf.getvalue()
# Call image-to-image
image = client.image_to_image(
input_bytes,
prompt=prompt,
model="Qwen/Qwen-Image-Edit",
)
# Resize/optimize (larger since not using data URIs)
max_size = 1024
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
out_buf = io.BytesIO()
image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True)
image_bytes = out_buf.getvalue()
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = "image_to_image_result.jpg"
temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
return f""
except Exception as e:
print(f"Image-to-image generation error: {str(e)}")
return f"Error generating image (image-to-image): {str(e)}"
def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
"""Generate a video from an input image and prompt using Hugging Face InferenceClient."""
try:
print("[Image2Video] Starting video generation")
if not os.getenv('HF_TOKEN'):
print("[Image2Video] Missing HF_TOKEN")
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
# Prepare client
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
print(f"[Image2Video] InferenceClient initialized (provider=auto)")
# Normalize input image to bytes, with downscale/compress to cap request size
import io
from PIL import Image
try:
import numpy as np
except Exception:
np = None
def _load_pil(img_like) -> Image.Image:
if hasattr(img_like, 'read'):
return Image.open(io.BytesIO(img_like.read()))
if hasattr(img_like, 'mode') and hasattr(img_like, 'size'):
return img_like
if np is not None and isinstance(img_like, np.ndarray):
return Image.fromarray(img_like)
if isinstance(img_like, (bytes, bytearray)):
return Image.open(io.BytesIO(img_like))
return Image.open(io.BytesIO(bytes(img_like)))
pil_image = _load_pil(input_image_data)
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
try:
print(f"[Image2Video] Input PIL image size={pil_image.size} mode={pil_image.mode}")
except Exception:
pass
# Progressive encode to keep payload under ~3.9MB (below 4MB limit)
MAX_BYTES = 3_900_000
max_dim = 1024 # initial cap on longest edge
quality = 90
def encode_current(pil: Image.Image, q: int) -> bytes:
tmp = io.BytesIO()
pil.save(tmp, format='JPEG', quality=q, optimize=True)
return tmp.getvalue()
# Downscale while the longest edge exceeds max_dim
while max(pil_image.size) > max_dim:
ratio = max_dim / float(max(pil_image.size))
new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio)))
pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
encoded = encode_current(pil_image, quality)
# If still too big, iteratively reduce quality, then dimensions
while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640):
if quality > 40:
quality -= 10
else:
# reduce dims by 15% if already at low quality
new_w = max(1, int(pil_image.size[0] * 0.85))
new_h = max(1, int(pil_image.size[1] * 0.85))
pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS)
encoded = encode_current(pil_image, quality)
input_bytes = encoded
# Call image-to-video; require method support
model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled"
image_to_video_method = getattr(client, "image_to_video", None)
if not callable(image_to_video_method):
print("[Image2Video] InferenceClient.image_to_video not available in this huggingface_hub version")
return (
"Error generating video (image-to-video): Your installed huggingface_hub version "
"does not expose InferenceClient.image_to_video. Please upgrade with "
"`pip install -U huggingface_hub` and try again."
)
print(f"[Image2Video] Calling image_to_video with model={model_id}, prompt length={len(prompt or '')}")
video_bytes = image_to_video_method(
input_bytes,
prompt=prompt,
model=model_id,
)
print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = "image_to_video_result.mp4"
temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
video_html = (
f''
)
print(f"[Image2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
# Validate the generated video HTML
if not validate_video_html(video_html):
print("[Image2Video] Generated video HTML failed validation")
return "Error: Generated video HTML is malformed"
return video_html
except Exception as e:
import traceback
print("[Image2Video] Exception during generation:")
traceback.print_exc()
print(f"Image-to-video generation error: {str(e)}")
return f"Error generating video (image-to-video): {str(e)}"
def generate_video_from_text(prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
"""Generate a video from a text prompt using Hugging Face InferenceClient."""
try:
print("[Text2Video] Starting video generation from text")
if not os.getenv('HF_TOKEN'):
print("[Text2Video] Missing HF_TOKEN")
return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
client = InferenceClient(
provider="auto",
api_key=os.getenv('HF_TOKEN'),
bill_to="huggingface",
)
print("[Text2Video] InferenceClient initialized (provider=auto)")
# Ensure the client has text_to_video (newer huggingface_hub)
text_to_video_method = getattr(client, "text_to_video", None)
if not callable(text_to_video_method):
print("[Text2Video] InferenceClient.text_to_video not available in this huggingface_hub version")
return (
"Error generating video (text-to-video): Your installed huggingface_hub version "
"does not expose InferenceClient.text_to_video. Please upgrade with "
"`pip install -U huggingface_hub` and try again."
)
model_id = "Wan-AI/Wan2.2-T2V-A14B"
prompt_str = (prompt or "").strip()
print(f"[Text2Video] Calling text_to_video with model={model_id}, prompt length={len(prompt_str)}")
video_bytes = text_to_video_method(
prompt_str,
model=model_id,
)
print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
# Create temporary URL for preview (will be uploaded to HF during deploy)
filename = "text_to_video_result.mp4"
temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
# Check if creation was successful
if temp_url.startswith("Error"):
return temp_url
video_html = (
f''
)
print(f"[Text2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
# Validate the generated video HTML
if not validate_video_html(video_html):
print("[Text2Video] Generated video HTML failed validation")
return "Error: Generated video HTML is malformed"
return video_html
except Exception as e:
import traceback
print("[Text2Video] Exception during generation:")
traceback.print_exc()
print(f"Text-to-video generation error: {str(e)}")
return f"Error generating video (text-to-video): {str(e)}"
def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
"""Generate music from a text prompt using ElevenLabs Music API and return an HTML