johnbridges commited on
Commit
fa5f350
·
1 Parent(s): 15d27ef
Files changed (2) hide show
  1. app.py +17 -19
  2. openai_server.py +57 -106
app.py CHANGED
@@ -1,8 +1,7 @@
1
  # app.py
2
- import asyncio
3
- import logging
4
- import gradio as gr
5
  from config import settings
 
6
  from openai_server import ChatCompletionsServer, ImagesServer
7
 
8
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
@@ -11,42 +10,41 @@ log = logging.getLogger("app")
11
  try:
12
  import spaces
13
  @spaces.GPU(duration=60)
14
- def gpu_entrypoint() -> str:
15
- return "gpu: ready"
16
  except Exception:
17
- def gpu_entrypoint() -> str:
18
- return "gpu: not available (CPU only)"
19
 
20
- chat_srv = ChatCompletionsServer(settings.AMQP_URL, exchange_name="oa.chat.create", routing_key="default")
21
- img_srv = ImagesServer(settings.AMQP_URL, exchange_name="oa.images.generate", routing_key="default")
 
 
 
 
22
 
23
  async def _startup_init():
24
  try:
 
 
25
  await asyncio.gather(chat_srv.start(), img_srv.start())
26
  return "OpenAI MQ servers: ready"
27
  except Exception as e:
28
  log.exception("Startup init failed")
29
  return f"ERROR: {e}"
30
 
31
- async def ping() -> str:
32
- return "ok"
33
 
34
  with gr.Blocks(title="OpenAI over RabbitMQ", theme=gr.themes.Soft()) as demo:
35
- gr.Markdown("## OpenAI-compatible server over RabbitMQ")
36
  with gr.Tabs():
37
  with gr.Tab("Service"):
38
- with gr.Row():
39
- btn = gr.Button("Ping")
40
- out = gr.Textbox(label="Ping result")
41
  btn.click(ping, inputs=None, outputs=out)
42
-
43
  init_status = gr.Textbox(label="Startup status", interactive=False)
44
  demo.load(fn=_startup_init, inputs=None, outputs=init_status)
45
 
46
  with gr.Tab("@spaces.GPU Probe"):
47
- with gr.Row():
48
- gpu_btn = gr.Button("GPU Ready Probe", variant="primary")
49
- gpu_out = gr.Textbox(label="GPU Probe Result", interactive=False)
50
  gpu_btn.click(gpu_entrypoint, inputs=None, outputs=gpu_out)
51
 
52
  if __name__ == "__main__":
 
1
  # app.py
2
+ import asyncio, logging, gradio as gr
 
 
3
  from config import settings
4
+ from rabbit_base import RabbitBase
5
  from openai_server import ChatCompletionsServer, ImagesServer
6
 
7
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
 
10
  try:
11
  import spaces
12
  @spaces.GPU(duration=60)
13
+ def gpu_entrypoint() -> str: return "gpu: ready"
 
14
  except Exception:
15
+ def gpu_entrypoint() -> str: return "gpu: not available (CPU only)"
 
16
 
17
+ # Ensure oa.* exchanges are DIRECT; fall back to your global default otherwise.
18
+ resolver = lambda name: ("direct" if name.startswith("oa.") else settings.RABBIT_EXCHANGE_TYPE)
19
+ base = RabbitBase(exchange_type_resolver=resolver)
20
+
21
+ chat_srv = ChatCompletionsServer(base, exchange_name="oa.chat.create", routing_key="default")
22
+ img_srv = ImagesServer(base, exchange_name="oa.images.generate", routing_key="default")
23
 
24
  async def _startup_init():
25
  try:
26
+ # This connect() uses your TLS settings (verify disabled if amqps)
27
+ await base.connect()
28
  await asyncio.gather(chat_srv.start(), img_srv.start())
29
  return "OpenAI MQ servers: ready"
30
  except Exception as e:
31
  log.exception("Startup init failed")
