johnbridges commited on
Commit
0bf22fe
·
1 Parent(s): 98324a6
Files changed (3) hide show
  1. listener.py +15 -7
  2. rabbit_base.py +20 -8
  3. rabbit_repo.py +17 -9
listener.py CHANGED
@@ -1,11 +1,16 @@
 
1
  import json
 
 
2
  import aio_pika
3
- from typing import Callable, Awaitable, Dict, Any, List, Optional
 
 
4
 
5
  class RabbitListenerBase:
6
- def __init__(self, base, instance_name: str, handlers: Dict[str, Callable[[Any], Awaitable[None]]]):
7
  self._base = base
8
- self._instance_name = instance_name
9
  self._handlers = handlers
10
  self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
11
 
@@ -14,13 +19,15 @@ class RabbitListenerBase:
14
  suffix = f"-{rk_part}" if rk_part else ""
15
  return f"{self._instance_name}-{exchange}{suffix}"
16
 
17
- async def start(self, declarations: List[dict]):
18
  for d in declarations:
19
  exch = d["ExchangeName"]
20
  ttl = d.get("MessageTimeout") or None
21
  rks = d.get("RoutingKeys") or [""]
22
  qname = self._qname(exch, rks)
23
- q = await self._base.declare_queue_bind(exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl)
 
 
24
  await q.consume(self._make_consumer(d["FuncName"]))
25
  self._consumers.append(q)
26
 
@@ -31,11 +38,12 @@ class RabbitListenerBase:
31
  async with msg.process():
32
  try:
33
  envelope = json.loads(msg.body.decode("utf-8"))
34
- data = envelope.get("data", None) # dict / list / str
 
35
  if handler:
36
  await handler(data)
37
  except Exception:
38
- # Avoid requeue storms; log if you add a logger
39
  pass
40
 
41
  return _on_msg
 
1
+ # listener.py
2
  import json
3
+ from typing import Callable, Awaitable, Dict, Any, List
4
+
5
  import aio_pika
6
+
7
+ Handler = Callable[[Any], Awaitable[None]] # payload is envelope["data"]
8
+
9
 
10
  class RabbitListenerBase:
11
+ def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
12
  self._base = base
13
+ self._instance_name = instance_name # queue prefix (like your .NET instance name)
14
  self._handlers = handlers
15
  self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
16
 
 
19
  suffix = f"-{rk_part}" if rk_part else ""
20
  return f"{self._instance_name}-{exchange}{suffix}"
21
 
22
+ async def start(self, declarations: List[dict]) -> None:
23
  for d in declarations:
24
  exch = d["ExchangeName"]
25
  ttl = d.get("MessageTimeout") or None
26
  rks = d.get("RoutingKeys") or [""]
27
  qname = self._qname(exch, rks)
28
+ q = await self._base.declare_queue_bind(
29
+ exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
30
+ )
31
  await q.consume(self._make_consumer(d["FuncName"]))
32
  self._consumers.append(q)
33
 
 
38
  async with msg.process():
39
  try:
40
  envelope = json.loads(msg.body.decode("utf-8"))
41
+ # Expect CloudEvent-ish envelope; we only need the 'data' field
42
+ data = envelope.get("data", None)
43
  if handler:
44
  await handler(data)
45
  except Exception:
46
+ # Avoid requeue storms; add logging if you want
47
  pass
48
 
49
  return _on_msg
rabbit_base.py CHANGED
@@ -1,4 +1,4 @@
1
- import asyncio
2
  from typing import Callable, Dict, List, Optional
3
  import aio_pika
4
 
@@ -12,9 +12,11 @@ class RabbitBase:
12
  self._conn: Optional[aio_pika.RobustConnection] = None
13
  self._chan: Optional[aio_pika.RobustChannel] = None
14
  self._exchanges: Dict[str, aio_pika.Exchange] = {}
15
- self._exchange_type_resolver = exchange_type_resolver or (lambda _: settings.RABBIT_EXCHANGE_TYPE)
 
 
16
 
17
- async def connect(self):
18
  if self._conn and not self._conn.is_closed:
19
  return
20
  self._conn = await aio_pika.connect_robust(str(settings.AMQP_URL))
@@ -25,18 +27,28 @@ class RabbitBase:
25
  await self.connect()
26
  if name in self._exchanges:
27
  return self._exchanges[name]
28
- ex_type = self._exchange_type_resolver(name)
29
- ex = await self._chan.declare_exchange(name, getattr(aio_pika.ExchangeType, ex_type), durable=True)
 
 
30
  self._exchanges[name] = ex
31
  return ex
32
 
33
- async def declare_queue_bind(self, exchange: str, queue_name: str, routing_keys: List[str], ttl_ms: Optional[int]):
 
 
 
 
 
 
34
  await self.connect()
35
  ex = await self.ensure_exchange(exchange)
36
- args = {}
37
  if ttl_ms:
38
  args["x-message-ttl"] = ttl_ms
