Commit
·
a9000ae
1
Parent(s):
3b719b5
- rabbit_base.py +19 -1
rabbit_base.py
CHANGED
|
@@ -6,6 +6,22 @@ import ssl
|
|
| 6 |
|
| 7 |
ExchangeResolver = Callable[[str], str] # exchangeName -> exchangeType
|
| 8 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 9 |
|
| 10 |
def _parse_amqp_url(url: str) -> dict:
|
| 11 |
parts = urlsplit(url)
|
|
@@ -53,7 +69,9 @@ class RabbitBase:
|
|
| 53 |
await self.connect()
|
| 54 |
if name in self._exchanges:
|
| 55 |
return self._exchanges[name]
|
| 56 |
-
|
|
|
|
|
|
|
| 57 |
ex = await self._chan.declare_exchange(
|
| 58 |
name, getattr(aio_pika.ExchangeType, ex_type), durable=True
|
| 59 |
)
|
|
|
|
| 6 |
|
| 7 |
ExchangeResolver = Callable[[str], str] # exchangeName -> exchangeType
|
| 8 |
|
| 9 |
+
# rabbit_base.py
|
| 10 |
+
import aio_pika
|
| 11 |
+
|
| 12 |
+
def _normalize_exchange_type(val: str) -> aio_pika.ExchangeType:
|
| 13 |
+
# 1) Try attribute by NAME (DIRECT/FANOUT/TOPIC/HEADERS)
|
| 14 |
+
if isinstance(val, str):
|
| 15 |
+
name = val.upper()
|
| 16 |
+
if hasattr(aio_pika.ExchangeType, name):
|
| 17 |
+
return getattr(aio_pika.ExchangeType, name)
|
| 18 |
+
# 2) Try enum by VALUE ("direct"/"fanout"/"topic"/"headers")
|
| 19 |
+
try:
|
| 20 |
+
return aio_pika.ExchangeType(val.lower())
|
| 21 |
+
except Exception:
|
| 22 |
+
pass
|
| 23 |
+
# 3) Default
|
| 24 |
+
return aio_pika.ExchangeType.TOPIC
|
| 25 |
|
| 26 |
def _parse_amqp_url(url: str) -> dict:
|
| 27 |
parts = urlsplit(url)
|
|
|
|
| 69 |
await self.connect()
|
| 70 |
if name in self._exchanges:
|
| 71 |
return self._exchanges[name]
|
| 72 |
+
ex_type_str = self._exchange_type_resolver(name) # e.g. "direct"
|
| 73 |
+
ex_type = _normalize_exchange_type(ex_type_str)
|
| 74 |
+
|
| 75 |
ex = await self._chan.declare_exchange(
|
| 76 |
name, getattr(aio_pika.ExchangeType, ex_type), durable=True
|
| 77 |
)
|