Spaces:
Runtime error
Runtime error
| from typing import Dict, Optional, Union | |
| from .config import logger, console | |
| from typing import List | |
| import os | |
| import re | |
| import datetime | |
| import random | |
| from typing import List | |
| import re | |
| import textwrap | |
| from datetime import datetime | |
| from rich.pretty import pprint | |
| from rich.table import Table | |
| from collections import defaultdict | |
| from typing import List | |
| import re | |
| import random | |
| from typing import Dict, Optional, Union | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| import re | |
| def ordinal(n): | |
| """Add ordinal suffix to a number""" | |
| return str(n) + ("th" if 4<=n%100<=20 else {1:"st",2:"nd",3:"rd"}.get(n%10, "th")) | |
| def time_of_day(hour): | |
| """Define time of day based on hour""" | |
| if 5 <= hour < 12: | |
| return "in the morning" | |
| elif 12 <= hour < 17: | |
| return "in the afternoon" | |
| elif 17 <= hour < 21: | |
| return "in the evening" | |
| else: | |
| return "at night" | |
| def current_date_time_in_words(): | |
| now = datetime.now() | |
| day_of_week = now.strftime('%A') | |
| month = now.strftime('%B') | |
| day = ordinal(now.day) | |
| year = now.year | |
| hour = now.hour | |
| minute = now.minute | |
| time_of_day_str = time_of_day(hour) | |
| if minute == 0: | |
| minute_str = "" | |
| elif minute == 1: | |
| minute_str = "1 minute past" | |
| elif minute == 15: | |
| minute_str = "quarter past" | |
| elif minute == 30: | |
| minute_str = "half past" | |
| elif minute == 45: | |
| minute_str = "quarter to " | |
| hour += 1 | |
| elif minute < 30: | |
| minute_str = str(minute) + " minutes past" | |
| else: | |
| minute_str = str(60 - minute) + " minutes to " | |
| hour += 1 | |
| hour_str = str(hour if hour <= 12 else hour - 12) | |
| if minute_str: | |
| time_str = minute_str + " " + hour_str | |
| else: | |
| time_str = hour_str + " o'clock" | |
| time_string = f"{day_of_week}, {month} {day}, {year}, {time_str} {time_of_day_str}." | |
| # Prepare final output | |
| return time_string | |
| #Let's keep comptability for now in case people are used to this | |
| # Chunked generation originally from https://github.com/serp-ai/bark-with-voice-clone | |
| def split_general_purpose(text, split_character_goal_length=150, split_character_max_length=200): | |
| # return nltk.sent_tokenize(text) | |
| # from https://github.com/neonbjb/tortoise-tts | |
| """Split text it into chunks of a desired length trying to keep sentences intact.""" | |
| # normalize text, remove redundant whitespace and convert non-ascii quotes to ascii | |
| text = re.sub(r"\n\n+", "\n", text) | |
| text = re.sub(r"\s+", " ", text) | |
| text = re.sub(r"[“”]", '"', text) | |
| rv = [] | |
| in_quote = False | |
| current = "" | |
| split_pos = [] | |
| pos = -1 | |
| end_pos = len(text) - 1 | |
| def seek(delta): | |
| nonlocal pos, in_quote, current | |
| is_neg = delta < 0 | |
| for _ in range(abs(delta)): | |
| if is_neg: | |
| pos -= 1 | |
| current = current[:-1] | |
| else: | |
| pos += 1 | |
| current += text[pos] | |
| if text[pos] == '"': | |
| in_quote = not in_quote | |
| return text[pos] | |
| def peek(delta): | |
| p = pos + delta | |
| return text[p] if p < end_pos and p >= 0 else "" | |
| def commit(): | |
| nonlocal rv, current, split_pos | |
| rv.append(current) | |
| current = "" | |
| split_pos = [] | |
| while pos < end_pos: | |
| c = seek(1) | |
| # do we need to force a split? | |
| if len(current) >= split_character_max_length: | |
| if len(split_pos) > 0 and len(current) > (split_character_goal_length / 2): | |
| # we have at least one sentence and we are over half the desired length, seek back to the last split | |
| d = pos - split_pos[-1] | |
| seek(-d) | |
| else: | |
| # should split on semicolon too | |
| # no full sentences, seek back until we are not in the middle of a word and split there | |
| while c not in ";!?.\n " and pos > 0 and len(current) > split_character_goal_length: | |
| c = seek(-1) | |
| commit() | |
| # check for sentence boundaries | |
| elif not in_quote and (c in ";!?\n" or (c == "." and peek(1) in "\n ")): | |
| # seek forward if we have consecutive boundary markers but still within the max length | |
| while ( | |
| pos < len(text) - 1 and len(current) < split_character_max_length and peek(1) in "!?." | |
| ): | |
| c = seek(1) | |
| split_pos.append(pos) | |
| if len(current) >= split_character_goal_length: | |
| commit() | |
| # treat end of quote as a boundary if its followed by a space or newline | |
| elif in_quote and peek(1) == '"' and peek(2) in "\n ": | |
| seek(2) | |
| split_pos.append(pos) | |
| rv.append(current) | |
| # clean up, remove lines with only whitespace or punctuation | |
| rv = [s.strip() for s in rv] | |
| rv = [s for s in rv if len(s) > 0 and not re.match(r"^[\s\.,;:!?]*$", s)] | |
| return rv | |
| def is_sentence_ending(s): | |
| return s in {"!", "?", ".", ";"} | |
| def is_boundary_marker(s): | |
| return s in {"!", "?", ".", "\n"} | |
| def split_general_purpose_hm(text, split_character_goal_length=110, split_character_max_length=160): | |
| def clean_text(text): | |
| text = re.sub(r"\n\n+", "\n", text) | |
| text = re.sub(r"\s+", " ", text) | |
| text = re.sub(r"[“”]", '"', text) | |
| return text | |
| def _split_text(text): | |
| sentences = [] | |
| sentence = "" | |
| in_quote = False | |
| for i, c in enumerate(text): | |
| sentence += c | |
| if c == '"': | |
| in_quote = not in_quote | |
| elif not in_quote and (is_sentence_ending(c) or c == "\n"): | |
| if i < len(text) - 1 and text[i + 1] in '!?.': | |
| continue | |
| sentences.append(sentence.strip()) | |
| sentence = "" | |
| if sentence.strip(): | |
| sentences.append(sentence.strip()) | |
| return sentences | |
| def recombine_chunks(chunks): | |
| combined_chunks = [] | |
| current_chunk = "" | |
| for chunk in chunks: | |
| if len(current_chunk) + len(chunk) + 1 <= split_character_max_length: | |
| current_chunk += " " + chunk | |
| else: | |
| combined_chunks.append(current_chunk.strip()) | |
| current_chunk = chunk | |
| if current_chunk.strip(): | |
| combined_chunks.append(current_chunk.strip()) | |
| return combined_chunks | |
| cleaned_text = clean_text(text) | |
| sentences = _split_text(cleaned_text) | |
| wrapped_sentences = [textwrap.fill(s, width=split_character_goal_length) for s in sentences] | |
| chunks = [chunk for s in wrapped_sentences for chunk in s.split('\n')] | |
| combined_chunks = recombine_chunks(chunks) | |
| return combined_chunks | |
| def split_text(text: str, split_type: Optional[str] = None, split_type_quantity = 1, split_type_string: Optional[str] = None, split_type_value_type: Optional[str] = None) -> List[str]: | |
| if text == '': | |
| return [text] | |
| # the old syntax still works if you don't use this parameter, ie | |
| # split_type line, split_type_value 4, splits into groups of 4 lines | |
| if split_type_value_type == '': | |
| split_type_value_type = split_type | |
| """ | |
| if split_type == 'phrase': | |
| # print(f"Loading spacy to split by phrase.") | |
| nlp = spacy.load('en_core_web_sm') | |
| chunks = split_by_phrase(text, nlp) | |
| # print(chunks) | |
| return chunks | |
| """ | |
| if split_type == 'string' or split_type == 'regex': | |
| if split_type_string is None: | |
| logger.warning( | |
| f"Splitting by {split_type} requires a string to split by. Returning original text.") | |
| return [text] | |
| split_type_to_function = { | |
| 'word': split_by_words, | |
| 'line': split_by_lines, | |
| 'sentence': split_by_sentence, | |
| 'string': split_by_string, | |
| 'char' : split_by_char, | |
| #'random': split_by_random, | |
| # 'rhyme': split_by_rhymes, | |
| # 'pos': split_by_part_of_speech, | |
| 'regex': split_by_regex, | |
| } | |
| if split_type in split_type_to_function: | |
| # split into groups of 1 by the desired type | |
| # this is so terrible even I'm embarassed, destroy all this code later, but I guess it does something useful atm | |
| segmented_text = split_type_to_function[split_type](text, split_type = split_type, split_type_quantity=1, split_type_string=split_type_string, split_type_value_type=split_type_value_type) | |
| final_segmented_text = [] | |
| current_segment = '' | |
| split_type_quantity_found = 0 | |
| if split_type_value_type is None: | |
| split_type_value_type = split_type | |
| for seg in segmented_text: # for each line, for example, we can now split by 'words' or whatever, as a counter for when to break the group | |
| current_segment += seg | |
| #print(split_type_to_function[split_type](current_segment, split_type=split_type_value_type, split_type_quantity=1, split_type_string=split_type_string)) | |
| split_type_quantity_found = len(split_type_to_function[split_type_value_type](current_segment, split_type=split_type_value_type, split_type_quantity=1, split_type_string=split_type_string)) | |
| #print(f"I see {split_type_quantity_found} {split_type_value_type} in {current_segment}") | |
| if split_type_quantity_found >= int(split_type_quantity): | |
| final_segmented_text.append(current_segment) | |
| split_type_quantity_found = 0 | |
| current_segment = '' | |
| return final_segmented_text | |
| logger.warning( | |
| f"Splitting by {split_type} not a supported option. Returning original text.") | |
| return [text] | |
| def split_by_string(text: str, split_type: Optional[str] = None, split_type_quantity: Optional[int] = 1, split_type_string: Optional[str] = None, split_type_value_type: Optional[str] = None) -> List[str]: | |
| if split_type_string is not None: | |
| split_pattern = f"({split_type_string})" | |
| split_list = re.split(split_pattern, text) | |
| result = [split_list[0]] | |
| for i in range(1, len(split_list), 2): | |
| result.append(split_list[i] + split_list[i+1]) | |
| return result | |
| else: | |
| return text.split() | |
| def split_by_regex(text: str, split_type: Optional[str] = None, split_type_quantity: Optional[int] = 1, split_type_string: Optional[str] = None, split_type_value_type: Optional[str] = None) -> List[str]: | |
| chunks = [] | |
| start = 0 | |
| if split_type_string is not None: | |
| for match in re.finditer(split_type_string, text): | |
| end = match.start() | |
| chunks.append(text[start:end].strip()) | |
| start = end | |
| chunks.append(text[start:].strip()) | |
| return chunks | |
| else: | |
| return text.split() | |
| def split_by_char(text: str, split_type: Optional[str] = None, split_type_quantity = 1, split_type_string: Optional[str] = None, split_type_value_type: Optional[str] = None) -> List[str]: | |
| return list(text) | |
| def split_by_words(text: str, split_type: Optional[str] = None, split_type_quantity = 1, split_type_string: Optional[str] = None, split_type_value_type: Optional[str] = None) -> List[str]: | |
| return [word + ' ' for word in text.split() if text.strip()] | |
| #return [' '.join(words[i:i + split_type_quantity]) for i in range(0, len(words), split_type_quantity)] | |
| def split_by_lines(text: str, split_type: Optional[str] = None, split_type_quantity = 1, split_type_string: Optional[str] = None, split_type_value_type: Optional[str] = None) -> List[str]: | |
| lines = [line + '\n' for line in text.split('\n') if line.strip()] | |
| return lines | |
| #return ['\n'.join(lines[i:i + split_type_quantity]) for i in range(0, len(lines), split_type_quantity)] | |
| def split_by_sentence(text: str, split_type: Optional[str] = None, split_type_quantity: Optional[int] = 1, split_type_string: Optional[str] = None, split_type_value_type: Optional[str] = None) -> List[str]: | |
| import nltk | |
| text = text.replace("\n", " ").strip() | |
| sentences = nltk.sent_tokenize(text) | |
| return [sentence + ' ' for sentence in sentences] | |
| #return [' '.join(sentences[i:i + split_type_quantity]) for i in range(0, len(sentences), split_type_quantity)] | |
| """ | |
| def split_by_sentences(text: str, n: int, language="en") -> List[str]: | |
| seg = pysbd.Segmenter(language=language, clean=False) | |
| sentences = seg.segment(text) | |
| return [' '.join(sentences[i:i + n]) for i in range(0, len(sentences), n)] | |
| """ | |
| def load_text(file_path: str) -> Union[str, None]: | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| logger.info(f"Successfully loaded the file: {file_path}") | |
| return content | |
| except FileNotFoundError: | |
| logger.error(f"File not found: {file_path}") | |
| except PermissionError: | |
| logger.error(f"Permission denied to read the file: {file_path}") | |
| except Exception as e: | |
| logger.error( | |
| f"An unexpected error occurred while reading the file: {file_path}. Error: {e}") | |
| return None | |
| # Good for just exploring random voices | |
| """ | |
| def split_by_random(text: str, n: int) -> List[str]: | |
| words = text.split() | |
| chunks = [] | |
| min_len = max(1, n - 2) | |
| max_len = n + 2 | |
| while words: | |
| chunk_len = random.randint(min_len, max_len) | |
| chunk = ' '.join(words[:chunk_len]) | |
| chunks.append(chunk) | |
| words = words[chunk_len:] | |
| return chunks | |
| """ | |
| # too many libraries, removing | |
| """ | |
| def split_by_phrase(text: str, nlp, min_duration=8, max_duration=18, words_per_second=2.3) -> list: | |
| if text is None: | |
| return '' | |
| doc = nlp(text) | |
| chunks = [] | |
| min_words = int(min_duration * words_per_second) | |
| max_words = int(max_duration * words_per_second) | |
| current_chunk = "" | |
| current_word_count = 0 | |
| for sent in doc.sents: | |
| word_count = len(sent.text.split()) | |
| if current_word_count + word_count < min_words: | |
| current_chunk += " " + sent.text.strip() | |
| current_word_count += word_count | |
| elif current_word_count + word_count <= max_words: | |
| current_chunk += " " + sent.text.strip() | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = "" | |
| current_word_count = 0 | |
| else: | |
| # Emergency cutoff | |
| words = sent.text.split() | |
| while words: | |
| chunk_len = max_words - current_word_count | |
| chunk = ' '.join(words[:chunk_len]) | |
| current_chunk += " " + chunk | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = "" | |
| current_word_count = 0 | |
| words = words[chunk_len:] | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| """ | |
| """ | |
| def split_by_rhymes(text: str, n: int) -> List[str]: | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| rhyming_word_count = 0 | |
| for word in words: | |
| current_chunk.append(word) | |
| if any(rhyme_word in words for rhyme_word in rhymes(word)): | |
| rhyming_word_count += 1 | |
| if rhyming_word_count >= n: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [] | |
| rhyming_word_count = 0 | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| """ | |
| # 'NN' for noun. 'VB' for verb. 'JJ' for adjective. 'RB' for adverb. | |
| # NN-VV Noun followed by a verb | |
| # JJR, JJS | |
| # UH = Interjection, Goodbye Goody Gosh Wow Jeepers Jee-sus Hubba Hey Kee-reist Oops amen huh howdy uh dammit whammo shucks heck anyways whodunnit honey golly man baby diddle hush sonuvabitch ... | |
| """ | |
| def split_by_part_of_speech(text: str, pos_pattern: str) -> List[str]: | |
| tokens = word_tokenize(text) | |
| tagged_tokens = pos_tag(tokens) | |
| pos_pattern = pos_pattern.split('-') | |
| original_pos_pattern = pos_pattern.copy() | |
| chunks = [] | |
| current_chunk = [] | |
| for word, pos in tagged_tokens: | |
| current_chunk.append(word) | |
| if pos in pos_pattern: | |
| pos_index = pos_pattern.index(pos) | |
| if pos_index == 0: | |
| pos_pattern.pop(0) | |
| else: | |
| current_chunk = current_chunk[:-1] | |
| pos_pattern = original_pos_pattern.copy() | |
| if not pos_pattern: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [word] | |
| pos_pattern = original_pos_pattern.copy() | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| """ | |