|
|
import json |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime, timezone |
|
|
from typing import Any |
|
|
|
|
|
@dataclass |
|
|
class CloudEvent: |
|
|
id: str |
|
|
type: str |
|
|
source: str |
|
|
time: str |
|
|
data: Any |
|
|
|
|
|
@staticmethod |
|
|
def wrap(obj: Any, *, event_type: str, source: str, id: str) -> bytes: |
|
|
evt = CloudEvent( |
|
|
id=id, |
|
|
type=event_type or (obj.__class__.__name__ if obj is not None else "NullOrEmpty"), |
|
|
source=source, |
|
|
time=datetime.now(timezone.utc).isoformat(), |
|
|
data=obj, |
|
|
) |
|
|
return json.dumps(asdict(evt), ensure_ascii=False).encode("utf-8") |
|
|
|