| | import hashlib |
| | import re |
| | import time |
| | import uuid |
| | from datetime import timedelta |
| | from pathlib import Path |
| | from typing import Callable, Optional |
| |
|
| |
|
| | def get_messages_content(messages: list[dict]) -> str: |
| | return "\n".join( |
| | [ |
| | f"{message['role'].upper()}: {get_content_from_message(message)}" |
| | for message in messages |
| | ] |
| | ) |
| |
|
| |
|
| | def get_last_user_message_item(messages: list[dict]) -> Optional[dict]: |
| | for message in reversed(messages): |
| | if message["role"] == "user": |
| | return message |
| | return None |
| |
|
| |
|
| | def get_content_from_message(message: dict) -> Optional[str]: |
| | if isinstance(message["content"], list): |
| | for item in message["content"]: |
| | if item["type"] == "text": |
| | return item["text"] |
| | else: |
| | return message["content"] |
| | return None |
| |
|
| |
|
| | def get_last_user_message(messages: list[dict]) -> Optional[str]: |
| | message = get_last_user_message_item(messages) |
| | if message is None: |
| | return None |
| | return get_content_from_message(message) |
| |
|
| |
|
| | def get_last_assistant_message(messages: list[dict]) -> Optional[str]: |
| | for message in reversed(messages): |
| | if message["role"] == "assistant": |
| | return get_content_from_message(message) |
| | return None |
| |
|
| |
|
| | def get_system_message(messages: list[dict]) -> Optional[dict]: |
| | for message in messages: |
| | if message["role"] == "system": |
| | return message |
| | return None |
| |
|
| |
|
| | def remove_system_message(messages: list[dict]) -> list[dict]: |
| | return [message for message in messages if message["role"] != "system"] |
| |
|
| |
|
| | def pop_system_message(messages: list[dict]) -> tuple[Optional[dict], list[dict]]: |
| | return get_system_message(messages), remove_system_message(messages) |
| |
|
| |
|
| | def prepend_to_first_user_message_content( |
| | content: str, messages: list[dict] |
| | ) -> list[dict]: |
| | for message in messages: |
| | if message["role"] == "user": |
| | if isinstance(message["content"], list): |
| | for item in message["content"]: |
| | if item["type"] == "text": |
| | item["text"] = f"{content}\n{item['text']}" |
| | else: |
| | message["content"] = f"{content}\n{message['content']}" |
| | break |
| | return messages |
| |
|
| |
|
| | def add_or_update_system_message(content: str, messages: list[dict]): |
| | """ |
| | Adds a new system message at the beginning of the messages list |
| | or updates the existing system message at the beginning. |
| | |
| | :param msg: The message to be added or appended. |
| | :param messages: The list of message dictionaries. |
| | :return: The updated list of message dictionaries. |
| | """ |
| |
|
| | if messages and messages[0].get("role") == "system": |
| | messages[0]["content"] = f"{content}\n{messages[0]['content']}" |
| | else: |
| | |
| | messages.insert(0, {"role": "system", "content": content}) |
| |
|
| | return messages |
| |
|
| |
|
| | def openai_chat_message_template(model: str): |
| | return { |
| | "id": f"{model}-{str(uuid.uuid4())}", |
| | "created": int(time.time()), |
| | "model": model, |
| | "choices": [{"index": 0, "logprobs": None, "finish_reason": None}], |
| | } |
| |
|
| |
|
| | def openai_chat_chunk_message_template( |
| | model: str, message: Optional[str] = None |
| | ) -> dict: |
| | template = openai_chat_message_template(model) |
| | template["object"] = "chat.completion.chunk" |
| | if message: |
| | template["choices"][0]["delta"] = {"content": message} |
| | else: |
| | template["choices"][0]["finish_reason"] = "stop" |
| | return template |
| |
|
| |
|
| | def openai_chat_completion_message_template( |
| | model: str, message: Optional[str] = None |
| | ) -> dict: |
| | template = openai_chat_message_template(model) |
| | template["object"] = "chat.completion" |
| | if message is not None: |
| | template["choices"][0]["message"] = {"content": message, "role": "assistant"} |
| | template["choices"][0]["finish_reason"] = "stop" |
| | return template |
| |
|
| |
|
| | def get_gravatar_url(email): |
| | |
| | |
| | |
| | address = str(email).strip().lower() |
| |
|
| | |
| | hash_object = hashlib.sha256(address.encode()) |
| | hash_hex = hash_object.hexdigest() |
| |
|
| | |
| | return f"https://www.gravatar.com/avatar/{hash_hex}?d=mp" |
| |
|
| |
|
| | def calculate_sha256(file): |
| | sha256 = hashlib.sha256() |
| | |
| | for chunk in iter(lambda: file.read(8192), b""): |
| | sha256.update(chunk) |
| | return sha256.hexdigest() |
| |
|
| |
|
| | def calculate_sha256_string(string): |
| | |
| | sha256_hash = hashlib.sha256() |
| | |
| | sha256_hash.update(string.encode("utf-8")) |
| | |
| | hashed_string = sha256_hash.hexdigest() |
| | return hashed_string |
| |
|
| |
|
| | def validate_email_format(email: str) -> bool: |
| | if email.endswith("@localhost"): |
| | return True |
| |
|
| | return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email)) |
| |
|
| |
|
| | def sanitize_filename(file_name): |
| | |
| | lower_case_file_name = file_name.lower() |
| |
|
| | |
| | sanitized_file_name = re.sub(r"[^\w\s]", "", lower_case_file_name) |
| |
|
| | |
| | final_file_name = re.sub(r"\s+", "-", sanitized_file_name) |
| |
|
| | return final_file_name |
| |
|
| |
|
| | def extract_folders_after_data_docs(path): |
| | |
| | path = Path(path) |
| |
|
| | |
| | parts = path.parts |
| |
|
| | |
| | try: |
| | index_data_docs = parts.index("data") + 1 |
| | index_docs = parts.index("docs", index_data_docs) + 1 |
| | except ValueError: |
| | return [] |
| |
|
| | |
| | tags = [] |
| |
|
| | folders = parts[index_docs:-1] |
| | for idx, _ in enumerate(folders): |
| | tags.append("/".join(folders[: idx + 1])) |
| |
|
| | return tags |
| |
|
| |
|
| | def parse_duration(duration: str) -> Optional[timedelta]: |
| | if duration == "-1" or duration == "0": |
| | return None |
| |
|
| | |
| | pattern = r"(-?\d+(\.\d+)?)(ms|s|m|h|d|w)" |
| | matches = re.findall(pattern, duration) |
| |
|
| | if not matches: |
| | raise ValueError("Invalid duration string") |
| |
|
| | total_duration = timedelta() |
| |
|
| | for number, _, unit in matches: |
| | number = float(number) |
| | if unit == "ms": |
| | total_duration += timedelta(milliseconds=number) |
| | elif unit == "s": |
| | total_duration += timedelta(seconds=number) |
| | elif unit == "m": |
| | total_duration += timedelta(minutes=number) |
| | elif unit == "h": |
| | total_duration += timedelta(hours=number) |
| | elif unit == "d": |
| | total_duration += timedelta(days=number) |
| | elif unit == "w": |
| | total_duration += timedelta(weeks=number) |
| |
|
| | return total_duration |
| |
|
| |
|
| | def parse_ollama_modelfile(model_text): |
| | parameters_meta = { |
| | "mirostat": int, |
| | "mirostat_eta": float, |
| | "mirostat_tau": float, |
| | "num_ctx": int, |
| | "repeat_last_n": int, |
| | "repeat_penalty": float, |
| | "temperature": float, |
| | "seed": int, |
| | "tfs_z": float, |
| | "num_predict": int, |
| | "top_k": int, |
| | "top_p": float, |
| | "num_keep": int, |
| | "typical_p": float, |
| | "presence_penalty": float, |
| | "frequency_penalty": float, |
| | "penalize_newline": bool, |
| | "numa": bool, |
| | "num_batch": int, |
| | "num_gpu": int, |
| | "main_gpu": int, |
| | "low_vram": bool, |
| | "f16_kv": bool, |
| | "vocab_only": bool, |
| | "use_mmap": bool, |
| | "use_mlock": bool, |
| | "num_thread": int, |
| | } |
| |
|
| | data = {"base_model_id": None, "params": {}} |
| |
|
| | |
| | base_model_match = re.search( |
| | r"^FROM\s+(\w+)", model_text, re.MULTILINE | re.IGNORECASE |
| | ) |
| | if base_model_match: |
| | data["base_model_id"] = base_model_match.group(1) |
| |
|
| | |
| | template_match = re.search( |
| | r'TEMPLATE\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE |
| | ) |
| | if template_match: |
| | data["params"] = {"template": template_match.group(1).strip()} |
| |
|
| | |
| | stops = re.findall(r'PARAMETER stop "(.*?)"', model_text, re.IGNORECASE) |
| | if stops: |
| | data["params"]["stop"] = stops |
| |
|
| | |
| | for param, param_type in parameters_meta.items(): |
| | param_match = re.search(rf"PARAMETER {param} (.+)", model_text, re.IGNORECASE) |
| | if param_match: |
| | value = param_match.group(1) |
| |
|
| | try: |
| | if param_type is int: |
| | value = int(value) |
| | elif param_type is float: |
| | value = float(value) |
| | elif param_type is bool: |
| | value = value.lower() == "true" |
| | except Exception as e: |
| | print(e) |
| | continue |
| |
|
| | data["params"][param] = value |
| |
|
| | |
| | adapter_match = re.search(r"ADAPTER (.+)", model_text, re.IGNORECASE) |
| | if adapter_match: |
| | data["params"]["adapter"] = adapter_match.group(1) |
| |
|
| | |
| | system_desc_match = re.search( |
| | r'SYSTEM\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE |
| | ) |
| | system_desc_match_single = re.search( |
| | r"SYSTEM\s+([^\n]+)", model_text, re.IGNORECASE |
| | ) |
| |
|
| | if system_desc_match: |
| | data["params"]["system"] = system_desc_match.group(1).strip() |
| | elif system_desc_match_single: |
| | data["params"]["system"] = system_desc_match_single.group(1).strip() |
| |
|
| | |
| | messages = [] |
| | message_matches = re.findall(r"MESSAGE (\w+) (.+)", model_text, re.IGNORECASE) |
| | for role, content in message_matches: |
| | messages.append({"role": role, "content": content}) |
| |
|
| | if messages: |
| | data["params"]["messages"] = messages |
| |
|
| | return data |
| |
|