File size: 9,759 Bytes
2c8368f bf292d9 2c8368f 9ad2a81 bf292d9 8690dbe 2c8368f 8690dbe 8d27c84 32b704b 2c8368f bf292d9 2c8368f 9ad2a81 2c8368f bf292d9 9ad2a81 bf292d9 9ad2a81 bf292d9 2c8368f 7255aeb 2c8368f 9ad2a81 2c8368f 9ad2a81 2c8368f 9ad2a81 2c8368f 8d27c84 2c8368f 9ad2a81 2c8368f 9ad2a81 2c8368f 9ad2a81 2c8368f bf292d9 9ad2a81 2c8368f bf292d9 2c8368f bf292d9 2c8368f bf292d9 2c8368f 9ad2a81 bf292d9 2c8368f 9ad2a81 bf292d9 9ad2a81 2c8368f bf292d9 9ad2a81 7255aeb bf292d9 2c8368f bf292d9 2c8368f bf292d9 98324a6 bf292d9 2c8368f 9ad2a81 2c8368f 9ad2a81 2c8368f 9ad2a81 2c8368f 9ad2a81 bf292d9 9ad2a81 bf292d9 9ad2a81 bf292d9 2c8368f 8d27c84 2c8368f bf292d9 2c8368f bf292d9 2c8368f 9ad2a81 2c8368f bf292d9 2c8368f 9ad2a81 2c8368f bf292d9 2c8368f 9ad2a81 2c8368f bf292d9 9ad2a81 2c8368f bf292d9 2c8368f c476a18 2c8368f 9ad2a81 bf292d9 2c8368f 8d27c84 2c8368f 8d27c84 2c8368f bf292d9 8d27c84 bf292d9 9ad2a81 2c8368f 9ad2a81 2c8368f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# service.py
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, Optional, Callable, Awaitable
from config import settings
from models import LLMServiceObj, ResultObj
from rabbit_repo import RabbitRepo
from runners.base import ILLMRunner
from message_helper import success as _ok, error as _err
import logging
logger = logging.getLogger(__name__)
@dataclass
class _Session:
Runner: Optional[ILLMRunner]
FullSessionId: str
class LLMService:
"""
Python/Gradio equivalent of your .NET LLMService.
Keeps identical field names and queue semantics when talking to RabbitMQ.
"""
def __init__(
self,
publisher: RabbitRepo,
runner_factory: Callable[[Dict[str, Any]], Awaitable[ILLMRunner]],
):
self._pub: RabbitRepo = publisher
self._runner_factory = runner_factory # async factory: dict -> ILLMRunner
self._sessions: Dict[str, _Session] = {}
self._ready = asyncio.Event()
self._ready.set() # call clear()/set() if you preload history
self._service_id_lc = settings.SERVICE_ID.lower()
async def init(self) -> None:
"""Hook to preload history/sessions; call self._ready.set() when finished."""
pass
# ---------------------------- helpers ----------------------------
def _to_model(self, data: Any) -> LLMServiceObj:
# Defensive: ensure required nested objects are dicts, not None
if data.get("FunctionCallData") is None:
data["FunctionCallData"] = {}
if data.get("UserInfo") is None:
data["UserInfo"] = {}
return LLMServiceObj(**data)
async def _emit_result(
self,
obj: LLMServiceObj | Dict[str, Any],
message: str,
success: bool,
queue: str,
*,
check_system: bool = False,
include_llm_message: bool = True,
) -> None:
"""
Build a ResultObj-style message on the wire, mirroring your .NET usage.
check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
"""
llm = obj if isinstance(obj, LLMServiceObj) else LLMServiceObj(**obj)
llm.ResultMessage = message
llm.ResultSuccess = success
if include_llm_message:
llm.LlmMessage = _ok(message) if success else _err(message)
if check_system and llm.IsSystemLlm:
return
# You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
await self._pub.publish(queue, llm)
def _session_for(self, session_id: str) -> Optional[_Session]:
return self._sessions.get(session_id)
# ---------------------------- API methods ----------------------------
async def StartProcess(self, payload: Any) -> None:
llm = self._to_model(payload)
# Validate critical fields
if not llm.RequestSessionId:
await self._emit_result(llm, "Error: RequestSessionId is required.", False, "llmServiceMessage")
return
if not llm.LLMRunnerType:
await self._emit_result(llm, "Error: LLMRunnerType is required.", False, "llmServiceMessage")
return
# Construct session id like C#: RequestSessionId + "_" + LLMRunnerType
session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
llm.SessionId = session_id
# Wait ready (max 120s) exactly like the C# logic
try:
await asyncio.wait_for(self._ready.wait(), timeout=120)
except asyncio.TimeoutError:
await self._emit_result(
llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True
)
return
sess = self._session_for(session_id)
runner = sess.Runner if sess else None
create_new = (runner is None) or getattr(runner, "IsStateFailed", False)
if create_new:
# Remove previous runner if exists
if runner:
try:
await runner.RemoveProcess(session_id)
except Exception:
pass
# Create runner from factory (pass a plain dict for decoupling)
runner = await self._runner_factory({
**llm.model_dump(by_alias=True),
"_publisher": self._pub,
"_settings": settings,
})
if not runner.IsEnabled:
await self._emit_result(
llm,
f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.",
True,
"llmServiceMessage",
)
return
await self._emit_result(
llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True
)
await runner.StartProcess(llm.model_dump(by_alias=True))
self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)
# Friendly greeting for your renamed service
if self._service_id_lc in {"monitor", "gradllm"}:
await self._emit_result(
llm,
f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
True,
"llmServiceMessage",
check_system=True,
)
# Notify "started" (full LLMServiceObj)
await self._pub.publish("llmServiceStarted", llm)
async def RemoveSession(self, payload: Any) -> None:
llm = self._to_model(payload)
base = (llm.SessionId or "").split("_")[0]
if not base:
await self._emit_result(llm, "Error: SessionId is required to remove sessions.", False, "llmServiceMessage")
return
targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
msgs: list[str] = []
ok = True
for sid in targets:
s = self._sessions.get(sid)
if not s or not s.Runner:
continue
try:
await s.Runner.RemoveProcess(sid)
s.Runner = None
self._sessions.pop(sid, None) # ← free the entry
msgs.append(sid)
except Exception as e:
ok = False
msgs.append(f"Error {sid}: {e}")
if ok:
await self._emit_result(
llm,
f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}",
True,
"llmSessionMessage",
check_system=True,
)
else:
await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")
async def StopRequest(self, payload: Any) -> None:
llm = self._to_model(payload)
sid = llm.SessionId or ""
s = self._session_for(sid)
if not s or not s.Runner:
await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage")
return
await s.Runner.StopRequest(sid)
await self._emit_result(
llm,
f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted",
True,
"llmServiceMessage",
check_system=True,
)
async def UserInput(self, payload: Any) -> None:
llm = self._to_model(payload)
sid = llm.SessionId or ""
s = self._session_for(sid)
if not s or not s.Runner:
await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage")
return
r: ILLMRunner = s.Runner
if getattr(r, "IsStateStarting", False):
await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
return
if getattr(r, "IsStateFailed", False):
await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
return
# Let runner push partials itself if desired; we still return a small ack
await r.SendInputAndGetResponse(llm.model_dump(by_alias=True))
async def QueryIndexResult(self, payload: Any) -> None:
try:
data = payload if isinstance(payload, dict) else {}
outputs = data.get("QueryResults") or []
rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
# NEW: show RAG to the chat like tool output
await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Function Response:> {rag_data}\n\n"))
await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="</functioncall-complete>"))
# keep your existing summary object (nice for observers/metrics)
await self._pub.publish(
"llmServiceMessage",
ResultObj(Message=data.get("Message", ""), Success=bool(data.get("Success", False)), Data=rag_data),
)
except Exception as e:
await self._pub.publish("llmServiceMessage", ResultObj(Message=str(e), Success=False))
async def GetFunctionRegistry(self, filtered: bool = False) -> None:
"""
Wire up to your real registry when ready.
For now, mimic your success message payload.
"""
catalog = "{}" # replace with real JSON
msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
await self._pub.publish(
"llmServiceMessage",
ResultObj(Message=msg, Success=True),
)
|