File size: 1,993 Bytes
bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
import json
from typing import Callable, Dict, List, Optional
import aio_pika
from .rabbit_base import RabbitBase
from .config import settings
# Maps FuncName -> handler coroutine
Handler = Callable[[dict], "awaitable[None]"]
class RabbitListenerBase(RabbitBase):
def __init__(self, service_id: str, handlers: Dict[str, Handler]):
super().__init__()
self._service_id = service_id
self._handlers = handlers
self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
def _qname(self, exchange: str, routing_keys: List[str]) -> str:
rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk != ""])) or ""
suffix = f"-{rk_part}" if rk_part else ""
return f"{settings.RABBIT_INSTANCE_NAME}-{exchange}{suffix}"
async def start(self, declarations: List[dict]):
"""
declarations: list of {ExchangeName, FuncName, MessageTimeout, Type?, RoutingKeys?}
"""
for d in declarations:
exch = d["ExchangeName"]
rks = d.get("RoutingKeys") or [settings.RABBIT_ROUTING_KEY]
ttl = d.get("MessageTimeout") or None
q = await self.declare_queue_bind(exchange=exch, queue_name=self._qname(exch, rks), routing_keys=rks, ttl_ms=ttl)
await q.consume(self._make_consumer(d["FuncName"]))
self._consumers.append(q)
def _make_consumer(self, func_name: str):
handler = self._handlers.get(func_name)
async def _on_msg(msg: aio_pika.IncomingMessage):
async with msg.process():
try:
# Expect CloudEvent JSON
envelope = json.loads(msg.body.decode("utf-8"))
data = envelope.get("data")
if handler:
await handler(data)
except Exception as e:
# swallow to avoid nack loops; your logger can capture details
pass
return _on_msg
|