Spaces:
Build error
Build error
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from transformers import pipeline | |
| from transformers.pipelines import PIPELINE_REGISTRY, FillMaskPipeline | |
| from transformers import AutoModelForMaskedLM | |
| ex_str1 = "A crustless sandwich made from two slices of baked bread. The sandwich includes first and second matching " \ | |
| "crustless bread pieces. The bread pieces have the same general outer shape defined by an outer periphery " \ | |
| "with central portions surrounded by an outer peripheral area, the bread pieces being at least partially " \ | |
| "crimped together at the outer peripheral area." | |
| ex_str2 = "The present disclosure provides a DNA-targeting RNA that comprises a targeting sequence and, together with" \ | |
| " a modifying polypeptide, provides for site-specific modification of a target DNA and/or a polypeptide" \ | |
| " associated with the target DNA. " | |
| ex_str3 = "The graphite plane is composed of a two-dimensional hexagonal lattice of carbon atoms and the plate has a " \ | |
| "length and a width parallel to the graphite plane and a thickness orthogonal to the graphite plane with at " \ | |
| "least one of the length, width, and thickness values being 100 nanometers or smaller. " | |
| tab_two_examples = [[ex_str1, 1.2, 1], | |
| [ex_str2, 1.5, 10], | |
| [ex_str3, 1.4, 5]] | |
| tab_one_examples = [['A crustless _ made from two slices of baked bread.'], | |
| ['The present disclosure provides a DNA-targeting RNA that comprises a targeting _.'], | |
| ['The _ plane is composed of a two-dimensional hexagonal lattice of carbon atoms.'] | |
| ] | |
| def add_mask(text): | |
| split_text = text.split() | |
| # If the user supplies a mask, don't add more | |
| if '_' in split_text: | |
| u_pos = [i for i, s in enumerate(split_text) if '_' in s][0] | |
| split_text[u_pos] = '[MASK]' | |
| return ' '.join(split_text), '[MASK]' | |
| idx = np.random.randint(len(split_text), size=1).astype(int)[0] | |
| # Don't mask certain words | |
| num_iters = 0 | |
| while split_text[idx].lower() in ['a', 'an', 'the', 'is', 'and', 'or']: | |
| num_iters += 1 | |
| idx = np.random.randint(len(split_text), size=1).astype(int)[0] | |
| if num_iters > 10: | |
| break | |
| masked_string = split_text[idx] | |
| split_text[idx] = '[MASK]' | |
| masked_output = ' '.join(split_text) | |
| return masked_output, masked_string | |
| class TempScalePipe(FillMaskPipeline): | |
| def _sanitize_parameters(self, top_k=None, targets=None, temp=None): | |
| postprocess_params = {} | |
| if targets is not None: | |
| target_ids = self.get_target_ids(targets, top_k) | |
| postprocess_params["target_ids"] = target_ids | |
| if top_k is not None: | |
| postprocess_params["top_k"] = top_k | |
| if temp is not None: | |
| postprocess_params["temp"] = temp | |
| return {}, {}, postprocess_params | |
| def __call__(self, inputs, *args, **kwargs): | |
| """ | |
| Fill the masked token in the text(s) given as inputs. | |
| Args: | |
| args (`str` or `List[str]`): | |
| One or several texts (or one list of prompts) with masked tokens. | |
| targets (`str` or `List[str]`, *optional*): | |
| When passed, the model will limit the scores to the passed targets instead of looking up in the whole | |
| vocab. If the provided targets are not in the model vocab, they will be tokenized and the first | |
| resulting token will be used (with a warning, and that might be slower). | |
| top_k (`int`, *optional*): | |
| When passed, overrides the number of predictions to return. | |
| Return: | |
| A list or a list of list of `dict`: Each result comes as list of dictionaries with the following keys: | |
| - **sequence** (`str`) -- The corresponding input with the mask token prediction. | |
| - **score** (`float`) -- The corresponding probability. | |
| - **token** (`int`) -- The predicted token id (to replace the masked one). | |
| - **token** (`str`) -- The predicted token (to replace the masked one). | |
| """ | |
| outputs = super().__call__(inputs, **kwargs) | |
| if isinstance(inputs, list) and len(inputs) == 1: | |
| return outputs[0] | |
| return outputs | |
| def postprocess(self, model_outputs, top_k=10, target_ids=None, temp=1): | |
| # Cap top_k if there are targets | |
| if target_ids is not None and target_ids.shape[0] < top_k: | |
| top_k = target_ids.shape[0] | |
| input_ids = model_outputs["input_ids"][0] | |
| outputs = model_outputs["logits"] | |
| masked_index = torch.nonzero(input_ids == self.tokenizer.mask_token_id, as_tuple=False).squeeze(-1) | |
| # Fill mask pipeline supports only one ${mask_token} per sample | |
| logits = outputs[0, masked_index, :] / temp | |
| probs = logits.softmax(dim=-1) | |
| sampling = False | |
| if sampling: | |
| predictions = torch.multinomial(probs, num_samples=3) | |
| values = probs[0, predictions] | |
| if target_ids is not None: | |
| probs = probs[..., target_ids] | |
| if not sampling: | |
| values, predictions = probs.topk(top_k) | |
| result = [] | |
| single_mask = values.shape[0] == 1 | |
| for i, (_values, _predictions) in enumerate(zip(values.tolist(), predictions.tolist())): | |
| row = [] | |
| for v, p in zip(_values, _predictions): | |
| # Copy is important since we're going to modify this array in place | |
| tokens = input_ids.numpy().copy() | |
| if target_ids is not None: | |
| p = target_ids[p].tolist() | |
| tokens[masked_index[i]] = p | |
| # Filter padding out: | |
| tokens = tokens[np.where(tokens != self.tokenizer.pad_token_id)] | |
| # Originally we skip special tokens to give readable output. | |
| # For multi masks though, the other [MASK] would be removed otherwise | |
| # making the output look odd, so we add them back | |
| sequence = self.tokenizer.decode(tokens, skip_special_tokens=single_mask) | |
| proposition = {"score": v, "token": p, "token_str": self.tokenizer.decode([p]), "sequence": sequence} | |
| row.append(proposition) | |
| result.append(row) | |
| if single_mask: | |
| return result[0] | |
| return result | |
| PIPELINE_REGISTRY.register_pipeline( | |
| "temp-scale", | |
| pipeline_class=TempScalePipe, | |
| pt_model=AutoModelForMaskedLM, | |
| ) | |
| scrambler = pipeline("temp-scale", model="anferico/bert-for-patents") | |
| def sample_output(out, sampling): | |
| score_to_str = {out[k]: k for k in out.keys()} | |
| score_list = list(score_to_str.keys()) | |
| if sampling == 'multi': | |
| idx = np.argmax(np.random.multinomial(1, score_list, 1)) | |
| else: | |
| idx = np.random.randint(0, len(score_list)) | |
| score = score_list[idx] | |
| return score_to_str[score] | |
| def unmask_single(text, temp=1): | |
| masked_text, _ = add_mask(text) | |
| res = scrambler(masked_text, temp=temp, top_k=10) | |
| out = {item["token_str"]: item["score"] for item in res} | |
| return out | |
| def unmask(text, temp, rounds): | |
| sampling = 'multi' | |
| for _ in range(rounds): | |
| masked_text, masked = add_mask(text) | |
| split_text = masked_text.split() | |
| res = scrambler(masked_text, temp=temp, top_k=15) | |
| mask_pos = [i for i, t in enumerate(split_text) if 'MASK' in t][0] | |
| out = {item["token_str"]: item["score"] for item in res} | |
| new_token = sample_output(out, sampling) | |
| unsuccessful_iters = 0 | |
| while masked in new_token: | |
| if unsuccessful_iters > 5: | |
| break | |
| print('skipped', new_token) | |
| new_token = sample_output(out, sampling=sampling) | |
| unsuccessful_iters += 1 | |
| if masked in new_token: | |
| split_text[mask_pos] = new_token | |
| else: | |
| split_text[mask_pos] = '*' + new_token + '*' | |
| text = ' '.join(split_text) | |
| text = list(text) | |
| text[0] = text[0].upper() | |
| return ''.join(text) | |
| textbox1 = gr.Textbox(label="Input Sentence", lines=5) | |
| output_textbox1 = gr.Textbox(placeholder="Output will appear here", lines=4) | |
| textbox2 = gr.Textbox(label="Input Sentences", lines=5) | |
| output_textbox2 = gr.Textbox(placeholder="Output will appear here", lines=4) | |
| temp_slider2 = gr.Slider(1.0, 3.0, value=1.0, label='Creativity') | |
| edit_slider2 = gr.Slider(1, 20, step=1, value=1.0, label='Number of edits') | |
| title1 = "Patent-BERT Sentence Remix-er: Single Edit" | |
| description1 = """<p> | |
| This is a model based on | |
| <a href= "https://github.com/google/patents-public-data/blob/master/models/BERT%20for%20Patents.md">Patent BERT</a> created by Google. | |
| <br/> | |
| Try inserting a '_' where you want the model to generate a list of likely words. | |
| <strong>Note:</strong> You can only add one '_' per submission. | |
| <br/> | |
| <p/>""" | |
| title2 = "Patent-BERT Sentence Remix-er: Multiple Edits" | |
| description2 = """<p> | |
| Try typing in a sentence for the model to remix. Adjust the 'creativity' scale bar to change the | |
| the model's confidence in its likely substitutions and the 'number of edits' for the number of edits you want | |
| the model to attempt to make. The words substituted in the output sentence will be enclosed in asterisks (e.g., *word*). | |
| <br/> <p/> """ | |
| demo1 = gr.Interface( | |
| fn=unmask_single, | |
| inputs=[textbox1], | |
| outputs='label', | |
| examples=tab_one_examples, | |
| allow_flagging='never', | |
| title=title1, | |
| description=description1 | |
| ) | |
| demo2 = gr.Interface( | |
| fn=unmask, | |
| inputs=[textbox2, temp_slider2, edit_slider2], | |
| outputs=[output_textbox2], | |
| examples=tab_two_examples, | |
| allow_flagging='never', | |
| title=title2, | |
| description=description2 | |
| ) | |
| gr.TabbedInterface( | |
| [demo1, demo2], ["Single edit", "Multiple Edits"] | |
| ).launch() | |