File size: 3,533 Bytes
0bf22fe bf292d9 66c4f69 2c8368f 0bf22fe 66c4f69 2c8368f bbfbcdd 2c8368f bbfbcdd ba71442 66c4f69 32b704b 2c8368f 66c4f69 bbfbcdd bf292d9 0bfda05 bf292d9 3692feb 66c4f69 3692feb 66c4f69 3692feb bf292d9 66c4f69 0bf22fe 2c8368f 0bf22fe bbfbcdd 66c4f69 2c8368f 0bf22fe 2c8368f 0bf22fe bf292d9 2c8368f 0bf22fe 2c8368f bbfbcdd 66c4f69 bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# rabbit_repo.py
import uuid
import asyncio
import logging
from typing import Any, Optional
import aiormq
import aio_pika
from config import settings
from models import CloudEvent
from rabbit_base import RabbitBase
from utils import to_json, json_compress_str
logger = logging.getLogger(__name__)
class RabbitRepo(RabbitBase):
def __init__(self, external_source: str):
super().__init__(exchange_type_resolver=self._resolve_type)
self._source = external_source
def _resolve_type(self, exch: str) -> str:
if exch.lower().startswith("oa."):
return "direct"
if hasattr(settings, 'EXCHANGE_TYPES') and settings.EXCHANGE_TYPES:
matches = [k for k in settings.EXCHANGE_TYPES.keys()
if exch.lower().startswith(k.lower())]
if matches:
return settings.EXCHANGE_TYPES[max(matches, key=len)]
return "fanout"
async def _publish_with_retry(self, exchange: str, body: bytes, routing_key: str = "") -> None:
attempts, delay = 0, 0.5
while True:
try:
ex = await self.ensure_exchange(exchange)
msg = aio_pika.Message(
body=body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
)
await ex.publish(msg, routing_key=routing_key)
return
except (asyncio.CancelledError,
aiormq.exceptions.ChannelInvalidStateError,
aiormq.exceptions.ConnectionClosed,
aio_pika.exceptions.AMQPError,
RuntimeError) as e:
attempts += 1
logger.warning("publish failed attempt=%d exchange=%s rk=%s err=%r",
attempts, exchange, routing_key, e)
try:
await self.close()
except Exception:
pass
if attempts >= 5:
logger.exception("publish giving up after %d attempts", attempts)
raise
await asyncio.sleep(delay)
delay = min(delay * 2, 5.0)
async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=payload,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await self._publish_with_retry(exchange, body, routing_key)
async def publish_jsonz(
self,
exchange: str,
obj: Any,
routing_key: str = "",
with_id: Optional[str] = None,
) -> str:
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
datajson = to_json(payload)
datajsonZ = json_compress_str(datajson)
wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=wrapped,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await self._publish_with_retry(exchange, body, routing_key)
return datajsonZ
|