File size: 9,707 Bytes
2c8368f
bf292d9
2c8368f
9ad2a81
bf292d9
8690dbe
2c8368f
 
8690dbe
8d27c84
2c8368f
 
 
 
 
 
 
bf292d9
2c8368f
 
 
 
9ad2a81
 
 
 
 
 
 
2c8368f
bf292d9
9ad2a81
 
bf292d9
9ad2a81
 
bf292d9
 
2c8368f
 
 
7255aeb
 
 
 
 
 
2c8368f
 
 
9ad2a81
2c8368f
 
 
 
 
9ad2a81
 
2c8368f
 
 
 
9ad2a81
 
 
 
2c8368f
8d27c84
2c8368f
9ad2a81
2c8368f
 
 
9ad2a81
2c8368f
 
 
 
 
 
9ad2a81
2c8368f
bf292d9
9ad2a81
 
 
 
 
 
 
 
 
2c8368f
 
bf292d9
2c8368f
bf292d9
 
 
2c8368f
 
 
bf292d9
 
2c8368f
9ad2a81
 
bf292d9
 
2c8368f
9ad2a81
bf292d9
9ad2a81
2c8368f
 
bf292d9
9ad2a81
7255aeb
 
 
 
 
bf292d9
2c8368f
 
 
 
 
 
bf292d9
 
2c8368f
 
 
bf292d9
98324a6
bf292d9
2c8368f
 
9ad2a81
 
2c8368f
 
 
 
 
 
 
 
9ad2a81
2c8368f
 
9ad2a81
2c8368f
 
9ad2a81
 
 
bf292d9
9ad2a81
 
bf292d9
9ad2a81
bf292d9
 
2c8368f
 
 
 
 
8d27c84
2c8368f
 
 
 
 
bf292d9
2c8368f
 
 
 
 
 
 
bf292d9
2c8368f
 
9ad2a81
2c8368f
 
 
 
 
bf292d9
2c8368f
 
 
 
 
 
 
 
 
 
9ad2a81
2c8368f
 
 
 
 
bf292d9
2c8368f
 
9ad2a81
2c8368f
bf292d9
9ad2a81
2c8368f
bf292d9
 
2c8368f
c476a18
2c8368f
9ad2a81
bf292d9
2c8368f
 
 
 
8d27c84
 
 
 
 
2c8368f
 
8d27c84
2c8368f
bf292d9
8d27c84
bf292d9
9ad2a81
2c8368f
 
 
 
 
 
 
 
9ad2a81
2c8368f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# service.py
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, Optional, Callable, Awaitable

from config import settings
from models import LLMServiceObj, ResultObj
from rabbit_repo import RabbitRepo
from runners.base import ILLMRunner
from message_helper import success as _ok, error as _err

@dataclass
class _Session:
    Runner: Optional[ILLMRunner]
    FullSessionId: str


