|
|
|
|
|
import asyncio |
|
|
from dataclasses import dataclass |
|
|
from typing import Any, Dict, Optional, Callable, Awaitable |
|
|
|
|
|
from config import settings |
|
|
from models import LLMServiceObj, ResultObj |
|
|
from rabbit_repo import RabbitRepo |
|
|
from runners.base import ILLMRunner |
|
|
from message_helper import success as _ok, error as _err |
|
|
|
|
|
@dataclass |
|
|
class _Session: |
|
|
Runner: Optional[ILLMRunner] |
|
|
FullSessionId: str |
|
|
|
|
|
|
|
|
class LLMService: |
|
|
""" |
|
|
Python/Gradio equivalent of your .NET LLMService. |
|
|
Keeps identical field names and queue semantics when talking to RabbitMQ. |
|
|
""" |
|
|
def __init__( |
|
|
self, |
|
|
publisher: RabbitRepo, |
|
|
runner_factory: Callable[[Dict[str, Any]], Awaitable[ILLMRunner]], |
|
|
): |
|
|
self._pub: RabbitRepo = publisher |
|
|
self._runner_factory = runner_factory |
|
|
self._sessions: Dict[str, _Session] = {} |
|
|
self._ready = asyncio.Event() |
|
|
self._ready.set() |
|
|
self._service_id_lc = settings.SERVICE_ID.lower() |
|
|
|
|
|
async def init(self) -> None: |
|
|
"""Hook to preload history/sessions; call self._ready.set() when finished.""" |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def _to_model(self, data: Any) -> LLMServiceObj: |
|
|
|
|
|
if data.get("FunctionCallData") is None: |
|
|
data["FunctionCallData"] = {} |
|
|
if data.get("UserInfo") is None: |
|
|
data["UserInfo"] = {} |
|
|
return LLMServiceObj(**data) |
|
|
|
|
|
async def _emit_result( |
|
|
self, |
|
|
obj: LLMServiceObj | Dict[str, Any], |
|
|
message: str, |
|
|
success: bool, |
|
|
queue: str, |
|
|
*, |
|
|
check_system: bool = False, |
|
|
include_llm_message: bool = True, |
|
|
) -> None: |
|
|
""" |
|
|
Build a ResultObj-style message on the wire, mirroring your .NET usage. |
|
|
check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule). |
|
|
""" |
|
|
llm = obj if isinstance(obj, LLMServiceObj) else LLMServiceObj(**obj) |
|
|
|
|
|
llm.ResultMessage = message |
|
|
llm.ResultSuccess = success |
|
|
if include_llm_message: |
|
|
llm.LlmMessage = _ok(message) if success else _err(message) |
|
|
|
|
|
if check_system and llm.IsSystemLlm: |
|
|
return |
|
|
|
|
|
|
|
|
await self._pub.publish(queue, llm) |
|
|
|
|
|
def _session_for(self, session_id: str) -> Optional[_Session]: |
|
|
return self._sessions.get(session_id) |
|
|
|
|
|
|
|
|
|
|
|
async def StartProcess(self, payload: Any) -> None: |
|
|
llm = self._to_model(payload) |
|
|
|
|
|
|
|
|
if not llm.RequestSessionId: |
|
|
await self._emit_result(llm, "Error: RequestSessionId is required.", False, "llmServiceMessage") |
|
|
return |
|
|
if not llm.LLMRunnerType: |
|
|
await self._emit_result(llm, "Error: LLMRunnerType is required.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
|
|
|
session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}" |
|
|
llm.SessionId = session_id |
|
|
|
|
|
|
|
|
try: |
|
|
await asyncio.wait_for(self._ready.wait(), timeout=120) |
|
|
except asyncio.TimeoutError: |
|
|
await self._emit_result( |
|
|
llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True |
|
|
) |
|
|
return |
|
|
|
|
|
sess = self._session_for(session_id) |
|
|
runner = sess.Runner if sess else None |
|
|
create_new = (runner is None) or getattr(runner, "IsStateFailed", False) |
|
|
|
|
|
if create_new: |
|
|
|
|
|
if runner: |
|
|
try: |
|
|
await runner.RemoveProcess(session_id) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
runner = await self._runner_factory({ |
|
|
**llm.model_dump(by_alias=True), |
|
|
"_publisher": self._pub, |
|
|
"_settings": settings, |
|
|
}) |
|
|
if not runner.IsEnabled: |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.", |
|
|
True, |
|
|
"llmServiceMessage", |
|
|
) |
|
|
return |
|
|
|
|
|
await self._emit_result( |
|
|
llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True |
|
|
) |
|
|
|
|
|
await runner.StartProcess(llm.model_dump(by_alias=True)) |
|
|
|
|
|
self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id) |
|
|
|
|
|
|
|
|
if self._service_id_lc in {"monitor", "gradllm"}: |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.", |
|
|
True, |
|
|
"llmServiceMessage", |
|
|
check_system=True, |
|
|
) |
|
|
|
|
|
|
|
|
await self._pub.publish("llmServiceStarted", llm) |
|
|
|
|
|
async def RemoveSession(self, payload: Any) -> None: |
|
|
llm = self._to_model(payload) |
|
|
base = (llm.SessionId or "").split("_")[0] |
|
|
if not base: |
|
|
await self._emit_result(llm, "Error: SessionId is required to remove sessions.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")] |
|
|
msgs: list[str] = [] |
|
|
ok = True |
|
|
|
|
|
for sid in targets: |
|
|
s = self._sessions.get(sid) |
|
|
if not s or not s.Runner: |
|
|
continue |
|
|
try: |
|
|
await s.Runner.RemoveProcess(sid) |
|
|
s.Runner = None |
|
|
self._sessions.pop(sid, None) |
|
|
msgs.append(sid) |
|
|
except Exception as e: |
|
|
ok = False |
|
|
msgs.append(f"Error {sid}: {e}") |
|
|
|
|
|
if ok: |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}", |
|
|
True, |
|
|
"llmSessionMessage", |
|
|
check_system=True, |
|
|
) |
|
|
else: |
|
|
await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage") |
|
|
|
|
|
async def StopRequest(self, payload: Any) -> None: |
|
|
llm = self._to_model(payload) |
|
|
sid = llm.SessionId or "" |
|
|
s = self._session_for(sid) |
|
|
if not s or not s.Runner: |
|
|
await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
await s.Runner.StopRequest(sid) |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted", |
|
|
True, |
|
|
"llmServiceMessage", |
|
|
check_system=True, |
|
|
) |
|
|
|
|
|
async def UserInput(self, payload: Any) -> None: |
|
|
llm = self._to_model(payload) |
|
|
sid = llm.SessionId or "" |
|
|
s = self._session_for(sid) |
|
|
if not s or not s.Runner: |
|
|
await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
r: ILLMRunner = s.Runner |
|
|
if getattr(r, "IsStateStarting", False): |
|
|
await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage") |
|
|
return |
|
|
if getattr(r, "IsStateFailed", False): |
|
|
await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
|
|
|
await r.SendInputAndGetResponse(llm.model_dump(by_alias=True)) |
|
|
|
|
|
async def QueryIndexResult(self, payload: Any) -> None: |
|
|
try: |
|
|
data = payload if isinstance(payload, dict) else {} |
|
|
outputs = data.get("QueryResults") or [] |
|
|
rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)]) |
|
|
|
|
|
|
|
|
await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Function Response:> {rag_data}\n\n")) |
|
|
await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="</functioncall-complete>")) |
|
|
|
|
|
|
|
|
await self._pub.publish( |
|
|
"llmServiceMessage", |
|
|
ResultObj(Message=data.get("Message", ""), Success=bool(data.get("Success", False)), Data=rag_data), |
|
|
) |
|
|
except Exception as e: |
|
|
await self._pub.publish("llmServiceMessage", ResultObj(Message=str(e), Success=False)) |
|
|
|
|
|
async def GetFunctionRegistry(self, filtered: bool = False) -> None: |
|
|
""" |
|
|
Wire up to your real registry when ready. |
|
|
For now, mimic your success message payload. |
|
|
""" |
|
|
catalog = "{}" |
|
|
msg = f"Success : Got GetFunctionCatalogJson : {catalog}" |
|
|
await self._pub.publish( |
|
|
"llmServiceMessage", |
|
|
ResultObj(Message=msg, Success=True), |
|
|
) |
|
|
|