32
  return f"ERROR: {e}"
33
 
34
+ async def ping() -> str: return "ok"
 
35
 
36
  with gr.Blocks(title="OpenAI over RabbitMQ", theme=gr.themes.Soft()) as demo:
37
+ gr.Markdown("## OpenAI-compatible server over RabbitMQ (using RabbitBase TLS settings)")
38
  with gr.Tabs():
39
  with gr.Tab("Service"):
40
+ btn = gr.Button("Ping"); out = gr.Textbox(label="Ping result")
 
 
41
  btn.click(ping, inputs=None, outputs=out)
 
42
  init_status = gr.Textbox(label="Startup status", interactive=False)
43
  demo.load(fn=_startup_init, inputs=None, outputs=init_status)
44
 
45
  with gr.Tab("@spaces.GPU Probe"):
46
+ gpu_btn = gr.Button("GPU Ready Probe", variant="primary")
47
+ gpu_out = gr.Textbox(label="GPU Probe Result", interactive=False)
 
48
  gpu_btn.click(gpu_entrypoint, inputs=None, outputs=gpu_out)
49
 
50
  if __name__ == "__main__":
openai_server.py CHANGED
@@ -1,24 +1,16 @@
1
  # openai_server.py
2
  from __future__ import annotations
3
- import asyncio, json, time, uuid, math, logging
4
- from typing import Any, AsyncIterable, Dict, List, Optional
5
-
6
  import aio_pika
 
7
 
8
  logger = logging.getLogger(__name__)
9
 
10
- # --------------------------- Helpers ---------------------------
11
-
12
- def _now() -> int:
13
- return int(time.time())
14
-
15
- def _chunk_text(s: str, sz: int = 120) -> List[str]:
16
- if not s:
17
- return []
18
- return [s[i:i+sz] for i in range(0, len(s), sz)]
19
-
20
  def _last_user_text(messages: List[Dict[str, Any]]) -> str:
21
- # Accept either string or multimodal parts [{type:"text"/"image_url"/...}]
22
  for m in reversed(messages or []):
23
  if (m or {}).get("role") == "user":
24
  c = m.get("content", "")
@@ -29,158 +21,117 @@ def _last_user_text(messages: List[Dict[str, Any]]) -> str:
29
  return " ".join([t for t in texts if t])
30
  return ""
31
 
32
- # --------------------------- Backends ---------------------------
33
- # You can replace DummyChatBackend with a real LLM (OpenAI/HF/local).
34
  class ChatBackend:
35
  async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
36
  raise NotImplementedError
37
 
38
  class DummyChatBackend(ChatBackend):
39
  async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
40
- """
41
- Emits OpenAI-shaped *streaming* chunks.
42
- - No tool_calls for now (keeps server simple)
43
- - Mimics delta frames + final finish_reason
44
- """
45
  rid = f"chatcmpl-{uuid.uuid4().hex[:12]}"
46
  model = request.get("model", "gpt-4o-mini")
47
  text = _last_user_text(request.get("messages", [])) or "(empty)"
48
  answer = f"Echo (RabbitMQ): {text}"
49
  now = _now()
50
-
51
- # First delta sets the role per OpenAI stream shape
52
- yield {
53
- "id": rid, "object": "chat.completion.chunk", "created": now, "model": model,
54
- "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]
55
- }
56
-
57
- # Stream content in small pieces
58
  for piece in _chunk_text(answer, 140):
59
- yield {
60
- "id": rid, "object": "chat.completion.chunk", "created": now, "model": model,
61
- "choices": [{"index": 0, "delta": {"content": piece}, "finish_reason": None}]
62
- }
63
-
64
- # Final delta with finish_reason
65
- yield {
66
- "id": rid, "object": "chat.completion.chunk", "created": now, "model": model,
67
- "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
68
- }
69
 
70
  class ImagesBackend:
71
  async def generate_b64(self, request: Dict[str, Any]) -> str:
