File size: 2,289 Bytes
bf292d9 7630510 0bf22fe bf292d9 0bf22fe 7630510 bf292d9 2c8368f 0bf22fe 2c8368f 0bf22fe bf292d9 2c8368f bf292d9 2c8368f bf292d9 0bf22fe bf292d9 2c8368f 0bf22fe bf292d9 2c8368f bf292d9 7630510 0bf22fe bf292d9 7630510 2c8368f bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
import json
import logging
from typing import Callable, Awaitable, Dict, Any, List
import aio_pika
Handler = Callable[[Any], Awaitable[None]] # payload is envelope["data"]
# Configure root logger if not already configured
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
class RabbitListenerBase:
def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
self._base = base
self._instance_name = instance_name # queue prefix (like your .NET instance name)
self._handlers = handlers
self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
def _qname(self, exchange: str, routing_keys: List[str]) -> str:
rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
suffix = f"-{rk_part}" if rk_part else ""
return f"{self._instance_name}-{exchange}{suffix}"
async def start(self, declarations: List[dict]) -> None:
for d in declarations:
exch = d["ExchangeName"]
ttl = d.get("MessageTimeout") or None
rks = d.get("RoutingKeys") or [""]
qname = self._qname(exch, rks)
q = await self._base.declare_queue_bind(
exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
)
await q.consume(self._make_consumer(d["FuncName"]))
self._consumers.append(q)
def _make_consumer(self, func_name: str):
handler = self._handlers.get(func_name)
async def _on_msg(msg: aio_pika.IncomingMessage):
async with msg.process():
try:
raw_body = msg.body.decode("utf-8", errors="replace")
logger.info(
"Received message for handler '%s': %s",
func_name,
raw_body
)
envelope = json.loads(raw_body)
data = envelope.get("data", None)
if handler:
await handler(data)
except Exception as e:
logger.exception("Error processing message for '%s'", func_name)
return _on_msg
|