From 299fe32198012b129eb5c94274cb4bcf359f8b78 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Mon, 15 Jun 2026 17:39:12 +0000 Subject: [PATCH 1/5] UI: Add deadline callback log visibility Add a callback-logs API endpoint and UI viewer so users can see the logs produced during async (triggerer) and executor deadline callback execution, matching the task-log UX. Includes the callback log reader, the CallbackLogViewer component, and triggerer-side callback log capture/upload. --- .../core_api/datamodels/ui/deadline.py | 2 + .../core_api/openapi/_private_ui.yaml | 102 ++++++++++ .../core_api/routes/ui/deadlines.py | 86 +++++++- .../src/airflow/jobs/triggerer_job_runner.py | 41 +++- .../airflow/ui/openapi-gen/queries/common.ts | 8 + .../ui/openapi-gen/queries/ensureQueryData.ts | 18 ++ .../ui/openapi-gen/queries/prefetch.ts | 18 ++ .../airflow/ui/openapi-gen/queries/queries.ts | 18 ++ .../ui/openapi-gen/queries/suspense.ts | 18 ++ .../ui/openapi-gen/requests/schemas.gen.ts | 23 +++ .../ui/openapi-gen/requests/services.gen.ts | 31 ++- .../ui/openapi-gen/requests/types.gen.ts | 29 +++ .../ui/public/i18n/locales/en/dag.json | 5 + .../ui/src/pages/Run/CallbackLogViewer.tsx | 189 ++++++++++++++++++ .../ui/src/pages/Run/DeadlineStatus.tsx | 59 +----- .../ui/src/pages/Run/DeadlineStatusModal.tsx | 109 ++++------ .../airflow/utils/log/callback_log_reader.py | 182 +++++++++++++++++ .../tests/unit/jobs/test_triggerer_job.py | 94 +++++++++ .../utils/log/test_callback_log_reader.py | 122 +++++++++++ 19 files changed, 1027 insertions(+), 127 deletions(-) create mode 100644 airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx create mode 100644 airflow-core/src/airflow/utils/log/callback_log_reader.py create mode 100644 airflow-core/tests/unit/utils/log/test_callback_log_reader.py diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py index 6f9402f23603d..7d2e8360311a1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py @@ -37,6 +37,8 @@ class DeadlineResponse(BaseModel): dag_run_id: str = Field(validation_alias=AliasPath("dagrun", "run_id")) alert_id: UUID | None = Field(validation_alias="deadline_alert_id", default=None) alert_name: str | None = Field(validation_alias=AliasPath("deadline_alert", "name"), default=None) + callback_id: UUID | None = Field(validation_alias="callback_id", default=None) + callback_state: str | None = Field(validation_alias=AliasPath("callback", "state"), default=None) class DeadlineCollectionResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index d786e259b4e77..2247d4ea34263 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -920,6 +920,60 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /ui/dags/{dag_id}/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs: + get: + tags: + - Deadlines + summary: Get Callback Logs + description: 'Get execution logs for a callback associated with a deadline. + + + Returns the logs produced during callback execution. These logs are uploaded + + to remote storage (or written locally) by the callback supervisor after execution.' + operationId: get_callback_logs + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: callback_id + in: path + required: true + schema: + type: string + format: uuid + title: Callback Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskInstancesLogResponse' + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /ui/structure/structure_data: get: tags: @@ -2580,6 +2634,17 @@ components: - type: string - type: 'null' title: Alert Name + callback_id: + anyOf: + - type: string + format: uuid + - type: 'null' + title: Callback Id + callback_state: + anyOf: + - type: string + - type: 'null' + title: Callback State type: object required: - id @@ -3568,6 +3633,21 @@ components: - nodes title: StructureDataResponse description: Structure Data serializer for responses. + StructuredLogMessage: + properties: + timestamp: + type: string + format: date-time + title: Timestamp + event: + type: string + title: Event + additionalProperties: true + type: object + required: + - event + title: StructuredLogMessage + description: An individual log message. TaskInstanceResponse: properties: id: @@ -3836,6 +3916,28 @@ components: - awaiting_input title: TaskInstanceStateCount description: TaskInstance serializer for responses. + TaskInstancesLogResponse: + properties: + content: + anyOf: + - items: + $ref: '#/components/schemas/StructuredLogMessage' + type: array + - items: + type: string + type: array + title: Content + continuation_token: + anyOf: + - type: string + - type: 'null' + title: Continuation Token + type: object + required: + - content + - continuation_token + title: TaskInstancesLogResponse + description: Log serializer for responses. TeamCollectionResponse: properties: teams: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py index d9cfea10d6d94..671375c12efa6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py @@ -17,11 +17,13 @@ from __future__ import annotations +import os from typing import Annotated +from uuid import UUID from fastapi import Depends, HTTPException, status from sqlalchemy import select -from sqlalchemy.orm import contains_eager, noload +from sqlalchemy.orm import contains_eager, joinedload, noload from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import SessionDep, paginated_select @@ -35,16 +37,19 @@ filter_param_factory, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse from airflow.api_fastapi.core_api.datamodels.ui.deadline import ( DeadlineAlertCollectionResponse, DeadlineCollectionResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import ReadableDagRunsFilterDep, requires_access_dag +from airflow.models.callback import Callback from airflow.models.dagrun import DagRun from airflow.models.deadline import Deadline from airflow.models.deadline_alert import DeadlineAlert from airflow.models.serialized_dag import SerializedDagModel +from airflow.utils.log.callback_log_reader import read_callback_log deadlines_router = AirflowRouter(prefix="/dags/{dag_id}", tags=["Deadlines"]) @@ -106,7 +111,7 @@ def get_deadlines( .options( contains_eager(Deadline.dagrun).options(noload(DagRun.deadlines)), contains_eager(Deadline.deadline_alert), - noload(Deadline.callback), + joinedload(Deadline.callback), ) ) @@ -201,3 +206,80 @@ def get_dag_deadline_alerts( alerts = session.scalars(alerts_select) return DeadlineAlertCollectionResponse(deadline_alerts=alerts, total_entries=total_entries) + + +@deadlines_router.get( + "/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), + dependencies=[ + Depends( + requires_access_dag( + method="GET", + access_entity=DagAccessEntity.TASK_LOGS, + ) + ), + ], + response_model=TaskInstancesLogResponse, + response_model_exclude_unset=True, +) +def get_callback_logs( + dag_id: str, + dag_run_id: str, + callback_id: UUID, + session: SessionDep, +) -> TaskInstancesLogResponse: + """ + Get execution logs for a callback associated with a deadline. + + Returns the logs produced during callback execution. These logs are uploaded + to remote storage (or written locally) by the callback supervisor after execution. + """ + # Sanitize path components to prevent path traversal via URL parameters. + for param_name, param_value in [("dag_id", dag_id), ("dag_run_id", dag_run_id)]: + if os.sep in param_value or "\\" in param_value or ".." in param_value: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Invalid characters in {param_name}", + ) + + # Verify the callback exists + callback = session.scalar(select(Callback).where(Callback.id == callback_id)) + if callback is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Callback with id `{callback_id}` was not found", + ) + + # Verify the dag_run exists with matching dag_id + dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)) + if dag_run is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", + ) + + # Verify the callback actually belongs to this dag_run (via the Deadline relationship) + deadline = session.scalar( + select(Deadline).where(Deadline.callback_id == callback_id, Deadline.dagrun_id == dag_run.id) + ) + if deadline is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Callback `{callback_id}` is not associated with DagRun `{dag_run_id}` of Dag `{dag_id}`", + ) + + try: + log_stream = read_callback_log( + dag_id=dag_id, + run_id=dag_run_id, + callback_id=str(callback_id), + ) + except ValueError: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid callback log path") + + content = list(log_stream) + return TaskInstancesLogResponse.model_construct(content=content, continuation_token=None) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 97fd669204456..98bae2f72dec0 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -391,7 +391,7 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe class TriggerLoggingFactory: log_path: str - ti: RuntimeTI = attrs.field(repr=False) + ti: RuntimeTI | None = attrs.field(default=None, repr=False) bound_logger: WrappedLogger = attrs.field(init=False, repr=False) @@ -428,8 +428,35 @@ def upload_to_remote(self): # Never actually called, nothing to do return + if self.ti is None: + # Callback triggers have no task instance — upload using the path directly. + self._upload_callback_log_to_remote() + return + upload_to_remote(self.bound_logger, self.ti) + def _upload_callback_log_to_remote(self): + """Upload callback trigger logs to remote storage without a task instance.""" + from airflow.sdk.log import load_remote_log_handler, relative_path_from_logger + + handler = load_remote_log_handler() + if not handler: + return + + raw_logger = getattr(self.bound_logger, "_logger") + try: + relative_path = relative_path_from_logger(raw_logger) + except Exception: + return + if not relative_path: + return + + log_relative_path = relative_path.as_posix() + try: + handler.upload(log_relative_path, None) # type: ignore[arg-type] + except Exception: + log.warning("Failed to upload callback trigger logs to remote", log_path=log_relative_path) + def in_process_api_server() -> InProcessExecutionAPI: from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI @@ -778,6 +805,18 @@ def _create_workload( session: Session, ) -> workloads.RunTrigger | None: if trigger.task_instance is None: + # Set up dedicated logging for callback triggers so their output is + # captured to a file that the UI log endpoint can later read. + if trigger.callback: + callback_data = trigger.callback.data or {} + dag_id = callback_data.get("dag_id", "unknown") + run_id = callback_data.get("run_id", "unknown") + callback_id = str(trigger.callback.id) + log_path = f"triggerer_callbacks/{dag_id}/{run_id}/{callback_id}" + self.logger_cache[trigger.id] = TriggerLoggingFactory( + log_path=log_path, + ti=None, + ) return workloads.RunTrigger( id=trigger.id, classpath=trigger.classpath, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index ff01113fe4b7a..b1806e34a5b3e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -948,6 +948,14 @@ export const UseDeadlinesServiceGetDagDeadlineAlertsKeyFn = ({ dagId, limit, off offset?: number; orderBy?: string[]; }, queryKey?: Array) => [useDeadlinesServiceGetDagDeadlineAlertsKey, ...(queryKey ?? [{ dagId, limit, offset, orderBy }])]; +export type DeadlinesServiceGetCallbackLogsDefaultResponse = Awaited>; +export type DeadlinesServiceGetCallbackLogsQueryResult = UseQueryResult; +export const useDeadlinesServiceGetCallbackLogsKey = "DeadlinesServiceGetCallbackLogs"; +export const UseDeadlinesServiceGetCallbackLogsKeyFn = ({ callbackId, dagId, dagRunId }: { + callbackId: string; + dagId: string; + dagRunId: string; +}, queryKey?: Array) => [useDeadlinesServiceGetCallbackLogsKey, ...(queryKey ?? [{ callbackId, dagId, dagRunId }])]; export type StructureServiceStructureDataDefaultResponse = Awaited>; export type StructureServiceStructureDataQueryResult = UseQueryResult; export const useStructureServiceStructureDataKey = "StructureServiceStructureData"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 74a833b2ccfdc..8f79d7a39bb52 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1907,6 +1907,24 @@ export const ensureUseDeadlinesServiceGetDagDeadlineAlertsData = (queryClient: Q orderBy?: string[]; }) => queryClient.ensureQueryData({ queryKey: Common.UseDeadlinesServiceGetDagDeadlineAlertsKeyFn({ dagId, limit, offset, orderBy }), queryFn: () => DeadlinesService.getDagDeadlineAlerts({ dagId, limit, offset, orderBy }) }); /** +* Get Callback Logs +* Get execution logs for a callback associated with a deadline. +* +* Returns the logs produced during callback execution. These logs are uploaded +* to remote storage (or written locally) by the callback supervisor after execution. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.callbackId +* @returns TaskInstancesLogResponse Successful Response +* @throws ApiError +*/ +export const ensureUseDeadlinesServiceGetCallbackLogsData = (queryClient: QueryClient, { callbackId, dagId, dagRunId }: { + callbackId: string; + dagId: string; + dagRunId: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseDeadlinesServiceGetCallbackLogsKeyFn({ callbackId, dagId, dagRunId }), queryFn: () => DeadlinesService.getCallbackLogs({ callbackId, dagId, dagRunId }) }); +/** * Structure Data * Get Structure Data. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 9db58a9676cc6..5a47555c79ff4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1907,6 +1907,24 @@ export const prefetchUseDeadlinesServiceGetDagDeadlineAlerts = (queryClient: Que orderBy?: string[]; }) => queryClient.prefetchQuery({ queryKey: Common.UseDeadlinesServiceGetDagDeadlineAlertsKeyFn({ dagId, limit, offset, orderBy }), queryFn: () => DeadlinesService.getDagDeadlineAlerts({ dagId, limit, offset, orderBy }) }); /** +* Get Callback Logs +* Get execution logs for a callback associated with a deadline. +* +* Returns the logs produced during callback execution. These logs are uploaded +* to remote storage (or written locally) by the callback supervisor after execution. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.callbackId +* @returns TaskInstancesLogResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseDeadlinesServiceGetCallbackLogs = (queryClient: QueryClient, { callbackId, dagId, dagRunId }: { + callbackId: string; + dagId: string; + dagRunId: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseDeadlinesServiceGetCallbackLogsKeyFn({ callbackId, dagId, dagRunId }), queryFn: () => DeadlinesService.getCallbackLogs({ callbackId, dagId, dagRunId }) }); +/** * Structure Data * Get Structure Data. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 9a1dd570953b9..def98c9fc8fbc 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1907,6 +1907,24 @@ export const useDeadlinesServiceGetDagDeadlineAlerts = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDeadlinesServiceGetDagDeadlineAlertsKeyFn({ dagId, limit, offset, orderBy }, queryKey), queryFn: () => DeadlinesService.getDagDeadlineAlerts({ dagId, limit, offset, orderBy }) as TData, ...options }); /** +* Get Callback Logs +* Get execution logs for a callback associated with a deadline. +* +* Returns the logs produced during callback execution. These logs are uploaded +* to remote storage (or written locally) by the callback supervisor after execution. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.callbackId +* @returns TaskInstancesLogResponse Successful Response +* @throws ApiError +*/ +export const useDeadlinesServiceGetCallbackLogs = = unknown[]>({ callbackId, dagId, dagRunId }: { + callbackId: string; + dagId: string; + dagRunId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDeadlinesServiceGetCallbackLogsKeyFn({ callbackId, dagId, dagRunId }, queryKey), queryFn: () => DeadlinesService.getCallbackLogs({ callbackId, dagId, dagRunId }) as TData, ...options }); +/** * Structure Data * Get Structure Data. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 8c4f670717639..d963c79883259 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1907,6 +1907,24 @@ export const useDeadlinesServiceGetDagDeadlineAlertsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDeadlinesServiceGetDagDeadlineAlertsKeyFn({ dagId, limit, offset, orderBy }, queryKey), queryFn: () => DeadlinesService.getDagDeadlineAlerts({ dagId, limit, offset, orderBy }) as TData, ...options }); /** +* Get Callback Logs +* Get execution logs for a callback associated with a deadline. +* +* Returns the logs produced during callback execution. These logs are uploaded +* to remote storage (or written locally) by the callback supervisor after execution. +* @param data The data for the request. +* @param data.dagId +* @param data.dagRunId +* @param data.callbackId +* @returns TaskInstancesLogResponse Successful Response +* @throws ApiError +*/ +export const useDeadlinesServiceGetCallbackLogsSuspense = = unknown[]>({ callbackId, dagId, dagRunId }: { + callbackId: string; + dagId: string; + dagRunId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDeadlinesServiceGetCallbackLogsKeyFn({ callbackId, dagId, dagRunId }, queryKey), queryFn: () => DeadlinesService.getCallbackLogs({ callbackId, dagId, dagRunId }) as TData, ...options }); +/** * Structure Data * Get Structure Data. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 413dbef3d7e42..54df61ffd0401 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -9073,6 +9073,29 @@ export const $DeadlineResponse = { } ], title: 'Alert Name' + }, + callback_id: { + anyOf: [ + { + type: 'string', + format: 'uuid' + }, + { + type: 'null' + } + ], + title: 'Callback Id' + }, + callback_state: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Callback State' } }, type: 'object', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 63587d21d0e2a..84495e5038ec2 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse2, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionTestData, GetConnectionTestResponse, EnqueueConnectionTestData, EnqueueConnectionTestResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, ClearDagRunsData, ClearDagRunsResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStateStoreData, ListAssetStateStoreResponse, ClearAssetStateStoreData, ClearAssetStateStoreResponse, GetAssetStateStoreData, GetAssetStateStoreResponse, SetAssetStateStoreData, SetAssetStateStoreResponse, DeleteAssetStateStoreData, DeleteAssetStateStoreResponse, ListTaskStateStoreData, ListTaskStateStoreResponse, ClearTaskStateStoreData, ClearTaskStateStoreResponse, GetTaskStateStoreData, GetTaskStateStoreResponse, SetTaskStateStoreData, SetTaskStateStoreResponse, PatchTaskStateStoreData, PatchTaskStateStoreResponse, DeleteTaskStateStoreData, DeleteTaskStateStoreResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse2, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionTestData, GetConnectionTestResponse, EnqueueConnectionTestData, EnqueueConnectionTestResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, ClearDagRunsData, ClearDagRunsResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStateStoreData, ListAssetStateStoreResponse, ClearAssetStateStoreData, ClearAssetStateStoreResponse, GetAssetStateStoreData, GetAssetStateStoreResponse, SetAssetStateStoreData, SetAssetStateStoreResponse, DeleteAssetStateStoreData, DeleteAssetStateStoreResponse, ListTaskStateStoreData, ListTaskStateStoreResponse, ClearTaskStateStoreData, ClearTaskStateStoreResponse, GetTaskStateStoreData, GetTaskStateStoreResponse, SetTaskStateStoreData, SetTaskStateStoreResponse, PatchTaskStateStoreData, PatchTaskStateStoreResponse, DeleteTaskStateStoreData, DeleteTaskStateStoreResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, GetCallbackLogsData, GetCallbackLogsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -4831,6 +4831,35 @@ export class DeadlinesService { }); } + /** + * Get Callback Logs + * Get execution logs for a callback associated with a deadline. + * + * Returns the logs produced during callback execution. These logs are uploaded + * to remote storage (or written locally) by the callback supervisor after execution. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.callbackId + * @returns TaskInstancesLogResponse Successful Response + * @throws ApiError + */ + public static getCallbackLogs(data: GetCallbackLogsData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/ui/dags/{dag_id}/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs', + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + callback_id: data.callbackId + }, + errors: { + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + } export class StructureService { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 31f716d85f885..260cb6622d045 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2286,6 +2286,8 @@ export type DeadlineResponse = { dag_run_id: string; alert_id?: string | null; alert_name?: string | null; + callback_id?: string | null; + callback_state?: string | null; }; /** @@ -4460,6 +4462,14 @@ export type GetDagDeadlineAlertsData = { export type GetDagDeadlineAlertsResponse = DeadlineAlertCollectionResponse; +export type GetCallbackLogsData = { + callbackId: string; + dagId: string; + dagRunId: string; +}; + +export type GetCallbackLogsResponse = TaskInstancesLogResponse; + export type StructureDataData = { dagId: string; depth?: number | null; @@ -8243,6 +8253,25 @@ export type $OpenApiTs = { }; }; }; + '/ui/dags/{dag_id}/dagRuns/{dag_run_id}/callbacks/{callback_id}/logs': { + get: { + req: GetCallbackLogsData; + res: { + /** + * Successful Response + */ + 200: TaskInstancesLogResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/ui/structure/structure_data': { get: { req: StructureDataData; diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json index b51add8d5ec09..93f357ece9527 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json @@ -34,6 +34,11 @@ "wednesday": "Wed" } }, + "callbackLogs": { + "noLogs": "No logs available for this callback.", + "title": "Callback Logs", + "viewLogs": "Callback Logs" + }, "code": { "bundleUrl": "Bundle Url", "noCode": "No Code Found", diff --git a/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx b/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx new file mode 100644 index 0000000000000..33467bf58fbe4 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx @@ -0,0 +1,189 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, Box, Button, Heading, HStack, Text } from "@chakra-ui/react"; +import type { TFunction } from "i18next"; +import { useMemo, useState } from "react"; +import { useTranslation } from "react-i18next"; +import { FiFileText } from "react-icons/fi"; +import innerText from "react-innertext"; + +import { useDeadlinesServiceGetCallbackLogs } from "openapi/queries"; +import type { TaskInstancesLogResponse } from "openapi/requests/types.gen"; +import { renderStructuredLog } from "src/components/renderStructuredLog"; +import { Dialog } from "src/components/ui"; +import { TaskLogContent } from "src/pages/TaskInstance/Logs/TaskLogContent"; +import type { ParsedLogEntry } from "src/queries/useLogs"; +import { parseStreamingLogContent } from "src/utils/logs"; + +type CallbackLogViewerProps = { + readonly callbackId: string; + readonly callbackState?: string | null; + readonly dagId: string; + readonly dagRunId: string; +}; + +const stateColorMap: Record = { + failed: "red", + running: "blue", + success: "green", +}; + +/** + * Parse callback log data using the same structured log rendering pipeline + * as the task instance logs, providing consistent formatting, grouping, and display. + */ +const parseCallbackLogs = ( + data: TaskInstancesLogResponse["content"], + translate: TFunction, +): Array => { + let lineNumber = 0; + const lineNumbers = data.map((datum) => { + const text = typeof datum === "string" ? datum : datum.event; + + if (text.includes("::group::") || text.includes("::endgroup::")) { + return undefined; + } + const current = lineNumber; + + lineNumber += 1; + + return current; + }); + + const parsedLines = data + .map((datum, index) => + renderStructuredLog({ + index: lineNumbers[index] ?? index, + logLink: "", + logMessage: datum, + renderingMode: "jsx", + showSource: false, + showTimestamp: true, + translate, + }), + ) + .filter((parsedLine) => parsedLine !== ""); + + // Process group markers (::group:: / ::endgroup::) into structured entries + type Group = { id: number; level: number; name: string }; + const groupStack: Array = []; + const result: Array = []; + let nextGroupId = 0; + + for (const line of parsedLines) { + const text = innerText(line); + + if (text.includes("::group::")) { + const groupName = text.split("::group::")[1] as string; + const id = nextGroupId; + + nextGroupId += 1; + const level = groupStack.length; + const parentGroup = groupStack[groupStack.length - 1]; + + groupStack.push({ id, level, name: groupName }); + result.push({ + element: groupName, + group: { id, level, parentId: parentGroup?.id, type: "header" }, + }); + } else if (text.includes("::endgroup::")) { + groupStack.pop(); + } else { + const currentGroup = groupStack[groupStack.length - 1]; + + if (groupStack.length > 0 && currentGroup) { + result.push({ + element: line, + group: { id: currentGroup.id, level: currentGroup.level, type: "line" }, + }); + } else { + result.push({ element: line }); + } + } + } + + return result; +}; + +export const CallbackLogViewer = ({ callbackId, callbackState, dagId, dagRunId }: CallbackLogViewerProps) => { + const { t: translate } = useTranslation(["dag", "common"]); + const [isOpen, setIsOpen] = useState(false); + + const { data, error, isLoading } = useDeadlinesServiceGetCallbackLogs( + { + callbackId, + dagId, + dagRunId, + }, + undefined, + { enabled: isOpen }, + ); + + const parsedLogs = useMemo(() => { + const content = parseStreamingLogContent(data); + + if (content.length === 0) { + return []; + } + + return parseCallbackLogs(content, translate); + }, [data, translate]); + + return ( + <> + + setIsOpen(false)} open={isOpen} scrollBehavior="inside" size="xl"> + + + + {translate("dag:callbackLogs.title")} + {callbackState !== undefined && callbackState !== null ? ( + + {callbackState} + + ) : undefined} + + + + + {!isLoading && parsedLogs.length === 0 && error === undefined ? ( + + {translate("dag:callbackLogs.noLogs")} + + ) : ( + + + + )} + + + + + ); +}; diff --git a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx index eff38d4a78e71..ebaebb275c15c 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx @@ -25,10 +25,9 @@ import { useTranslation } from "react-i18next"; import { FiAlertTriangle, FiCheck, FiClock } from "react-icons/fi"; import { useDeadlinesServiceGetDagDeadlineAlerts, useDeadlinesServiceGetDeadlines } from "openapi/queries"; -import type { DeadlineAlertResponse } from "openapi/requests/types.gen"; import Time from "src/components/Time"; import { Tooltip } from "src/components/ui/Tooltip"; -import { renderDuration } from "src/utils/datetimeUtils"; +import { CallbackLogViewer } from "src/pages/Run/CallbackLogViewer"; import { DeadlineStatusModal } from "./DeadlineStatusModal"; @@ -58,12 +57,6 @@ export const DeadlineStatus = ({ dagId, dagRunId, endDate }: DeadlineStatusProps limit: 100, }); - const alertMap = new Map(); - - for (const deadlineAlert of alertData?.deadline_alerts ?? []) { - alertMap.set(deadlineAlert.id, deadlineAlert); - } - if (isLoadingDeadlines || isLoadingAlerts) { return undefined; } @@ -137,7 +130,6 @@ export const DeadlineStatus = ({ dagId, dagRunId, endDate }: DeadlineStatusProps setIsModalOpen(false)} @@ -148,53 +140,29 @@ export const DeadlineStatus = ({ dagId, dagRunId, endDate }: DeadlineStatusProps ); } - // Single deadline — show inline with Expected / Actual dates and precise duration. + // Single deadline — show inline with Expected / Actual times. const [dl] = deadlines; if (dl === undefined) { return undefined; } - const alert = dl.alert_id !== undefined && dl.alert_id !== null ? alertMap.get(dl.alert_id) : undefined; - const deadlineTime = dayjs(dl.deadline_time); - - let actualDurationLabel: string | undefined; - - if (dl.missed && runEndDate !== undefined) { - const diff = dayjs(runEndDate).diff(deadlineTime); - const dur = renderDuration(Math.abs(diff) / 1000, false); - - if (dur !== undefined) { - actualDurationLabel = - diff >= 0 - ? translate("deadlineStatus.finishedLate", { duration: dur }) - : translate("deadlineStatus.finishedEarly", { duration: dur }); - } - } - return ( - + {dl.missed ? : } {translate(dl.missed ? "deadlineStatus.missed" : "deadlineStatus.upcoming")} - {Boolean(dl.alert_name) && ( - - ({dl.alert_name}) - - )} + {dl.callback_id !== undefined && dl.callback_id !== null ? ( + + ) : undefined} - {alert === undefined ? undefined : ( - - {translate("deadlineAlerts.completionRule", { - interval: dayjs.duration(alert.interval, "seconds").humanize(), - reference: translate(`deadlineAlerts.referenceType.${alert.reference_type}`, { - defaultValue: alert.reference_type, - }), - })} - - )} {translate("deadlineStatus.expected")}: @@ -213,11 +181,6 @@ export const DeadlineStatus = ({ dagId, dagRunId, endDate }: DeadlineStatusProps - {actualDurationLabel === undefined ? undefined : ( - - {actualDurationLabel} - - )} ); }; diff --git a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx index 968f1210d9e82..d5dfd386458de 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx @@ -17,28 +17,20 @@ * under the License. */ import { Badge, Heading, HStack, Separator, Skeleton, Text, VStack } from "@chakra-ui/react"; -import dayjs from "dayjs"; -import duration from "dayjs/plugin/duration"; -import relativeTime from "dayjs/plugin/relativeTime"; import { useState } from "react"; import { useTranslation } from "react-i18next"; import { FiAlertTriangle, FiClock } from "react-icons/fi"; import { useDeadlinesServiceGetDeadlines } from "openapi/queries"; -import type { DeadlineAlertResponse } from "openapi/requests/types.gen"; import { ErrorAlert } from "src/components/ErrorAlert"; import Time from "src/components/Time"; import { Dialog } from "src/components/ui"; import { Pagination } from "src/components/ui/Pagination"; -import { renderDuration } from "src/utils/datetimeUtils"; - -dayjs.extend(duration); -dayjs.extend(relativeTime); +import { CallbackLogViewer } from "src/pages/Run/CallbackLogViewer"; const PAGE_LIMIT = 10; type DeadlineStatusModalProps = { - readonly alertMap: Map; readonly dagId: string; readonly dagRunId: string; readonly onClose: () => void; @@ -47,7 +39,6 @@ type DeadlineStatusModalProps = { }; export const DeadlineStatusModal = ({ - alertMap, dagId, dagRunId, onClose, @@ -96,74 +87,42 @@ export const DeadlineStatusModal = ({ ) : ( }> - {deadlines.map((dl) => { - const alert = - dl.alert_id !== undefined && dl.alert_id !== null ? alertMap.get(dl.alert_id) : undefined; - const deadlineTime = dayjs(dl.deadline_time); - - let actualDurationLabel: string | undefined; - - if (dl.missed && runEndDate !== undefined) { - const diff = dayjs(runEndDate).diff(deadlineTime); - const dur = renderDuration(Math.abs(diff) / 1000, false); - - if (dur !== undefined) { - actualDurationLabel = - diff >= 0 - ? translate("deadlineStatus.finishedLate", { duration: dur }) - : translate("deadlineStatus.finishedEarly", { duration: dur }); - } - } - - return ( - - - - {dl.missed ? : } - {translate(dl.missed ? "deadlineStatus.missed" : "deadlineStatus.upcoming")} - - {Boolean(dl.alert_name) && ( - - {dl.alert_name} - - )} - - {alert === undefined ? undefined : ( - - {translate("deadlineAlerts.completionRule", { - interval: dayjs.duration(alert.interval, "seconds").humanize(), - reference: translate(`deadlineAlerts.referenceType.${alert.reference_type}`, { - defaultValue: alert.reference_type, - }), - })} - - )} - - - {translate("deadlineStatus.expected")}: - - - + {deadlines.map((dl) => ( + + + + {dl.missed ? : } + {translate(dl.missed ? "deadlineStatus.missed" : "deadlineStatus.upcoming")} + + {dl.callback_id !== undefined && dl.callback_id !== null ? ( + + ) : undefined} + + + + {translate("deadlineStatus.expected")}: + + + + + {translate("deadlineStatus.actual")}: + + {runEndDate === undefined ? ( - {translate("deadlineStatus.actual")}: - - {runEndDate === undefined ? ( - - {translate("deadlineStatus.stillRunning")} - - ) : ( - - {actualDurationLabel === undefined ? undefined : ( - - {actualDurationLabel} + {translate("deadlineStatus.stillRunning")} + ) : ( + - ); - })} + + + ))} )} diff --git a/airflow-core/src/airflow/utils/log/callback_log_reader.py b/airflow-core/src/airflow/utils/log/callback_log_reader.py new file mode 100644 index 0000000000000..7b7beaf3aeb2b --- /dev/null +++ b/airflow-core/src/airflow/utils/log/callback_log_reader.py @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Reader for callback execution logs stored in remote or local storage.""" + +from __future__ import annotations + +import logging +import os +from collections.abc import Generator +from contextlib import suppress +from pathlib import Path +from typing import TYPE_CHECKING + +from airflow.configuration import conf +from airflow.utils.log.file_task_handler import ( + StructuredLogMessage, + _interleave_logs, + _stream_lines_by_chunk, +) + +if TYPE_CHECKING: + from airflow._shared.logging.remote import LogSourceInfo, RawLogStream + +logger = logging.getLogger(__name__) + + +def _get_callback_log_relative_paths(dag_id: str, run_id: str, callback_id: str) -> list[str]: + """ + Construct the relative log paths for a callback execution. + + Returns paths for both executor callbacks (sync) and triggerer callbacks (async). + The executor path matches the format used in ExecuteCallback.make(): + executor_callbacks/{dag_id}/{run_id}/{callback_id} + The triggerer path matches what TriggerLoggingFactory writes for callback triggers: + triggerer_callbacks/{dag_id}/{run_id}/{callback_id} + """ + return [ + f"executor_callbacks/{dag_id}/{run_id}/{callback_id}", + f"triggerer_callbacks/{dag_id}/{run_id}/{callback_id}", + ] + + +def read_callback_log( + dag_id: str, + run_id: str, + callback_id: str, +) -> Generator[StructuredLogMessage, None, None]: + """ + Read callback logs from remote and/or local storage. + + Tries both executor_callbacks and triggerer_callbacks paths. + For each path, tries remote storage first (if configured), then falls back to local filesystem. + Returns a generator of StructuredLogMessage objects suitable for the API response. + + :param dag_id: The Dag ID associated with the callback. + :param run_id: The Dag run ID associated with the callback. + :param callback_id: The unique callback identifier. + :return: Generator of StructuredLogMessage objects. + """ + relative_paths = _get_callback_log_relative_paths(dag_id, run_id, callback_id) + + sources: LogSourceInfo = [] + remote_logs: list[RawLogStream] = [] + local_logs: list[RawLogStream] = [] + + for relative_path in relative_paths: + # Try remote storage first + with suppress(Exception): + remote_sources, remote_log_streams = _read_callback_remote_logs(relative_path) + if remote_log_streams: + sources.extend(remote_sources) + remote_logs.extend(remote_log_streams) + + # Try local filesystem + if not remote_logs: + local_sources, local_log_streams = _read_callback_local_logs(relative_path) + if local_log_streams: + sources.extend(local_sources) + local_logs.extend(local_log_streams) + + # If we found logs at this path, no need to check the next path + if remote_logs or local_logs: + break + + if not remote_logs and not local_logs: + yield StructuredLogMessage(event="No callback logs found.", timestamp=None) + return + + # Emit source information header + yield StructuredLogMessage(event="::group::Log message source details", sources=sources) # type: ignore[call-arg] + yield StructuredLogMessage(event="::endgroup::") + + # Interleave and yield all log streams + log_stream = _interleave_logs(*remote_logs, *local_logs) + yield from log_stream + + +def _read_callback_remote_logs( + relative_path: str, +) -> tuple[list[str], list[RawLogStream]]: + """Read callback logs from the configured remote log storage.""" + from airflow.logging_config import get_remote_task_log + + remote_io = get_remote_task_log() + if remote_io is None: + return [], [] + + # RemoteLogIO.read() takes (relative_path, ti) -- for S3 the ti is not used, + # for CloudWatch it uses ti.end_date (with getattr fallback to None). + # We pass None since callbacks don't have a TaskInstance. + if stream_method := getattr(remote_io, "stream", None): + sources, logs = stream_method(relative_path, None) + return sources, logs or [] + + sources, logs = remote_io.read(relative_path, None) # type: ignore[arg-type] + if not logs: + return sources, [] + + # Convert legacy string logs to stream format + from airflow.utils.log.file_task_handler import _get_compatible_log_stream + + return sources, [_get_compatible_log_stream(logs)] + + +def _validate_path_component(component: str) -> str: + """Validate and return a path component, raising ValueError if unsafe.""" + import re + + if component in (".", "..") or not re.fullmatch(r"[A-Za-z0-9._:+\-~@]+", component): + raise ValueError(f"Invalid path component: {component!r}") + return component + + +def _read_callback_local_logs( + relative_path: str, +) -> tuple[list[str], list[RawLogStream]]: + """Read callback logs from the local filesystem.""" + base_log_folder = os.path.realpath(conf.get("logging", "base_log_folder")) + + parts = relative_path.split("/") + safe_parts = [_validate_path_component(p) for p in parts if p] + safe_relative = os.path.join(*safe_parts) + + log_path = Path(base_log_folder) / safe_relative + + sources: list[str] = [] + log_streams: list[RawLogStream] = [] + + paths = sorted(log_path.parent.glob(log_path.name + "*")) + if not paths: + return sources, log_streams + + for path in paths: + resolved_path = os.path.realpath(path) + try: + if os.path.commonpath([base_log_folder, resolved_path]) != base_log_folder: + continue + except ValueError: + continue + + try: + log_stream = _stream_lines_by_chunk(open(resolved_path, encoding="utf-8")) + except OSError: + continue + sources.append(os.fspath(path)) + log_streams.append(log_stream) + + return sources, log_streams diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 923779df0034c..bf9379578dbd4 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -705,6 +705,100 @@ def test_trigger_logger_fd_closed_when_upload_to_remote_raises(jobless_superviso assert 42 not in jobless_supervisor.running_triggers +def test_trigger_logging_factory_accepts_none_ti(): + """TriggerLoggingFactory can be created with ti=None for callback triggers.""" + factory = TriggerLoggingFactory(log_path="/tmp/test_callback.log", ti=None) + assert factory.ti is None + assert factory.log_path == "/tmp/test_callback.log" + + +def test_trigger_logging_factory_upload_skips_when_no_bound_logger(): + """upload_to_remote is a no-op when bound_logger was never created (ti=None case).""" + factory = TriggerLoggingFactory(log_path="/tmp/test_callback.log", ti=None) + # Should not raise — early return because bound_logger is not set + factory.upload_to_remote() + + +def test_trigger_logging_factory_upload_callback_log_to_remote(): + """When ti=None and bound_logger exists, _upload_callback_log_to_remote is called.""" + factory = TriggerLoggingFactory(log_path="/tmp/test_callback.log", ti=None) + # Simulate that bound_logger has been created + mock_logger = MagicMock() + factory.bound_logger = mock_logger + + with patch( + "airflow.jobs.triggerer_job_runner.TriggerLoggingFactory._upload_callback_log_to_remote" + ) as mock_upload: + factory.upload_to_remote() + mock_upload.assert_called_once() + + +def test_create_workload_sets_up_logger_cache_for_callback_triggers(session): + """_create_workload populates logger_cache when trigger has a callback but no task_instance.""" + from airflow.jobs.job import Job + + trigger_orm = MagicMock() + trigger_orm.task_instance = None + trigger_orm.id = 99 + trigger_orm.classpath = "airflow.triggers.callback.CallbackTrigger" + trigger_orm.encrypted_kwargs = "{}" + + # Simulate a callback with data containing dag_id and run_id + mock_callback = MagicMock() + mock_callback.data = {"dag_id": "test_dag", "run_id": "manual__2024-01-01"} + mock_callback.id = uuid.UUID("12345678-1234-5678-1234-567812345678") + trigger_orm.callback = mock_callback + + supervisor = TriggerRunnerSupervisor.start(job=Job(id=123), capacity=10) + try: + workload = supervisor._create_workload( + trigger=trigger_orm, + dag_bag=MagicMock(), + render_log_fname=MagicMock(), + session=session, + ) + + # Workload should be returned + assert workload is not None + assert workload.id == 99 + + # Logger cache should be populated for the trigger + assert 99 in supervisor.logger_cache + factory = supervisor.logger_cache[99] + expected_path = "triggerer_callbacks/test_dag/manual__2024-01-01/12345678-1234-5678-1234-567812345678" + assert factory.log_path == expected_path + assert factory.ti is None + finally: + supervisor.kill(force=False) + + +def test_create_workload_no_logger_cache_for_non_callback_triggers(session): + """_create_workload does NOT populate logger_cache when trigger has no callback and no task_instance.""" + from airflow.jobs.job import Job + + trigger_orm = MagicMock() + trigger_orm.task_instance = None + trigger_orm.id = 100 + trigger_orm.classpath = "airflow.triggers.temporal.DateTimeTrigger" + trigger_orm.encrypted_kwargs = "{}" + trigger_orm.callback = None + + supervisor = TriggerRunnerSupervisor.start(job=Job(id=123), capacity=10) + try: + workload = supervisor._create_workload( + trigger=trigger_orm, + dag_bag=MagicMock(), + render_log_fname=MagicMock(), + session=session, + ) + + assert workload is not None + # No logger_cache entry for non-callback triggers without task_instance + assert 100 not in supervisor.logger_cache + finally: + supervisor.kill(force=False) + + class TestTriggerRunner: def test_blocked_main_thread_warning_threshold_decode(self) -> None: with conf_vars({("triggerer", "blocked_main_thread_warning_threshold"): "0.5"}): diff --git a/airflow-core/tests/unit/utils/log/test_callback_log_reader.py b/airflow-core/tests/unit/utils/log/test_callback_log_reader.py new file mode 100644 index 0000000000000..3f04b37466669 --- /dev/null +++ b/airflow-core/tests/unit/utils/log/test_callback_log_reader.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +import tempfile +from unittest.mock import patch + +from airflow.utils.log.callback_log_reader import ( + _get_callback_log_relative_paths, + read_callback_log, +) + + +class TestGetCallbackLogRelativePaths: + def test_returns_both_executor_and_triggerer_paths(self): + paths = _get_callback_log_relative_paths("my_dag", "run_123", "cb_456") + assert paths == [ + "executor_callbacks/my_dag/run_123/cb_456", + "triggerer_callbacks/my_dag/run_123/cb_456", + ] + + def test_path_components_preserved(self): + paths = _get_callback_log_relative_paths("dag-with-dashes", "manual__2024-01-01", "abc123") + assert "dag-with-dashes" in paths[0] + assert "manual__2024-01-01" in paths[0] + assert "abc123" in paths[0] + assert paths[1] == "triggerer_callbacks/dag-with-dashes/manual__2024-01-01/abc123" + + +class TestReadCallbackLog: + def test_no_logs_found_yields_message(self): + """When no logs exist at either path, a 'No callback logs found.' message is yielded.""" + with tempfile.TemporaryDirectory() as tmpdir: + with patch("airflow.utils.log.callback_log_reader.conf") as mock_conf: + mock_conf.get.return_value = tmpdir + msgs = list(read_callback_log("dag1", "run1", "cb1")) + assert len(msgs) == 1 + assert msgs[0].event == "No callback logs found." + + def test_reads_executor_callback_logs(self): + """Logs at the executor_callbacks path are found and returned.""" + with tempfile.TemporaryDirectory() as tmpdir: + # The local reader looks for files matching the last path component as a + # filename (with potential suffixes) in its parent directory. + # relative_path = "executor_callbacks/dag1/run1/cb1" + # It globs base_log_folder/executor_callbacks/dag1/run1/cb1* + log_dir = os.path.join(tmpdir, "executor_callbacks", "dag1", "run1") + os.makedirs(log_dir) + log_file = os.path.join(log_dir, "cb1") + with open(log_file, "w") as f: + f.write("executor log line 1\n") + + with patch("airflow.utils.log.callback_log_reader.conf") as mock_conf: + mock_conf.get.return_value = tmpdir + # Suppress remote log attempts + with patch( + "airflow.utils.log.callback_log_reader._read_callback_remote_logs", + return_value=([], []), + ): + msgs = list(read_callback_log("dag1", "run1", "cb1")) + + # Should have source header + log content (not "No callback logs found.") + assert not any(m.event == "No callback logs found." for m in msgs) + + def test_reads_triggerer_callback_logs(self): + """Logs at the triggerer_callbacks path are found when executor path is empty.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Only create triggerer path, NOT executor path + log_dir = os.path.join(tmpdir, "triggerer_callbacks", "dag1", "run1") + os.makedirs(log_dir) + log_file = os.path.join(log_dir, "cb1") + with open(log_file, "w") as f: + f.write("triggerer log line 1\n") + + with patch("airflow.utils.log.callback_log_reader.conf") as mock_conf: + mock_conf.get.return_value = tmpdir + with patch( + "airflow.utils.log.callback_log_reader._read_callback_remote_logs", + return_value=([], []), + ): + msgs = list(read_callback_log("dag1", "run1", "cb1")) + + # Should have found logs (not the "no logs" message) + assert not any(m.event == "No callback logs found." for m in msgs) + + def test_executor_path_preferred_over_triggerer(self): + """When logs exist at both paths, executor_callbacks is returned (first match wins).""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create both paths with log files + for prefix in ("executor_callbacks", "triggerer_callbacks"): + log_dir = os.path.join(tmpdir, prefix, "dag1", "run1") + os.makedirs(log_dir) + log_file = os.path.join(log_dir, "cb1") + with open(log_file, "w") as f: + f.write(f"{prefix} log line\n") + + with patch("airflow.utils.log.callback_log_reader.conf") as mock_conf: + mock_conf.get.return_value = tmpdir + with patch( + "airflow.utils.log.callback_log_reader._read_callback_remote_logs", + return_value=([], []), + ): + msgs = list(read_callback_log("dag1", "run1", "cb1")) + + # We can't easily check which path was used from the messages alone, but we know + # the function returns after the first successful path. No "No callback logs" message. + assert not any(m.event == "No callback logs found." for m in msgs) From 15fa4d4e679bbcffd4bf5c64aaa223783b04dbde Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Fri, 19 Jun 2026 21:38:44 +0000 Subject: [PATCH 2/5] UI: use StateBadge for callback state, type callbackState as TaskInstanceState Address review feedback on the deadline callback-log viewer: - Replace the hand-rolled stateColorMap + Badge with the shared StateBadge component (maps state to colorPalette and adds the state icon), matching the usage in Run/Details. - Type the callbackState prop as TaskInstanceState instead of a bare string; cast dl.callback_state at the two call sites since the generated API schema types it as string. --- .../ui/src/pages/Run/CallbackLogViewer.tsx | 17 ++++++----------- .../airflow/ui/src/pages/Run/DeadlineStatus.tsx | 3 ++- .../ui/src/pages/Run/DeadlineStatusModal.tsx | 3 ++- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx b/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx index 33467bf58fbe4..ca1e682f3399c 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Badge, Box, Button, Heading, HStack, Text } from "@chakra-ui/react"; +import { Box, Button, Heading, HStack, Text } from "@chakra-ui/react"; import type { TFunction } from "i18next"; import { useMemo, useState } from "react"; import { useTranslation } from "react-i18next"; @@ -24,8 +24,9 @@ import { FiFileText } from "react-icons/fi"; import innerText from "react-innertext"; import { useDeadlinesServiceGetCallbackLogs } from "openapi/queries"; -import type { TaskInstancesLogResponse } from "openapi/requests/types.gen"; +import type { TaskInstanceState, TaskInstancesLogResponse } from "openapi/requests/types.gen"; import { renderStructuredLog } from "src/components/renderStructuredLog"; +import { StateBadge } from "src/components/StateBadge"; import { Dialog } from "src/components/ui"; import { TaskLogContent } from "src/pages/TaskInstance/Logs/TaskLogContent"; import type { ParsedLogEntry } from "src/queries/useLogs"; @@ -33,17 +34,11 @@ import { parseStreamingLogContent } from "src/utils/logs"; type CallbackLogViewerProps = { readonly callbackId: string; - readonly callbackState?: string | null; + readonly callbackState?: TaskInstanceState | null; readonly dagId: string; readonly dagRunId: string; }; -const stateColorMap: Record = { - failed: "red", - running: "blue", - success: "green", -}; - /** * Parse callback log data using the same structured log rendering pipeline * as the task instance logs, providing consistent formatting, grouping, and display. @@ -157,9 +152,9 @@ export const CallbackLogViewer = ({ callbackId, callbackState, dagId, dagRunId } {translate("dag:callbackLogs.title")} {callbackState !== undefined && callbackState !== null ? ( - + {callbackState} - + ) : undefined} diff --git a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx index ebaebb275c15c..4a8ecb6c2ac5e 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatus.tsx @@ -25,6 +25,7 @@ import { useTranslation } from "react-i18next"; import { FiAlertTriangle, FiCheck, FiClock } from "react-icons/fi"; import { useDeadlinesServiceGetDagDeadlineAlerts, useDeadlinesServiceGetDeadlines } from "openapi/queries"; +import type { TaskInstanceState } from "openapi/requests/types.gen"; import Time from "src/components/Time"; import { Tooltip } from "src/components/ui/Tooltip"; import { CallbackLogViewer } from "src/pages/Run/CallbackLogViewer"; @@ -157,7 +158,7 @@ export const DeadlineStatus = ({ dagId, dagRunId, endDate }: DeadlineStatusProps {dl.callback_id !== undefined && dl.callback_id !== null ? ( diff --git a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx index d5dfd386458de..7007eb0f268b2 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/DeadlineStatusModal.tsx @@ -22,6 +22,7 @@ import { useTranslation } from "react-i18next"; import { FiAlertTriangle, FiClock } from "react-icons/fi"; import { useDeadlinesServiceGetDeadlines } from "openapi/queries"; +import type { TaskInstanceState } from "openapi/requests/types.gen"; import { ErrorAlert } from "src/components/ErrorAlert"; import Time from "src/components/Time"; import { Dialog } from "src/components/ui"; @@ -97,7 +98,7 @@ export const DeadlineStatusModal = ({ {dl.callback_id !== undefined && dl.callback_id !== null ? ( From f017ab068c99c1f7ae6030c104589a6ccc8fe4cd Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Fri, 19 Jun 2026 22:36:12 +0000 Subject: [PATCH 3/5] UI: fix import order in CallbackLogViewer (StateBadge before renderStructuredLog) The prettier import-sort is case-sensitive (uppercase before lowercase), so StateBadge must precede renderStructuredLog. Fixes the ts-compile-lint-ui static check. --- airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx b/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx index ca1e682f3399c..5e7792315f36b 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/CallbackLogViewer.tsx @@ -25,8 +25,8 @@ import innerText from "react-innertext"; import { useDeadlinesServiceGetCallbackLogs } from "openapi/queries"; import type { TaskInstanceState, TaskInstancesLogResponse } from "openapi/requests/types.gen"; -import { renderStructuredLog } from "src/components/renderStructuredLog"; import { StateBadge } from "src/components/StateBadge"; +import { renderStructuredLog } from "src/components/renderStructuredLog"; import { Dialog } from "src/components/ui"; import { TaskLogContent } from "src/pages/TaskInstance/Logs/TaskLogContent"; import type { ParsedLogEntry } from "src/queries/useLogs"; From 27c0596cea63ce40f0b9d45a152f9f98ac164cd7 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Mon, 22 Jun 2026 05:27:50 +0000 Subject: [PATCH 4/5] =?UTF-8?q?UI:=20address=20review=20=E2=80=94=20fix=20?= =?UTF-8?q?dead=20ValueError=20handling,=20type=20callback=5Fstate=20as=20?= =?UTF-8?q?CallbackState?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - read_callback_log is a generator, so path-validation ValueError fires during iteration; move list() inside the try so the 400 is actually raised (was dead code). - Annotate DeadlineResponse.callback_state as CallbackState instead of bare str. (Skipped the StreamingLogResponse alias suggestion — that alias isn't present on this branch's base; the explicit tuple type is correct here.) --- .../api_fastapi/core_api/datamodels/ui/deadline.py | 5 ++++- .../api_fastapi/core_api/routes/ui/deadlines.py | 13 ++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py index 7d2e8360311a1..20e549886704e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py @@ -24,6 +24,7 @@ from pydantic import AliasPath, Field from airflow.api_fastapi.core_api.base import BaseModel +from airflow.utils.state import CallbackState class DeadlineResponse(BaseModel): @@ -38,7 +39,9 @@ class DeadlineResponse(BaseModel): alert_id: UUID | None = Field(validation_alias="deadline_alert_id", default=None) alert_name: str | None = Field(validation_alias=AliasPath("deadline_alert", "name"), default=None) callback_id: UUID | None = Field(validation_alias="callback_id", default=None) - callback_state: str | None = Field(validation_alias=AliasPath("callback", "state"), default=None) + callback_state: CallbackState | None = Field( + validation_alias=AliasPath("callback", "state"), default=None + ) class DeadlineCollectionResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py index 671375c12efa6..f4eb3ef526da9 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/deadlines.py @@ -272,14 +272,17 @@ def get_callback_logs( f"Callback `{callback_id}` is not associated with DagRun `{dag_run_id}` of Dag `{dag_id}`", ) + # read_callback_log is a generator: path validation (which raises ValueError) runs lazily during + # iteration, not when the generator is created -- so list() must be inside the try for the 400 to fire. try: - log_stream = read_callback_log( - dag_id=dag_id, - run_id=dag_run_id, - callback_id=str(callback_id), + content = list( + read_callback_log( + dag_id=dag_id, + run_id=dag_run_id, + callback_id=str(callback_id), + ) ) except ValueError: raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid callback log path") - content = list(log_stream) return TaskInstancesLogResponse.model_construct(content=content, continuation_token=None) From 5414a540260cacd7e59e027b3206494a16487334 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Mon, 22 Jun 2026 05:50:30 +0000 Subject: [PATCH 5/5] UI: use StreamingLogResponse type alias for callback log readers Address review: annotate _read_callback_remote_logs and _read_callback_local_logs with the existing StreamingLogResponse alias (= tuple[LogSourceInfo, list[RawLogStream]]) from airflow._shared.logging.remote, instead of the inlined tuple type. Imported under TYPE_CHECKING (annotation-only, no runtime cost). --- airflow-core/src/airflow/utils/log/callback_log_reader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/callback_log_reader.py b/airflow-core/src/airflow/utils/log/callback_log_reader.py index 7b7beaf3aeb2b..5545d5832261e 100644 --- a/airflow-core/src/airflow/utils/log/callback_log_reader.py +++ b/airflow-core/src/airflow/utils/log/callback_log_reader.py @@ -33,7 +33,7 @@ ) if TYPE_CHECKING: - from airflow._shared.logging.remote import LogSourceInfo, RawLogStream + from airflow._shared.logging.remote import LogSourceInfo, RawLogStream, StreamingLogResponse logger = logging.getLogger(__name__) @@ -111,7 +111,7 @@ def read_callback_log( def _read_callback_remote_logs( relative_path: str, -) -> tuple[list[str], list[RawLogStream]]: +) -> StreamingLogResponse: """Read callback logs from the configured remote log storage.""" from airflow.logging_config import get_remote_task_log @@ -147,7 +147,7 @@ def _validate_path_component(component: str) -> str: def _read_callback_local_logs( relative_path: str, -) -> tuple[list[str], list[RawLogStream]]: +) -> StreamingLogResponse: """Read callback logs from the local filesystem.""" base_log_folder = os.path.realpath(conf.get("logging", "base_log_folder"))