Source code for embedl_hub._internal.tracking.client

# Copyright (C) 2025, 2026 Embedl AB

import logging
import os
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urljoin

from embedl_hub._internal.compat import UTC
from embedl_hub._internal.core.types import RemotePath

if TYPE_CHECKING:
    from embedl_hub._internal.core.context import HubContext

from embedl_hub._internal.core.config import load_ctx_config
from embedl_hub._internal.tracking.errors import (
    ArtifactUploadError,
    UnsupportedDeviceError,
    raise_if_artifact_error,
    raise_if_run_error,
)
from embedl_hub._internal.tracking.rest_api import (
    ApiConfig,
    ApiError,
    ArtifactStatus,
    CompletedRunStatus,
    Device,
    ExternalLink,
    Metric,
    Parameter,
    Project,
    Run,
    RunStatus,
    RunType,
    Tag,
    _RemoveParent,
    coerce_run_type,
    create_artifact,
    create_artifact_upload_url,
    create_project,
    create_run,
    get_devices,
    get_project_by_name,
    log_external_link,
    log_metric,
    log_param,
    log_tag,
    update_artifact,
    update_run,
    upload_file,
    upload_file_to_gcs,
)
from embedl_hub._internal.tracking.run_log import (
    LoggedArtifact,
    LoggedExternalLink,
    LoggedMetric,
    LoggedParam,
    RunLog,
    RunLogHistory,
)

API_KEY_ENV_VAR_NAME = "EMBEDL_HUB_API_KEY"
BASE_URL_ENV_VAR_NAME = "EMBEDL_HUB_API_BASE_URL"

DEFAULT_API_BASE_URL = "https://hub.embedl.com/"

logger = logging.getLogger(__name__)


def _resolve_run_type(
    type: RunType | str,
    custom_type: str | None = None,
) -> tuple[RunType, str | None]:
    """Resolve a run type and optional custom type label.

    When ``type`` is a string it is first matched case-insensitively
    (after stripping whitespace) against the :class:`RunType` enum.
    Matching strings are coerced to the corresponding enum member.
    Strings that do not match any known member are treated as custom
    type labels and ``RunType.CUSTOM`` is returned.

    :param type: A ``RunType`` enum member or a string label.  Known
        type names are coerced to their enum member regardless of case
        (e.g. ``"compile"`` → :attr:`RunType.COMPILE`).  Unknown strings
        become custom type labels.
    :param custom_type: An explicit custom type label (only used when
        ``type`` is :attr:`RunType.CUSTOM`).
    :return: A ``(RunType, custom_type_label)`` tuple.
    :raises ValueError: If ``type`` (or its coerced value) is
        :attr:`RunType.CUSTOM` but no label is supplied.
    """
    if isinstance(type, str):
        coerced = coerce_run_type(type)
        if isinstance(coerced, RunType):
            type = coerced
        else:
            return RunType.CUSTOM, type

    if type is RunType.CUSTOM and not custom_type:
        raise ValueError(
            "A custom_type label is required when type=RunType.CUSTOM."
        )

    return type, custom_type if type is RunType.CUSTOM else None


