|
|
import uuid |
|
|
from typing import Any |
|
|
from rabbit_base import RabbitBase |
|
|
from cloud_event import CloudEvent |
|
|
from config import settings |
|
|
from utils import to_json, json_compress_str |
|
|
|
|
|
class RabbitRepo(RabbitBase): |
|
|
def __init__(self, external_source: str): |
|
|
super().__init__(exchange_type_resolver=self._resolve_type) |
|
|
self._external_source = external_source |
|
|
|
|
|
def _resolve_type(self, exch: str) -> str: |
|
|
|
|
|
matches = [k for k in settings.EXCHANGE_TYPES.keys() if exch.lower().startswith(k.lower())] |
|
|
if matches: |
|
|
return settings.EXCHANGE_TYPES[max(matches, key=len)] |
|
|
return settings.RABBIT_EXCHANGE_TYPE |
|
|
|
|
|
async def publish(self, exchange: str, obj: Any, routing_key: str = ""): |
|
|
ex = await self.ensure_exchange(exchange) |
|
|
payload = CloudEvent.wrap(obj, event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"), |
|
|
source=self._external_source, id=str(uuid.uuid4())) |
|
|
await ex.publish(aio_pika.Message(body=payload), routing_key=routing_key) |
|
|
|
|
|
async def publish_jsonz(self, exchange: str, obj: Any, routing_key: str = "", with_id: str | None = None): |
|
|
ex = await self.ensure_exchange(exchange) |
|
|
json_str = to_json(obj) |
|
|
datajsonZ = json_compress_str(json_str) |
|
|
to_send = (datajsonZ, with_id) if with_id else datajsonZ |
|
|
payload = CloudEvent.wrap(to_send, event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"), |
|
|
source=self._external_source, id=str(uuid.uuid4())) |
|
|
await ex.publish(aio_pika.Message(body=payload), routing_key=routing_key) |
|
|
return datajsonZ |
|
|
|