Spaces:
Sleeping
Sleeping
| """ | |
| core/runner_env.py | |
| Minimal HTTP-based environment client. | |
| - Talks to a single env worker exposing: POST /reset, POST /step | |
| Future hooks (commented below) for: | |
| - episode_id, seed on reset | |
| - request_id on step | |
| - custom headers (auth/trace) | |
| """ | |
| from __future__ import annotations | |
| from abc import ABC, abstractmethod | |
| from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar | |
| import requests | |
| from .client_types import StepResult | |
| from .containers.runtime.uv_provider import UVProvider | |
| from .containers.runtime import LocalDockerProvider | |
| if TYPE_CHECKING: | |
| from .containers.runtime import ContainerProvider | |
| ActT = TypeVar("ActT") | |
| ObsT = TypeVar("ObsT") | |
| EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient") | |
| class HTTPEnvClient(ABC, Generic[ActT, ObsT]): | |
| def __init__( | |
| self, | |
| base_url: str, | |
| request_timeout_s: float = 15.0, | |
| default_headers: Optional[Dict[str, str]] = None, | |
| provider: Optional["ContainerProvider"] = None, | |
| ): | |
| self._base = base_url.rstrip("/") | |
| self._timeout = float(request_timeout_s) | |
| self._http = requests.Session() | |
| self._headers = default_headers or {} | |
| self._provider = provider | |
| def from_docker_image( | |
| cls: Type[EnvClientT], | |
| image: str, | |
| provider: Optional["ContainerProvider"] = None, | |
| **kwargs: Any, | |
| ) -> EnvClientT: | |
| """ | |
| Create an environment client by spinning up a Docker container locally. | |
| This is a development utility that: | |
| 1. Starts a Docker container from the specified image | |
| 2. Waits for the server to be ready | |
| 3. Creates and returns a client instance connected to the container | |
| Note: | |
| The caller or a higher-level orchestrator manages the container | |
| lifecycle. The container continues running until it is stopped. | |
| Args: | |
| image: Docker image name to run (e.g., "echo-env:latest") | |
| provider: Container provider to use (defaults to | |
| ``LocalDockerProvider``) | |
| **kwargs: Additional arguments passed to | |
| ``provider.start_container()`` (e.g., env_vars, port) | |
| Returns: | |
| An instance of the client class connected to the running container | |
| Example: | |
| >>> from envs.coding_env.client import CodingEnv | |
| >>> from envs.coding_env.models import CodeAction | |
| >>> | |
| >>> # Create environment from image | |
| >>> env = CodingEnv.from_docker_image("coding-env:latest") | |
| >>> | |
| >>> # Create environment with custom env vars | |
| >>> env = CodingEnv.from_docker_image( | |
| ... "coding-env:latest", | |
| ... env_vars={"MY_VAR": "value"} | |
| ... ) | |
| >>> | |
| >>> # Use the environment | |
| >>> result = env.reset() | |
| >>> print(result.observation) | |
| >>> | |
| >>> step_result = env.step(CodeAction(code="print('hello')")) | |
| >>> print(step_result.observation.stdout) | |
| >>> | |
| >>> # Cleanup (optional) | |
| >>> env.close() | |
| """ | |
| # Use default provider if none provided | |
| if provider is None: | |
| provider = LocalDockerProvider() | |
| # 1. Start container with optional kwargs (e.g., env_vars, port) | |
| base_url = provider.start_container(image, **kwargs) | |
| # 2. Wait for server to be ready | |
| provider.wait_for_ready(base_url) | |
| # 3. Create and return client instance with provider reference | |
| return cls(base_url=base_url, provider=provider) | |
| def from_hub( | |
| cls: Type[EnvClientT], | |
| space_id: str, | |
| *, | |
| use_docker: bool = False, | |
| provider: Optional["ContainerProvider"] = None, | |
| host: str = "0.0.0.0", | |
| port: Optional[int] = None, | |
| reload: bool = False, | |
| timeout_s: float = 60.0, | |
| runner: Optional[UVProvider] = None, | |
| project_url: Optional[str] = None, | |
| connect_host: Optional[str] = None, | |
| extra_env: Optional[Dict[str, str]] = None, | |
| **provider_kwargs: Any, | |
| ) -> EnvClientT: | |
| """Create a client from a Hugging Face Space.""" | |
| if use_docker: | |
| if provider is None: | |
| provider = LocalDockerProvider() | |
| tag = provider_kwargs.pop("tag", "latest") | |
| image = provider_kwargs.pop( | |
| "image", | |
| f"registry.hf.space/{space_id.replace('/', '-')}:{tag}", | |
| ) | |
| base_url = provider.start_container(image, **provider_kwargs) | |
| provider.wait_for_ready(base_url, timeout_s=timeout_s) | |
| return cls(base_url=base_url, provider=provider) | |
| uv_runner = runner or UVProvider( | |
| space_id=space_id, | |
| host=host, | |
| port=port, | |
| reload=reload, | |
| project_url=project_url, | |
| connect_host=connect_host, | |
| extra_env=extra_env, | |
| ) | |
| non_docker_kwargs = dict(provider_kwargs) | |
| env_vars = non_docker_kwargs.pop("env_vars", None) | |
| base_url = uv_runner.start_container( | |
| space_id, | |
| port=port, | |
| env_vars=env_vars, | |
| **non_docker_kwargs, | |
| ) | |
| try: | |
| uv_runner.wait_for_ready(base_url, timeout_s=timeout_s) | |
| except Exception: | |
| uv_runner.stop_container() | |
| raise | |
| return cls(base_url=base_url, provider=uv_runner) | |
| def _step_payload(self, action: ActT) -> dict: | |
| """Convert an action to the JSON payload expected by the server.""" | |
| raise NotImplementedError | |
| def _parse_result(self, payload: dict) -> StepResult[ObsT]: | |
| """Convert a JSON response into :class:`StepResult`.""" | |
| raise NotImplementedError | |
| def _parse_state(self, payload: dict) -> Any: | |
| """Convert state JSON into a :class:`State` object.""" | |
| raise NotImplementedError | |
| # ---------- Environment Server Interface Methods ---------- | |
| def reset(self) -> StepResult[ObsT]: | |
| body: Dict[str, Any] = {} | |
| # TODO: later: | |
| # body["seed"] = seed | |
| # body["episode_id"] = episode_id | |
| r = self._http.post( | |
| f"{self._base}/reset", | |
| json=body, | |
| headers=self._headers, | |
| timeout=self._timeout, | |
| ) | |
| r.raise_for_status() | |
| return self._parse_result(r.json()) | |
| def step(self, action: ActT) -> StepResult[ObsT]: | |
| body: Dict[str, Any] = { | |
| "action": self._step_payload(action), | |
| "timeout_s": int(self._timeout), | |
| } | |
| # TODO: later: | |
| # body["request_id"] = str(uuid.uuid4()) | |
| # body["episode_id"] = current_episode_id | |
| r = self._http.post( | |
| f"{self._base}/step", | |
| json=body, | |
| headers=self._headers, | |
| timeout=self._timeout, | |
| ) | |
| r.raise_for_status() | |
| return self._parse_result(r.json()) | |
| def state(self) -> Any: | |
| """ | |
| Get the current environment state from the server. | |
| Returns: | |
| State object with environment state information (e.g., | |
| episode_id, step_count) | |
| Example: | |
| >>> client = EchoEnv.from_docker_image("echo-env:latest") | |
| >>> result = client.reset() | |
| >>> state = client.state() | |
| >>> print(state.episode_id) | |
| >>> print(state.step_count) | |
| """ | |
| r = self._http.get( | |
| f"{self._base}/state", | |
| headers=self._headers, | |
| timeout=self._timeout, | |
| ) | |
| r.raise_for_status() | |
| return self._parse_state(r.json()) | |
| def close(self) -> None: | |
| """ | |
| Close the environment and clean up resources. | |
| If this client was created via from_docker_image(), this will stop | |
| and remove the associated container. | |
| """ | |
| if self._provider is not None: | |
| self._provider.stop_container() | |