File size: 2,826 Bytes
bf292d9
bbfbcdd
a3a1b05
ba71442
ee5adc7
bf292d9
 
 
bbfbcdd
a3a1b05
 
 
 
 
 
 
 
01785f3
a3a1b05
 
 
bf292d9
 
 
 
 
a3a1b05
 
0bf22fe
bf292d9
0bf22fe
bf292d9
 
05be9a1
a3a1b05
05be9a1
01785f3
 
05be9a1
01785f3
 
 
 
 
 
 
 
 
bf292d9
 
 
 
 
 
 
a3a1b05
0bf22fe
 
 
bf292d9
 
 
0bf22fe
 
 
 
 
 
 
bf292d9
 
0bf22fe
bf292d9
 
0bf22fe
a3a1b05
 
 
 
 
0bf22fe
bf292d9
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from typing import Callable, Dict, List, Optional
import aio_pika
from urllib.parse import urlsplit, unquote
from config import settings
import ssl

ExchangeResolver = Callable[[str], str]  # exchangeName -> exchangeType


def _parse_amqp_url(url: str) -> dict:
    parts = urlsplit(url)
    return {
        "host": parts.hostname or "localhost",
        "port": parts.port or (5671 if parts.scheme == "amqps" else 5672),
        "login": parts.username or "guest",
        "password": parts.password or "guest",
        "virtualhost": unquote(parts.path[1:] or "/"),
        "ssl": parts.scheme == "amqps",
    }


class RabbitBase:
    def __init__(self, exchange_type_resolver: Optional[ExchangeResolver] = None):
        self._conn: Optional[aio_pika.RobustConnection] = None
        self._chan: Optional[aio_pika.RobustChannel] = None
        self._exchanges: Dict[str, aio_pika.Exchange] = {}
        self._exchange_type_resolver = exchange_type_resolver or (
            lambda _: settings.RABBIT_EXCHANGE_TYPE
        )

    async def connect(self) -> None:
        if self._conn and not self._conn.is_closed:
            return

        conn_kwargs = _parse_amqp_url(str(settings.AMQP_URL))

        # Build an SSLContext that DISABLES verification
        ssl_ctx = None
        if conn_kwargs.get("ssl"):
            ssl_ctx = ssl.create_default_context()
            ssl_ctx.check_hostname = False
            ssl_ctx.verify_mode = ssl.CERT_NONE

        # Pass ssl_context explicitly – this is what aio-pika supports
        self._conn = await aio_pika.connect_robust(
            **conn_kwargs,
            ssl_context=ssl_ctx  # <- key bit
        )
        self._chan = await self._conn.channel()
        await self._chan.set_qos(prefetch_count=settings.RABBIT_PREFETCH)

    async def ensure_exchange(self, name: str) -> aio_pika.Exchange:
        await self.connect()
        if name in self._exchanges:
            return self._exchanges[name]
        ex_type = self._exchange_type_resolver(name)
        ex = await self._chan.declare_exchange(
            name, getattr(aio_pika.ExchangeType, ex_type), durable=True
        )
        self._exchanges[name] = ex
        return ex

    async def declare_queue_bind(
        self,
        exchange: str,
        queue_name: str,
        routing_keys: List[str],
        ttl_ms: Optional[int],
    ):
        await self.connect()
        ex = await self.ensure_exchange(exchange)
        args: Dict[str, int] = {}
        if ttl_ms:
            args["x-message-ttl"] = ttl_ms
        q = await self._chan.declare_queue(
            queue_name,
            durable=True,
            exclusive=False,
            auto_delete=True,
            arguments=args,
        )
        for rk in routing_keys or [""]:
            await q.bind(ex, rk)
        return q