File size: 4,225 Bytes
9f3b48c
 
 
 
d6ff847
9f3b48c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0122f7
9f3b48c
66c4f69
 
9f3b48c
f0122f7
 
 
9f3b48c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66c4f69
 
 
 
 
 
9f3b48c
66c4f69
 
 
 
9f3b48c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66c4f69
 
 
 
9f3b48c
 
11dee55
66c4f69
11dee55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# oa_server.py
from __future__ import annotations
import json, time, uuid, logging
from typing import Any, Dict, List, AsyncIterable, Optional
from backends_base import ChatBackend, ImagesBackend

from rabbit_repo import RabbitRepo

logger = logging.getLogger(__name__)

# ------------------ helpers ------------------
def _now() -> int: return int(time.time())
def _chunk_text(s: str, sz: int = 140) -> List[str]:
    return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else []
def _last_user_text(messages: List[Dict[str, Any]]) -> str:
    for m in reversed(messages or []):
        if (m or {}).get("role") == "user":
            c = m.get("content", "")
            if isinstance(c, str):
                return c
            if isinstance(c, list):
                texts = [p.get("text","") for p in c if p.get("type") == "text"]
                return " ".join([t for t in texts if t])
    return ""


# ------------------ handler class ------------------
class OpenAIServers:
    """
    Handlers you can register in RabbitListenerBase:
      - 'oaChatCreate'       -> handle_chat_create
      - 'oaImagesGenerate'   -> handle_images_generate
    Uses RabbitRepo.publish(...) to emit CloudEvent-wrapped OpenAI JSON.
    """

    def __init__(self, publisher: RabbitRepo,
                 *, chat_backend: Optional[ChatBackend] = None,
                 images_backend: Optional[ImagesBackend] = None):
        self._pub = publisher
        self._chat = chat_backend
        self._img  = images_backend

    # -------- Chat Completions --------
    async def handle_chat_create(self, data: Dict[str, Any]) -> None:
        """
        data: OpenAI chat request + 'reply_key' (string)
        Server publishes to exchange 'oa.chat.reply' with routing_key = reply_key.
        """
        if not isinstance(data, dict):
            logger.warning("oaChatCreate: data is not a dict")
            return

        reply_key = data.get("reply_key")
        if not reply_key:
            logger.error("oaChatCreate: missing reply_key")
            return

        try:
            async for chunk in self._chat.stream(data):
                try:
                    await self._pub.publish("oa.chat.reply", chunk, routing_key=reply_key)
                except Exception:
                    logger.exception("oaChatCreate: publish failed")
                    break  # stop streaming on publish failure

            # Optional sentinel
            try:
                await self._pub.publish("oa.chat.reply", {"object": "stream.end"}, routing_key=reply_key)
            except Exception:
                logger.exception("oaChatCreate: publish sentinel failed")
        except Exception:
            logger.exception("oaChatCreate: streaming failed")

    # -------- Images (generations) --------
    async def handle_images_generate(self, data: Dict[str, Any]) -> None:
        """
        data: OpenAI images.generate request + 'reply_key' (string)
        """
        if not isinstance(data, dict):
            logger.warning("oaImagesGenerate: data is not a dict")
            return
        reply_key = data.get("reply_key")
        if not reply_key:
            logger.error("oaImagesGenerate: missing reply_key")
            return

        try:
            b64 = await self._img.generate_b64(data)
            resp = {"created": _now(), "data":[{"b64_json": b64}]}
            try:
                await self._pub.publish("oa.images.reply", resp, routing_key=reply_key)
            except Exception:
                logger.exception("oaImagesGenerate: publish failed")
        except Exception:
            logger.exception("oaImagesGenerate: generation failed")


# --- at the bottom of oa_server.py ---
# Provide aliases expected by vllm_backend.py
try:
    ChatBackend  # type: ignore[name-defined]
except NameError:
    try:
        from typing import TYPE_CHECKING
        # If your actual names differ, map them here:
        ChatBackend = ChatCompletionsBackend          # noqa: F821
    except Exception:
        pass

try:
    ImagesBackend  # type: ignore[name-defined]
except NameError:
    try:
        ImagesBackend = ImageGenerationsBackend       # noqa: F821
    except Exception:
        pass