|
|
import json |
|
|
from typing import Callable, Dict, List, Optional |
|
|
import aio_pika |
|
|
from rabbit_base import RabbitBase |
|
|
from config import settings |
|
|
|
|
|
|
|
|
Handler = Callable[[dict], "awaitable[None]"] |
|
|
|
|
|
class RabbitListenerBase(RabbitBase): |
|
|
def __init__(self, service_id: str, handlers: Dict[str, Handler]): |
|
|
super().__init__() |
|
|
self._service_id = service_id |
|
|
self._handlers = handlers |
|
|
self._consumers: List[aio_pika.abc.AbstractRobustQueue] = [] |
|
|
|
|
|
def _qname(self, exchange: str, routing_keys: List[str]) -> str: |
|
|
rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk != ""])) or "" |
|
|
suffix = f"-{rk_part}" if rk_part else "" |
|
|
return f"{settings.RABBIT_INSTANCE_NAME}-{exchange}{suffix}" |
|
|
|
|
|
async def start(self, declarations: List[dict]): |
|
|
""" |
|
|
declarations: list of {ExchangeName, FuncName, MessageTimeout, Type?, RoutingKeys?} |
|
|
""" |
|
|
for d in declarations: |
|
|
exch = d["ExchangeName"] |
|
|
rks = d.get("RoutingKeys") or [settings.RABBIT_ROUTING_KEY] |
|
|
ttl = d.get("MessageTimeout") or None |
|
|
q = await self.declare_queue_bind(exchange=exch, queue_name=self._qname(exch, rks), routing_keys=rks, ttl_ms=ttl) |
|
|
await q.consume(self._make_consumer(d["FuncName"])) |
|
|
self._consumers.append(q) |
|
|
|
|
|
def _make_consumer(self, func_name: str): |
|
|
handler = self._handlers.get(func_name) |
|
|
async def _on_msg(msg: aio_pika.IncomingMessage): |
|
|
async with msg.process(): |
|
|
try: |
|
|
|
|
|
envelope = json.loads(msg.body.decode("utf-8")) |
|
|
data = envelope.get("data") |
|
|
if handler: |
|
|
await handler(data) |
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
return _on_msg |
|
|
|