39
- q = await self._chan.declare_queue(queue_name, durable=True, exclusive=False, auto_delete=True, arguments=args)
 
 
40
  for rk in routing_keys or [""]:
41
  await q.bind(ex, rk)
42
  return q
 
1
+ # rabbit_base.py
2
  from typing import Callable, Dict, List, Optional
3
  import aio_pika
4
 
 
12
  self._conn: Optional[aio_pika.RobustConnection] = None
13
  self._chan: Optional[aio_pika.RobustChannel] = None
14
  self._exchanges: Dict[str, aio_pika.Exchange] = {}
15
+ self._exchange_type_resolver = (
16
+ exchange_type_resolver or (lambda _: settings.RABBIT_EXCHANGE_TYPE)
17
+ )
18
 
19
+ async def connect(self) -> None:
20
  if self._conn and not self._conn.is_closed:
21
  return
22
  self._conn = await aio_pika.connect_robust(str(settings.AMQP_URL))
 
27
  await self.connect()
28
  if name in self._exchanges:
29
  return self._exchanges[name]
30
+ ex_type = self._exchange_type_resolver(name) # "fanout"|"topic"|"direct"|"headers"
31
+ ex = await self._chan.declare_exchange(
32
+ name, getattr(aio_pika.ExchangeType, ex_type), durable=True
33
+ )
34
  self._exchanges[name] = ex
35
  return ex
36
 
37
+ async def declare_queue_bind(
38
+ self,
39
+ exchange: str,
40
+ queue_name: str,
41
+ routing_keys: List[str],
42
+ ttl_ms: Optional[int],
43
+ ):
44
  await self.connect()
45
  ex = await self.ensure_exchange(exchange)
46
+ args: Dict[str, int] = {}
47
  if ttl_ms:
48
  args["x-message-ttl"] = ttl_ms
49
+ q = await self._chan.declare_queue(
50
+ queue_name, durable=True, exclusive=False, auto_delete=True, arguments=args
51
+ )
52
  for rk in routing_keys or [""]:
53
  await q.bind(ex, rk)
54
  return q
rabbit_repo.py CHANGED
@@ -1,5 +1,7 @@
 
1
  import uuid
2
  from typing import Any, Optional
 
3
  import aio_pika
4
 
5
  from config import settings
@@ -20,30 +22,36 @@ class RabbitRepo(RabbitBase):
20
  return settings.EXCHANGE_TYPES[max(matches, key=len)]
21
  return settings.RABBIT_EXCHANGE_TYPE
22
 
23
- async def publish(self, exchange: str, obj: Any, routing_key: str = ""):
24
  ex = await self.ensure_exchange(exchange)
25
- # rabbit_repo.py
26
  evt = CloudEvent.wrap(
27
  event_id=str(uuid.uuid4()),
28
  event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
29
  source=self._source,
30
- data=obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True),
31
- )
32
  body = evt.model_dump_json(exclude_none=True).encode("utf-8")
33
-
34
  await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
35
 
36
- async def publish_jsonz(self, exchange: str, obj: Any, routing_key: str = "", with_id: Optional[str] = None) -> str:
 
 
 
 
 
 
37
  ex = await self.ensure_exchange(exchange)
38
- datajson = to_json(obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True))
 
39
  datajsonZ = json_compress_str(datajson)
40
- payload: Any = (datajsonZ, with_id) if with_id else datajsonZ
41
 
42
  evt = CloudEvent.wrap(
43
  event_id=str(uuid.uuid4()),
44
  event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
45
  source=self._source,
46
- data=payload,
47
  )
48
  body = evt.model_dump_json(exclude_none=True).encode("utf-8")
49
  await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
 
1
+ # rabbit_repo.py
2
  import uuid
3
  from typing import Any, Optional
4
+
5
  import aio_pika
6
 
7
  from config import settings
 
22
  return settings.EXCHANGE_TYPES[max(matches, key=len)]
23
  return settings.RABBIT_EXCHANGE_TYPE
24
 
25
+ async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
26
  ex = await self.ensure_exchange(exchange)
27
+ payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
28
  evt = CloudEvent.wrap(
29
  event_id=str(uuid.uuid4()),
30
  event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
31
  source=self._source,
32
+ data=payload,
33
+ )
34
  body = evt.model_dump_json(exclude_none=True).encode("utf-8")
 
35
  await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
36
 
37
+ async def publish_jsonz(
38
+ self,
39
+ exchange: str,
40
+ obj: Any,
41
+ routing_key: str = "",
42
+ with_id: Optional[str] = None,
43
+ ) -> str:
44
  ex = await self.ensure_exchange(exchange)
45
+ payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
46
+ datajson = to_json(payload)
47
  datajsonZ = json_compress_str(datajson)
48
+ wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ
49
 
50
  evt = CloudEvent.wrap(
51
  event_id=str(uuid.uuid4()),
52
  event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
53
  source=self._source,
54
+ data=wrapped,
55
  )
56
  body = evt.model_dump_json(exclude_none=True).encode("utf-8")
57
  await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)