Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions cloud_pipelines_backend/instrumentation/execution_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Retroactive OTel trace emission for execution lifecycle.

When an ExecutionNode reaches a terminal status, emits a root ``execution``
span covering the full lifetime plus one ``execution.status`` child span per
status entry recorded in the status history. All timestamps are derived from
the history so span durations reflect actual time spent, not when this code
ran.
"""

import datetime
import logging

from opentelemetry import trace

from .. import backend_types_sql as bts

_logger = logging.getLogger(__name__)
_tracer = trace.get_tracer("tangle.orchestrator")

_HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY
_TERMINAL_STATUSES = frozenset(s.value for s in bts.CONTAINER_STATUSES_ENDED)


_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)


def _ns(*, dt: datetime.datetime) -> int:
"""Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK).

Uses integer arithmetic on timedelta components to avoid float64 precision
loss at current Unix timestamps (~1.75e18 ns), where the ULP is ~512 ns.
"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
delta = dt - _EPOCH
return (
delta.days * 86_400 + delta.seconds
) * 1_000_000_000 + delta.microseconds * 1_000


def try_emit_execution_trace(*, execution: bts.ExecutionNode) -> None:
"""Emit a complete execution trace when *execution* reaches a terminal status.

No-op for non-terminal executions. All exceptions are caught and logged so
that tracing failures never affect the surrounding SQLAlchemy commit.
"""
history: list = (execution.extra_data or {}).get(_HISTORY_KEY, [])
if not history or history[-1]["status"] not in _TERMINAL_STATUSES:
return
try:
first_time = datetime.datetime.fromisoformat(history[0]["first_observed_at"])
last_time = datetime.datetime.fromisoformat(history[-1]["first_observed_at"])

root = _tracer.start_span(
"execution",
attributes={"execution.id": execution.id},
start_time=_ns(dt=first_time),
)
root_ctx = trace.set_span_in_context(root)

for i, entry in enumerate(history):
t_start = datetime.datetime.fromisoformat(entry["first_observed_at"])
t_end = (
datetime.datetime.fromisoformat(history[i + 1]["first_observed_at"])
if i + 1 < len(history)
else last_time
)
attrs: dict[str, object] = {
"execution.id": execution.id,
"execution.status": entry["status"],
}
_tracer.start_span(
f"execution.status {entry['status']}",
context=root_ctx,
attributes=attrs,
start_time=_ns(dt=t_start),
).end(end_time=_ns(dt=t_end))

root.end(end_time=_ns(dt=last_time))
except Exception:
_logger.warning(
f"Failed to emit execution trace for {execution.id!r}", exc_info=True
)
2 changes: 2 additions & 0 deletions cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from sqlalchemy import orm

from .. import backend_types_sql
from . import execution_tracing

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -102,3 +103,4 @@ def _handle_before_commit(session: orm.Session) -> None:
exc_info=True,
)
obj._status_changed = False
execution_tracing.try_emit_execution_trace(execution=obj)
4 changes: 4 additions & 0 deletions orchestrator_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from cloud_pipelines_backend import orchestrator_sql
from cloud_pipelines_backend.instrumentation import bugsnag_instrumentation
from cloud_pipelines_backend.instrumentation import opentelemetry as otel
from cloud_pipelines_backend.launchers import kubernetes_launchers
from cloud_pipelines.orchestration.storage_providers import local_storage

Expand All @@ -26,6 +27,7 @@ def _build_launcher():
from cloud_pipelines_backend.launchers.skypilot_launchers import (
SkyPilotKubernetesLauncher,
)

return SkyPilotKubernetesLauncher(
infra=os.environ.get("SKYPILOT_INFRA", "kubernetes"),
pool=os.environ.get("SKYPILOT_POOL"),
Expand All @@ -36,6 +38,7 @@ def _build_launcher():

from kubernetes import config as k8s_config_lib
from kubernetes import client as k8s_client_lib

try:
k8s_config_lib.load_incluster_config()
except Exception:
Expand Down Expand Up @@ -75,6 +78,7 @@ def main():

logger.info("Starting the orchestrator")
bugsnag_instrumentation.setup(service_name="tangle-orchestrator")
otel.setup_providers()

DEFAULT_DATABASE_URI = "sqlite:///db.sqlite"
database_uri = os.environ.get("DATABASE_URI", DEFAULT_DATABASE_URI)
Expand Down
146 changes: 146 additions & 0 deletions tests/instrumentation/test_execution_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Tests for execution lifecycle OTel trace emission."""

import datetime

import pytest
from opentelemetry import trace
from opentelemetry.sdk import trace as otel_sdk_trace
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend.instrumentation import execution_tracing


@pytest.fixture()
def span_exporter(monkeypatch: pytest.MonkeyPatch) -> InMemorySpanExporter:
"""Isolated in-memory span exporter for each test.

Patches ``execution_tracing._tracer`` directly so tests are independent of
global OTel provider state (the module-level ProxyTracer would otherwise
remain bound to the provider from the first test run).
"""
exporter = InMemorySpanExporter()
provider = otel_sdk_trace.TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
monkeypatch.setattr(
execution_tracing, "_tracer", provider.get_tracer("tangle.orchestrator")
)
return exporter


def _make_execution(
*, statuses: list[str], extra: dict | None = None
) -> bts.ExecutionNode:
"""Build an ExecutionNode stub with a pre-populated status history.

Assigns a deterministic ID because OTel drops None-valued attributes and
execution.id is only set by the DB insert_default in production.
"""
history = [
{
"status": s,
"first_observed_at": (
datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
+ datetime.timedelta(minutes=i * 5)
).isoformat(),
}
for i, s in enumerate(statuses)
]
node = bts.ExecutionNode(task_spec={})
node.id = "test-execution-id"
node.extra_data = {
bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history,
**(extra or {}),
}
return node


class TestTryEmitExecutionTrace:
def test_no_spans_for_non_terminal_execution(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "RUNNING"])
execution_tracing.try_emit_execution_trace(execution=execution)
assert span_exporter.get_finished_spans() == ()

def test_no_spans_for_empty_history(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=[])
execution_tracing.try_emit_execution_trace(execution=execution)
assert span_exporter.get_finished_spans() == ()

def test_emits_root_and_child_spans_on_terminal(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

names = {s.name for s in span_exporter.get_finished_spans()}
assert "execution" in names
assert any(n.startswith("execution.status ") for n in names)

def test_child_span_count_matches_history(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

status_spans = [
s
for s in span_exporter.get_finished_spans()
if s.name.startswith("execution.status ")
]
assert len(status_spans) == 3

def test_root_span_has_execution_id_attribute(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

root = next(
s for s in span_exporter.get_finished_spans() if s.name == "execution"
)
assert root.attributes["execution.id"] == execution.id

def test_child_spans_share_trace_id_with_root(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

finished = span_exporter.get_finished_spans()
trace_ids = {s.context.trace_id for s in finished}
assert len(trace_ids) == 1

def test_root_span_duration_matches_history(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "RUNNING", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

root = next(
s for s in span_exporter.get_finished_spans() if s.name == "execution"
)
duration_ns = root.end_time - root.start_time
assert duration_ns == int(
datetime.timedelta(minutes=10).total_seconds() * 1_000_000_000
)

def test_child_span_status_attribute(
self, span_exporter: InMemorySpanExporter
) -> None:
execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"])
execution_tracing.try_emit_execution_trace(execution=execution)

status_spans = [
s
for s in span_exporter.get_finished_spans()
if s.name.startswith("execution.status ")
]
assert {s.name for s in status_spans} == {
"execution.status QUEUED",
"execution.status SUCCEEDED",
}
Loading