File size: 3,533 Bytes
0bf22fe
bf292d9
66c4f69
 
2c8368f
0bf22fe
66c4f69
2c8368f
 
bbfbcdd
2c8368f
bbfbcdd
ba71442
66c4f69
32b704b
2c8368f
66c4f69
bbfbcdd
 
bf292d9
0bfda05
bf292d9
 
3692feb
66c4f69
3692feb
66c4f69
 
3692feb
 
 
bf292d9
66c4f69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0bf22fe
 
2c8368f
 
 
 
0bf22fe
 
bbfbcdd
66c4f69
2c8368f
0bf22fe
 
 
 
 
 
 
 
 
2c8368f
0bf22fe
bf292d9
2c8368f
 
 
 
0bf22fe
2c8368f
bbfbcdd
66c4f69
bf292d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# rabbit_repo.py
import uuid
import asyncio
import logging
from typing import Any, Optional

import aiormq
import aio_pika

from config import settings
from models import CloudEvent
from rabbit_base import RabbitBase
from utils import to_json, json_compress_str

logger = logging.getLogger(__name__)


class RabbitRepo(RabbitBase):
    def __init__(self, external_source: str):
        super().__init__(exchange_type_resolver=self._resolve_type)
        self._source = external_source

    def _resolve_type(self, exch: str) -> str:
        if exch.lower().startswith("oa."):
            return "direct"
        if hasattr(settings, 'EXCHANGE_TYPES') and settings.EXCHANGE_TYPES:
            matches = [k for k in settings.EXCHANGE_TYPES.keys()
                       if exch.lower().startswith(k.lower())]
            if matches:
                return settings.EXCHANGE_TYPES[max(matches, key=len)]
        return "fanout"

    async def _publish_with_retry(self, exchange: str, body: bytes, routing_key: str = "") -> None:
        attempts, delay = 0, 0.5
        while True:
            try:
                ex = await self.ensure_exchange(exchange)
                msg = aio_pika.Message(
                    body=body,
                    delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
                )
                await ex.publish(msg, routing_key=routing_key)
                return
            except (asyncio.CancelledError,
                    aiormq.exceptions.ChannelInvalidStateError,
                    aiormq.exceptions.ConnectionClosed,
                    aio_pika.exceptions.AMQPError,
                    RuntimeError) as e:
                attempts += 1
                logger.warning("publish failed attempt=%d exchange=%s rk=%s err=%r",
                               attempts, exchange, routing_key, e)
                try:
                    await self.close()
                except Exception:
                    pass
                if attempts >= 5:
                    logger.exception("publish giving up after %d attempts", attempts)
                    raise
                await asyncio.sleep(delay)
                delay = min(delay * 2, 5.0)

    async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
        payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
        evt = CloudEvent.wrap(
            event_id=str(uuid.uuid4()),
            event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
            source=self._source,
            data=payload,
        )
        body = evt.model_dump_json(exclude_none=True).encode("utf-8")
        await self._publish_with_retry(exchange, body, routing_key)

    async def publish_jsonz(
        self,
        exchange: str,
        obj: Any,
        routing_key: str = "",
        with_id: Optional[str] = None,
    ) -> str:
        payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
        datajson = to_json(payload)
        datajsonZ = json_compress_str(datajson)
        wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ

        evt = CloudEvent.wrap(
            event_id=str(uuid.uuid4()),
            event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
            source=self._source,
            data=wrapped,
        )
        body = evt.model_dump_json(exclude_none=True).encode("utf-8")
        await self._publish_with_retry(exchange, body, routing_key)
        return datajsonZ