|
|
|
|
|
from __future__ import annotations |
|
|
from typing import Any, Dict, Optional |
|
|
from .base import ILLMRunner |
|
|
from models import LLMServiceObj |
|
|
from function_tracker import FunctionCallTracker |
|
|
import logging |
|
|
|
|
|
class EchoRunner(ILLMRunner): |
|
|
Type = "TurboLLM" |
|
|
IsEnabled = True |
|
|
IsStateStarting = False |
|
|
IsStateFailed = False |
|
|
|
|
|
def __init__(self, publisher, settings): |
|
|
self._pub = publisher |
|
|
self._settings = settings |
|
|
self._tracker = FunctionCallTracker() |
|
|
self._log = logging.getLogger("EchoRunner") |
|
|
|
|
|
async def StartProcess(self, llmServiceObj: dict) -> None: |
|
|
self._log.debug(f"StartProcess called with: {llmServiceObj}") |
|
|
|
|
|
pass |
|
|
|
|
|
async def RemoveProcess(self, sessionId: str) -> None: |
|
|
self._log.debug(f"RemoveProcess called for session: {sessionId}") |
|
|
|
|
|
pass |
|
|
|
|
|
async def StopRequest(self, sessionId: str) -> None: |
|
|
self._log.debug(f"StopRequest called for session: {sessionId}") |
|
|
|
|
|
pass |
|
|
|
|
|
async def SendInputAndGetResponse(self, llmServiceObj: dict) -> None: |
|
|
self._log.debug(f"SendInputAndGetResponse called with: {llmServiceObj}") |
|
|
llm = LLMServiceObj(**llmServiceObj) |
|
|
if llm.UserInput.startswith("<|START_AUDIO|>") or llm.UserInput.startswith("<|STOP_AUDIO|>"): |
|
|
self._log.debug("Audio input detected, ignoring in echo.") |
|
|
return |
|
|
|
|
|
|
|
|
await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<User:> {llm.UserInput}\n\n")) |
|
|
await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Assistant:> You said: {llm.UserInput}\n")) |
|
|
await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="<end-of-line>")) |
|
|
|