"""Google Colab notebook generator for model merging, quantization, and deployment.""" import json from typing import Optional from .config_generator import MergeConfig, generate_yaml, MERGE_METHODS def _cell(source: str, cell_type: str = "code") -> dict: """Create a notebook cell.""" return { "cell_type": cell_type, "metadata": {}, "source": source.split("\n"), "outputs": [] if cell_type == "code" else [], **({"execution_count": None} if cell_type == "code" else {}), } def _md(text: str) -> dict: return _cell(text, "markdown") def generate_merge_notebook( config: MergeConfig, output_model_name: str = "", hf_username: str = "", include_quantize: bool = True, include_deploy: bool = True, quant_types: Optional[list[str]] = None, ) -> dict: """Generate a complete Colab notebook for merging models. Args: config: MergeConfig with all merge parameters output_model_name: Name for the merged model (e.g., "My-Merged-7B") hf_username: HF username for upload (e.g., "AIencoder") include_quantize: Include GGUF quantization cells include_deploy: Include HF Space deployment cells quant_types: List of quantization types (default: ["Q5_K_M", "Q4_K_M"]) Returns: Complete notebook dict (nbformat v4) """ if quant_types is None: quant_types = ["Q5_K_M", "Q4_K_M"] if not output_model_name: output_model_name = "ForgeKit-Merged-Model" yaml_config = generate_yaml(config) method_info = MERGE_METHODS.get(config.method, {}) # Estimate RAM for Colab runtime recommendation ram_note = "" if config.models: n_models = len(config.models) # Rough heuristic if any("14b" in m.lower() or "13b" in m.lower() for m in config.models): ram_note = "⚠️ 14B models need **High-RAM runtime** (48GB). Go to Runtime → Change runtime → High-RAM." elif any("70b" in m.lower() for m in config.models): ram_note = "⚠️ 70B models need **A100 GPU** (Colab Pro+). This won't work on free tier." elif any("7b" in m.lower() or "8b" in m.lower() for m in config.models): ram_note = "💡 7-8B models work on **High-RAM CPU** runtime (free tier). No GPU needed." cells = [] # ===== HEADER ===== cells.append(_md(f"""# 🔥 ForgeKit — Model Merge Notebook **Generated by [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** This notebook will: 1. ✅ Install mergekit and dependencies 2. ✅ Merge your selected models using **{method_info.get('name', config.method)}** 3. {'✅' if include_quantize else '⬜'} Quantize to GGUF format 4. {'✅' if include_deploy else '⬜'} Upload to HuggingFace Hub **Models being merged:** {chr(10).join(f'- `{m}`' for m in config.models)} **Method:** {method_info.get('name', config.method)} — {method_info.get('description', '')} {ram_note} --- ⚡ **Quick Start:** Click **Runtime → Run all** to execute everything.""")) # ===== CELL 1: INSTALL ===== cells.append(_md("## 1️⃣ Install Dependencies")) cells.append(_cell("""# Install mergekit and dependencies !pip install -q mergekit[all] huggingface_hub transformers accelerate !pip install -q pyyaml sentencepiece protobuf print("✅ All dependencies installed!")""")) # ===== CELL 2: HF LOGIN ===== cells.append(_md("## 2️⃣ HuggingFace Login\nRequired for downloading gated models and uploading your merge.")) cells.append(_cell("""from huggingface_hub import notebook_login notebook_login()""")) # ===== CELL 3: CONFIG ===== cells.append(_md(f"""## 3️⃣ Merge Configuration Your merge config (auto-generated by ForgeKit). Edit the YAML below if you want to tweak weights or parameters.""")) escaped_yaml = yaml_config.replace('"', '\\"') cells.append(_cell(f"""# === CONFIGURATION === MODEL_NAME = "{output_model_name}" USERNAME = "{hf_username}" # Change to your HF username YAML_CONFIG = \"\"\" {yaml_config}\"\"\" # Display the config print("📋 Merge Configuration:") print("=" * 50) print(YAML_CONFIG) print("=" * 50) print(f"\\n📦 Output: {{USERNAME}}/{{MODEL_NAME}}" if USERNAME else f"\\n📦 Output: {{MODEL_NAME}}")""")) # ===== CELL 4: MERGE ===== cells.append(_md("""## 4️⃣ Execute Merge This is the main merge step. Time depends on model sizes: | Size | Estimated Time | |------|---------------| | 1-3B | 5-15 min | | 7B | 15-30 min | | 14B | 30-60 min |""")) cells.append(_cell("""import yaml import os import time # Write config to file with open("merge_config.yaml", "w") as f: f.write(YAML_CONFIG) # Create output directory os.makedirs("merged_model", exist_ok=True) print("🔥 Starting merge...") print(f" Method: {yaml.safe_load(YAML_CONFIG).get('merge_method', 'unknown')}") print(f" Models: {len(yaml.safe_load(YAML_CONFIG).get('models', []))}") print() start = time.time() # Run mergekit !mergekit-yaml merge_config.yaml merged_model --copy-tokenizer --allow-crimes --lazy-unpickle elapsed = time.time() - start print(f"\\n✅ Merge complete in {elapsed/60:.1f} minutes!") print(f"📁 Output: ./merged_model/") # Show output size total = sum( os.path.getsize(os.path.join("merged_model", f)) for f in os.listdir("merged_model") if os.path.isfile(os.path.join("merged_model", f)) ) print(f"💾 Total size: {total / (1024**3):.2f} GB")""")) # ===== CELL 5: TEST ===== cells.append(_md("## 5️⃣ Quick Test\nVerify the merged model loads and generates text.")) cells.append(_cell("""from transformers import AutoTokenizer, AutoModelForCausalLM import torch print("🧪 Loading merged model for testing...") tokenizer = AutoTokenizer.from_pretrained("merged_model", trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained( "merged_model", torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True, ) # Test prompts test_prompts = [ "Write a Python function to calculate fibonacci numbers:", "Explain what machine learning is in simple terms:", "What is 15 * 23 + 7?", ] print("\\n" + "=" * 60) for prompt in test_prompts: inputs = tokenizer(prompt, return_tensors="pt").to(model.device) with torch.no_grad(): output = model.generate( **inputs, max_new_tokens=100, do_sample=False, temperature=1.0, ) response = tokenizer.decode(output[0], skip_special_tokens=True) print(f"\\n📝 Prompt: {prompt}") print(f"🤖 Response: {response[len(prompt):].strip()[:200]}...") print("-" * 60) print("\\n✅ Model test complete!") # Clean up GPU memory del model torch.cuda.empty_cache() if torch.cuda.is_available() else None""")) # ===== CELL 6: UPLOAD ===== cells.append(_md("## 6️⃣ Upload to HuggingFace Hub")) model_card = _generate_model_card(config, output_model_name, hf_username) escaped_card = model_card.replace('"""', '\\"\\"\\"') cells.append(_cell(f"""from huggingface_hub import HfApi, create_repo REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME # Create repo try: create_repo(REPO_ID, exist_ok=True, repo_type="model") print(f"📦 Repo ready: https://huggingface.co/{{REPO_ID}}") except Exception as e: print(f"⚠️ Repo creation: {{e}}") # Write model card MODEL_CARD = \"\"\"{model_card}\"\"\" with open("merged_model/README.md", "w") as f: f.write(MODEL_CARD) # Upload api = HfApi() print("⬆️ Uploading merged model (this may take a while)...") api.upload_folder( repo_id=REPO_ID, folder_path="merged_model", commit_message=f"Upload {{MODEL_NAME}} merged with ForgeKit", ) print(f"\\n✅ Model uploaded!") print(f"🔗 https://huggingface.co/{{REPO_ID}}")""")) # ===== CELL 7: QUANTIZE (optional) ===== if include_quantize: cells.append(_md(f"""## 7️⃣ Quantize to GGUF Convert to GGUF format for use with llama.cpp, Ollama, LM Studio, etc. **Quantization types:** {', '.join(quant_types)}""")) quant_cmds = "\n".join( f' !./llama.cpp/llama-quantize model-f16.gguf {output_model_name}-{q}.gguf {q}\n' f' print(f"✅ {q} done: {output_model_name}-{q}.gguf")' for q in quant_types ) cells.append(_cell(f"""import os print("📦 Setting up llama.cpp for GGUF conversion...") # Clone and build llama.cpp if not os.path.exists("llama.cpp"): !git clone --depth 1 https://github.com/ggerganov/llama.cpp !cd llama.cpp && make -j$(nproc) llama-quantize # Install conversion deps !pip install -q gguf # Convert to f16 GGUF first print("\\n🔄 Converting to GGUF (f16)...") !python llama.cpp/convert_hf_to_gguf.py merged_model --outfile model-f16.gguf --outtype f16 # Quantize to each target print("\\n🗜️ Quantizing...") if os.path.exists("model-f16.gguf"): {quant_cmds} # Show file sizes print("\\n📊 Output sizes:") for f in os.listdir("."): if f.endswith(".gguf"): size_gb = os.path.getsize(f) / (1024**3) print(f" {{f}}: {{size_gb:.2f}} GB") else: print("❌ f16 conversion failed. Check errors above.")""")) # Upload GGUFs cells.append(_cell(f"""# Upload GGUF files to the same repo import os from huggingface_hub import HfApi api = HfApi() REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME gguf_files = [f for f in os.listdir(".") if f.endswith(".gguf") and f != "model-f16.gguf"] for gf in gguf_files: print(f"⬆️ Uploading {{gf}}...") api.upload_file( path_or_fileobj=gf, path_in_repo=gf, repo_id=REPO_ID, ) print(f" ✅ Done") print(f"\\n🎉 All GGUF files uploaded to https://huggingface.co/{{REPO_ID}}")""")) # ===== CELL 8: DEPLOY (optional) ===== if include_deploy: cells.append(_md("""## 8️⃣ Deploy to HuggingFace Space Create a Gradio chat Space running your merged model.""")) cells.append(_cell(f"""from huggingface_hub import HfApi, create_repo SPACE_ID = f"{{USERNAME}}/{{MODEL_NAME}}-chat" if USERNAME else f"{{MODEL_NAME}}-chat" REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME # Create Space try: create_repo(SPACE_ID, repo_type="space", space_sdk="gradio", exist_ok=True) print(f"🚀 Space created: https://huggingface.co/spaces/{{SPACE_ID}}") except Exception as e: print(f"⚠️ {{e}}") # Generate app.py APP_CODE = '''import gradio as gr from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer import torch from threading import Thread MODEL_ID = "{hf_username}/{output_model_name}" if "{hf_username}" else "{output_model_name}" tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True ) def chat(message, history): messages = [] for h in history: messages.append({{"role": "user", "content": h[0]}}) if h[1]: messages.append({{"role": "assistant", "content": h[1]}}) messages.append({{"role": "user", "content": message}}) text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = tokenizer(text, return_tensors="pt").to(model.device) streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) thread = Thread(target=model.generate, kwargs={{ **inputs, "max_new_tokens": 512, "streamer": streamer, "do_sample": True, "temperature": 0.7 }}) thread.start() response = "" for token in streamer: response += token yield response demo = gr.ChatInterface(chat, title="🔥 {output_model_name}", description="Merged with ForgeKit") demo.launch() ''' api = HfApi() # Upload app.py api.upload_file( path_or_fileobj=APP_CODE.encode(), path_in_repo="app.py", repo_id=SPACE_ID, repo_type="space", ) # Upload requirements.txt reqs = "transformers\\ntorch\\naccelerate\\nsentencepiece\\nprotobuf" api.upload_file( path_or_fileobj=reqs.encode(), path_in_repo="requirements.txt", repo_id=SPACE_ID, repo_type="space", ) print(f"\\n🎉 Space deployed!") print(f"🔗 https://huggingface.co/spaces/{{SPACE_ID}}") print(f"\\n⏳ It may take a few minutes to build and start.")""")) # ===== DONE ===== cells.append(_md(f"""## 🎉 All Done! Your merged model **{output_model_name}** is ready. Here's what was created: | Output | Link | |--------|------| | Model | `https://huggingface.co/{hf_username or 'YOUR_USERNAME'}/{output_model_name}` | {'| GGUF Files | Same repo (quantized versions) |' if include_quantize else ''} {'| Chat Space | `https://huggingface.co/spaces/' + (hf_username or 'YOUR_USERNAME') + '/' + output_model_name + '-chat` |' if include_deploy else ''} --- **Made with [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** — Forge your perfect AI model 🔥""")) # ===== BUILD NOTEBOOK ===== notebook = { "nbformat": 4, "nbformat_minor": 5, "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3", }, "language_info": {"name": "python", "version": "3.10.0"}, "colab": { "provenance": [], "gpuType": "T4", }, "accelerator": "GPU", }, "cells": cells, } return notebook def _generate_model_card(config: MergeConfig, name: str, username: str) -> str: """Generate a model card README.md for the merged model.""" method_info = MERGE_METHODS.get(config.method, {}) models_list = "\n".join(f"- [{m}](https://huggingface.co/{m})" for m in config.models) base_link = f"[{config.base_model}](https://huggingface.co/{config.base_model})" if config.base_model else "N/A" return f"""--- tags: - merge - mergekit - forgekit base_model: {config.base_model or config.models[0] if config.models else ''} license: apache-2.0 --- # {name} This model was created using **[ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** — an open-source model merging platform. ## Merge Details | Parameter | Value | |-----------|-------| | **Method** | {method_info.get('name', config.method)} | | **Base Model** | {base_link} | | **dtype** | {config.dtype} | ### Source Models {models_list} ### Configuration ```yaml {generate_yaml(config)} ``` ## Usage ```python from transformers import AutoTokenizer, AutoModelForCausalLM tokenizer = AutoTokenizer.from_pretrained("{username}/{name}" if "{username}" else "{name}") model = AutoModelForCausalLM.from_pretrained("{username}/{name}" if "{username}" else "{name}") ``` --- *Made with [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)* 🔥 """ def notebook_to_json(notebook: dict) -> str: """Serialize notebook to JSON string.""" return json.dumps(notebook, indent=2, ensure_ascii=False) def save_notebook(notebook: dict, path: str): """Save notebook to .ipynb file.""" with open(path, "w", encoding="utf-8") as f: json.dump(notebook, f, indent=2, ensure_ascii=False)