GradLLM / cloud_event.py
johnbridges's picture
.
5de81bd
raw
history blame
642 Bytes
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any
@dataclass
class CloudEvent:
id: str
type: str
source: str
time: str
data: Any
@staticmethod
def wrap(obj: Any, *, event_type: str, source: str, id: str) -> bytes:
evt = CloudEvent(
id=id,
type=event_type or (obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=source,
time=datetime.now(timezone.utc).isoformat(),
data=obj,
)
return json.dumps(asdict(evt), ensure_ascii=False).encode("utf-8")