johnbridges commited on
Commit
15d27ef
·
1 Parent(s): 32b704b

trying a openai over rabbitmq test

Browse files
Files changed (2) hide show
  1. app.py +13 -83
  2. openai_server.py +186 -0
app.py CHANGED
@@ -1,103 +1,38 @@
1
- # app.py (Gradio-only, ZeroGPU-safe)
2
  import asyncio
3
  import logging
4
- from typing import Any, Dict, List
5
  import gradio as gr
6
-
7
  from config import settings
8
- from rabbit_base import RabbitBase
9
- from listener import RabbitListenerBase
10
- from rabbit_repo import RabbitRepo
11
- from service import LLMService
12
- from runners.base import ILLMRunner
13
- from factory import default_runner_factory
14
 
15
- # ---------- logging ----------
16
- logging.basicConfig(
17
- level=logging.INFO,
18
- format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
19
- )
20
  log = logging.getLogger("app")
21
 
22
- # ---------- @spaces.GPU entrypoint ----------
23
  try:
24
  import spaces
25
-
26
- @spaces.GPU(duration=60) # minimal GPU endpoint; no tensors allocated
27
  def gpu_entrypoint() -> str:
28
  return "gpu: ready"
29
  except Exception:
30
  def gpu_entrypoint() -> str:
31
  return "gpu: not available (CPU only)"
32
-
33
- # ---------- Publisher & Service ----------
34
- publisher = RabbitRepo(external_source="https://space.external")
35
- service = LLMService(publisher, default_runner_factory)
36
-
37
- # ---------- Handlers (.NET FuncName -> service) ----------
38
- async def h_start(data): await service.StartProcess(data or {})
39
- async def h_user(data): await service.UserInput(data or {})
40
- async def h_remove(data): await service.RemoveSession(data or {})
41
- async def h_stop(data): await service.StopRequest(data or {})
42
- async def h_qir(data): await service.QueryIndexResult(data or {})
43
- async def h_getreg(_): await service.GetFunctionRegistry(False)
44
- async def h_getreg_f(_): await service.GetFunctionRegistry(True)
45
-
46
- handlers = {
47
- "llmStartSession": h_start,
48
- "llmUserInput": h_user,
49
- "llmRemoveSession": h_remove,
50
- "llmStopRequest": h_stop,
51
- "queryIndexResult": h_qir,
52
- "getFunctionRegistry": h_getreg,
53
- "getFunctionRegistryFiltered": h_getreg_f,
54
- }
55
-
56
- # ---------- Listener wiring ----------
57
- base = RabbitBase()
58
- listener = RabbitListenerBase(
59
- base,
60
- instance_name=settings.RABBIT_INSTANCE_NAME, # queue prefix like your .NET instance
61
- handlers=handlers,
62
- )
63
 