[docs] class Client: """Tracks projects and runs for the Embedl Hub web app.""" _api_config: ApiConfig | None _project: Project | None _active_run: Run | None _current_run_log: RunLog | None _run_log_history: list[RunLog] _log_remote_artifacts: bool def __init__( self, api_config: ApiConfig | None = None, *, log_remote_artifacts: bool = True, ) -> None: """Create a new tracking client. :param api_config: API configuration. When ``None``, the configuration is loaded from the environment on first use. :param log_remote_artifacts: Whether to fetch remote artifacts from devices and upload them to the Hub when :meth:`log_artifact` is called with a remote path or a remote :class:`~embedl_hub._internal.tracking.run_log.LoggedArtifact`. Defaults to ``True``. Set to ``False`` to skip the download-and-upload step and speed up execution. """ self._api_config = api_config self._project = None self._active_run = None self._current_run_log = None self._run_log_history = [] self._log_remote_artifacts = log_remote_artifacts @property def log_remote_artifacts(self) -> bool: """Whether remote artifacts are fetched and uploaded to the Hub. When ``True`` (the default), calling :meth:`log_artifact` with a remote path or a remote :class:`~embedl_hub._internal.tracking.run_log.LoggedArtifact` will download the file from the device and upload it to the Embedl Hub. Set to ``False`` to skip the download-and-upload step and speed up execution. The artifact is still recorded in the run log, but its contents are not transferred. """ return self._log_remote_artifacts @log_remote_artifacts.setter def log_remote_artifacts(self, value: bool) -> None: self._log_remote_artifacts = value
[docs] def set_project(self, name: str) -> Project: """Set or create the current project by name.""" project = get_project_by_name(self.api_config, name) if not project: project = create_project(self.api_config, name) self._project = project return project
[docs] def create_run( self, type: RunType | str, name: str | None = None, parent_run_id: str | None = None, custom_type: str | None = None, ) -> Run: """Create a new run for the current project.""" resolved_type, resolved_custom_type = _resolve_run_type( type, custom_type ) project = self.project try: run = create_run( self.api_config, type=resolved_type, name=name, project_id=project.id, parent_run_id=parent_run_id, custom_type=resolved_custom_type, ) except ApiError as exc: raise_if_run_error(exc) raise return run
[docs] def update_active_run( self, status: CompletedRunStatus | None = None, ended_at: datetime | None = None, metrics: list[Metric] | None = None, params: list[Parameter] | None = None, external_links: list[ExternalLink] | None = None, parent_run_id: str | _RemoveParent | None = None, ) -> None: """Update the status and end time of the active run.""" project = self.project run = self.active_run try: update_run( self.api_config, status=status, ended_at=ended_at, project_id=project.id, run_id=run.id, metrics=metrics, params=params, external_links=external_links, parent_run_id=parent_run_id, ) except ApiError as exc: raise_if_run_error(exc) raise
[docs] @contextmanager def start_run( self, type: RunType | str, name: str | None = None, parent_run_id: str | None = None, custom_type: str | None = None, ): """Context manager to start and finish a run.""" run = self.create_run(type, name, parent_run_id, custom_type) self._active_run = run # Create a new run log for this run self._current_run_log = RunLog( run_id=run.id, run_name=run.name, started_at=run.started_at, project_id=self.project.id, hub_url=self.api_config.base_url, ) status: CompletedRunStatus = RunStatus.FINISHED try: yield run except KeyboardInterrupt: status = RunStatus.KILLED raise except Exception: status = RunStatus.FAILED raise finally: ended_at = datetime.now(UTC) try: self.update_active_run(status=status) finally: # Finalize local history even if the remote status update # fails, so the client state is not left active. if self._current_run_log is not None: self._current_run_log.ended_at = ended_at self._run_log_history.append(self._current_run_log) self._current_run_log = None self._active_run = None
[docs] def log_param( self, name: str, value: str, *, save_to_run_log: bool = True ) -> LoggedParam: """Log a parameter for the current run. :param name: The parameter name. :param value: The parameter value. :param save_to_run_log: Whether to also record this entry in the current :class:`RunLog`. Defaults to ``True``. :return: The :class:`LoggedParam` that was recorded. """ project = self.project active_run = self.active_run log_param( self.api_config, name=name, value=value, project_id=project.id, run_id=active_run.id, ) logged_param = LoggedParam( name=name, value=value, logged_at=datetime.now(UTC), ) # Log to the current run log if save_to_run_log and self._current_run_log is not None: self._current_run_log.add_param(logged_param) return logged_param
[docs] def log_metric( self, name: str, value: float, step: int | None = None, *, save_to_run_log: bool = True, ) -> LoggedMetric: """Log a metric for the current run. :param name: The metric name. :param value: The metric value. :param step: Optional step number. :param save_to_run_log: Whether to also record this entry in the current :class:`RunLog`. Defaults to ``True``. :return: The :class:`LoggedMetric` that was recorded. """ project = self.project active_run = self.active_run log_metric( self.api_config, name=name, value=value, step=step, project_id=project.id, run_id=active_run.id, ) logged_metric = LoggedMetric( name=name, value=value, step=step, logged_at=datetime.now(UTC), ) # Log to the current run log if save_to_run_log and self._current_run_log is not None: self._current_run_log.add_metric(logged_metric) return logged_metric
[docs] def log_batch( self, *, metrics: list[tuple[str, float, int | None]] | None = None, params: list[tuple[str, str]] | None = None, external_links: list[tuple[str, str]] | None = None, save_to_run_log: bool = True, ) -> None: """Log multiple metrics and params in a single API call. This is significantly more efficient than calling :meth:`log_metric` and :meth:`log_param` in a loop, as it sends all data in one PATCH request instead of one POST per item. :param metrics: List of ``(name, value, step)`` tuples. `step` may be ``None`` for metrics without a step. :param params: List of ``(name, value)`` tuples. :param external_links: List of ``(label, url)`` tuples. :param save_to_run_log: Whether to also record entries in the current :class:`RunLog`. Defaults to ``True``. """ api_metrics = [ Metric(name=name, value=value, step=step) for name, value, step in (metrics or []) ] api_params = [ Parameter(name=name, value=value) for name, value in (params or []) ] api_external_links = [ ExternalLink(label=label, url=url) for label, url in (external_links or []) ] if api_metrics or api_params or api_external_links: self.update_active_run( metrics=api_metrics or None, params=api_params or None, external_links=api_external_links or None, ) # Mirror into the local run log if save_to_run_log and self._current_run_log is not None: now = datetime.now(UTC) for m_name, m_value, m_step in metrics or []: self._current_run_log.add_metric( LoggedMetric( name=m_name, value=m_value, step=m_step, logged_at=now ) ) for p_name, p_value in params or []: self._current_run_log.add_param( LoggedParam(name=p_name, value=p_value, logged_at=now) ) for label, url in external_links or []: self._current_run_log.add_external_link( LoggedExternalLink( label=label, url=url, logged_at=now, ) )
[docs] def log_tag(self, name: str, value: str) -> Tag: """Log a tag for the current run.""" project = self.project run_id = self.active_run.id tag = log_tag( self.api_config, name=name, value=value, project_id=project.id, run_id=run_id, ) return tag
[docs] def log_artifact( self, file_path: Path | str | LoggedArtifact, name: str | None = None, file_name: str | None = None, run_id: str | None = None, *, ctx: "HubContext | None" = None, device_name: str | None = None, save_to_run_log: bool = True, ) -> LoggedArtifact: """ Log an artifact file for a run and upload it to artifact storage. Accepts a local path, a remote path, or an existing :class:`LoggedArtifact`. Remote artifacts are fetched from the device and uploaded to the Hub when :attr:`log_remote_artifacts` is ``True`` (the default). Set it to ``False`` to skip remote uploads and speed up execution. If no `name` is given, the artifact is logged as unnamed. If no `file_name` is given, the name from the file path is used. If no `run_id` is given, the current active run is used. :param file_path: Local file path, remote path, or a :class:`LoggedArtifact`. :param name: Optional artifact name. :param file_name: Optional file name override. :param run_id: Optional run ID (defaults to the active run). :param ctx: Hub context, required when `file_path` is remote or a :class:`LoggedArtifact`. :param device_name: Name of the device that owns the remote artifact. Required for remote paths; ignored for :class:`LoggedArtifact` (uses its own device info). :param save_to_run_log: Whether to also record the artifact in the current :class:`RunLog`. :return: The :class:`LoggedArtifact` that was recorded. :raises RuntimeError: If a remote artifact is received without a `ctx`, or a remote path is received without `device_name`. :raises FileTooLargeError: If the file exceeds the maximum allowed upload size. :raises StorageQuotaExceededError: If the storage quota has been exceeded. :raises ArtifactUploadError: If the file upload fails. """ # -- Handle LoggedArtifact input -------------------------------- if isinstance(file_path, LoggedArtifact): return self._log_logged_artifact( file_path, name=name, file_name=file_name, run_id=run_id, ctx=ctx, save_to_run_log=save_to_run_log, ) # -- Handle remote path ----------------------------------------- # RemotePath is PurePosixPath; on Linux, Path is also a # PurePosixPath subclass. Check for concrete Path first so # that local paths are not misidentified as remote. if isinstance(file_path, RemotePath) and not isinstance( file_path, Path ): return self._log_remote_path( file_path, name=name, file_name=file_name, run_id=run_id, ctx=ctx, device_name=device_name, save_to_run_log=save_to_run_log, ) # -- Local path (original behaviour) ---------------------------- return self._upload_and_record( Path(file_path), name=name, file_name=file_name, run_id=run_id, save_to_run_log=save_to_run_log, )
# -- log_artifact private helpers ----------------------------------- def _log_logged_artifact( self, artifact: LoggedArtifact, *, name: str | None, file_name: str | None, run_id: str | None, ctx: "HubContext | None", save_to_run_log: bool, ) -> LoggedArtifact: """Handle ``log_artifact`` when the input is a :class:`LoggedArtifact`.""" if ctx is None: raise RuntimeError( "A 'ctx' (HubContext) is required when logging a " "LoggedArtifact. Pass the current context so the " "artifact's device information can be resolved." ) effective_name = name if name is not None else artifact.name effective_file_name = ( file_name if file_name is not None else artifact.file_name ) if artifact.is_local: return self._upload_and_record( artifact.local(), name=effective_name, file_name=effective_file_name, run_id=run_id, save_to_run_log=save_to_run_log, ) # Remote LoggedArtifact if not self._log_remote_artifacts: logger.debug( "Skipping remote artifact '%s' (log_remote_artifacts=False).", effective_name or effective_file_name, ) return artifact device_log = artifact.device prefix = ( f"remote_{device_log.name}__" if device_log is not None else "remote__" ) prefixed_name = ( f"{prefix}{effective_name}" if effective_name is not None else None ) prefixed_file_name = f"{prefix}{effective_file_name}" local_artifact = artifact.to_local(ctx) return self._upload_and_record( local_artifact.local(), name=prefixed_name, file_name=prefixed_file_name, run_id=run_id, save_to_run_log=save_to_run_log, ) def _log_remote_path( self, file_path: RemotePath, *, name: str | None, file_name: str | None, run_id: str | None, ctx: "HubContext | None", device_name: str | None, save_to_run_log: bool, ) -> LoggedArtifact: """Handle ``log_artifact`` when the input is a :class:`RemotePath`.""" if ctx is None: raise RuntimeError( "A 'ctx' (HubContext) is required when logging a " "remote artifact path. Pass the current context so " "the file can be fetched from the device." ) if device_name is None: raise RuntimeError( "A 'device_name' is required when logging a remote " "artifact path so the correct device can be resolved." ) # Validate device exists in context if device_name not in ctx.devices: available = ", ".join(sorted(ctx.devices.keys())) or "(none)" raise RuntimeError( f"Device '{device_name}' not found in context. " f"Available devices: {available}." ) effective_file_name = ( file_name if file_name is not None else file_path.name ) if not self._log_remote_artifacts: logger.debug( "Skipping remote artifact '%s' (log_remote_artifacts=False).", name or effective_file_name or str(file_path), ) return LoggedArtifact( id="", file_name=effective_file_name, file_size=0, logged_at=datetime.now(UTC), file_path=file_path, name=name, ) prefix = f"remote_{device_name}__" prefixed_name = f"{prefix}{name}" if name is not None else None prefixed_file_name = f"{prefix}{effective_file_name}" # Build a temporary LoggedArtifact to leverage to_local() from embedl_hub._internal.tracking.run_log import ( _resolve_device_by_name, ) device_obj, _ = _resolve_device_by_name(ctx, device_name) temp_artifact = LoggedArtifact( id="", file_name=effective_file_name, file_size=0, logged_at=datetime.now(UTC), file_path=file_path, device=device_obj.create_device_log(device_name), ) local_artifact = temp_artifact.to_local(ctx) return self._upload_and_record( local_artifact.local(), name=prefixed_name, file_name=prefixed_file_name, run_id=run_id, save_to_run_log=save_to_run_log, ) def _upload_and_record( self, file_path: Path, *, name: str | None, file_name: str | None, run_id: str | None, save_to_run_log: bool, ) -> LoggedArtifact: """Upload a local file to the hub and record it in the run log.""" if file_name is None: file_name = file_path.name file_size = file_path.stat().st_size run_id = run_id or self.active_run.id try: artifact = create_artifact( self.api_config, run_id, file_name, file_size ) except ApiError as exc: raise_if_artifact_error(exc, file_path=file_path) raise try: upload_response = create_artifact_upload_url( self.api_config, artifact.id ) if upload_response.upload_mode == "API": upload_file( file_path, urljoin(self.api_config.base_url, upload_response.url), headers={ "Authorization": (f"Bearer {self.api_config.api_key}"), "Content-Type": "application/octet-stream", }, ) else: upload_file_to_gcs(file_path, upload_response.url, file_size) except Exception as exc: update_artifact( self.api_config, artifact.id, ArtifactStatus.FAILED ) raise ArtifactUploadError(file_path=file_path) from exc update_artifact(self.api_config, artifact.id, ArtifactStatus.UPLOADED) logged_artifact = LoggedArtifact( id=artifact.id, file_name=file_name, file_size=file_size, logged_at=datetime.now(UTC), file_path=file_path, name=name, artifact_dir=( self._current_run_log.artifact_dir if self._current_run_log is not None else None ), ) # Log to the current run log if save_to_run_log and self._current_run_log is not None: self._current_run_log.add_artifact(logged_artifact) return logged_artifact
[docs] def get_devices(self) -> list[Device]: """Get the list of supported devices in the Embedl device cloud.""" devices = get_devices(self.api_config) return devices
[docs] def validate_device(self, device: str) -> Device: """Check if the specified device is supported in the Embedl device cloud. :param device: The device name to check. :return: The :class:`Device` if supported. :raises UnsupportedDeviceError: If `device` is not in the supported device list. """ supported_devices = self.get_devices() found_device = None for d in supported_devices: if d.name == device: found_device = d break if found_device is None: raise UnsupportedDeviceError(device) return found_device
@property def api_config(self) -> ApiConfig: """Get or create the API config from environment variables.""" if self._api_config is None: # environment variable takes precedence over context def get_api_key() -> str | None: """Get API key from environment or context.""" if key := os.getenv(API_KEY_ENV_VAR_NAME): return key # TODO: receive api key from CLI context instead of reading from file here? return load_ctx_config().get("api_key") if api_key := get_api_key(): api_base_url = os.getenv( BASE_URL_ENV_VAR_NAME, DEFAULT_API_BASE_URL ) self._api_config = ApiConfig( api_key=api_key, base_url=api_base_url ) else: raise RuntimeError( "No API key found. " f"{API_KEY_ENV_VAR_NAME} must be set as an environment variable or stored in context." ) return self._api_config @property def project(self) -> Project: if self._project is None: raise RuntimeError("Project is not set. Use set_project() first.") return self._project @property def active_run(self) -> Run: if self._active_run is None: raise RuntimeError( "There is no active run. Use start_run() as a context manager first." ) return self._active_run @property def run_history(self) -> RunLogHistory: """Get the run history (read-only). :return: A :class:`RunLogHistory` containing all completed :class:`RunLog` entries. The history cannot be modified. """ return RunLogHistory(self._run_log_history) @property def latest_run_log(self) -> RunLog | None: """Get the most recent completed :class:`RunLog`, or ``None`` if no runs have completed. """ if not self._run_log_history: return None return self._run_log_history[-1] @property def current_run_log(self) -> RunLog | None: """Get the current in-progress :class:`RunLog`, or ``None`` if no run is active. """ return self._current_run_log