# Copyright (C) 2025, 2026 Embedl AB
import logging
import os
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urljoin
from embedl_hub._internal.compat import UTC
from embedl_hub._internal.core.types import RemotePath
if TYPE_CHECKING:
from embedl_hub._internal.core.context import HubContext
from embedl_hub._internal.core.config import load_ctx_config
from embedl_hub._internal.tracking.errors import (
ArtifactUploadError,
UnsupportedDeviceError,
raise_if_artifact_error,
raise_if_run_error,
)
from embedl_hub._internal.tracking.rest_api import (
ApiConfig,
ApiError,
ArtifactStatus,
CompletedRunStatus,
Device,
ExternalLink,
Metric,
Parameter,
Project,
Run,
RunStatus,
RunType,
Tag,
_RemoveParent,
coerce_run_type,
create_artifact,
create_artifact_upload_url,
create_project,
create_run,
get_devices,
get_project_by_name,
log_external_link,
log_metric,
log_param,
log_tag,
update_artifact,
update_run,
upload_file,
upload_file_to_gcs,
)
from embedl_hub._internal.tracking.run_log import (
LoggedArtifact,
LoggedExternalLink,
LoggedMetric,
LoggedParam,
RunLog,
RunLogHistory,
)
API_KEY_ENV_VAR_NAME = "EMBEDL_HUB_API_KEY"
BASE_URL_ENV_VAR_NAME = "EMBEDL_HUB_API_BASE_URL"
DEFAULT_API_BASE_URL = "https://hub.embedl.com/"
logger = logging.getLogger(__name__)
def _resolve_run_type(
type: RunType | str,
custom_type: str | None = None,
) -> tuple[RunType, str | None]:
"""Resolve a run type and optional custom type label.
When ``type`` is a string it is first matched case-insensitively
(after stripping whitespace) against the :class:`RunType` enum.
Matching strings are coerced to the corresponding enum member.
Strings that do not match any known member are treated as custom
type labels and ``RunType.CUSTOM`` is returned.
:param type: A ``RunType`` enum member or a string label. Known
type names are coerced to their enum member regardless of case
(e.g. ``"compile"`` → :attr:`RunType.COMPILE`). Unknown strings
become custom type labels.
:param custom_type: An explicit custom type label (only used when
``type`` is :attr:`RunType.CUSTOM`).
:return: A ``(RunType, custom_type_label)`` tuple.
:raises ValueError: If ``type`` (or its coerced value) is
:attr:`RunType.CUSTOM` but no label is supplied.
"""
if isinstance(type, str):
coerced = coerce_run_type(type)
if isinstance(coerced, RunType):
type = coerced
else:
return RunType.CUSTOM, type
if type is RunType.CUSTOM and not custom_type:
raise ValueError(
"A custom_type label is required when type=RunType.CUSTOM."
)
return type, custom_type if type is RunType.CUSTOM else None
[docs]
class Client:
"""Tracks projects and runs for the Embedl Hub web app."""
_api_config: ApiConfig | None
_project: Project | None
_active_run: Run | None
_current_run_log: RunLog | None
_run_log_history: list[RunLog]
_log_remote_artifacts: bool
def __init__(
self,
api_config: ApiConfig | None = None,
*,
log_remote_artifacts: bool = True,
) -> None:
"""Create a new tracking client.
:param api_config: API configuration. When ``None``, the
configuration is loaded from the environment on first use.
:param log_remote_artifacts: Whether to fetch remote artifacts
from devices and upload them to the Hub when
:meth:`log_artifact` is called with a remote path or a
remote :class:`~embedl_hub._internal.tracking.run_log.LoggedArtifact`.
Defaults to ``True``. Set to ``False`` to skip the
download-and-upload step and speed up execution.
"""
self._api_config = api_config
self._project = None
self._active_run = None
self._current_run_log = None
self._run_log_history = []
self._log_remote_artifacts = log_remote_artifacts
@property
def log_remote_artifacts(self) -> bool:
"""Whether remote artifacts are fetched and uploaded to the Hub.
When ``True`` (the default), calling :meth:`log_artifact` with
a remote path or a remote
:class:`~embedl_hub._internal.tracking.run_log.LoggedArtifact` will
download the file from the device and upload it to the
Embedl Hub.
Set to ``False`` to skip the download-and-upload step and
speed up execution. The artifact is still recorded in the
run log, but its contents are not transferred.
"""
return self._log_remote_artifacts
@log_remote_artifacts.setter
def log_remote_artifacts(self, value: bool) -> None:
self._log_remote_artifacts = value
[docs]
def set_project(self, name: str) -> Project:
"""Set or create the current project by name."""
project = get_project_by_name(self.api_config, name)
if not project:
project = create_project(self.api_config, name)
self._project = project
return project
[docs]
def create_run(
self,
type: RunType | str,
name: str | None = None,
parent_run_id: str | None = None,
custom_type: str | None = None,
) -> Run:
"""Create a new run for the current project."""
resolved_type, resolved_custom_type = _resolve_run_type(
type, custom_type
)
project = self.project
try:
run = create_run(
self.api_config,
type=resolved_type,
name=name,
project_id=project.id,
parent_run_id=parent_run_id,
custom_type=resolved_custom_type,
)
except ApiError as exc:
raise_if_run_error(exc)
raise
return run
[docs]
def update_active_run(
self,
status: CompletedRunStatus | None = None,
ended_at: datetime | None = None,
metrics: list[Metric] | None = None,
params: list[Parameter] | None = None,
external_links: list[ExternalLink] | None = None,
parent_run_id: str | _RemoveParent | None = None,
) -> None:
"""Update the status and end time of the active run."""
project = self.project
run = self.active_run
try:
update_run(
self.api_config,
status=status,
ended_at=ended_at,
project_id=project.id,
run_id=run.id,
metrics=metrics,
params=params,
external_links=external_links,
parent_run_id=parent_run_id,
)
except ApiError as exc:
raise_if_run_error(exc)
raise
[docs]
@contextmanager
def start_run(
self,
type: RunType | str,
name: str | None = None,
parent_run_id: str | None = None,
custom_type: str | None = None,
):
"""Context manager to start and finish a run."""
run = self.create_run(type, name, parent_run_id, custom_type)
self._active_run = run
# Create a new run log for this run
self._current_run_log = RunLog(
run_id=run.id,
run_name=run.name,
started_at=run.started_at,
project_id=self.project.id,
hub_url=self.api_config.base_url,
)
status: CompletedRunStatus = RunStatus.FINISHED
try:
yield run
except KeyboardInterrupt:
status = RunStatus.KILLED
raise
except Exception:
status = RunStatus.FAILED
raise
finally:
ended_at = datetime.now(UTC)
try:
self.update_active_run(status=status)
finally:
# Finalize local history even if the remote status update
# fails, so the client state is not left active.
if self._current_run_log is not None:
self._current_run_log.ended_at = ended_at
self._run_log_history.append(self._current_run_log)
self._current_run_log = None
self._active_run = None
[docs]
def log_param(
self, name: str, value: str, *, save_to_run_log: bool = True
) -> LoggedParam:
"""Log a parameter for the current run.
:param name: The parameter name.
:param value: The parameter value.
:param save_to_run_log: Whether to also record this entry in the
current :class:`RunLog`. Defaults to ``True``.
:return: The :class:`LoggedParam` that was recorded.
"""
project = self.project
active_run = self.active_run
log_param(
self.api_config,
name=name,
value=value,
project_id=project.id,
run_id=active_run.id,
)
logged_param = LoggedParam(
name=name,
value=value,
logged_at=datetime.now(UTC),
)
# Log to the current run log
if save_to_run_log and self._current_run_log is not None:
self._current_run_log.add_param(logged_param)
return logged_param
[docs]
def log_metric(
self,
name: str,
value: float,
step: int | None = None,
*,
save_to_run_log: bool = True,
) -> LoggedMetric:
"""Log a metric for the current run.
:param name: The metric name.
:param value: The metric value.
:param step: Optional step number.
:param save_to_run_log: Whether to also record this entry in the
current :class:`RunLog`. Defaults to ``True``.
:return: The :class:`LoggedMetric` that was recorded.
"""
project = self.project
active_run = self.active_run
log_metric(
self.api_config,
name=name,
value=value,
step=step,
project_id=project.id,
run_id=active_run.id,
)
logged_metric = LoggedMetric(
name=name,
value=value,
step=step,
logged_at=datetime.now(UTC),
)
# Log to the current run log
if save_to_run_log and self._current_run_log is not None:
self._current_run_log.add_metric(logged_metric)
return logged_metric
[docs]
def log_link(
self, label: str, url: str, *, save_to_run_log: bool = True
) -> LoggedExternalLink:
"""Log an external link for the current run."""
project = self.project
active_run = self.active_run
log_external_link(
self.api_config,
label=label,
url=url,
project_id=project.id,
run_id=active_run.id,
)
logged_link = LoggedExternalLink(
label=label,
url=url,
logged_at=datetime.now(UTC),
)
if save_to_run_log and self._current_run_log is not None:
self._current_run_log.add_external_link(logged_link)
return logged_link
[docs]
def log_batch(
self,
*,
metrics: list[tuple[str, float, int | None]] | None = None,
params: list[tuple[str, str]] | None = None,
external_links: list[tuple[str, str]] | None = None,
save_to_run_log: bool = True,
) -> None:
"""Log multiple metrics and params in a single API call.
This is significantly more efficient than calling
:meth:`log_metric` and :meth:`log_param` in a loop, as it
sends all data in one PATCH request instead of one POST per
item.
:param metrics: List of ``(name, value, step)`` tuples.
`step` may be ``None`` for metrics without a step.
:param params: List of ``(name, value)`` tuples.
:param external_links: List of ``(label, url)`` tuples.
:param save_to_run_log: Whether to also record entries in the
current :class:`RunLog`. Defaults to ``True``.
"""
api_metrics = [
Metric(name=name, value=value, step=step)
for name, value, step in (metrics or [])
]
api_params = [
Parameter(name=name, value=value) for name, value in (params or [])
]
api_external_links = [
ExternalLink(label=label, url=url)
for label, url in (external_links or [])
]
if api_metrics or api_params or api_external_links:
self.update_active_run(
metrics=api_metrics or None,
params=api_params or None,
external_links=api_external_links or None,
)
# Mirror into the local run log
if save_to_run_log and self._current_run_log is not None:
now = datetime.now(UTC)
for m_name, m_value, m_step in metrics or []:
self._current_run_log.add_metric(
LoggedMetric(
name=m_name, value=m_value, step=m_step, logged_at=now
)
)
for p_name, p_value in params or []:
self._current_run_log.add_param(
LoggedParam(name=p_name, value=p_value, logged_at=now)
)
for label, url in external_links or []:
self._current_run_log.add_external_link(
LoggedExternalLink(
label=label,
url=url,
logged_at=now,
)
)
[docs]
def log_tag(self, name: str, value: str) -> Tag:
"""Log a tag for the current run."""
project = self.project
run_id = self.active_run.id
tag = log_tag(
self.api_config,
name=name,
value=value,
project_id=project.id,
run_id=run_id,
)
return tag
[docs]
def log_artifact(
self,
file_path: Path | str | LoggedArtifact,
name: str | None = None,
file_name: str | None = None,
run_id: str | None = None,
*,
ctx: "HubContext | None" = None,
device_name: str | None = None,
save_to_run_log: bool = True,
) -> LoggedArtifact:
"""
Log an artifact file for a run and upload it to artifact storage.
Accepts a local path, a remote path, or an existing
:class:`LoggedArtifact`. Remote artifacts are fetched from the
device and uploaded to the Hub when :attr:`log_remote_artifacts`
is ``True`` (the default). Set it to ``False`` to skip remote
uploads and speed up execution.
If no `name` is given, the artifact is logged as unnamed.
If no `file_name` is given, the name from the file path is used.
If no `run_id` is given, the current active run is used.
:param file_path: Local file path, remote path, or a
:class:`LoggedArtifact`.
:param name: Optional artifact name.
:param file_name: Optional file name override.
:param run_id: Optional run ID (defaults to the active run).
:param ctx: Hub context, required when `file_path` is remote or
a :class:`LoggedArtifact`.
:param device_name: Name of the device that owns the remote
artifact. Required for remote paths; ignored for
:class:`LoggedArtifact` (uses its own device info).
:param save_to_run_log: Whether to also record the artifact in
the current :class:`RunLog`.
:return: The :class:`LoggedArtifact` that was recorded.
:raises RuntimeError: If a remote artifact is received without
a `ctx`, or a remote path is received without `device_name`.
:raises FileTooLargeError: If the file exceeds the maximum
allowed upload size.
:raises StorageQuotaExceededError: If the storage quota has been
exceeded.
:raises ArtifactUploadError: If the file upload fails.
"""
# -- Handle LoggedArtifact input --------------------------------
if isinstance(file_path, LoggedArtifact):
return self._log_logged_artifact(
file_path,
name=name,
file_name=file_name,
run_id=run_id,
ctx=ctx,
save_to_run_log=save_to_run_log,
)
# -- Handle remote path -----------------------------------------
# RemotePath is PurePosixPath; on Linux, Path is also a
# PurePosixPath subclass. Check for concrete Path first so
# that local paths are not misidentified as remote.
if isinstance(file_path, RemotePath) and not isinstance(
file_path, Path
):
return self._log_remote_path(
file_path,
name=name,
file_name=file_name,
run_id=run_id,
ctx=ctx,
device_name=device_name,
save_to_run_log=save_to_run_log,
)
# -- Local path (original behaviour) ----------------------------
return self._upload_and_record(
Path(file_path),
name=name,
file_name=file_name,
run_id=run_id,
save_to_run_log=save_to_run_log,
)
# -- log_artifact private helpers -----------------------------------
def _log_logged_artifact(
self,
artifact: LoggedArtifact,
*,
name: str | None,
file_name: str | None,
run_id: str | None,
ctx: "HubContext | None",
save_to_run_log: bool,
) -> LoggedArtifact:
"""Handle ``log_artifact`` when the input is a :class:`LoggedArtifact`."""
if ctx is None:
raise RuntimeError(
"A 'ctx' (HubContext) is required when logging a "
"LoggedArtifact. Pass the current context so the "
"artifact's device information can be resolved."
)
effective_name = name if name is not None else artifact.name
effective_file_name = (
file_name if file_name is not None else artifact.file_name
)
if artifact.is_local:
return self._upload_and_record(
artifact.local(),
name=effective_name,
file_name=effective_file_name,
run_id=run_id,
save_to_run_log=save_to_run_log,
)
# Remote LoggedArtifact
if not self._log_remote_artifacts:
logger.debug(
"Skipping remote artifact '%s' (log_remote_artifacts=False).",
effective_name or effective_file_name,
)
return artifact
device_log = artifact.device
prefix = (
f"remote_{device_log.name}__"
if device_log is not None
else "remote__"
)
prefixed_name = (
f"{prefix}{effective_name}" if effective_name is not None else None
)
prefixed_file_name = f"{prefix}{effective_file_name}"
local_artifact = artifact.to_local(ctx)
return self._upload_and_record(
local_artifact.local(),
name=prefixed_name,
file_name=prefixed_file_name,
run_id=run_id,
save_to_run_log=save_to_run_log,
)
def _log_remote_path(
self,
file_path: RemotePath,
*,
name: str | None,
file_name: str | None,
run_id: str | None,
ctx: "HubContext | None",
device_name: str | None,
save_to_run_log: bool,
) -> LoggedArtifact:
"""Handle ``log_artifact`` when the input is a :class:`RemotePath`."""
if ctx is None:
raise RuntimeError(
"A 'ctx' (HubContext) is required when logging a "
"remote artifact path. Pass the current context so "
"the file can be fetched from the device."
)
if device_name is None:
raise RuntimeError(
"A 'device_name' is required when logging a remote "
"artifact path so the correct device can be resolved."
)
# Validate device exists in context
if device_name not in ctx.devices:
available = ", ".join(sorted(ctx.devices.keys())) or "(none)"
raise RuntimeError(
f"Device '{device_name}' not found in context. "
f"Available devices: {available}."
)
effective_file_name = (
file_name if file_name is not None else file_path.name
)
if not self._log_remote_artifacts:
logger.debug(
"Skipping remote artifact '%s' (log_remote_artifacts=False).",
name or effective_file_name or str(file_path),
)
return LoggedArtifact(
id="",
file_name=effective_file_name,
file_size=0,
logged_at=datetime.now(UTC),
file_path=file_path,
name=name,
)
prefix = f"remote_{device_name}__"
prefixed_name = f"{prefix}{name}" if name is not None else None
prefixed_file_name = f"{prefix}{effective_file_name}"
# Build a temporary LoggedArtifact to leverage to_local()
from embedl_hub._internal.tracking.run_log import (
_resolve_device_by_name,
)
device_obj, _ = _resolve_device_by_name(ctx, device_name)
temp_artifact = LoggedArtifact(
id="",
file_name=effective_file_name,
file_size=0,
logged_at=datetime.now(UTC),
file_path=file_path,
device=device_obj.create_device_log(device_name),
)
local_artifact = temp_artifact.to_local(ctx)
return self._upload_and_record(
local_artifact.local(),
name=prefixed_name,
file_name=prefixed_file_name,
run_id=run_id,
save_to_run_log=save_to_run_log,
)
def _upload_and_record(
self,
file_path: Path,
*,
name: str | None,
file_name: str | None,
run_id: str | None,
save_to_run_log: bool,
) -> LoggedArtifact:
"""Upload a local file to the hub and record it in the run log."""
if file_name is None:
file_name = file_path.name
file_size = file_path.stat().st_size
run_id = run_id or self.active_run.id
try:
artifact = create_artifact(
self.api_config, run_id, file_name, file_size
)
except ApiError as exc:
raise_if_artifact_error(exc, file_path=file_path)
raise
try:
upload_response = create_artifact_upload_url(
self.api_config, artifact.id
)
if upload_response.upload_mode == "API":
upload_file(
file_path,
urljoin(self.api_config.base_url, upload_response.url),
headers={
"Authorization": (f"Bearer {self.api_config.api_key}"),
"Content-Type": "application/octet-stream",
},
)
else:
upload_file_to_gcs(file_path, upload_response.url, file_size)
except Exception as exc:
update_artifact(
self.api_config, artifact.id, ArtifactStatus.FAILED
)
raise ArtifactUploadError(file_path=file_path) from exc
update_artifact(self.api_config, artifact.id, ArtifactStatus.UPLOADED)
logged_artifact = LoggedArtifact(
id=artifact.id,
file_name=file_name,
file_size=file_size,
logged_at=datetime.now(UTC),
file_path=file_path,
name=name,
artifact_dir=(
self._current_run_log.artifact_dir
if self._current_run_log is not None
else None
),
)
# Log to the current run log
if save_to_run_log and self._current_run_log is not None:
self._current_run_log.add_artifact(logged_artifact)
return logged_artifact
[docs]
def get_devices(self) -> list[Device]:
"""Get the list of supported devices in the Embedl device cloud."""
devices = get_devices(self.api_config)
return devices
[docs]
def validate_device(self, device: str) -> Device:
"""Check if the specified device is supported in the Embedl device cloud.
:param device: The device name to check.
:return: The :class:`Device` if supported.
:raises UnsupportedDeviceError: If `device` is not in the
supported device list.
"""
supported_devices = self.get_devices()
found_device = None
for d in supported_devices:
if d.name == device:
found_device = d
break
if found_device is None:
raise UnsupportedDeviceError(device)
return found_device
@property
def api_config(self) -> ApiConfig:
"""Get or create the API config from environment variables."""
if self._api_config is None:
# environment variable takes precedence over context
def get_api_key() -> str | None:
"""Get API key from environment or context."""
if key := os.getenv(API_KEY_ENV_VAR_NAME):
return key
# TODO: receive api key from CLI context instead of reading from file here?
return load_ctx_config().get("api_key")
if api_key := get_api_key():
api_base_url = os.getenv(
BASE_URL_ENV_VAR_NAME, DEFAULT_API_BASE_URL
)
self._api_config = ApiConfig(
api_key=api_key, base_url=api_base_url
)
else:
raise RuntimeError(
"No API key found. "
f"{API_KEY_ENV_VAR_NAME} must be set as an environment variable or stored in context."
)
return self._api_config
@property
def project(self) -> Project:
if self._project is None:
raise RuntimeError("Project is not set. Use set_project() first.")
return self._project
@property
def active_run(self) -> Run:
if self._active_run is None:
raise RuntimeError(
"There is no active run. Use start_run() as a context manager first."
)
return self._active_run
@property
def run_history(self) -> RunLogHistory:
"""Get the run history (read-only).
:return: A :class:`RunLogHistory` containing all completed
:class:`RunLog` entries. The history cannot be modified.
"""
return RunLogHistory(self._run_log_history)
@property
def latest_run_log(self) -> RunLog | None:
"""Get the most recent completed :class:`RunLog`, or ``None`` if no
runs have completed.
"""
if not self._run_log_history:
return None
return self._run_log_history[-1]
@property
def current_run_log(self) -> RunLog | None:
"""Get the current in-progress :class:`RunLog`, or ``None`` if no
run is active.
"""
return self._current_run_log