Forgekit / forgekit /notebook_generator.py
AIencoder's picture
Rename notebook_generator.py to forgekit/notebook_generator.py
816c4d5 verified
"""Google Colab notebook generator for model merging, quantization, and deployment."""
import json
from typing import Optional
from .config_generator import MergeConfig, generate_yaml, MERGE_METHODS
def _cell(source: str, cell_type: str = "code") -> dict:
"""Create a notebook cell."""
return {
"cell_type": cell_type,
"metadata": {},
"source": source.split("\n"),
"outputs": [] if cell_type == "code" else [],
**({"execution_count": None} if cell_type == "code" else {}),
}
def _md(text: str) -> dict:
return _cell(text, "markdown")
def generate_merge_notebook(
config: MergeConfig,
output_model_name: str = "",
hf_username: str = "",
include_quantize: bool = True,
include_deploy: bool = True,
quant_types: Optional[list[str]] = None,
) -> dict:
"""Generate a complete Colab notebook for merging models.
Args:
config: MergeConfig with all merge parameters
output_model_name: Name for the merged model (e.g., "My-Merged-7B")
hf_username: HF username for upload (e.g., "AIencoder")
include_quantize: Include GGUF quantization cells
include_deploy: Include HF Space deployment cells
quant_types: List of quantization types (default: ["Q5_K_M", "Q4_K_M"])
Returns:
Complete notebook dict (nbformat v4)
"""
if quant_types is None:
quant_types = ["Q5_K_M", "Q4_K_M"]
if not output_model_name:
output_model_name = "ForgeKit-Merged-Model"
yaml_config = generate_yaml(config)
method_info = MERGE_METHODS.get(config.method, {})
# Estimate RAM for Colab runtime recommendation
ram_note = ""
if config.models:
n_models = len(config.models)
# Rough heuristic
if any("14b" in m.lower() or "13b" in m.lower() for m in config.models):
ram_note = "⚠️ 14B models need **High-RAM runtime** (48GB). Go to Runtime β†’ Change runtime β†’ High-RAM."
elif any("70b" in m.lower() for m in config.models):
ram_note = "⚠️ 70B models need **A100 GPU** (Colab Pro+). This won't work on free tier."
elif any("7b" in m.lower() or "8b" in m.lower() for m in config.models):
ram_note = "πŸ’‘ 7-8B models work on **High-RAM CPU** runtime (free tier). No GPU needed."
cells = []
# ===== HEADER =====
cells.append(_md(f"""# πŸ”₯ ForgeKit β€” Model Merge Notebook
**Generated by [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)**
This notebook will:
1. βœ… Install mergekit and dependencies
2. βœ… Merge your selected models using **{method_info.get('name', config.method)}**
3. {'βœ…' if include_quantize else '⬜'} Quantize to GGUF format
4. {'βœ…' if include_deploy else '⬜'} Upload to HuggingFace Hub
**Models being merged:**
{chr(10).join(f'- `{m}`' for m in config.models)}
**Method:** {method_info.get('name', config.method)} β€” {method_info.get('description', '')}
{ram_note}
---
⚑ **Quick Start:** Click **Runtime β†’ Run all** to execute everything."""))
# ===== CELL 1: INSTALL =====
cells.append(_md("## 1️⃣ Install Dependencies"))
cells.append(_cell("""# Install mergekit and dependencies
!pip install -q mergekit[all] huggingface_hub transformers accelerate
!pip install -q pyyaml sentencepiece protobuf
print("βœ… All dependencies installed!")"""))
# ===== CELL 2: HF LOGIN =====
cells.append(_md("## 2️⃣ HuggingFace Login\nRequired for downloading gated models and uploading your merge."))
cells.append(_cell("""from huggingface_hub import notebook_login
notebook_login()"""))
# ===== CELL 3: CONFIG =====
cells.append(_md(f"""## 3️⃣ Merge Configuration
Your merge config (auto-generated by ForgeKit). Edit the YAML below if you want to tweak weights or parameters."""))
escaped_yaml = yaml_config.replace('"', '\\"')
cells.append(_cell(f"""# === CONFIGURATION ===
MODEL_NAME = "{output_model_name}"
USERNAME = "{hf_username}" # Change to your HF username
YAML_CONFIG = \"\"\"
{yaml_config}\"\"\"
# Display the config
print("πŸ“‹ Merge Configuration:")
print("=" * 50)
print(YAML_CONFIG)
print("=" * 50)
print(f"\\nπŸ“¦ Output: {{USERNAME}}/{{MODEL_NAME}}" if USERNAME else f"\\nπŸ“¦ Output: {{MODEL_NAME}}")"""))
# ===== CELL 4: MERGE =====
cells.append(_md("""## 4️⃣ Execute Merge
This is the main merge step. Time depends on model sizes:
| Size | Estimated Time |
|------|---------------|
| 1-3B | 5-15 min |
| 7B | 15-30 min |
| 14B | 30-60 min |"""))
cells.append(_cell("""import yaml
import os
import time
# Write config to file
with open("merge_config.yaml", "w") as f:
f.write(YAML_CONFIG)
# Create output directory
os.makedirs("merged_model", exist_ok=True)
print("πŸ”₯ Starting merge...")
print(f" Method: {yaml.safe_load(YAML_CONFIG).get('merge_method', 'unknown')}")
print(f" Models: {len(yaml.safe_load(YAML_CONFIG).get('models', []))}")
print()
start = time.time()
# Run mergekit
!mergekit-yaml merge_config.yaml merged_model --copy-tokenizer --allow-crimes --lazy-unpickle
elapsed = time.time() - start
print(f"\\nβœ… Merge complete in {elapsed/60:.1f} minutes!")
print(f"πŸ“ Output: ./merged_model/")
# Show output size
total = sum(
os.path.getsize(os.path.join("merged_model", f))
for f in os.listdir("merged_model")
if os.path.isfile(os.path.join("merged_model", f))
)
print(f"πŸ’Ύ Total size: {total / (1024**3):.2f} GB")"""))
# ===== CELL 5: TEST =====
cells.append(_md("## 5️⃣ Quick Test\nVerify the merged model loads and generates text."))
cells.append(_cell("""from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
print("πŸ§ͺ Loading merged model for testing...")
tokenizer = AutoTokenizer.from_pretrained("merged_model", trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
"merged_model",
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True,
)
# Test prompts
test_prompts = [
"Write a Python function to calculate fibonacci numbers:",
"Explain what machine learning is in simple terms:",
"What is 15 * 23 + 7?",
]
print("\\n" + "=" * 60)
for prompt in test_prompts:
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
output = model.generate(
**inputs,
max_new_tokens=100,
do_sample=False,
temperature=1.0,
)
response = tokenizer.decode(output[0], skip_special_tokens=True)
print(f"\\nπŸ“ Prompt: {prompt}")
print(f"πŸ€– Response: {response[len(prompt):].strip()[:200]}...")
print("-" * 60)
print("\\nβœ… Model test complete!")
# Clean up GPU memory
del model
torch.cuda.empty_cache() if torch.cuda.is_available() else None"""))
# ===== CELL 6: UPLOAD =====
cells.append(_md("## 6️⃣ Upload to HuggingFace Hub"))
model_card = _generate_model_card(config, output_model_name, hf_username)
escaped_card = model_card.replace('"""', '\\"\\"\\"')
cells.append(_cell(f"""from huggingface_hub import HfApi, create_repo
REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME
# Create repo
try:
create_repo(REPO_ID, exist_ok=True, repo_type="model")
print(f"πŸ“¦ Repo ready: https://huggingface.co/{{REPO_ID}}")
except Exception as e:
print(f"⚠️ Repo creation: {{e}}")
# Write model card
MODEL_CARD = \"\"\"{model_card}\"\"\"
with open("merged_model/README.md", "w") as f:
f.write(MODEL_CARD)
# Upload
api = HfApi()
print("⬆️ Uploading merged model (this may take a while)...")
api.upload_folder(
repo_id=REPO_ID,
folder_path="merged_model",
commit_message=f"Upload {{MODEL_NAME}} merged with ForgeKit",
)
print(f"\\nβœ… Model uploaded!")
print(f"πŸ”— https://huggingface.co/{{REPO_ID}}")"""))
# ===== CELL 7: QUANTIZE (optional) =====
if include_quantize:
cells.append(_md(f"""## 7️⃣ Quantize to GGUF
Convert to GGUF format for use with llama.cpp, Ollama, LM Studio, etc.
**Quantization types:** {', '.join(quant_types)}"""))
quant_cmds = "\n".join(
f' !./llama.cpp/llama-quantize model-f16.gguf {output_model_name}-{q}.gguf {q}\n'
f' print(f"βœ… {q} done: {output_model_name}-{q}.gguf")'
for q in quant_types
)
cells.append(_cell(f"""import os
print("πŸ“¦ Setting up llama.cpp for GGUF conversion...")
# Clone and build llama.cpp
if not os.path.exists("llama.cpp"):
!git clone --depth 1 https://github.com/ggerganov/llama.cpp
!cd llama.cpp && make -j$(nproc) llama-quantize
# Install conversion deps
!pip install -q gguf
# Convert to f16 GGUF first
print("\\nπŸ”„ Converting to GGUF (f16)...")
!python llama.cpp/convert_hf_to_gguf.py merged_model --outfile model-f16.gguf --outtype f16
# Quantize to each target
print("\\nπŸ—œοΈ Quantizing...")
if os.path.exists("model-f16.gguf"):
{quant_cmds}
# Show file sizes
print("\\nπŸ“Š Output sizes:")
for f in os.listdir("."):
if f.endswith(".gguf"):
size_gb = os.path.getsize(f) / (1024**3)
print(f" {{f}}: {{size_gb:.2f}} GB")
else:
print("❌ f16 conversion failed. Check errors above.")"""))
# Upload GGUFs
cells.append(_cell(f"""# Upload GGUF files to the same repo
import os
from huggingface_hub import HfApi
api = HfApi()
REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME
gguf_files = [f for f in os.listdir(".") if f.endswith(".gguf") and f != "model-f16.gguf"]
for gf in gguf_files:
print(f"⬆️ Uploading {{gf}}...")
api.upload_file(
path_or_fileobj=gf,
path_in_repo=gf,
repo_id=REPO_ID,
)
print(f" βœ… Done")
print(f"\\nπŸŽ‰ All GGUF files uploaded to https://huggingface.co/{{REPO_ID}}")"""))
# ===== CELL 8: DEPLOY (optional) =====
if include_deploy:
cells.append(_md("""## 8️⃣ Deploy to HuggingFace Space
Create a Gradio chat Space running your merged model."""))
cells.append(_cell(f"""from huggingface_hub import HfApi, create_repo
SPACE_ID = f"{{USERNAME}}/{{MODEL_NAME}}-chat" if USERNAME else f"{{MODEL_NAME}}-chat"
REPO_ID = f"{{USERNAME}}/{{MODEL_NAME}}" if USERNAME else MODEL_NAME
# Create Space
try:
create_repo(SPACE_ID, repo_type="space", space_sdk="gradio", exist_ok=True)
print(f"πŸš€ Space created: https://huggingface.co/spaces/{{SPACE_ID}}")
except Exception as e:
print(f"⚠️ {{e}}")
# Generate app.py
APP_CODE = '''import gradio as gr
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
import torch
from threading import Thread
MODEL_ID = "{hf_username}/{output_model_name}" if "{hf_username}" else "{output_model_name}"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True
)
def chat(message, history):
messages = []
for h in history:
messages.append({{"role": "user", "content": h[0]}})
if h[1]:
messages.append({{"role": "assistant", "content": h[1]}})
messages.append({{"role": "user", "content": message}})
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = tokenizer(text, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
thread = Thread(target=model.generate, kwargs={{
**inputs, "max_new_tokens": 512, "streamer": streamer, "do_sample": True, "temperature": 0.7
}})
thread.start()
response = ""
for token in streamer:
response += token
yield response
demo = gr.ChatInterface(chat, title="πŸ”₯ {output_model_name}", description="Merged with ForgeKit")
demo.launch()
'''
api = HfApi()
# Upload app.py
api.upload_file(
path_or_fileobj=APP_CODE.encode(),
path_in_repo="app.py",
repo_id=SPACE_ID,
repo_type="space",
)
# Upload requirements.txt
reqs = "transformers\\ntorch\\naccelerate\\nsentencepiece\\nprotobuf"
api.upload_file(
path_or_fileobj=reqs.encode(),
path_in_repo="requirements.txt",
repo_id=SPACE_ID,
repo_type="space",
)
print(f"\\nπŸŽ‰ Space deployed!")
print(f"πŸ”— https://huggingface.co/spaces/{{SPACE_ID}}")
print(f"\\n⏳ It may take a few minutes to build and start.")"""))
# ===== DONE =====
cells.append(_md(f"""## πŸŽ‰ All Done!
Your merged model **{output_model_name}** is ready. Here's what was created:
| Output | Link |
|--------|------|
| Model | `https://huggingface.co/{hf_username or 'YOUR_USERNAME'}/{output_model_name}` |
{'| GGUF Files | Same repo (quantized versions) |' if include_quantize else ''}
{'| Chat Space | `https://huggingface.co/spaces/' + (hf_username or 'YOUR_USERNAME') + '/' + output_model_name + '-chat` |' if include_deploy else ''}
---
**Made with [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** β€” Forge your perfect AI model πŸ”₯"""))
# ===== BUILD NOTEBOOK =====
notebook = {
"nbformat": 4,
"nbformat_minor": 5,
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3",
},
"language_info": {"name": "python", "version": "3.10.0"},
"colab": {
"provenance": [],
"gpuType": "T4",
},
"accelerator": "GPU",
},
"cells": cells,
}
return notebook
def _generate_model_card(config: MergeConfig, name: str, username: str) -> str:
"""Generate a model card README.md for the merged model."""
method_info = MERGE_METHODS.get(config.method, {})
models_list = "\n".join(f"- [{m}](https://huggingface.co/{m})" for m in config.models)
base_link = f"[{config.base_model}](https://huggingface.co/{config.base_model})" if config.base_model else "N/A"
return f"""---
tags:
- merge
- mergekit
- forgekit
base_model: {config.base_model or config.models[0] if config.models else ''}
license: apache-2.0
---
# {name}
This model was created using **[ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)** β€” an open-source model merging platform.
## Merge Details
| Parameter | Value |
|-----------|-------|
| **Method** | {method_info.get('name', config.method)} |
| **Base Model** | {base_link} |
| **dtype** | {config.dtype} |
### Source Models
{models_list}
### Configuration
```yaml
{generate_yaml(config)}
```
## Usage
```python
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained("{username}/{name}" if "{username}" else "{name}")
model = AutoModelForCausalLM.from_pretrained("{username}/{name}" if "{username}" else "{name}")
```
---
*Made with [ForgeKit](https://huggingface.co/spaces/AIencoder/ForgeKit)* πŸ”₯
"""
def notebook_to_json(notebook: dict) -> str:
"""Serialize notebook to JSON string."""
return json.dumps(notebook, indent=2, ensure_ascii=False)
def save_notebook(notebook: dict, path: str):
"""Save notebook to .ipynb file."""
with open(path, "w", encoding="utf-8") as f:
json.dump(notebook, f, indent=2, ensure_ascii=False)