class LLMService:
    """
    Python/Gradio equivalent of your .NET LLMService.
    Keeps identical field names and queue semantics when talking to RabbitMQ.
    """
    def __init__(
        self,
        publisher: RabbitRepo,
        runner_factory: Callable[[Dict[str, Any]], Awaitable[ILLMRunner]],
    ):
        self._pub: RabbitRepo = publisher
        self._runner_factory = runner_factory  # async factory: dict -> ILLMRunner
        self._sessions: Dict[str, _Session] = {}
        self._ready = asyncio.Event()
        self._ready.set()  # call clear()/set() if you preload history
        self._service_id_lc = settings.SERVICE_ID.lower()

    async def init(self) -> None:
        """Hook to preload history/sessions; call self._ready.set() when finished."""
        pass

    # ---------------------------- helpers ----------------------------

    def _to_model(self, data: Any) -> LLMServiceObj:
        # Defensive: ensure required nested objects are dicts, not None
        if data.get("FunctionCallData") is None:
            data["FunctionCallData"] = {}
        if data.get("UserInfo") is None:
            data["UserInfo"] = {}
        return LLMServiceObj(**data)

    async def _emit_result(
        self,
        obj: LLMServiceObj | Dict[str, Any],
        message: str,
        success: bool,
        queue: str,
        *,
        check_system: bool = False,
        include_llm_message: bool = True,
    ) -> None:
        """
        Build a ResultObj-style message on the wire, mirroring your .NET usage.
        check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
        """
        llm = obj if isinstance(obj, LLMServiceObj) else LLMServiceObj(**obj)

        llm.ResultMessage = message
        llm.ResultSuccess = success
        if include_llm_message:
            llm.LlmMessage = _ok(message) if success else _err(message)

        if check_system and llm.IsSystemLlm:
            return

        # You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
        await self._pub.publish(queue, llm)

    def _session_for(self, session_id: str) -> Optional[_Session]:
        return self._sessions.get(session_id)

    # ---------------------------- API methods ----------------------------

    async def StartProcess(self, payload: Any) -> None:
        llm = self._to_model(payload)

        # Validate critical fields
        if not llm.RequestSessionId:
            await self._emit_result(llm, "Error: RequestSessionId is required.", False, "llmServiceMessage")
            return
        if not llm.LLMRunnerType:
            await self._emit_result(llm, "Error: LLMRunnerType is required.", False, "llmServiceMessage")
            return

        # Construct session id like C#: RequestSessionId + "_" + LLMRunnerType
        session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
        llm.SessionId = session_id

        # Wait ready (max 120s) exactly like the C# logic
        try:
            await asyncio.wait_for(self._ready.wait(), timeout=120)
        except asyncio.TimeoutError:
            await self._emit_result(
                llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True
            )
            return

        sess = self._session_for(session_id)
        runner = sess.Runner if sess else None
        create_new = (runner is None) or getattr(runner, "IsStateFailed", False)

        if create_new:
            # Remove previous runner if exists
            if runner:
                try:
                    await runner.RemoveProcess(session_id)
                except Exception:
                    pass

            # Create runner from factory (pass a plain dict for decoupling)
            runner = await self._runner_factory({
                **llm.model_dump(by_alias=True),
                "_publisher": self._pub,
                "_settings": settings,
            })
            if not runner.IsEnabled:
                await self._emit_result(
                    llm,
                    f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.",
                    True,
                    "llmServiceMessage",
                )
                return

            await self._emit_result(
                llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True
            )

            await runner.StartProcess(llm.model_dump(by_alias=True))

            self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)

            # Friendly greeting for your renamed service
            if self._service_id_lc in {"monitor", "gradllm"}:
                await self._emit_result(
                    llm,
                    f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
                    True,
                    "llmServiceMessage",
                    check_system=True,
                )

        # Notify "started" (full LLMServiceObj)
        await self._pub.publish("llmServiceStarted", llm)

    async def RemoveSession(self, payload: Any) -> None:
        llm = self._to_model(payload)
        base = (llm.SessionId or "").split("_")[0]
        if not base:
            await self._emit_result(llm, "Error: SessionId is required to remove sessions.", False, "llmServiceMessage")
            return

        targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
        msgs: list[str] = []
        ok = True

        for sid in targets:
            s = self._sessions.get(sid)
            if not s or not s.Runner:
                continue
            try:
                await s.Runner.RemoveProcess(sid)
                s.Runner = None
                self._sessions.pop(sid, None)  # ← free the entry
                msgs.append(sid)
            except Exception as e:
                ok = False
                msgs.append(f"Error {sid}: {e}")

        if ok:
            await self._emit_result(
                llm,
                f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}",
                True,
                "llmSessionMessage",
                check_system=True,
            )
        else:
            await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")

    async def StopRequest(self, payload: Any) -> None:
        llm = self._to_model(payload)
        sid = llm.SessionId or ""
        s = self._session_for(sid)
        if not s or not s.Runner:
            await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage")
            return

        await s.Runner.StopRequest(sid)
        await self._emit_result(
            llm,
            f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted",
            True,
            "llmServiceMessage",
            check_system=True,
        )

    async def UserInput(self, payload: Any) -> None:
        llm = self._to_model(payload)
        sid = llm.SessionId or ""
        s = self._session_for(sid)
        if not s or not s.Runner:
            await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage")
            return

        r: ILLMRunner = s.Runner
        if getattr(r, "IsStateStarting", False):
            await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
            return
        if getattr(r, "IsStateFailed", False):
            await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
            return

        # Let runner push partials itself if desired; we still return a small ack
        await r.SendInputAndGetResponse(llm.model_dump(by_alias=True))

    async def QueryIndexResult(self, payload: Any) -> None:
        try:
            data = payload if isinstance(payload, dict) else {}
            outputs = data.get("QueryResults") or []
            rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])

            # NEW: show RAG to the chat like tool output
            await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Function Response:> {rag_data}\n\n"))
            await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="</functioncall-complete>"))

            # keep your existing summary object (nice for observers/metrics)
            await self._pub.publish(
                "llmServiceMessage",
                ResultObj(Message=data.get("Message", ""), Success=bool(data.get("Success", False)), Data=rag_data),
            )
        except Exception as e:
            await self._pub.publish("llmServiceMessage", ResultObj(Message=str(e), Success=False))

    async def GetFunctionRegistry(self, filtered: bool = False) -> None:
        """
        Wire up to your real registry when ready.
        For now, mimic your success message payload.
        """
        catalog = "{}"  # replace with real JSON
        msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
        await self._pub.publish(
            "llmServiceMessage",
            ResultObj(Message=msg, Success=True),
        )