72
- """
73
- Return base64 image string. This is a stub.
74
- Replace with your image generator (e.g., SDXL, OpenAI gpt-image-1, etc.).
75
- """
76
- # For now, return a 1x1 transparent PNG
77
  return "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGP4BwQACfsD/etCJH0AAAAASUVORK5CYII="
78
 
79
- # --------------------------- Servers ---------------------------
80
-
81
  class ChatCompletionsServer:
82
- """
83
- Consumes OpenAI Chat Completions requests from exchange `oa.chat.create`,
84
- routing-key `default`, and streams OpenAI-shaped chunks back to `reply_to`.
85
- """
86
- def __init__(self, amqp_url: str, *, exchange_name: str = "oa.chat.create", routing_key: str = "default", backend: Optional[ChatBackend] = None):
87
- self._amqp_url = amqp_url
88
- self._exchange_name = exchange_name
89
  self._routing_key = routing_key
90
  self._backend = backend or DummyChatBackend()
91
- self._conn: Optional[aio_pika.RobustConnection] = None
92
- self._ch: Optional[aio_pika.RobustChannel] = None
93
- self._ex: Optional[aio_pika.Exchange] = None
94
  self._queue_name = f"{exchange_name}.{routing_key}"
95
 
96
  async def start(self):
97
- self._conn = await aio_pika.connect_robust(self._amqp_url)
98
- self._ch = await self._conn.channel()
99
- self._ex = await self._ch.declare_exchange(self._exchange_name, aio_pika.ExchangeType.DIRECT, durable=True)
100
- q = await self._ch.declare_queue(self._queue_name, durable=True)
101
- await q.bind(self._ex, routing_key=self._routing_key)
 
 
102
  await q.consume(self._on_message)
103
- logger.info("ChatCompletionsServer listening on %s/%s → %s", self._exchange_name, self._routing_key, self._queue_name)
 
104
 
105
  async def _on_message(self, msg: aio_pika.IncomingMessage):
106
  async with msg.process(ignore_processed=True):
107
  try:
108
  req = json.loads(msg.body.decode("utf-8", errors="replace"))
109
  reply_to = msg.reply_to
110
- corr_id = msg.correlation_id
111
  if not reply_to or not corr_id:
112
  logger.warning("Missing reply_to/correlation_id; dropping.")
113
  return
114
 
115
  async for chunk in self._backend.stream(req):
116
- await self._ch.default_exchange.publish(
117
- aio_pika.Message(
118
- body=json.dumps(chunk).encode("utf-8"),
119
- correlation_id=corr_id,
120
- content_type="application/json",
121
- delivery_mode=aio_pika.DeliveryMode.NOT_PERSISTENT,
122
- ),
123
  routing_key=reply_to,
124
- )
125
-
126
- # Optional end sentinel
127
- await self._ch.default_exchange.publish(
128
- aio_pika.Message(
129
- body=b'{"object":"stream.end"}',
130
  correlation_id=corr_id,
131
- content_type="application/json",
132
- ),
 
133
  routing_key=reply_to,
 
 
134
  )
135
-
136
  except Exception:
137
  logger.exception("ChatCompletionsServer: failed to process message")
138
 
139
  class ImagesServer:
140
- """
141
- Consumes OpenAI Images API requests from exchange `oa.images.generate`,
142
- routing-key `default`, and replies once with {data:[{b64_json:...}], created:...}.
143
- """
144
- def __init__(self, amqp_url: str, *, exchange_name: str = "oa.images.generate", routing_key: str = "default", backend: Optional[ImagesBackend] = None):
145
- self._amqp_url = amqp_url
146
- self._exchange_name = exchange_name
147
  self._routing_key = routing_key
148
  self._backend = backend or ImagesBackend()
149
- self._conn: Optional[aio_pika.RobustConnection] = None
150
- self._ch: Optional[aio_pika.RobustChannel] = None
151
- self._ex: Optional[aio_pika.Exchange] = None
152
  self._queue_name = f"{exchange_name}.{routing_key}"
153
 
154
  async def start(self):
155
- self._conn = await aio_pika.connect_robust(self._amqp_url)
156
- self._ch = await self._conn.channel()
157
- self._ex = await self._ch.declare_exchange(self._exchange_name, aio_pika.ExchangeType.DIRECT, durable=True)
158
- q = await self._ch.declare_queue(self._queue_name, durable=True)
159
- await q.bind(self._ex, routing_key=self._routing_key)
 
160
  await q.consume(self._on_message)
161
- logger.info("ImagesServer listening on %s/%s → %s", self._exchange_name, self._routing_key, self._queue_name)
 
162
 
163
  async def _on_message(self, msg: aio_pika.IncomingMessage):
164
  async with msg.process(ignore_processed=True):
165
  try:
166
  req = json.loads(msg.body.decode("utf-8", errors="replace"))
167
  reply_to = msg.reply_to
168
- corr_id = msg.correlation_id
169
  if not reply_to or not corr_id:
170
  logger.warning("Missing reply_to/correlation_id; dropping.")
171
  return
172
 
173
- b64_img = await self._backend.generate_b64(req)
174
- resp = {"created": _now(), "data": [{"b64_json": b64_img}]}
175
-
176
- await self._ch.default_exchange.publish(
177
- aio_pika.Message(
178
- body=json.dumps(resp).encode("utf-8"),
179
- correlation_id=corr_id,
180
- content_type="application/json",
181
- ),
182
  routing_key=reply_to,
 
 
183
  )
184
-
185
  except Exception:
186
  logger.exception("ImagesServer: failed to process message")
 
1
  # openai_server.py
2
  from __future__ import annotations
3
+ import json, time, uuid, logging
4
+ from typing import Any, Dict, List, Optional, AsyncIterable
 
5
  import aio_pika
6
+ from rabbit_base import RabbitBase
7
 
8
  logger = logging.getLogger(__name__)
9
 
10
+ def _now() -> int: return int(time.time())
11
+ def _chunk_text(s: str, sz: int = 140) -> List[str]:
12
+ return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else []
 
 
 
 
 
 
 
13
  def _last_user_text(messages: List[Dict[str, Any]]) -> str:
 
14
  for m in reversed(messages or []):
15
  if (m or {}).get("role") == "user":
16
  c = m.get("content", "")
 
21
  return " ".join([t for t in texts if t])
22
  return ""
23
 
24
+ # ------------ Backends ------------
 
25
  class ChatBackend:
26
  async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
27
  raise NotImplementedError
28
 
29
  class DummyChatBackend(ChatBackend):
30
  async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
 
 
 
 
 
31
  rid = f"chatcmpl-{uuid.uuid4().hex[:12]}"
32
  model = request.get("model", "gpt-4o-mini")
33
  text = _last_user_text(request.get("messages", [])) or "(empty)"
34
  answer = f"Echo (RabbitMQ): {text}"
35
  now = _now()
36
+ # role delta first
37
+ yield {"id": rid,"object":"chat.completion.chunk","created":now,"model":model,
38
+ "choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":None}]}
39
+ # content deltas
 
 
 
 
40
  for piece in _chunk_text(answer, 140):
41
+ yield {"id": rid,"object":"chat.completion.chunk","created":now,"model":model,
42
+ "choices":[{"index":0,"delta":{"content":piece},"finish_reason":None}]}
43
+ # final
44
+ yield {"id": rid,"object":"chat.completion.chunk","created":now,"model":model,
45
+ "choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
 
 
 
 
 
46
 
47
  class ImagesBackend:
48
  async def generate_b64(self, request: Dict[str, Any]) -> str:
49
+ # 1x1 transparent PNG
 
 
 
 
50
  return "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGP4BwQACfsD/etCJH0AAAAASUVORK5CYII="
51
 
52
+ # ------------ Servers using RabbitBase ------------
 
53
  class ChatCompletionsServer:
54
+ def __init__(self, base: RabbitBase, *, exchange_name="oa.chat.create", routing_key="default",
55
+ backend: Optional[ChatBackend] = None):
56
+ self._base = base
57
+ self._exchange = exchange_name
 
 
 
58
  self._routing_key = routing_key
59
  self._backend = backend or DummyChatBackend()
 
 
 
60
  self._queue_name = f"{exchange_name}.{routing_key}"
61
 
62
  async def start(self):
63
+ # declare exchange + queue with *your* TLS/connection behavior
64
+ q = await self._base.declare_queue_bind(
65
+ exchange=self._exchange,
66
+ queue_name=self._queue_name,
67
+ routing_keys=[self._routing_key],
68
+ ttl_ms=None,
69
+ )
70
  await q.consume(self._on_message)
71
+ logger.info("ChatCompletionsServer listening on %s/%s → %s",
72
+ self._exchange, self._routing_key, self._queue_name)
73
 
74
  async def _on_message(self, msg: aio_pika.IncomingMessage):
75
  async with msg.process(ignore_processed=True):
76
  try:
77
  req = json.loads(msg.body.decode("utf-8", errors="replace"))
78
  reply_to = msg.reply_to
79
+ corr_id = msg.correlation_id
80
  if not reply_to or not corr_id:
81
  logger.warning("Missing reply_to/correlation_id; dropping.")
82
  return
83
 
84
  async for chunk in self._backend.stream(req):
85
+ await self._base.publish_basic(
 
 
 
 
 
 
86
  routing_key=reply_to,
87
+ body=json.dumps(chunk).encode("utf-8"),
 
 
 
 
 
88
  correlation_id=corr_id,
89
+ )
90
+ # optional end sentinel
91
+ await self._base.publish_basic(
92
  routing_key=reply_to,
93
+ body=b'{"object":"stream.end"}',
94
+ correlation_id=corr_id,
95
  )
 
96
  except Exception:
97
  logger.exception("ChatCompletionsServer: failed to process message")
98
 
99
  class ImagesServer:
100
+ def __init__(self, base: RabbitBase, *, exchange_name="oa.images.generate", routing_key="default",
101
+ backend: Optional[ImagesBackend] = None):
102
+ self._base = base
103
+ self._exchange = exchange_name
 
 
 
104
  self._routing_key = routing_key
105
  self._backend = backend or ImagesBackend()
 
 
 
106
  self._queue_name = f"{exchange_name}.{routing_key}"
107
 
108
  async def start(self):
109
+ q = await self._base.declare_queue_bind(
110
+ exchange=self._exchange,
111
+ queue_name=self._queue_name,
112
+ routing_keys=[self._routing_key],
113
+ ttl_ms=None,
114
+ )
115
  await q.consume(self._on_message)
116
+ logger.info("ImagesServer listening on %s/%s → %s",
117
+ self._exchange, self._routing_key, self._queue_name)
118
 
119
  async def _on_message(self, msg: aio_pika.IncomingMessage):
120
  async with msg.process(ignore_processed=True):
121
  try:
122
  req = json.loads(msg.body.decode("utf-8", errors="replace"))
123
  reply_to = msg.reply_to
124
+ corr_id = msg.correlation_id
125
  if not reply_to or not corr_id:
126
  logger.warning("Missing reply_to/correlation_id; dropping.")
127
  return
128
 
129
+ b64 = await self._backend.generate_b64(req)
130
+ resp = {"created": _now(), "data":[{"b64_json": b64}]}
131
+ await self._base.publish_basic(
 
 
 
 
 
 
132
  routing_key=reply_to,
133
+ body=json.dumps(resp).encode("utf-8"),
134
+ correlation_id=corr_id,
135
  )
 
136
  except Exception:
137
  logger.exception("ImagesServer: failed to process message")