Spaces:
Runtime error
Runtime error
| import copy | |
| import gc | |
| import os | |
| import sys | |
| import tempfile | |
| import unittest | |
| from typing import Optional | |
| import numpy as np | |
| import torch | |
| import transformers | |
| from peft import get_peft_config, get_peft_model | |
| from peft.utils.config import PeftType, TaskType | |
| from transformers import AutoConfig, AutoModelForCausalLM | |
| from trlx.data.configs import TokenizerConfig | |
| from trlx.data.default_configs import ( | |
| ModelConfig, | |
| default_ilql_config, | |
| default_ppo_config, | |
| default_sft_config, | |
| ) | |
| from trlx.models.modeling_ilql import ( | |
| AutoModelForCausalLMWithILQLHeads, | |
| AutoModelForSeq2SeqLMWithILQLHeads, | |
| ) | |
| from trlx.models.modeling_ppo import ( | |
| AutoModelForCausalLMWithHydraValueHead, | |
| AutoModelForCausalLMWithValueHead, | |
| AutoModelForSeq2SeqLMWithHydraValueHead, | |
| ) | |
| from trlx.trainer.accelerate_ilql_trainer import AccelerateILQLTrainer | |
| from trlx.trainer.accelerate_ppo_trainer import AcceleratePPOTrainer | |
| from trlx.trainer.accelerate_sft_trainer import AccelerateSFTTrainer | |
| PPO = "ppo" | |
| ILQL = "ilql" | |
| SFT = "sft" | |
| TRAINING_TYPES = [PPO, ILQL, SFT] | |
| CAUSAL = "causal" | |
| SEQ2SEQ = "seq2seq" | |
| MODEL_TASK_TYPE = { | |
| "gpt2": CAUSAL, | |
| "google/t5-efficient-tiny": SEQ2SEQ, | |
| # "EleutherAI/pythia-160m": CAUSAL, | |
| # "facebook/opt-125m": CAUSAL, | |
| } | |
| MODELS_TO_TEST = list(MODEL_TASK_TYPE.keys()) | |
| PEFT_CONFIGS_TO_TEST = [PeftType.LORA, PeftType.PROMPT_TUNING, PeftType.PREFIX_TUNING] | |
| ALL_TEST_COMBINATIONS = [ | |
| [training_type, model_path, peft_type] | |
| for training_type in TRAINING_TYPES | |
| for model_path in MODELS_TO_TEST | |
| for peft_type in PEFT_CONFIGS_TO_TEST | |
| if [training_type, MODEL_TASK_TYPE[model_path]] != [SFT, SEQ2SEQ] # Seq2Seq SFT not implemented | |
| and (MODEL_TASK_TYPE[model_path] != SEQ2SEQ or peft_type == PeftType.LORA) | |
| # Skip some tests due to implementation problems of peft 0.3.0 with Seq2Seq | |
| ] | |
| class TestPeft(unittest.TestCase): | |
| def setUp(self): | |
| np.random.seed(0) | |
| torch.manual_seed(0) | |
| torch.cuda.manual_seed_all(0) | |
| def tearDown(self): | |
| gc.collect() # Try to free up memory | |
| def _create_model( | |
| self, | |
| training_type: str, | |
| model_path: str, | |
| task_type: str, | |
| peft_type: Optional[str], | |
| create_trainer: bool = False, | |
| ): | |
| self.peft_config = self._get_peft_config(peft_type, task_type) if peft_type else None | |
| if create_trainer: | |
| self.trainer = self._get_trainer(training_type, model_path, task_type, self.peft_config) | |
| self.model = self.trainer.model.to("cpu") | |
| else: | |
| # Should be a bit faster to execute than creating a trainer. | |
| if training_type == SFT: | |
| self.model = AutoModelForCausalLM.from_pretrained(model_path) | |
| if self.peft_config: | |
| self.model = get_peft_model(self.model, self.peft_config) | |
| else: | |
| self.model = self._get_auto_model_type(training_type, task_type).from_pretrained( | |
| model_path, | |
| peft_config=self.peft_config, | |
| ) | |
| self._create_inputs(model_path, task_type) | |
| def _create_inputs(self, tokenizer_path, task_type): | |
| self.tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer_path) | |
| if task_type == CAUSAL: | |
| self.inputs = self.tokenizer( | |
| "Once upon a time there was a happy goose named Louis. He liked to eat bananas and", | |
| return_tensors="pt", | |
| ) | |
| elif task_type == SEQ2SEQ: | |
| self.encoder_text = "Translate this text to French: Hello, my dog is cute" | |
| self.decoder_text = "Bonjour, mon chien est mignon" | |
| encoder_inputs = self.tokenizer(self.encoder_text, return_tensors="pt") | |
| decoder_inputs = self.tokenizer(self.decoder_text, return_tensors="pt") | |
| self.inputs = { | |
| **encoder_inputs, | |
| "decoder_input_ids": decoder_inputs.input_ids, | |
| "decoder_attention_mask": decoder_inputs.attention_mask, | |
| } | |
| else: | |
| # Classification tasks not implemented | |
| raise NotImplementedError | |
| def _get_trainer(self, training_type, model_path: str, task_type: str, peft_config, tokenizer_path: str = None): | |
| if training_type == PPO: | |
| config = default_ppo_config() | |
| trainer_type = AcceleratePPOTrainer | |
| elif training_type == ILQL: | |
| config = default_ilql_config() | |
| trainer_type = AccelerateILQLTrainer | |
| elif training_type == SFT: | |
| config = default_sft_config() | |
| trainer_type = AccelerateSFTTrainer | |
| else: | |
| raise ValueError(f"Training type {training_type} not recognized.") | |
| config.tokenizer = TokenizerConfig(tokenizer_path=tokenizer_path if tokenizer_path else model_path) | |
| config.model = ModelConfig(model_path=model_path, peft_config=peft_config, model_arch_type=task_type) | |
| config.train.tracker = None | |
| return trainer_type(config) | |
| def _get_auto_model_type(self, training_type, task_type): | |
| if training_type == PPO: | |
| if task_type == CAUSAL: | |
| return AutoModelForCausalLMWithHydraValueHead | |
| elif task_type == SEQ2SEQ: | |
| return AutoModelForSeq2SeqLMWithHydraValueHead | |
| elif training_type == ILQL: | |
| if task_type == CAUSAL: | |
| return AutoModelForCausalLMWithILQLHeads | |
| elif task_type == SEQ2SEQ: | |
| return AutoModelForSeq2SeqLMWithILQLHeads | |
| elif training_type == SFT and task_type == CAUSAL: | |
| return AutoModelForCausalLM | |
| raise ValueError(f"Training type {training_type} for the task {task_type} not recognized.") | |
| def _get_peft_config(self, peft_type: str, task_type: str): | |
| assert task_type in [CAUSAL, SEQ2SEQ] | |
| task_type = TaskType.CAUSAL_LM if task_type == "causal" else TaskType.SEQ_2_SEQ_LM | |
| if peft_type == PeftType.LORA: | |
| return get_peft_config( | |
| { | |
| "peft_type": peft_type, | |
| "task_type": task_type, | |
| "r": 8, | |
| "lora_alpha": 32, | |
| "lora_dropout": 0.0, | |
| } | |
| ) | |
| elif peft_type == PeftType.PREFIX_TUNING: | |
| return get_peft_config( | |
| { | |
| "peft_type": peft_type, | |
| "task_type": task_type, | |
| "num_virtual_tokens": 10, | |
| } | |
| ) | |
| elif peft_type == PeftType.PROMPT_TUNING: | |
| return get_peft_config( | |
| { | |
| "peft_type": peft_type, | |
| "task_type": task_type, | |
| "prompt_tuning_init": "RANDOM", | |
| "num_virtual_tokens": 10, | |
| } | |
| ) | |
| else: | |
| raise NotImplementedError | |
| def _backprop(self, model): | |
| output = model(**self.inputs, return_dict=True) | |
| # Just apply an arbitrary loss to cause whatever change in the model's parameters. | |
| # This loss doesn't make sense, but it causes a gradient, so it's fine. | |
| loss = torch.nn.functional.binary_cross_entropy_with_logits( | |
| output.logits[0][-1][:1], | |
| torch.tensor([0.53]), | |
| ) | |
| if hasattr(output, "value"): | |
| loss += torch.nn.functional.binary_cross_entropy_with_logits( | |
| output.value.squeeze()[-1:], | |
| torch.tensor([0.53]), | |
| ) | |
| loss.backward() | |
| optimizer = torch.optim.SGD(model.parameters(), lr=0.1) | |
| optimizer.step() | |
| return model | |
| def _check_that_models_are_equivalent(self, model1, model2, training_type, test_hydra=False): | |
| self.assertTrue( | |
| torch.equal(model1(**self.inputs, return_dict=True).logits, model2(**self.inputs, return_dict=True).logits) | |
| ) | |
| state_dict1 = model1.state_dict() | |
| state_dict2 = model2.state_dict() | |
| self.assertEqual(state_dict1.keys(), state_dict2.keys()) | |
| for name in state_dict1.keys(): | |
| self.assertTrue(torch.equal(state_dict1[name], state_dict2[name])) | |
| if training_type != SFT: | |
| self.assertTrue( | |
| torch.equal( | |
| model1(**self.inputs, return_dict=True).value, | |
| model2(**self.inputs, return_dict=True).value, | |
| ) | |
| ) | |
| if training_type == PPO and test_hydra: | |
| self.assertTrue( | |
| torch.equal( | |
| model1.forward_hydra(**self.inputs, return_dict=True).logits, | |
| model2.forward_hydra(**self.inputs, return_dict=True).logits, | |
| ) | |
| ) | |
| def test_save_and_load(self): | |
| for training_type in [PPO, ILQL]: | |
| for model_path in MODELS_TO_TEST: | |
| peft_type = PeftType.LORA | |
| task_type = MODEL_TASK_TYPE[model_path] | |
| self._create_model(training_type, model_path, task_type, peft_type) | |
| self._backprop(self.model) | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| self.model.save_pretrained(tmp_dir) | |
| self.assertTrue(os.path.isfile(f"{tmp_dir}/adapter_model.bin")) | |
| self.assertTrue(os.path.isfile(f"{tmp_dir}/adapter_config.json")) | |
| self.assertTrue(os.path.isfile(f"{tmp_dir}/pytorch_model.bin")) | |
| # Check that it didn't save the whole model (which weights around 500MB) | |
| # pytorch_model.bin should only contain the other trained parts like the value heads. | |
| # ILQL heads are very big though (around 1.1GB for gpt2). | |
| self.assertLess(os.path.getsize(f"{tmp_dir}/pytorch_model.bin"), 1.3e9 if ILQL else 1e7) | |
| auto_model_type = self._get_auto_model_type(training_type, task_type) | |
| loaded_model = auto_model_type.from_pretrained(tmp_dir) | |
| self._check_that_models_are_equivalent(loaded_model, self.model, training_type, True) | |
| def test_from_config(self): | |
| """Check that from_config will add a peft adapter if given the argument peft_config""" | |
| for training_type in TRAINING_TYPES: | |
| peft_config = self._get_peft_config(PeftType.LORA, CAUSAL) | |
| gpt2_config = AutoConfig.from_pretrained("gpt2") | |
| trainer = self._get_trainer(training_type, gpt2_config, CAUSAL, peft_config, tokenizer_path="gpt2") | |
| state_dict = trainer.model.state_dict() | |
| self.assertTrue(any(["lora" in layer_name for layer_name in state_dict.keys()])) | |
| def test_save_and_load_without_peft(self): | |
| """Similar to test_save_load, but with peft not installed. Should not raise any error.""" | |
| with unittest.mock.patch.dict(sys.modules, {"peft": None}): | |
| for training_type in [PPO, ILQL]: | |
| for model_path in MODELS_TO_TEST: | |
| task_type = MODEL_TASK_TYPE[model_path] | |
| self._create_model(training_type, model_path, task_type, peft_type=None) | |
| self._backprop(self.model) | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| self.model.save_pretrained(tmp_dir) | |
| auto_model_type = self._get_auto_model_type(training_type, task_type) | |
| loaded_model = auto_model_type.from_pretrained(tmp_dir) | |
| self._check_that_models_are_equivalent(loaded_model, self.model, training_type) | |
| def test_backpropagation_and_disabling(self): | |
| for training_type, model_path, peft_type in ALL_TEST_COMBINATIONS: | |
| task_type = MODEL_TASK_TYPE[model_path] | |
| self._create_model(training_type, model_path, task_type, peft_type, create_trainer=True) | |
| old_logits = self.model(**self.inputs, return_dict=True).logits | |
| initial_model_state_dict = copy.deepcopy(self.model.state_dict()) | |
| self._backprop(self.model) | |
| self._backprop(self.model) | |
| new_logits = self.model(**self.inputs, return_dict=True).logits | |
| new_model_state_dict = self.model.state_dict() | |
| # Check that the backpropagation affected the predictions | |
| self.assertFalse(torch.equal(old_logits, new_logits)) | |
| # Check that only the peft adapter layers are modified by the backpropagation | |
| self.assertEqual(initial_model_state_dict.keys(), new_model_state_dict.keys()) | |
| for name in initial_model_state_dict.keys(): | |
| parameters_equal = torch.equal(initial_model_state_dict[name], new_model_state_dict[name]) | |
| if "lora" in name or "prompt" in name or "v_head" in name: | |
| self.assertFalse(parameters_equal) | |
| else: | |
| self.assertTrue(parameters_equal) | |
| # Check Lora enabling and disabling | |
| if "LORA" in peft_type: | |
| # If disabling the Lora adapter restores the original logits, | |
| # this shows that the backpropagation only affected the Lora adapter | |
| self.lora_model = self.model.base_model if training_type != SFT else self.model | |
| self.lora_model.disable_adapter_layers() | |
| new_logits = self.model(**self.inputs, return_dict=True).logits | |
| self.assertTrue(torch.equal(old_logits, new_logits)) | |
| # Re-enabling the Lora adapter should make the 2 models different again | |
| self.lora_model.enable_adapter_layers() | |
| new_logits = self.model(**self.inputs, return_dict=True).logits | |
| self.assertFalse(torch.equal(old_logits, new_logits)) | |
| def test_forward_hydra(self): | |
| """Test that PPO hydra heads work and give similar logits to the model without any fine-tuning.""" | |
| for model_path in MODELS_TO_TEST: | |
| for peft_type in PEFT_CONFIGS_TO_TEST: | |
| task_type = MODEL_TASK_TYPE[model_path] | |
| if task_type == SEQ2SEQ and peft_type != PeftType.LORA: | |
| continue # TODO: pass some tests due to some bugs in peft 0.3.0 with Seq2Seq | |
| self._create_model(PPO, model_path, task_type, peft_type) | |
| logits_without_peft = self.model.base_model.base_model(**self.inputs, return_dict=True).logits | |
| logits_before_backpropagation = self.model(**self.inputs, return_dict=True).logits | |
| self._backprop(self.model) | |
| # forward_hydra should return the same logits as the original model | |
| new_logits_from_hydra = self.model.forward_hydra(**self.inputs, return_dict=True).logits | |
| self.assertTrue(torch.equal(logits_without_peft, new_logits_from_hydra)) | |
| if "LORA" in peft_type: | |
| # True because the Lora adapter initially does not modify the output | |
| self.assertTrue(torch.equal(logits_before_backpropagation, new_logits_from_hydra)) | |
| else: | |
| # False because the initial prompt before backpropagation | |
| # was used to calculate logits_before_backpropagation, but not for new_logits_from_hydra. | |
| self.assertFalse(torch.equal(logits_before_backpropagation, new_logits_from_hydra)) | |
| def test_generate(self): | |
| """ | |
| Check that generate works, and that it's deterministic when the temperature is very low. | |
| """ | |
| temperature = 0.0 | |
| for training_type, model_path, peft_type in ALL_TEST_COMBINATIONS: | |
| task_type = MODEL_TASK_TYPE[model_path] | |
| self._create_model(training_type, model_path, task_type, peft_type) | |
| self._backprop(self.model) | |
| with torch.no_grad(): | |
| output1 = self.model.generate( | |
| **self.inputs, | |
| temperature=temperature, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id, | |
| ) | |
| output2 = self.model.generate( | |
| **self.inputs, | |
| temperature=temperature, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id, | |
| ) | |
| self.assertTrue(torch.equal(output1, output2)) | |
| def test_peft_not_installed_error(self): | |
| """If the argument peft_config is used but peft is not installed, expect a ModuleNotFoundError""" | |
| with unittest.mock.patch.dict(sys.modules, {"peft": None}): | |
| peft_config = {"peft_type": "LORA"} | |
| with self.assertRaises(ModuleNotFoundError): | |
| self._get_trainer(PPO, "gpt2", CAUSAL, peft_config) | |
| with self.assertRaises(ModuleNotFoundError): | |
| AutoModelForCausalLMWithHydraValueHead.from_pretrained("gpt2", peft_config=peft_config) | |
| def test_lora_modules_to_save(self): | |
| """ | |
| Test the special Lora config option 'modules_to_save'. | |
| It allows also train some non-lora modules, and its implementation is a bit tricky. | |
| """ | |
| for training_type in [PPO, ILQL]: | |
| trainable_layer_name = "base_model.model.transformer.h.3.mlp" | |
| peft_config = { | |
| "peft_type": PeftType.LORA, | |
| "task_type": CAUSAL, | |
| "r": 8, | |
| "lora_alpha": 32, | |
| "lora_dropout": 0.0, | |
| "modules_to_save": [trainable_layer_name], | |
| } | |
| model = self._get_auto_model_type(training_type, CAUSAL).from_pretrained("gpt2", peft_config=peft_config) | |
| initial_state_dict = copy.deepcopy(model.state_dict()) | |
| self._create_inputs("gpt2", CAUSAL) | |
| # initial_logits = model(**self.inputs, return_dict=True).logits | |
| self._backprop(model) | |
| self._backprop(model) | |
| new_state_dict = model.state_dict() | |
| self.assertEqual(initial_state_dict.keys(), new_state_dict.keys()) | |
| for name in initial_state_dict.keys(): | |
| parameters_equal = torch.equal(initial_state_dict[name], new_state_dict[name]) | |
| if trainable_layer_name + ".modules_to_save" in name or "lora" in name or "v_head" in name: | |
| self.assertFalse(parameters_equal) | |
| else: | |
| self.assertTrue(parameters_equal) | |
| # TODO: deactivated until the issue (https://github.com/huggingface/peft/issues/493) is fixed | |
| # if training_type == PPO: | |
| # forward_hydra_logits = model.forward_hydra(**self.inputs, return_dict=True).logits | |
| # self.assertTrue(torch.equal(initial_logits, forward_hydra_logits)) | |
| trained_model_logits = model(**self.inputs, return_dict=True).logits | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| model.save_pretrained(tmp_dir) | |
| loaded_model = self._get_auto_model_type(training_type, CAUSAL).from_pretrained(tmp_dir) | |
| loaded_model_logits = loaded_model(**self.inputs, return_dict=True).logits | |
| self.assertTrue(torch.equal(trained_model_logits, loaded_model_logits)) | |
| # @unittest.skipUnless( | |
| # importlib.util.find_spec("bitsandbytes") and torch.cuda.is_available(), | |
| # "bitsandbytes and GPU needed to execute test_8bits", | |
| # ) | |
| def test_8bits(self): | |
| """Test the behaviour of from_pretrained with 8 bits models""" | |
| from bitsandbytes.nn import Linear8bitLt | |
| # gpt2 uses Conv1D instead of Linear, so use pythia-160m instead. | |
| model_id = "EleutherAI/pythia-160m" | |
| peft_config = { | |
| "peft_type": PeftType.LORA, | |
| "task_type": TaskType.CAUSAL_LM, | |
| "lora_dropout": 0.0, | |
| "lora_alpha": 32, | |
| } | |
| reference_model = AutoModelForCausalLMWithValueHead.from_pretrained( | |
| model_id, | |
| peft_config=peft_config, | |
| ) | |
| initial_nb_trainable_params = sum(p.numel() for p in reference_model.parameters() if p.requires_grad) | |
| model_8bit = AutoModelForCausalLMWithValueHead.from_pretrained( | |
| model_id, | |
| peft_config=peft_config, | |
| load_in_8bit=True, | |
| peft_int8_kwargs={"use_gradient_checkpointing": True}, | |
| device_map="auto", | |
| ) | |
| new_nb_trainable_params = sum(p.numel() for p in model_8bit.parameters() if p.requires_grad) | |
| self.assertEqual(new_nb_trainable_params, initial_nb_trainable_params) | |
| self.assertIsInstance(reference_model.base_model.model.gpt_neox.layers[0].mlp.dense_h_to_4h, torch.nn.Linear) | |
| self.assertIsInstance(model_8bit.base_model.model.gpt_neox.layers[0].mlp.dense_h_to_4h, Linear8bitLt) | |
| base_model = AutoModelForCausalLM.from_pretrained(model_id, load_in_8bit=True, device_map="auto") | |
| model_8bit = AutoModelForCausalLMWithValueHead.from_pretrained( | |
| base_model, | |
| peft_config=peft_config, | |
| load_in_8bit=True, | |
| peft_int8_kwargs={"use_gradient_checkpointing": False}, | |
| device_map="auto", | |
| ) | |
| new_nb_trainable_params = sum(p.numel() for p in model_8bit.parameters() if p.requires_grad) | |
| self.assertEqual(new_nb_trainable_params, initial_nb_trainable_params) | |
| self.assertIsInstance(model_8bit.base_model.model.gpt_neox.layers[0].mlp.dense_h_to_4h, Linear8bitLt) | |