Spaces:
Sleeping
Sleeping
| # app.py - JOINED VIDEO SENTENCE ANALYZER | |
| # Analyzes ONE long video with multiple signs and builds a sentence | |
| import torch | |
| import torch.nn as nn | |
| from transformers import XCLIPProcessor, XCLIPModel | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import pandas as pd | |
| from datetime import datetime | |
| import os | |
| import tempfile | |
| print("π Loading Ugandan Sign Language Model...") | |
| # ============================================================================ | |
| # MODEL SETUP - MINIMALCLASSIFIER | |
| # ============================================================================ | |
| class MinimalClassifier(nn.Module): | |
| """SIMPLE classifier - matches your training notebook exactly""" | |
| def __init__(self, input_dim=512, num_classes=85, dropout=0.5): | |
| super().__init__() | |
| self.classifier = nn.Sequential( | |
| nn.Dropout(dropout), | |
| nn.Linear(input_dim, num_classes) | |
| ) | |
| def forward(self, x): | |
| return self.classifier(x) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| processor = XCLIPProcessor.from_pretrained("microsoft/xclip-base-patch32") | |
| xclip_model = XCLIPModel.from_pretrained("microsoft/xclip-base-patch32").to(device) | |
| xclip_model.eval() | |
| # Load your trained model | |
| try: | |
| checkpoint = torch.load("finetuned_xclip_model.pth", map_location=device, weights_only=False) | |
| if 'num_classes' in checkpoint: | |
| num_classes = checkpoint['num_classes'] | |
| elif 'id_to_label' in checkpoint: | |
| num_classes = len(checkpoint['id_to_label']) | |
| else: | |
| num_classes = 85 | |
| model = MinimalClassifier( | |
| input_dim=512, | |
| num_classes=num_classes, | |
| dropout=0.5 | |
| ).to(device) | |
| if 'model_state_dict' in checkpoint: | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| else: | |
| model.load_state_dict(checkpoint) | |
| if 'id_to_label' in checkpoint: | |
| id_to_label = checkpoint['id_to_label'] | |
| else: | |
| id_to_label = {i: f"class_{i}" for i in range(num_classes)} | |
| label_to_id = {v: k for k, v in id_to_label.items()} | |
| model.eval() | |
| print(f"β Model loaded! Can recognize {len(id_to_label)} signs") | |
| except Exception as e: | |
| print(f"β Error loading model: {e}") | |
| exit(1) | |
| # ============================================================================ | |
| # CORE FUNCTIONS - VIDEO SPLITTING & ANALYSIS WITH MOTION DETECTION | |
| # ============================================================================ | |
| def detect_motion_changes(video_path, threshold=30): | |
| """ | |
| Detect motion changes in video to find sign boundaries | |
| Args: | |
| video_path: Path to video | |
| threshold: Motion threshold (higher = less sensitive) | |
| Returns: | |
| List of frame indices where significant motion changes occur | |
| """ | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| # Read first frame | |
| ret, prev_frame = cap.read() | |
| if not ret: | |
| cap.release() | |
| return [] | |
| prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) | |
| prev_gray = cv2.GaussianBlur(prev_gray, (21, 21), 0) | |
| motion_scores = [] | |
| frame_idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Convert to grayscale and blur | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.GaussianBlur(gray, (21, 21), 0) | |
| # Calculate difference between frames | |
| frame_delta = cv2.absdiff(prev_gray, gray) | |
| thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] | |
| # Calculate motion score (percentage of changed pixels) | |
| motion_score = np.sum(thresh) / (thresh.shape[0] * thresh.shape[1]) | |
| motion_scores.append((frame_idx, motion_score)) | |
| prev_gray = gray | |
| frame_idx += 1 | |
| cap.release() | |
| # Find peaks in motion (where motion suddenly increases/decreases) | |
| # This indicates transitions between signs | |
| boundaries = [0] # Start with first frame | |
| if len(motion_scores) > 10: | |
| # Smooth motion scores | |
| window_size = 5 | |
| smoothed = [] | |
| for i in range(len(motion_scores)): | |
| start = max(0, i - window_size) | |
| end = min(len(motion_scores), i + window_size + 1) | |
| avg_score = np.mean([s[1] for s in motion_scores[start:end]]) | |
| smoothed.append((motion_scores[i][0], avg_score)) | |
| # Find local minima (pauses between signs) | |
| for i in range(10, len(smoothed) - 10): | |
| # Check if this is a local minimum | |
| current_score = smoothed[i][1] | |
| prev_scores = [smoothed[j][1] for j in range(i-10, i)] | |
| next_scores = [smoothed[j][1] for j in range(i+1, i+11)] | |
| if current_score < np.mean(prev_scores) * 0.3 and current_score < np.mean(next_scores) * 0.3: | |
| # Significant pause detected | |
| boundaries.append(smoothed[i][0]) | |
| return boundaries | |
| except Exception as e: | |
| print(f"β Motion detection error: {e}") | |
| return [0] | |
| def split_video_smart(video_path, num_signs=None, use_motion_detection=True): | |
| """ | |
| Smart video splitting using motion detection OR equal segments | |
| Args: | |
| video_path: Path to the joined video | |
| num_signs: Expected number of signs (optional if using motion detection) | |
| use_motion_detection: Whether to use automatic boundary detection | |
| Returns: | |
| List of segment video paths | |
| """ | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| # Get video properties | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| if total_frames == 0: | |
| cap.release() | |
| return [] | |
| # Determine split points | |
| if use_motion_detection: | |
| print("π Using motion detection to find sign boundaries...") | |
| boundaries = detect_motion_changes(video_path) | |
| # Filter boundaries to get approximately num_signs segments | |
| if num_signs and len(boundaries) > num_signs + 1: | |
| # Too many boundaries detected, keep the strongest ones | |
| # Sort by spacing and keep most evenly spaced | |
| step = len(boundaries) // (num_signs + 1) | |
| boundaries = [boundaries[i * step] for i in range(num_signs + 1)] | |
| boundaries.append(total_frames) # Add end frame | |
| boundaries = sorted(list(set(boundaries))) # Remove duplicates | |
| print(f"β Found {len(boundaries)-1} sign segments at frames: {boundaries}") | |
| else: | |
| # Fall back to equal segments | |
| print(f"π Splitting into {num_signs} equal segments...") | |
| frames_per_segment = total_frames // num_signs | |
| boundaries = [i * frames_per_segment for i in range(num_signs + 1)] | |
| boundaries[-1] = total_frames | |
| segment_paths = [] | |
| temp_dir = tempfile.mkdtemp() | |
| # Create segments based on boundaries | |
| for segment_idx in range(len(boundaries) - 1): | |
| start_frame = boundaries[segment_idx] | |
| end_frame = boundaries[segment_idx + 1] | |
| # Skip very short segments (less than 5 frames) | |
| if end_frame - start_frame < 5: | |
| continue | |
| segment_path = os.path.join(temp_dir, f"segment_{segment_idx}.mp4") | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(segment_path, fourcc, fps, (width, height)) | |
| # Write frames for this segment | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| for frame_idx in range(start_frame, end_frame): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(frame) | |
| out.release() | |
| # Only add if file was created successfully | |
| if os.path.exists(segment_path) and os.path.getsize(segment_path) > 0: | |
| segment_paths.append(segment_path) | |
| cap.release() | |
| return segment_paths | |
| except Exception as e: | |
| print(f"β Error splitting video: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [] | |
| def extract_frames(video_path, num_frames=8): | |
| """Extract frames from video""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_frames == 0: | |
| cap.release() | |
| return [Image.new('RGB', (224, 224), (0, 0, 0)) for _ in range(num_frames)] | |
| if total_frames <= num_frames: | |
| indices = list(range(total_frames)) + [total_frames-1] * (num_frames - total_frames) | |
| else: | |
| start = total_frames // 6 | |
| end = 5 * total_frames // 6 | |
| indices = np.linspace(start, end, num_frames, dtype=int) | |
| frames = [] | |
| for idx in indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx)) | |
| ret, frame = cap.read() | |
| if ret: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame = cv2.resize(frame, (224, 224)) | |
| frames.append(Image.fromarray(frame)) | |
| else: | |
| frames.append(Image.new('RGB', (224, 224), (0, 0, 0))) | |
| cap.release() | |
| return frames | |
| except Exception as e: | |
| return [Image.new('RGB', (224, 224), (0, 0, 0)) for _ in range(num_frames)] | |
| def predict_single_sign(video_path): | |
| """Predict sign from a single video""" | |
| try: | |
| frames = extract_frames(video_path) | |
| video_inputs = processor.video_processor([frames], return_tensors="pt") | |
| text_inputs = processor(text=["a person performing sign language"], return_tensors="pt") | |
| pixel_values = video_inputs['pixel_values'].to(device) | |
| input_ids = text_inputs['input_ids'].to(device) | |
| attention_mask = text_inputs['attention_mask'].to(device) | |
| with torch.no_grad(): | |
| outputs = xclip_model( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| pixel_values=pixel_values, | |
| return_dict=True | |
| ) | |
| video_embeds = outputs.video_embeds | |
| logits = model(video_embeds) | |
| probs = torch.softmax(logits, dim=1) | |
| confidence, pred_class = torch.max(probs, 1) | |
| predicted_label = id_to_label[pred_class.item()] | |
| return predicted_label # Only return the label | |
| except Exception as e: | |
| print(f"β Prediction error: {e}") | |
| return "Unknown" | |
| def analyze_joined_video(video_path, num_signs, use_auto_detect): | |
| """ | |
| NEW MAIN FUNCTION: Analyze a JOINED video with multiple signs | |
| Args: | |
| video_path: Path to the joined video from CapCut | |
| num_signs: How many signs are in the video (used as hint) | |
| use_auto_detect: Whether to use automatic motion detection | |
| Returns: | |
| Complete sentence, individual predictions, detailed results | |
| """ | |
| try: | |
| if video_path is None: | |
| return "Please upload a video.", "", [] | |
| if num_signs is None or num_signs <= 0: | |
| num_signs = 3 # Default | |
| # STEP 1: Split the joined video into segments | |
| if use_auto_detect: | |
| print(f"π€ Using AUTOMATIC motion detection (expected ~{num_signs} signs)...") | |
| segment_paths = split_video_smart(video_path, num_signs, use_motion_detection=True) | |
| else: | |
| print(f"π Using MANUAL equal split ({num_signs} segments)...") | |
| segment_paths = split_video_smart(video_path, num_signs, use_motion_detection=False) | |
| if len(segment_paths) == 0: | |
| return "Failed to split video. Please check your video file.", "", [] | |
| actual_segments = len(segment_paths) | |
| print(f"β Created {actual_segments} segments") | |
| # STEP 2: Analyze each segment separately | |
| predictions = [] | |
| detailed_results = [] | |
| for i, segment_path in enumerate(segment_paths, 1): | |
| print(f"π Analyzing segment {i}/{actual_segments}...") | |
| sign = predict_single_sign(segment_path) | |
| predictions.append(sign) | |
| detailed_results.append({ | |
| 'video_num': i, | |
| 'sign': sign | |
| }) | |
| # STEP 3: Build sentence | |
| sentence = " ".join(predictions) | |
| # Format detailed results | |
| details_md = "### Individual Sign Analysis (In Order)\n\n" | |
| for result in detailed_results: | |
| details_md += f"**Position {result['video_num']}:** {result['sign']}\n\n" | |
| # Determine split method used | |
| split_method = "Automatic Motion Detection" if use_auto_detect else "Equal Time Segments" | |
| segments_info = f"Detected {actual_segments} segments" if use_auto_detect else f"Split into {num_signs} equal segments" | |
| # Final output | |
| final_result = f""" | |
| ## Complete Sentence Translation | |
| ### Detected Sentence: | |
| **"{sentence}"** | |
| {details_md} | |
| --- | |
| **Split Method:** {split_method} | |
| **Segments:** {segments_info} | |
| **Model:** X-CLIP Fine-tuned on Ugandan Sign Language | |
| *{'Signs were automatically detected by analyzing motion patterns' if use_auto_detect else 'Each sign was analyzed from equal time segments'}* | |
| """ | |
| # Clean up temporary files | |
| try: | |
| for segment_path in segment_paths: | |
| if os.path.exists(segment_path): | |
| os.remove(segment_path) | |
| except: | |
| pass | |
| return final_result, sentence, detailed_results | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"β Error: {error_details}") | |
| return f"**Error analyzing video:** {str(e)}\n\nPlease try:\n- Using a different video\n- Toggling automatic detection\n- Adjusting number of signs", "", [] | |
| # ============================================================================ | |
| # FEEDBACK SYSTEM | |
| # ============================================================================ | |
| FEEDBACK_FILE = "user_feedback.csv" | |
| if not os.path.exists(FEEDBACK_FILE): | |
| pd.DataFrame(columns=['timestamp', 'predicted_sentence', 'correct_sentence', 'num_videos']).to_csv(FEEDBACK_FILE, index=False) | |
| def save_sentence_feedback(predicted_sentence, correct_sentence, num_videos): | |
| """Save user feedback for sentence""" | |
| try: | |
| feedback_data = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'predicted_sentence': predicted_sentence, | |
| 'correct_sentence': correct_sentence, | |
| 'num_videos': num_videos | |
| } | |
| df = pd.read_csv(FEEDBACK_FILE) | |
| df = pd.concat([df, pd.DataFrame([feedback_data])], ignore_index=True) | |
| df.to_csv(FEEDBACK_FILE, index=False) | |
| return "β Thank you! Your feedback helps improve the model." | |
| except Exception as e: | |
| return f"β Error saving feedback: {str(e)}" | |
| # ============================================================================ | |
| # GRADIO INTERFACE - MULTI-VIDEO SENTENCE BUILDER | |
| # ============================================================================ | |
| custom_css = """ | |
| .gradio-container { | |
| background: linear-gradient(135deg, #1a1a1a 0%, #2d2d2d 100%); | |
| font-family: 'Arial', sans-serif; | |
| max-width: 1200px !important; | |
| margin: 0 auto !important; | |
| } | |
| h1 { | |
| color: #ff6b35 !important; | |
| text-align: center; | |
| margin-bottom: 10px !important; | |
| } | |
| .primary { | |
| background: #ff6b35 !important; | |
| border: none !important; | |
| color: white !important; | |
| font-weight: bold !important; | |
| } | |
| .primary:hover { | |
| background: #e55a2b !important; | |
| } | |
| .secondary { | |
| background: #444444 !important; | |
| border: 1px solid #ff6b35 !important; | |
| color: white !important; | |
| } | |
| """ | |
| with gr.Blocks(css=custom_css, title="Sign Language Sentence Builder") as demo: | |
| gr.Markdown(""" | |
| # π€ Ugandan Sign Language Sentence Analyzer | |
| *Upload ONE joined video with multiple signs - we'll automatically detect and translate them!* | |
| **Two Detection Modes:** | |
| 1. **π€ Automatic (Recommended):** AI detects where each sign starts/ends (works with unequal durations!) | |
| 2. **π Manual:** Split video into equal time segments (use if signs have equal duration) | |
| """) | |
| with gr.Row(): | |
| # Left side - Video upload | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ Upload Your Joined Video") | |
| joined_video = gr.Video( | |
| label="Joined Video (from CapCut or any editor)", | |
| sources=["upload", "webcam"] | |
| ) | |
| gr.Markdown("### βοΈ Detection Settings") | |
| auto_detect = gr.Checkbox( | |
| label="π€ Use Automatic Motion Detection", | |
| value=True, | |
| info="AI automatically finds sign boundaries (recommended!)" | |
| ) | |
| num_signs_input = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=3, | |
| step=1, | |
| label="Expected number of signs (approximate)", | |
| info="Helps guide the detection algorithm" | |
| ) | |
| with gr.Accordion("π‘ How It Works", open=False): | |
| gr.Markdown(""" | |
| **Automatic Mode (π€):** | |
| - Analyzes motion patterns in your video | |
| - Detects pauses/transitions between signs | |
| - Works even if signs have different durations! | |
| - Example: 1s + 3s + 2s signs β correctly detected | |
| **Manual Mode (π):** | |
| - Splits video into equal time segments | |
| - Works best when all signs take equal time | |
| - Example: 2s + 2s + 2s signs β perfect split | |
| **Tips:** | |
| - β Pause briefly between signs for best detection | |
| - β Keep camera angle consistent | |
| - β Good lighting helps accuracy | |
| """) | |
| with gr.Row(): | |
| analyze_btn = gr.Button("π Analyze Sentence", variant="primary", scale=2) | |
| clear_btn = gr.Button("ποΈ Clear", variant="secondary", scale=1) | |
| # Right side - Results | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π― Translation Results") | |
| results_output = gr.Markdown( | |
| value="**Upload your video, choose detection mode, and click 'Analyze Sentence'**" | |
| ) | |
| gr.Markdown("### π‘ Feedback") | |
| gr.Markdown("*Help improve accuracy by providing corrections:*") | |
| correct_sentence_input = gr.Textbox( | |
| label="Correct Sentence (if prediction was wrong)", | |
| placeholder="e.g., Hello how are you" | |
| ) | |
| feedback_btn = gr.Button("π Submit Feedback", variant="secondary") | |
| feedback_output = gr.Markdown() | |
| # Hidden states | |
| current_sentence = gr.State() | |
| current_details = gr.State() | |
| # Analyze sentence logic | |
| analyze_btn.click( | |
| fn=analyze_joined_video, | |
| inputs=[joined_video, num_signs_input, auto_detect], | |
| outputs=[results_output, current_sentence, current_details] | |
| ) | |
| # Feedback logic | |
| def submit_feedback_wrapper(predicted, corrected, details): | |
| if not corrected or corrected.strip() == "": | |
| return "Please enter the correct sentence." | |
| num_videos = len(details) if details else 0 | |
| return save_sentence_feedback(predicted, corrected, num_videos) | |
| feedback_btn.click( | |
| fn=submit_feedback_wrapper, | |
| inputs=[current_sentence, correct_sentence_input, current_details], | |
| outputs=[feedback_output] | |
| ) | |
| # Clear button | |
| def clear_all(): | |
| return None, True, 3, "**Upload your video and click 'Analyze Sentence'.**", "", [], "" | |
| clear_btn.click( | |
| fn=clear_all, | |
| outputs=[joined_video, auto_detect, num_signs_input, results_output, current_sentence, current_details, feedback_output] | |
| ) | |
| # Example section | |
| gr.Markdown(""" | |
| --- | |
| ### π Complete Example Workflow | |
| **Goal:** Translate "Hello how good" in sign language | |
| **Step 1: Record Your Signs** | |
| - Sign 1: "Hello" (performer holds sign for 2 seconds) | |
| - Sign 2: "How" (performer holds sign for 1 second) | |
| - Sign 3: "Good" (performer holds sign for 3 seconds) | |
| **Step 2: Join in CapCut** | |
| - Import all 3 videos | |
| - Arrange in order: Hello β How β Good | |
| - Export as ONE video (6 seconds total) | |
| **Step 3: Upload & Analyze** | |
| - Upload the 6-second video here | |
| - Enable "Automatic Detection" β | |
| - Set "Expected signs" to 3 | |
| - Click "Analyze Sentence" | |
| **Step 4: Result** | |
| - π€ AI detects 3 segments automatically: | |
| - Position 1: "Hello" | |
| - Position 2: "How" | |
| - Position 3: "Good" | |
| - **Final Sentence:** "Hello How Good" β | |
| --- | |
| ### π When to Use Each Mode | |
| | Scenario | Recommended Mode | Why | | |
| |----------|-----------------|-----| | |
| | Signs have different lengths | π€ Automatic | Detects boundaries precisely | | |
| | You pause between signs | π€ Automatic | Pauses help detection | | |
| | All signs exactly same duration | π Manual | Simple equal split works | | |
| | Fast, continuous signing | π Manual | Motion detection may struggle | | |
| | Professional recording | π€ Automatic | Better accuracy | | |
| | Quick test/prototype | π Manual | Faster processing | | |
| """) | |
| # Launch | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |