johnbridges commited on
Commit
9f3b48c
·
1 Parent(s): fa5f350
Files changed (3) hide show
  1. app.py +31 -11
  2. oa_server.py +111 -0
  3. openai_server.py +0 -137
app.py CHANGED
@@ -2,7 +2,9 @@
2
  import asyncio, logging, gradio as gr
3
  from config import settings
4
  from rabbit_base import RabbitBase
5
- from openai_server import ChatCompletionsServer, ImagesServer
 
 
6
 
7
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
8
  log = logging.getLogger("app")
@@ -14,27 +16,45 @@ try:
14
  except Exception:
15
  def gpu_entrypoint() -> str: return "gpu: not available (CPU only)"
16
 
17
- # Ensure oa.* exchanges are DIRECT; fall back to your global default otherwise.
18
- resolver = lambda name: ("direct" if name.startswith("oa.") else settings.RABBIT_EXCHANGE_TYPE)
 
 
19
  base = RabbitBase(exchange_type_resolver=resolver)
20
 
21
- chat_srv = ChatCompletionsServer(base, exchange_name="oa.chat.create", routing_key="default")
22
- img_srv = ImagesServer(base, exchange_name="oa.images.generate", routing_key="default")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
  async def _startup_init():
25
  try:
26
- # This connect() uses your TLS settings (verify disabled if amqps)
27
- await base.connect()
28
- await asyncio.gather(chat_srv.start(), img_srv.start())
29
  return "OpenAI MQ servers: ready"
30
  except Exception as e:
31
  log.exception("Startup init failed")
32
  return f"ERROR: {e}"
33
 
34
- async def ping() -> str: return "ok"
35
 
36
- with gr.Blocks(title="OpenAI over RabbitMQ", theme=gr.themes.Soft()) as demo:
37
- gr.Markdown("## OpenAI-compatible server over RabbitMQ (using RabbitBase TLS settings)")
38
  with gr.Tabs():
39
  with gr.Tab("Service"):
40
  btn = gr.Button("Ping"); out = gr.Textbox(label="Ping result")
 
2
  import asyncio, logging, gradio as gr
3
  from config import settings
4
  from rabbit_base import RabbitBase
5
+ from listener import RabbitListenerBase
6
+ from rabbit_repo import RabbitRepo
7
+ from oa_server import OpenAIServers
8
 
9
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
10
  log = logging.getLogger("app")
 
16
  except Exception:
17
  def gpu_entrypoint() -> str: return "gpu: not available (CPU only)"
18
 
19
+ # publisher + servers
20
+ publisher = RabbitRepo(external_source="openai.mq.server")
21
+ # Force oa.* exchanges to DIRECT using the built-in resolver hook (no raw API added)
22
+ resolver = (lambda name: "direct" if name.startswith("oa.") else settings.RABBIT_EXCHANGE_TYPE)
23
  base = RabbitBase(exchange_type_resolver=resolver)
24
 
25
+ servers = OpenAIServers(publisher)
26
+
27
+ # Existing handlers can stay; add our two:
28
+ handlers = {
29
+ "oaChatCreate": servers.handle_chat_create,
30
+ "oaImagesGenerate": servers.handle_images_generate,
31
+ }
32
+
33
+ # Declare listener queues using your proven pattern
34
+ DECLS = [
35
+ # Chat Completions
36
+ {"ExchangeName": "oa.chat.create", "FuncName": "oaChatCreate",
37
+ "MessageTimeout": 600_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
38
+ # Images (generations)
39
+ {"ExchangeName": "oa.images.generate", "FuncName": "oaImagesGenerate",
40
+ "MessageTimeout": 600_000, "RoutingKeys": [settings.RABBIT_ROUTING_KEY]},
41
+ ]
42
+
43
+ listener = RabbitListenerBase(base, instance_name=settings.RABBIT_INSTANCE_NAME, handlers=handlers)
44
 
45
  async def _startup_init():
46
  try:
47
+ await base.connect() # your TLS settings apply here
48
+ await listener.start(DECLS) # same listener pattern as before
 
49
  return "OpenAI MQ servers: ready"
50
  except Exception as e:
51
  log.exception("Startup init failed")
52
  return f"ERROR: {e}"
53
 
54
+ async def ping(): return "ok"
55
 
56
+ with gr.Blocks(title="OpenAI over RabbitMQ (proven API)", theme=gr.themes.Soft()) as demo:
57
+ gr.Markdown("## OpenAI-compatible over RabbitMQ using existing Rabbit API (CloudEvent envelopes)")
58
  with gr.Tabs():
59
  with gr.Tab("Service"):
60
  btn = gr.Button("Ping"); out = gr.Textbox(label="Ping result")
oa_server.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # oa_server.py
2
+ from __future__ import annotations
3
+ import json, time, uuid, logging
4
+ from typing import Any, Dict, List, AsyncIterable, Optional
5
+
6
+ from rabbit_repo import RabbitRepo
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+ # ------------------ helpers ------------------
11
+ def _now() -> int: return int(time.time())
12
+ def _chunk_text(s: str, sz: int = 140) -> List[str]:
13
+ return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else []
14
+ def _last_user_text(messages: List[Dict[str, Any]]) -> str:
15
+ for m in reversed(messages or []):
16
+ if (m or {}).get("role") == "user":
17
+ c = m.get("content", "")
18
+ if isinstance(c, str):
19
+ return c
20
+ if isinstance(c, list):
21
+ texts = [p.get("text","") for p in c if p.get("type") == "text"]
22
+ return " ".join([t for t in texts if t])
23
+ return ""
24
+
25
+ # ------------------ backends (replace later) ------------------
26
+ class ChatBackend:
27
+ async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
28
+ raise NotImplementedError
29
+
30
+ class DummyChatBackend(ChatBackend):
31
+ async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
32
+ rid = f"chatcmpl-{uuid.uuid4().hex[:12]}"
33
+ model = request.get("model", "gpt-4o-mini")
34
+ text = _last_user_text(request.get("messages", [])) or "(empty)"
35
+ out = f"Echo (Rabbit): {text}"
36
+ now = _now()
37
+
38
+ # role delta
39
+ yield {"id": rid, "object":"chat.completion.chunk", "created": now, "model": model,
40
+ "choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":None}]}
41
+ # content deltas
42
+ for piece in _chunk_text(out, 140):
43
+ yield {"id": rid, "object":"chat.completion.chunk", "created": now, "model": model,
44
+ "choices":[{"index":0,"delta":{"content":piece},"finish_reason":None}]}
45
+ # final delta
46
+ yield {"id": rid, "object":"chat.completion.chunk", "created": now, "model": model,
47
+ "choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
48
+
49
+ class ImagesBackend:
50
+ async def generate_b64(self, request: Dict[str, Any]) -> str:
51
+ # 1x1 transparent PNG (stub)
52
+ return "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGP4BwQACfsD/etCJH0AAAAASUVORK5CYII="
53
+
54
+ # ------------------ handler class ------------------
55
+ class OpenAIServers:
56
+ """
57
+ Handlers you can register in RabbitListenerBase:
58
+ - 'oaChatCreate' -> handle_chat_create
59
+ - 'oaImagesGenerate' -> handle_images_generate
60
+ Uses RabbitRepo.publish(...) to emit CloudEvent-wrapped OpenAI JSON.
61
+ """
62
+ def __init__(self, publisher: RabbitRepo,
63
+ *, chat_backend: Optional[ChatBackend] = None,
64
+ images_backend: Optional[ImagesBackend] = None):
65
+ self._pub = publisher
66
+ self._chat = chat_backend or DummyChatBackend()
67
+ self._img = images_backend or ImagesBackend()
68
+
69
+ # -------- Chat Completions --------
70
+ async def handle_chat_create(self, data: Dict[str, Any]) -> None:
71
+ """
72
+ data: OpenAI chat request + 'reply_key' (string)
73
+ Server publishes to exchange 'oa.chat.reply' with routing_key = reply_key.
74
+ """
75
+ if not isinstance(data, dict):
76
+ logger.warning("oaChatCreate: data is not a dict")
77
+ return
78
+
79
+ reply_key = data.get("reply_key")
80
+ if not reply_key:
81
+ logger.error("oaChatCreate: missing reply_key")
82
+ return
83
+
84
+ try:
85
+ async for chunk in self._chat.stream(data):
86
+ # CloudEvent-wrapped OpenAI chunk to oa.chat.reply
87
+ await self._pub.publish("oa.chat.reply", chunk, routing_key=reply_key)
88
+ # Optional sentinel
89
+ await self._pub.publish("oa.chat.reply", {"object": "stream.end"}, routing_key=reply_key)
90
+ except Exception:
91
+ logger.exception("oaChatCreate: streaming failed")
92
+
93
+ # -------- Images (generations) --------
94
+ async def handle_images_generate(self, data: Dict[str, Any]) -> None:
95
+ """
96
+ data: OpenAI images.generate request + 'reply_key' (string)
97
+ """
98
+ if not isinstance(data, dict):
99
+ logger.warning("oaImagesGenerate: data is not a dict")
100
+ return
101
+ reply_key = data.get("reply_key")
102
+ if not reply_key:
103
+ logger.error("oaImagesGenerate: missing reply_key")
104
+ return
105
+
106
+ try:
107
+ b64 = await self._img.generate_b64(data)
108
+ resp = {"created": _now(), "data":[{"b64_json": b64}]}
109
+ await self._pub.publish("oa.images.reply", resp, routing_key=reply_key)
110
+ except Exception:
111
+ logger.exception("oaImagesGenerate: generation failed")
openai_server.py DELETED
@@ -1,137 +0,0 @@
1
- # openai_server.py
2
- from __future__ import annotations
3
- import json, time, uuid, logging
4
- from typing import Any, Dict, List, Optional, AsyncIterable
5
- import aio_pika
6
- from rabbit_base import RabbitBase
7
-
8
- logger = logging.getLogger(__name__)
9
-
10
- def _now() -> int: return int(time.time())
11
- def _chunk_text(s: str, sz: int = 140) -> List[str]:
12
- return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else []
13
- def _last_user_text(messages: List[Dict[str, Any]]) -> str:
14
- for m in reversed(messages or []):
15
- if (m or {}).get("role") == "user":
16
- c = m.get("content", "")
17
- if isinstance(c, str):
18
- return c
19
- if isinstance(c, list):
20
- texts = [p.get("text","") for p in c if p.get("type") == "text"]
21
- return " ".join([t for t in texts if t])
22
- return ""
23
-
24
- # ------------ Backends ------------
25
- class ChatBackend:
26
- async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
27
- raise NotImplementedError
28
-
29
- class DummyChatBackend(ChatBackend):
30
- async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]:
31
- rid = f"chatcmpl-{uuid.uuid4().hex[:12]}"
32
- model = request.get("model", "gpt-4o-mini")
33
- text = _last_user_text(request.get("messages", [])) or "(empty)"
34
- answer = f"Echo (RabbitMQ): {text}"
35
- now = _now()
36
- # role delta first
37
- yield {"id": rid,"object":"chat.completion.chunk","created":now,"model":model,
38
- "choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":None}]}
39
- # content deltas
40
- for piece in _chunk_text(answer, 140):
41
- yield {"id": rid,"object":"chat.completion.chunk","created":now,"model":model,
42
- "choices":[{"index":0,"delta":{"content":piece},"finish_reason":None}]}
43
- # final
44
- yield {"id": rid,"object":"chat.completion.chunk","created":now,"model":model,
45
- "choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
46
-
47
- class ImagesBackend:
48
- async def generate_b64(self, request: Dict[str, Any]) -> str:
49
- # 1x1 transparent PNG
50
- return "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGP4BwQACfsD/etCJH0AAAAASUVORK5CYII="
51
-
52
- # ------------ Servers using RabbitBase ------------
53
- class ChatCompletionsServer:
54
- def __init__(self, base: RabbitBase, *, exchange_name="oa.chat.create", routing_key="default",
55
- backend: Optional[ChatBackend] = None):
56
- self._base = base
57
- self._exchange = exchange_name
58
- self._routing_key = routing_key
59
- self._backend = backend or DummyChatBackend()
60
- self._queue_name = f"{exchange_name}.{routing_key}"
61
-
62
- async def start(self):
63
- # declare exchange + queue with *your* TLS/connection behavior
64
- q = await self._base.declare_queue_bind(
65
- exchange=self._exchange,
66
- queue_name=self._queue_name,
67
- routing_keys=[self._routing_key],
68
- ttl_ms=None,
69
- )
70
- await q.consume(self._on_message)
71
- logger.info("ChatCompletionsServer listening on %s/%s → %s",
72
- self._exchange, self._routing_key, self._queue_name)
73
-
74
- async def _on_message(self, msg: aio_pika.IncomingMessage):
75
- async with msg.process(ignore_processed=True):
76
- try:
77
- req = json.loads(msg.body.decode("utf-8", errors="replace"))
78
- reply_to = msg.reply_to
79
- corr_id = msg.correlation_id
80
- if not reply_to or not corr_id:
81
- logger.warning("Missing reply_to/correlation_id; dropping.")
82
- return
83
-
84
- async for chunk in self._backend.stream(req):
85
- await self._base.publish_basic(
86
- routing_key=reply_to,
87
- body=json.dumps(chunk).encode("utf-8"),
88
- correlation_id=corr_id,
89
- )
90
- # optional end sentinel
91
- await self._base.publish_basic(
92
- routing_key=reply_to,
93
- body=b'{"object":"stream.end"}',
94
- correlation_id=corr_id,
95
- )
96
- except Exception:
97
- logger.exception("ChatCompletionsServer: failed to process message")
98
-
99
- class ImagesServer:
100
- def __init__(self, base: RabbitBase, *, exchange_name="oa.images.generate", routing_key="default",
101
- backend: Optional[ImagesBackend] = None):
102
- self._base = base
103
- self._exchange = exchange_name
104
- self._routing_key = routing_key
105
- self._backend = backend or ImagesBackend()
106
- self._queue_name = f"{exchange_name}.{routing_key}"
107
-
108
- async def start(self):
109
- q = await self._base.declare_queue_bind(
110
- exchange=self._exchange,
111
- queue_name=self._queue_name,
112
- routing_keys=[self._routing_key],
113
- ttl_ms=None,
114
- )
115
- await q.consume(self._on_message)
116
- logger.info("ImagesServer listening on %s/%s → %s",
117
- self._exchange, self._routing_key, self._queue_name)
118
-
119
- async def _on_message(self, msg: aio_pika.IncomingMessage):
120
- async with msg.process(ignore_processed=True):
121
- try:
122
- req = json.loads(msg.body.decode("utf-8", errors="replace"))
123
- reply_to = msg.reply_to
124
- corr_id = msg.correlation_id
125
- if not reply_to or not corr_id:
126
- logger.warning("Missing reply_to/correlation_id; dropping.")
127
- return
128
-
129
- b64 = await self._backend.generate_b64(req)
130
- resp = {"created": _now(), "data":[{"b64_json": b64}]}
131
- await self._base.publish_basic(
132
- routing_key=reply_to,
133
- body=json.dumps(resp).encode("utf-8"),
134
- correlation_id=corr_id,
135
- )
136
- except Exception:
137
- logger.exception("ImagesServer: failed to process message")