File size: 3,853 Bytes
bf292d9
 
 
 
b2c2f23
 
 
 
 
bf292d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b2c2f23
bf292d9
 
 
b2c2f23
bf292d9
 
 
 
 
b2c2f23
bf292d9
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import asyncio
import gradio as gr
from fastapi import FastAPI

from config import settings
from listener import RabbitListenerBase
from rabbit_repo import RabbitRepo
from service import LLMService
from runners.base import ILLMRunner

# --- Runner factory (stub) ---
class EchoRunner(ILLMRunner):
    Type = "EchoRunner"
    async def StartProcess(self, llmServiceObj: dict): pass
    async def RemoveProcess(self, sessionId: str): pass
    async def StopRequest(self, sessionId: str): pass
    async def SendInputAndGetResponse(self, llmServiceObj: dict):
        # Emits a message back (you can choose queue names per your topology)
        pass

async def runner_factory(llmServiceObj: dict) -> ILLMRunner:
    # Use llmServiceObj["LLMRunnerType"] to instantiate different runners
    return EchoRunner()

# --- Publisher and Service ---
publisher = RabbitRepo(external_source="https://space.external")  # put your ExternalUrl if you have one
service = LLMService(publisher, runner_factory)

# --- Handlers mapping .NET FuncName -> service method ---
async def h_start(data): await service.StartProcess(data or {})
async def h_user(data):  await service.UserInput(data or {})
async def h_remove(data): await service.RemoveSession(data or {})
async def h_stop(data): await service.StopRequest(data or {})
async def h_qir(data): await service.QueryIndexResult(data or {})
async def h_getreg(data): await service.GetFunctionRegistry(False)
async def h_getreg_f(data): await service.GetFunctionRegistry(True)

handlers = {
    "llmStartSession": h_start,
    "llmUserInput": h_user,
    "llmRemoveSession": h_remove,
    "llmStopRequest": h_stop,
    "queryIndexResult": h_qir,
    "getFunctionRegistry": h_getreg,
    "getFunctionRegistryFiltered": h_getreg_f,
}

listener = RabbitListenerBase(service_id=settings.SERVICE_ID, handlers=handlers)

# Declarations mirror your C# InitRabbitMQObjs()
DECLS = [
    {"ExchangeName":"llmStartSession"+settings.SERVICE_ID, "FuncName":"llmStartSession", "MessageTimeout":600000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]},
    {"ExchangeName":"llmUserInput"+settings.SERVICE_ID, "FuncName":"llmUserInput", "MessageTimeout":600000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]},
    {"ExchangeName":"llmRemoveSession"+settings.SERVICE_ID, "FuncName":"llmRemoveSession", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]},
    {"ExchangeName":"llmStopRequest"+settings.SERVICE_ID, "FuncName":"llmStopRequest", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]},
    {"ExchangeName":"queryIndexResult"+settings.SERVICE_ID, "FuncName":"queryIndexResult", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]},
    {"ExchangeName":"getFunctionRegistry"+settings.SERVICE_ID, "FuncName":"getFunctionRegistry", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]},
    {"ExchangeName":"getFunctionRegistryFiltered"+settings.SERVICE_ID, "FuncName":"getFunctionRegistryFiltered", "MessageTimeout":60000, "RoutingKeys":[settings.RABBIT_ROUTING_KEY]},
]

# --- Gradio UI (for smoke test) ---
async def ping():
    return "ok"

with gr.Blocks() as demo:
    gr.Markdown("### LLM Runner (Python) listening on RabbitMQ")
    btn = gr.Button("Ping")
    out = gr.Textbox()
    btn.click(ping, inputs=None, outputs=out)

# --- FastAPI mount + lifecycle ---
app = FastAPI()
app = gr.mount_gradio_app(app, demo, path="/")

@get("/health")
async def health():
    return {"status":"ok"}

@on_event("startup")
async def on_start():
    await publisher.connect()
    await service.init()
    await listener.start(DECLS)

@on_event("shutdown")
async def on_stop():
    # aio-pika RobustConnection closes on GC; optionally add explicit closes if you add references
    pass

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)