64
- # Mirror your C# InitRabbitMQObjs()
65
- DECLS: List[Dict[str, Any]] = [
66
- {"ExchangeName": f"llmStartSession{settings.SERVICE_ID}", "FuncName": "llmStartSession",
67
- "MessageTimeout": 600_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
68
- {"ExchangeName": f"llmUserInput{settings.SERVICE_ID}", "FuncName": "llmUserInput",
69
- "MessageTimeout": 600_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
70
- {"ExchangeName": f"llmRemoveSession{settings.SERVICE_ID}", "FuncName": "llmRemoveSession",
71
- "MessageTimeout": 60_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
72
- {"ExchangeName": f"llmStopRequest{settings.SERVICE_ID}", "FuncName": "llmStopRequest",
73
- "MessageTimeout": 60_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
74
- {"ExchangeName": f"queryIndexResult{settings.SERVICE_ID}", "FuncName": "queryIndexResult",
75
- "MessageTimeout": 60_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
76
- {"ExchangeName": f"getFunctionRegistry{settings.SERVICE_ID}", "FuncName": "getFunctionRegistry",
77
- "MessageTimeout": 60_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
78
- {"ExchangeName": f"getFunctionRegistryFiltered{settings.SERVICE_ID}", "FuncName": "getFunctionRegistryFiltered",
79
- "MessageTimeout": 60_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
80
- ]
81
 
82
- # ---------- Gradio callbacks ----------
83
- async def ping() -> str:
84
- return "ok"
85
-
86
- # Start RabbitMQ when the Gradio app loads. Return a short status string.
87
  async def _startup_init():
88
  try:
89
- await publisher.connect()
90
- await service.init()
91
- await listener.start(DECLS)
92
- return "listener: ready"
93
  except Exception as e:
94
  log.exception("Startup init failed")
95
- return f"listener: ERROR -> {e}"
96
 
97
- # ---------- Build the actual page ----------
98
- with gr.Blocks(title="LLM Runner (ZeroGPU-ready)", theme=gr.themes.Soft()) as demo:
99
- gr.Markdown("## LLM Runner — RabbitMQ listener (ZeroGPU-ready)")
100
 
 
 
101
  with gr.Tabs():
102
  with gr.Tab("Service"):
103
  with gr.Row():
@@ -105,19 +40,14 @@ with gr.Blocks(title="LLM Runner (ZeroGPU-ready)", theme=gr.themes.Soft()) as de
105
  out = gr.Textbox(label="Ping result")
106
  btn.click(ping, inputs=None, outputs=out)
107
 
108
- # show init status when page loads
109
  init_status = gr.Textbox(label="Startup status", interactive=False)
110
  demo.load(fn=_startup_init, inputs=None, outputs=init_status)
111
 
112
  with gr.Tab("@spaces.GPU Probe"):
113
- gr.Markdown("This button is a real `@spaces.GPU()` entrypoint so ZeroGPU keeps the Space alive.")
114
  with gr.Row():
115
  gpu_btn = gr.Button("GPU Ready Probe", variant="primary")
116
  gpu_out = gr.Textbox(label="GPU Probe Result", interactive=False)
117
- # IMPORTANT: reference the decorated function DIRECTLY
118
  gpu_btn.click(gpu_entrypoint, inputs=None, outputs=gpu_out)
119
 
120
- # On HF Spaces, Gradio serves the app automatically if the variable is named `demo`.
121
  if __name__ == "__main__":
122
- # Local testing only.
123
  demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)
 
1
+ # app.py
2
  import asyncio
3
  import logging
 
4
  import gradio as gr
 
5
  from config import settings
6
+ from openai_server import ChatCompletionsServer, ImagesServer
 
 
 
 
 
7
 
8
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
 
 
 
 
9
  log = logging.getLogger("app")
10
 
 
11
  try:
12
  import spaces
13
+ @spaces.GPU(duration=60)
 
14
  def gpu_entrypoint() -> str:
15
  return "gpu: ready"
16
  except Exception:
17
  def gpu_entrypoint() -> str:
18
  return "gpu: not available (CPU only)"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
+ chat_srv = ChatCompletionsServer(settings.AMQP_URL, exchange_name="oa.chat.create", routing_key="default")
21
+ img_srv = ImagesServer(settings.AMQP_URL, exchange_name="oa.images.generate", routing_key="default")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
 
 
 
 
 
 
23
  async def _startup_init():
24
  try:
25
+ await asyncio.gather(chat_srv.start(), img_srv.start())
26
+ return "OpenAI MQ servers: ready"
 
 
27
  except Exception as e:
28
  log.exception("Startup init failed")
29
+ return f"ERROR: {e}"
30
 
31
+ async def ping() -> str:
32
+ return "ok"
 
33
 
34
+ with gr.Blocks(title="OpenAI over RabbitMQ", theme=gr.themes.Soft()) as demo:
35
+ gr.Markdown("## OpenAI-compatible server over RabbitMQ")
36
  with gr.Tabs():
37
  with gr.Tab("Service"):
38
  with gr.Row():
 
40
  out = gr.Textbox(label="Ping result")
41
  btn.click(ping, inputs=None, outputs=out)
42
 
 
43
  init_status = gr.Textbox(label="Startup status", interactive=False)
44
  demo.load(fn=_startup_init, inputs=None, outputs=init_status)
45
 
46
  with gr.Tab("@spaces.GPU Probe"):
 
47
  with gr.Row():
48
  gpu_btn = gr.Button("GPU Ready Probe", variant="primary")
49
  gpu_out = gr.Textbox(label="GPU Probe Result", interactive=False)
 
50
  gpu_btn.click(gpu_entrypoint, inputs=None, outputs=gpu_out)
51
 
 
52
  if __name__ == "__main__":
 
53
  demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True)
openai_server.py ADDED
@@ -0,0 +1,186 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # openai_server.py
2
+ from __future__ import annotations
3
+ import asyncio, json, time, uuid, math, logging
4
+ from typing import Any, AsyncIterable, Dict, List, Optional
5
+
6
+ import aio_pika
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+ # --------------------------- Helpers ---------------------------
11
+
12
+ def _now() -> int:
13
+ return int(time.time())
14
+
15
+ def _chunk_text(s: str, sz: int = 120) -> List[str]:
16
+ if not s:
17
+ return []
18
+ return [s[i:i+sz] for i in range(0, len(s), sz)]
19
+
20
+ def _last_user_text(messages: List[Dict[str, Any]]) -> str:
21
+ # Accept either string or multimodal parts [{type:"text"/"image_url"/...}]
22
+ for m in reversed(messages or []):
23
+ if (m or {}).get("role") == "user":
24
+ c = m.get("content", "")
25
+ if isinstance(c, str):
26
+ return c
27
+ if isinstance(c, list):
28
+ texts = [p.get("text","") for p in c if p.get("type") == "text"]
29
+ return " ".join([t for t in texts if t])
30
+ return ""
31
+
32
+ # --------------------------- Backends ---------------------------
33
+ # You can replace DummyChatBackend with a real LLM (OpenAI/HF/local).
34
+ class ChatBackend:
35
+ async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
36
+ raise NotImplementedError
37
+
38
+ class DummyChatBackend(ChatBackend):
39
+ async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
40
+ """
41
+ Emits OpenAI-shaped *streaming* chunks.
42
+ - No tool_calls for now (keeps server simple)
43
+ - Mimics delta frames + final finish_reason
44
+ """
45
+ rid = f"chatcmpl-{uuid.uuid4().hex[:12]}"
46
+ model = request.get("model", "gpt-4o-mini")
47
+ text = _last_user_text(request.get("messages", [])) or "(empty)"
48
+ answer = f"Echo (RabbitMQ): {text}"
49
+ now = _now()
50
+
51
+ # First delta sets the role per OpenAI stream shape
52
+ yield {
53
+ "id": rid, "object": "chat.completion.chunk", "created": now, "model": model,
54
+ "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]
55
+ }
56
+
57
+ # Stream content in small pieces
58
+ for piece in _chunk_text(answer, 140):
59
+ yield {
60
+ "id": rid, "object": "chat.completion.chunk", "created": now, "model": model,
61
+ "choices": [{"index": 0, "delta": {"content": piece}, "finish_reason": None}]
62
+ }
63
+
64
+ # Final delta with finish_reason
65
+ yield {
66
+ "id": rid, "object": "chat.completion.chunk", "created": now, "model": model,
67
+ "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
68
+ }
69
+
70
+ class ImagesBackend:
71
+ async def generate_b64(self, request: Dict[str, Any]) -> str:
72
+ """
73
+ Return base64 image string. This is a stub.
74
+ Replace with your image generator (e.g., SDXL, OpenAI gpt-image-1, etc.).
75
+ """
76
+ # For now, return a 1x1 transparent PNG
77
+ return "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGP4BwQACfsD/etCJH0AAAAASUVORK5CYII="
78
+
79
+ # --------------------------- Servers ---------------------------
80
+
81
+ class ChatCompletionsServer:
82
+ """
83
+ Consumes OpenAI Chat Completions requests from exchange `oa.chat.create`,
84
+ routing-key `default`, and streams OpenAI-shaped chunks back to `reply_to`.
85
+ """
86
+ def __init__(self, amqp_url: str, *, exchange_name: str = "oa.chat.create", routing_key: str = "default", backend: Optional[ChatBackend] = None):
87
+ self._amqp_url = amqp_url
88
+ self._exchange_name = exchange_name
89
+ self._routing_key = routing_key
90
+ self._backend = backend or DummyChatBackend()
91
+ self._conn: Optional[aio_pika.RobustConnection] = None
92
+ self._ch: Optional[aio_pika.RobustChannel] = None
93
+ self._ex: Optional[aio_pika.Exchange] = None
94
+ self._queue_name = f"{exchange_name}.{routing_key}"
95
+
96
+ async def start(self):
97
+ self._conn = await aio_pika.connect_robust(self._amqp_url)
98
+ self._ch = await self._conn.channel()
99
+ self._ex = await self._ch.declare_exchange(self._exchange_name, aio_pika.ExchangeType.DIRECT, durable=True)
100
+ q = await self._ch.declare_queue(self._queue_name, durable=True)
101
+ await q.bind(self._ex, routing_key=self._routing_key)
102
+ await q.consume(self._on_message)
103
+ logger.info("ChatCompletionsServer listening on %s/%s → %s", self._exchange_name, self._routing_key, self._queue_name)
104
+
105
+ async def _on_message(self, msg: aio_pika.IncomingMessage):
106
+ async with msg.process(ignore_processed=True):
107
+ try:
108
+ req = json.loads(msg.body.decode("utf-8", errors="replace"))
109
+ reply_to = msg.reply_to
110
+ corr_id = msg.correlation_id
111
+ if not reply_to or not corr_id:
112
+ logger.warning("Missing reply_to/correlation_id; dropping.")
113
+ return
114
+
115
+ async for chunk in self._backend.stream(req):
116
+ await self._ch.default_exchange.publish(
117
+ aio_pika.Message(
118
+ body=json.dumps(chunk).encode("utf-8"),
119
+ correlation_id=corr_id,
120
+ content_type="application/json",
121
+ delivery_mode=aio_pika.DeliveryMode.NOT_PERSISTENT,
122
+ ),
123
+ routing_key=reply_to,
124
+ )
125
+
126
+ # Optional end sentinel
127
+ await self._ch.default_exchange.publish(
128
+ aio_pika.Message(
129
+ body=b'{"object":"stream.end"}',
130
+ correlation_id=corr_id,
131
+ content_type="application/json",
132
+ ),
133
+ routing_key=reply_to,
134
+ )
135
+
136
+ except Exception:
137
+ logger.exception("ChatCompletionsServer: failed to process message")
138
+
139
+ class ImagesServer:
140
+ """
141
+ Consumes OpenAI Images API requests from exchange `oa.images.generate`,
142
+ routing-key `default`, and replies once with {data:[{b64_json:...}], created:...}.
143
+ """
144
+ def __init__(self, amqp_url: str, *, exchange_name: str = "oa.images.generate", routing_key: str = "default", backend: Optional[ImagesBackend] = None):
145
+ self._amqp_url = amqp_url
146
+ self._exchange_name = exchange_name
147
+ self._routing_key = routing_key
148
+ self._backend = backend or ImagesBackend()
149
+ self._conn: Optional[aio_pika.RobustConnection] = None
150
+ self._ch: Optional[aio_pika.RobustChannel] = None
151
+ self._ex: Optional[aio_pika.Exchange] = None
152
+ self._queue_name = f"{exchange_name}.{routing_key}"
153
+
154
+ async def start(self):
155
+ self._conn = await aio_pika.connect_robust(self._amqp_url)
156
+ self._ch = await self._conn.channel()
157
+ self._ex = await self._ch.declare_exchange(self._exchange_name, aio_pika.ExchangeType.DIRECT, durable=True)
158
+ q = await self._ch.declare_queue(self._queue_name, durable=True)
159
+ await q.bind(self._ex, routing_key=self._routing_key)
160
+ await q.consume(self._on_message)
161
+ logger.info("ImagesServer listening on %s/%s → %s", self._exchange_name, self._routing_key, self._queue_name)
162
+
163
+ async def _on_message(self, msg: aio_pika.IncomingMessage):
164
+ async with msg.process(ignore_processed=True):
165
+ try:
166
+ req = json.loads(msg.body.decode("utf-8", errors="replace"))
167
+ reply_to = msg.reply_to
168
+ corr_id = msg.correlation_id
169
+ if not reply_to or not corr_id:
170
+ logger.warning("Missing reply_to/correlation_id; dropping.")
171
+ return
172
+
173
+ b64_img = await self._backend.generate_b64(req)
174
+ resp = {"created": _now(), "data": [{"b64_json": b64_img}]}
175
+
176
+ await self._ch.default_exchange.publish(
177
+ aio_pika.Message(
178
+ body=json.dumps(resp).encode("utf-8"),
179
+ correlation_id=corr_id,
180
+ content_type="application/json",
181
+ ),
182
+ routing_key=reply_to,
183
+ )
184
+
185
+ except Exception:
186
+ logger.exception("ImagesServer: failed to process message")