File size: 2,826 Bytes
bf292d9 bbfbcdd a3a1b05 ba71442 ee5adc7 bf292d9 bbfbcdd a3a1b05 01785f3 a3a1b05 bf292d9 a3a1b05 0bf22fe bf292d9 0bf22fe bf292d9 05be9a1 a3a1b05 05be9a1 01785f3 05be9a1 01785f3 bf292d9 a3a1b05 0bf22fe bf292d9 0bf22fe bf292d9 0bf22fe bf292d9 0bf22fe a3a1b05 0bf22fe bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
from typing import Callable, Dict, List, Optional
import aio_pika
from urllib.parse import urlsplit, unquote
from config import settings
import ssl
ExchangeResolver = Callable[[str], str] # exchangeName -> exchangeType
def _parse_amqp_url(url: str) -> dict:
parts = urlsplit(url)
return {
"host": parts.hostname or "localhost",
"port": parts.port or (5671 if parts.scheme == "amqps" else 5672),
"login": parts.username or "guest",
"password": parts.password or "guest",
"virtualhost": unquote(parts.path[1:] or "/"),
"ssl": parts.scheme == "amqps",
}
class RabbitBase:
def __init__(self, exchange_type_resolver: Optional[ExchangeResolver] = None):
self._conn: Optional[aio_pika.RobustConnection] = None
self._chan: Optional[aio_pika.RobustChannel] = None
self._exchanges: Dict[str, aio_pika.Exchange] = {}
self._exchange_type_resolver = exchange_type_resolver or (
lambda _: settings.RABBIT_EXCHANGE_TYPE
)
async def connect(self) -> None:
if self._conn and not self._conn.is_closed:
return
conn_kwargs = _parse_amqp_url(str(settings.AMQP_URL))
# Build an SSLContext that DISABLES verification
ssl_ctx = None
if conn_kwargs.get("ssl"):
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
# Pass ssl_context explicitly – this is what aio-pika supports
self._conn = await aio_pika.connect_robust(
**conn_kwargs,
ssl_context=ssl_ctx # <- key bit
)
self._chan = await self._conn.channel()
await self._chan.set_qos(prefetch_count=settings.RABBIT_PREFETCH)
async def ensure_exchange(self, name: str) -> aio_pika.Exchange:
await self.connect()
if name in self._exchanges:
return self._exchanges[name]
ex_type = self._exchange_type_resolver(name)
ex = await self._chan.declare_exchange(
name, getattr(aio_pika.ExchangeType, ex_type), durable=True
)
self._exchanges[name] = ex
return ex
async def declare_queue_bind(
self,
exchange: str,
queue_name: str,
routing_keys: List[str],
ttl_ms: Optional[int],
):
await self.connect()
ex = await self.ensure_exchange(exchange)
args: Dict[str, int] = {}
if ttl_ms:
args["x-message-ttl"] = ttl_ms
q = await self._chan.declare_queue(
queue_name,
durable=True,
exclusive=False,
auto_delete=True,
arguments=args,
)
for rk in routing_keys or [""]:
await q.bind(ex, rk)
return q
|