diff --git a/airflow-core/NOTICE b/airflow-core/NOTICE index b103c97d19a99..02667b73f51fc 100644 --- a/airflow-core/NOTICE +++ b/airflow-core/NOTICE @@ -6,17 +6,37 @@ Foundation (http://www.apache.org/). ======================================================================= -hue: ------ +hue +--- + This product contains a modified portion of 'Hue' developed by Cloudera, Inc. (https://github.com/cloudera/hue/). * Copyright 2009-2017 Cloudera Inc. -Chakra UI: ------ +Chakra UI +--------- + This product contains a modified portion of 'Chakra UI' developed by Segun Adebayo. (https://github.com/chakra-ui/chakra-ui). * Copyright 2019, Segun Adebayo + + +pylockfile +---------- + +This product contains a modified portion of 'pylockfile' developed by Skip Montanaro. +(https://opendev.org/openstack/pylockfile). + +* Copyright 2007 Skip Montanaro + + +python-daemon +------------- + +This product contains a modified portion of 'python-daemon' developed by Ben Finney. +(https://pagure.io/python-daemon). + +* Copyright 2008–2025 Ben Finney and others diff --git a/airflow-core/newsfragments/65513.significant.rst b/airflow-core/newsfragments/65513.significant.rst new file mode 100644 index 0000000000000..ecbace617b892 --- /dev/null +++ b/airflow-core/newsfragments/65513.significant.rst @@ -0,0 +1,9 @@ +Libraries ``lockfile`` and ``python-daemon`` have been removed as dependencies. + +The relevant code has been vendored-in so that the dependencies are not needed. +If you plan to use an older Celery or Edge provider from older than May 2026, you will need +to install ``lockfile`` and ``python-daemon`` separately. +These packages are not required for Airflow itself, but are required for the Celery and Edge +providers in older versions. + +TODO(jscheffl): Add a reference to min provider versions post next provider wave after this was merged prior Airflow 3.3.0. diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index a875d9e9c77e7..f7c273711464a 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -108,7 +108,6 @@ dependencies = [ 'libcst >=1.8.2; python_version < "3.14"', 'libcst >=1.8.6; python_version >= "3.14"', "linkify-it-py>=2.0.0", - "lockfile>=0.12.2", "methodtools>=0.4.7", "natsort>=8.4.0", "opentelemetry-api>=1.27.0", @@ -133,7 +132,6 @@ dependencies = [ # See https://github.com/pygments/pygments/issues/2834 "pygments>=2.0.1,!=2.19.0", "pyjwt>=2.11.0", - "python-daemon>=3.0.0", "python-dateutil>=2.7.0", "python-slugify>=5.0", # Requests 3 if it will be released, will be heavily breaking. diff --git a/airflow-core/src/airflow/cli/commands/daemon_utils.py b/airflow-core/src/airflow/cli/commands/daemon_utils.py index 5aae0dc04278b..692783116aace 100644 --- a/airflow-core/src/airflow/cli/commands/daemon_utils.py +++ b/airflow-core/src/airflow/cli/commands/daemon_utils.py @@ -20,11 +20,10 @@ from argparse import Namespace from collections.abc import Callable -from daemon import daemon -from daemon.pidfile import TimeoutPIDLockFile - from airflow import settings from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler +from airflow.utils.daemon import DaemonContext +from airflow.utils.pidfile import TimeoutPIDLockFile from airflow.utils.process_utils import check_if_pidfile_process_is_running @@ -65,7 +64,7 @@ def run_command_with_daemon_option( stdout_handle.truncate(0) stderr_handle.truncate(0) - ctx = daemon.DaemonContext( + ctx = DaemonContext( pidfile=TimeoutPIDLockFile(pid, -1), files_preserve=files_preserve, stdout=stdout_handle, diff --git a/airflow-core/src/airflow/utils/daemon.py b/airflow-core/src/airflow/utils/daemon.py new file mode 100644 index 0000000000000..be53e599186fb --- /dev/null +++ b/airflow-core/src/airflow/utils/daemon.py @@ -0,0 +1,338 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Minimal daemon context replacing the ``python-daemon`` distribution. + +Implements the subset of :pep:`3143` that Airflow actually uses: +double-fork detach, file-descriptor cleanup, stream redirection, +signal-handler installation, umask/chroot/cwd changes, UID/GID +switching, PID-file management and core-dump prevention. +""" + +from __future__ import annotations + +import atexit +import os +import signal +import socket +import sys + +try: + import pwd +except ModuleNotFoundError: + pwd = None # type: ignore[assignment] + +try: + import resource +except ModuleNotFoundError: + resource = None # type: ignore[assignment] + + +class DaemonContext: + """Context manager that turns the current process into a Unix daemon.""" + + def __init__( + self, + *, + chroot_directory: str | None = None, + working_directory: str = "/", + umask: int = 0, + uid: int | None = None, + gid: int | None = None, + initgroups: bool = False, + prevent_core: bool = True, + detach_process: bool | None = None, + files_preserve: list | None = None, + pidfile=None, + stdin=None, + stdout=None, + stderr=None, + signal_map: dict | None = None, + ): + self.chroot_directory = chroot_directory + self.working_directory = working_directory + self.umask = umask + self.prevent_core = prevent_core + self.files_preserve = files_preserve + self.pidfile = pidfile + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + + self.uid = uid if uid is not None else os.getuid() + self.gid = gid if gid is not None else os.getgid() + self.initgroups = initgroups + + if detach_process is None: + detach_process = _is_detach_process_context_required() + self.detach_process = detach_process + + if signal_map is None: + signal_map = _make_default_signal_map() + self.signal_map = signal_map + + self._is_open = False + + @property + def is_open(self) -> bool: + return self._is_open + + # -- context-manager protocol ------------------------------------------ + + def open(self) -> None: + if self.is_open: + return + + if self.chroot_directory is not None: + _change_root_directory(self.chroot_directory) + + if self.prevent_core: + _prevent_core_dump() + + _change_file_creation_mask(self.umask) + _change_working_directory(self.working_directory) + _change_process_owner(self.uid, self.gid, self.initgroups) + + if self.detach_process: + _detach_process_context() + + signal_handler_map = self._make_signal_handler_map() + _set_signal_handlers(signal_handler_map) + + exclude_fds = self._get_exclude_file_descriptors() + _close_all_open_files(exclude=exclude_fds) + + _redirect_stream(sys.stdin, self.stdin) + _redirect_stream(sys.stdout, self.stdout) + _redirect_stream(sys.stderr, self.stderr) + + if self.pidfile is not None: + self.pidfile.__enter__() + + self._is_open = True + atexit.register(self.close) + + def close(self) -> None: + if not self.is_open: + return + if self.pidfile is not None: + self.pidfile.__exit__(None, None, None) + self._is_open = False + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + # -- signal handling --------------------------------------------------- + + def terminate(self, signal_number, stack_frame): + raise SystemExit(f"Terminating on signal {signal_number!r}") + + def _make_signal_handler(self, target): + if target is None: + return signal.SIG_IGN + if isinstance(target, str): + return getattr(self, target) + return target + + def _make_signal_handler_map(self) -> dict: + return { + signal_number: self._make_signal_handler(target) + for signal_number, target in self.signal_map.items() + } + + # -- file-descriptor helpers ------------------------------------------- + + def _get_exclude_file_descriptors(self) -> set[int]: + files_preserve = list(self.files_preserve or []) + files_preserve.extend( + item for item in (self.stdin, self.stdout, self.stderr) if hasattr(item, "fileno") + ) + + exclude: set[int] = set() + for item in files_preserve: + if item is None: + continue + fd = _get_file_descriptor(item) + if fd is not None: + exclude.add(fd) + else: + exclude.add(item) + return exclude + + +# --------------------------------------------------------------------------- +# Helper functions +# --------------------------------------------------------------------------- + + +def _get_file_descriptor(obj) -> int | None: + if hasattr(obj, "fileno"): + try: + return obj.fileno() + except ValueError: + pass + return None + + +def _change_working_directory(directory: str) -> None: + try: + os.chdir(directory) + except Exception as exc: + raise OSError(f"Unable to change working directory ({exc})") from exc + + +def _change_root_directory(directory: str) -> None: + try: + os.chdir(directory) + os.chroot(directory) + except Exception as exc: + raise OSError(f"Unable to change root directory ({exc})") from exc + + +def _change_file_creation_mask(mask: int) -> None: + try: + os.umask(mask) + except Exception as exc: + raise OSError(f"Unable to change file creation mask ({exc})") from exc + + +def _change_process_owner(uid: int, gid: int, initgroups: bool = False) -> None: + if pwd is None: + raise OSError("Unable to change process owner (pwd module is not available on this platform)") + + try: + username = pwd.getpwuid(uid).pw_name + except KeyError: + initgroups = False + username = None + + try: + if initgroups and username is not None: + os.initgroups(username, gid) + else: + os.setgid(gid) + os.setuid(uid) + except Exception as exc: + raise OSError(f"Unable to change process owner ({exc})") from exc + + +def _prevent_core_dump() -> None: + if resource is None: + raise OSError("System does not support RLIMIT_CORE resource limit (resource module unavailable)") + + core_resource = resource.RLIMIT_CORE + try: + resource.getrlimit(core_resource) + except ValueError as exc: + raise OSError(f"System does not support RLIMIT_CORE resource limit ({exc})") from exc + resource.setrlimit(core_resource, (0, 0)) + + +def _detach_process_context() -> None: + def _fork_then_exit_parent(error_message: str) -> None: + try: + pid = os.fork() + if pid > 0: + os._exit(0) + except OSError as exc: + raise OSError(f"{error_message}: [{exc.errno}] {exc.strerror}") from exc + + _fork_then_exit_parent("Failed first fork") + os.setsid() + _fork_then_exit_parent("Failed second fork") + + +def _is_detach_process_context_required() -> bool: + if os.getppid() == 1: + return False + try: + if sys.__stdin__ is not None and hasattr(sys.__stdin__, "fileno"): + fd = sys.__stdin__.fileno() + with socket.fromfd(fd, socket.AF_INET, socket.SOCK_RAW) as sock: + sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE) + return False + except (OSError, ValueError): + pass + return True + + +_MAXFD = 2048 + + +def _get_maximum_file_descriptors() -> int: + if resource is None: + return _MAXFD + + __, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) + if hard_limit == resource.RLIM_INFINITY: + return _MAXFD + return hard_limit + + +def _close_all_open_files(exclude: set[int] | None = None) -> None: + if exclude is None: + exclude = set() + maxfd = _get_maximum_file_descriptors() + ranges: list[tuple[int, int]] = [] + remaining = range(0, maxfd) + for fd in sorted(exclude): + if fd > remaining.stop: + break + if fd < remaining.start: + continue + if fd != remaining.start: + ranges.append((remaining.start, fd)) + remaining = range(fd + 1, remaining.stop) + if remaining.start < remaining.stop: + ranges.append((remaining.start, remaining.stop)) + for low, high in ranges: + os.closerange(low, high) + + +def _redirect_stream(system_stream, target_stream) -> None: + system_fd = system_stream.fileno() + if target_stream is None: + target_fd = os.open(os.devnull, os.O_RDWR) + try: + if target_fd != system_fd: + os.dup2(target_fd, system_fd) + finally: + if target_fd != system_fd: + os.close(target_fd) + return + + os.dup2(target_stream.fileno(), system_fd) + + +def _make_default_signal_map() -> dict: + name_map = { + "SIGTSTP": None, + "SIGTTIN": None, + "SIGTTOU": None, + "SIGTERM": "terminate", + } + return {getattr(signal, name): target for name, target in name_map.items() if hasattr(signal, name)} + + +def _set_signal_handlers(signal_handler_map: dict) -> None: + for signal_number, handler in signal_handler_map.items(): + signal.signal(signal_number, handler) diff --git a/airflow-core/src/airflow/utils/pidfile.py b/airflow-core/src/airflow/utils/pidfile.py new file mode 100644 index 0000000000000..37452fb099019 --- /dev/null +++ b/airflow-core/src/airflow/utils/pidfile.py @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""PID file utilities replacing the deprecated ``lockfile`` library.""" + +from __future__ import annotations + +import errno +import os +import time + + +class PIDLockFile: + """ + Lockfile implemented as a Unix PID file. + + The lock file is a normal file named by the attribute ``path``. + A lock's PID file contains a single line of text, containing + the process ID (PID) of the process that acquired the lock. + + This is a minimal replacement for ``lockfile.pidlockfile.PIDLockFile``. + """ + + def __init__(self, path: str): + self.path = path + + def read_pid(self) -> int | None: + """Read the PID recorded in the PID file.""" + return read_pid_from_pidfile(self.path) + + def is_locked(self) -> bool: + """Return ``True`` if the PID file exists.""" + return os.path.exists(self.path) + + def i_am_locking(self) -> bool: + """Return ``True`` if the current process ID matches the PID in the file.""" + return self.is_locked() and os.getpid() == self.read_pid() + + def break_lock(self) -> None: + """Remove the PID file if it exists.""" + remove_existing_pidfile(self.path) + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, *_exc): + self.release() + + def acquire(self, timeout=None): + """ + Acquire the lock by creating the PID file. + + If *timeout* > 0 wait up to that many seconds; if 0 or negative + raise immediately on conflict; if ``None`` wait forever. + """ + end_time = time.monotonic() + if timeout is not None and timeout > 0: + end_time += timeout + + while True: + try: + write_pid_to_pidfile(self.path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if timeout is not None and timeout > 0 and time.monotonic() > end_time: + raise TimeoutError(f"Timeout waiting to acquire lock for {self.path}") from exc + if timeout is not None and timeout <= 0: + raise FileExistsError(f"{self.path} is already locked") from exc + time.sleep(timeout / 10 if timeout else 0.1) + else: + raise + else: + return + + def release(self): + """Release the lock by removing the PID file.""" + if not self.is_locked(): + return + if self.i_am_locking(): + remove_existing_pidfile(self.path) + + +class TimeoutPIDLockFile(PIDLockFile): + """ + PIDLockFile with a default acquire timeout. + + Drop-in replacement for ``daemon.pidfile.TimeoutPIDLockFile``. + """ + + def __init__(self, path: str, acquire_timeout: float | None = None): + super().__init__(path) + self.acquire_timeout = acquire_timeout + + def acquire(self, timeout=None): + if timeout is None: + timeout = self.acquire_timeout + super().acquire(timeout) + + +def read_pid_from_pidfile(pidfile_path: str) -> int | None: + """ + Read and return the numeric PID recorded in the named PID file. + + If the PID file cannot be read, or if the content is not a valid PID, + return ``None``. + """ + try: + with open(pidfile_path) as pidfile: + line = pidfile.readline().strip() + return int(line) + except (OSError, ValueError): + return None + + +def write_pid_to_pidfile(pidfile_path: str) -> None: + """ + Write the current process PID to the named PID file. + + Uses ``O_CREAT | O_EXCL`` to fail atomically if the file already exists. + """ + open_flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY + open_mode = 0o644 + pidfile_fd = os.open(pidfile_path, open_flags, open_mode) + try: + os.write(pidfile_fd, f"{os.getpid()}\n".encode()) + finally: + os.close(pidfile_fd) + + +def remove_existing_pidfile(pidfile_path: str) -> None: + """ + Remove the named PID file if it exists. + + Silently ignores the case where the file does not exist. + """ + try: + os.remove(pidfile_path) + except OSError as exc: + if exc.errno != errno.ENOENT: + raise diff --git a/airflow-core/src/airflow/utils/process_utils.py b/airflow-core/src/airflow/utils/process_utils.py index af887f1b8aa57..113c7735018e3 100644 --- a/airflow-core/src/airflow/utils/process_utils.py +++ b/airflow-core/src/airflow/utils/process_utils.py @@ -42,10 +42,10 @@ from contextlib import contextmanager import psutil -from lockfile.pidlockfile import PIDLockFile from airflow.configuration import conf from airflow.exceptions import AirflowException +from airflow.utils.pidfile import PIDLockFile log = logging.getLogger(__name__) diff --git a/airflow-core/tests/unit/cli/commands/test_api_server_command.py b/airflow-core/tests/unit/cli/commands/test_api_server_command.py index 93c2c43e27b39..e6910daabea92 100644 --- a/airflow-core/tests/unit/cli/commands/test_api_server_command.py +++ b/airflow-core/tests/unit/cli/commands/test_api_server_command.py @@ -207,11 +207,11 @@ def test_args_to_uvicorn(self, ssl_cert_and_key, cli_args, expected_additional_k ) @mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile") @mock.patch("airflow.cli.commands.daemon_utils.setup_locations") - @mock.patch("airflow.cli.commands.daemon_utils.daemon") + @mock.patch("airflow.cli.commands.daemon_utils.DaemonContext") @mock.patch("airflow.cli.commands.daemon_utils.check_if_pidfile_process_is_running") @mock.patch("airflow.cli.commands.api_server_command.uvicorn") def test_run_command_daemon( - self, mock_uvicorn, _, mock_daemon, mock_setup_locations, mock_pid_file, demonize + self, mock_uvicorn, _, mock_daemon_context, mock_setup_locations, mock_pid_file, demonize ): mock_setup_locations.return_value = ( mock.MagicMock(name="pidfile"), @@ -254,16 +254,16 @@ def test_run_command_daemon( ) if demonize: - assert mock_daemon.mock_calls[:3] == [ - mock.call.DaemonContext( + assert mock_daemon_context.mock_calls[:3] == [ + mock.call( pidfile=mock_pid_file.return_value, files_preserve=None, stdout=mock_open.return_value, stderr=mock_open.return_value, umask=0o077, ), - mock.call.DaemonContext().__enter__(), - mock.call.DaemonContext().__exit__(None, None, None), + mock.call().__enter__(), + mock.call().__exit__(None, None, None), ] assert mock_setup_locations.mock_calls == [ mock.call( @@ -301,7 +301,7 @@ def test_run_command_daemon( mock.call().__exit__(None, None, None), ] else: - assert mock_daemon.mock_calls == [] + assert mock_daemon_context.mock_calls == [] mock_setup_locations.mock_calls == [] mock_pid_file.assert_not_called() mock_open.assert_not_called() diff --git a/airflow-core/tests/unit/cli/commands/test_kerberos_command.py b/airflow-core/tests/unit/cli/commands/test_kerberos_command.py index ac8ba911ddab7..d321957973f87 100644 --- a/airflow-core/tests/unit/cli/commands/test_kerberos_command.py +++ b/airflow-core/tests/unit/cli/commands/test_kerberos_command.py @@ -49,10 +49,10 @@ def test_run_command(self, mock_krb): @mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile") @mock.patch("airflow.cli.commands.daemon_utils.setup_locations") - @mock.patch("airflow.cli.commands.daemon_utils.daemon") + @mock.patch("airflow.cli.commands.daemon_utils.DaemonContext") @mock.patch("airflow.cli.commands.kerberos_command.krb") @conf_vars({("core", "executor"): "CeleryExecutor"}) - def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, mock_pid_file): + def test_run_command_daemon(self, mock_krb, mock_daemon_context, mock_setup_locations, mock_pid_file): mock_setup_locations.return_value = ( mock.MagicMock(name="pidfile"), mock.MagicMock(name="stdout"), @@ -83,16 +83,16 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m mock_krb.run.assert_called_once_with( keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD ) - assert mock_daemon.mock_calls[:3] == [ - mock.call.DaemonContext( + assert mock_daemon_context.mock_calls[:3] == [ + mock.call( pidfile=mock_pid_file.return_value, files_preserve=None, stderr=mock_open.return_value, stdout=mock_open.return_value, umask=0o077, ), - mock.call.DaemonContext().__enter__(), - mock.call.DaemonContext().__exit__(None, None, None), + mock.call().__enter__(), + mock.call().__exit__(None, None, None), ] assert mock_setup_locations.mock_calls[0] == mock.call( diff --git a/airflow-core/tests/unit/utils/test_daemon.py b/airflow-core/tests/unit/utils/test_daemon.py new file mode 100644 index 0000000000000..21b5f056550f6 --- /dev/null +++ b/airflow-core/tests/unit/utils/test_daemon.py @@ -0,0 +1,373 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import contextlib +import os +import resource +import signal +from unittest import mock + +import pytest + +from airflow.utils.daemon import ( + DaemonContext, + _change_file_creation_mask, + _change_working_directory, + _close_all_open_files, + _get_file_descriptor, + _get_maximum_file_descriptors, + _is_detach_process_context_required, + _make_default_signal_map, + _prevent_core_dump, + _redirect_stream, + _set_signal_handlers, +) + + +class TestGetFileDescriptor: + def test_returns_fd_for_file_like_object(self, tmp_path): + path = tmp_path / "test.txt" + path.write_text("hello") + with open(str(path)) as f: + assert _get_file_descriptor(f) == f.fileno() + + def test_returns_none_for_non_file_object(self): + assert _get_file_descriptor("not-a-file") is None + + def test_returns_none_for_closed_file(self, tmp_path): + path = tmp_path / "test.txt" + path.write_text("hello") + f = open(str(path)) + f.close() + assert _get_file_descriptor(f) is None + + +class TestChangeWorkingDirectory: + def test_changes_cwd(self, tmp_path): + original = os.getcwd() + try: + _change_working_directory(str(tmp_path)) + assert os.getcwd() == str(tmp_path) + finally: + os.chdir(original) + + def test_raises_on_invalid_directory(self): + with pytest.raises(OSError, match="Unable to change working directory"): + _change_working_directory("/nonexistent_dir_abc123") + + +class TestChangeFileCreationMask: + def test_sets_umask(self): + old = os.umask(0o022) + os.umask(old) + try: + _change_file_creation_mask(0o077) + assert os.umask(0o077) == 0o077 + finally: + os.umask(old) + + +class TestPreventCoreDump: + def test_sets_core_limit_to_zero(self): + original = resource.getrlimit(resource.RLIMIT_CORE) + try: + _prevent_core_dump() + soft, hard = resource.getrlimit(resource.RLIMIT_CORE) + assert soft == 0 + assert hard == 0 + finally: + with contextlib.suppress(ValueError): + resource.setrlimit(resource.RLIMIT_CORE, original) + + +class TestGetMaximumFileDescriptors: + def test_returns_hard_limit(self): + __, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + result = _get_maximum_file_descriptors() + if hard == resource.RLIM_INFINITY: + assert result == 2048 + else: + assert result == hard + + +class TestCloseAllOpenFiles: + @mock.patch("airflow.utils.daemon._get_maximum_file_descriptors", return_value=8) + @mock.patch("airflow.utils.daemon.os.closerange") + def test_closes_file_descriptors_except_excluded(self, mock_closerange, _mock_maxfd): + _close_all_open_files(exclude={0, 1, 2, 5}) + + assert mock_closerange.call_args_list == [mock.call(3, 5), mock.call(6, 8)] + + @mock.patch("airflow.utils.daemon._get_maximum_file_descriptors", return_value=3) + @mock.patch("airflow.utils.daemon.os.closerange") + def test_empty_exclude_does_not_raise(self, mock_closerange, _mock_maxfd): + _close_all_open_files(exclude={0, 1, 2}) + mock_closerange.assert_not_called() + + +class TestRedirectStream: + @mock.patch("airflow.utils.daemon.os.dup2") + def test_redirects_to_target(self, mock_dup2): + system_stream = mock.Mock() + target_stream = mock.Mock() + system_stream.fileno.return_value = 1 + target_stream.fileno.return_value = 12 + + _redirect_stream(system_stream, target_stream) + + mock_dup2.assert_called_once_with(12, 1) + + @mock.patch("airflow.utils.daemon.os.close") + @mock.patch("airflow.utils.daemon.os.dup2") + @mock.patch("airflow.utils.daemon.os.open", return_value=11) + def test_redirects_to_devnull_when_none(self, mock_open, mock_dup2, mock_close): + system_stream = mock.Mock() + system_stream.fileno.return_value = 1 + + _redirect_stream(system_stream, None) + + mock_open.assert_called_once_with(os.devnull, os.O_RDWR) + mock_dup2.assert_called_once_with(11, 1) + mock_close.assert_called_once_with(11) + + +class TestMakeDefaultSignalMap: + def test_contains_sigterm(self): + signal_map = _make_default_signal_map() + assert signal.SIGTERM in signal_map + assert signal_map[signal.SIGTERM] == "terminate" + + def test_contains_sigtstp_as_none(self): + signal_map = _make_default_signal_map() + if hasattr(signal, "SIGTSTP"): + assert signal_map[signal.SIGTSTP] is None + + +class TestSetSignalHandlers: + @mock.patch("signal.signal") + def test_installs_handlers(self, mock_signal): + handler = mock.MagicMock() + _set_signal_handlers({signal.SIGUSR1: handler}) + mock_signal.assert_called_once_with(signal.SIGUSR1, handler) + + +class TestIsDetachProcessContextRequired: + def test_returns_true_in_normal_process(self): + assert _is_detach_process_context_required() is True + + @mock.patch("os.getppid", return_value=1) + def test_returns_false_when_parent_is_init(self, _mock_getppid): + assert _is_detach_process_context_required() is False + + +class TestDaemonContext: + def test_defaults(self): + ctx = DaemonContext(detach_process=False) + assert ctx.working_directory == "/" + assert ctx.umask == 0 + assert ctx.prevent_core is True + assert ctx.pidfile is None + assert ctx.stdin is None + assert ctx.stdout is None + assert ctx.stderr is None + assert ctx.uid == os.getuid() + assert ctx.gid == os.getgid() + assert not ctx.is_open + + def test_is_open_initially_false(self): + ctx = DaemonContext(detach_process=False) + assert ctx.is_open is False + + def test_close_when_not_open_is_noop(self): + ctx = DaemonContext(detach_process=False) + ctx.close() + assert ctx.is_open is False + + def test_open_twice_is_idempotent(self): + with mock.patch.multiple( + "airflow.utils.daemon", + _prevent_core_dump=mock.DEFAULT, + _change_file_creation_mask=mock.DEFAULT, + _change_working_directory=mock.DEFAULT, + _change_process_owner=mock.DEFAULT, + _close_all_open_files=mock.DEFAULT, + _redirect_stream=mock.DEFAULT, + _set_signal_handlers=mock.DEFAULT, + ) as mocks: + ctx = DaemonContext(detach_process=False) + ctx.open() + call_count = mocks["_prevent_core_dump"].call_count + ctx.open() + assert mocks["_prevent_core_dump"].call_count == call_count + ctx.close() + + def test_context_manager_opens_and_closes(self): + with mock.patch.multiple( + "airflow.utils.daemon", + _prevent_core_dump=mock.DEFAULT, + _change_file_creation_mask=mock.DEFAULT, + _change_working_directory=mock.DEFAULT, + _change_process_owner=mock.DEFAULT, + _close_all_open_files=mock.DEFAULT, + _redirect_stream=mock.DEFAULT, + _set_signal_handlers=mock.DEFAULT, + ): + ctx = DaemonContext(detach_process=False) + with ctx: + assert ctx.is_open is True + assert ctx.is_open is False + + def test_pidfile_entered_and_exited(self): + mock_pidfile = mock.MagicMock() + with mock.patch.multiple( + "airflow.utils.daemon", + _prevent_core_dump=mock.DEFAULT, + _change_file_creation_mask=mock.DEFAULT, + _change_working_directory=mock.DEFAULT, + _change_process_owner=mock.DEFAULT, + _close_all_open_files=mock.DEFAULT, + _redirect_stream=mock.DEFAULT, + _set_signal_handlers=mock.DEFAULT, + ): + ctx = DaemonContext(detach_process=False, pidfile=mock_pidfile) + with ctx: + mock_pidfile.__enter__.assert_called_once() + mock_pidfile.__exit__.assert_called_once_with(None, None, None) + + def test_terminate_raises_system_exit(self): + ctx = DaemonContext(detach_process=False) + with pytest.raises(SystemExit, match="Terminating on signal"): + ctx.terminate(signal.SIGTERM, None) + + def test_signal_map_none_becomes_sig_ign(self): + ctx = DaemonContext( + detach_process=False, + signal_map={signal.SIGUSR1: None}, + ) + handler_map = ctx._make_signal_handler_map() + assert handler_map[signal.SIGUSR1] is signal.SIG_IGN + + def test_signal_map_string_becomes_method(self): + ctx = DaemonContext( + detach_process=False, + signal_map={signal.SIGTERM: "terminate"}, + ) + handler_map = ctx._make_signal_handler_map() + assert handler_map[signal.SIGTERM] == ctx.terminate + + def test_signal_map_callable_passed_through(self): + handler = mock.MagicMock() + ctx = DaemonContext( + detach_process=False, + signal_map={signal.SIGUSR1: handler}, + ) + handler_map = ctx._make_signal_handler_map() + assert handler_map[signal.SIGUSR1] is handler + + def test_exclude_fds_includes_stdio_streams(self): + stdout_mock = mock.MagicMock() + stdout_mock.fileno.return_value = 42 + stderr_mock = mock.MagicMock() + stderr_mock.fileno.return_value = 43 + ctx = DaemonContext( + detach_process=False, + stdout=stdout_mock, + stderr=stderr_mock, + ) + fds = ctx._get_exclude_file_descriptors() + assert 42 in fds + assert 43 in fds + + def test_exclude_fds_includes_files_preserve(self): + f = mock.MagicMock() + f.fileno.return_value = 10 + ctx = DaemonContext( + detach_process=False, + files_preserve=[f], + ) + fds = ctx._get_exclude_file_descriptors() + assert 10 in fds + + def test_exclude_fds_handles_raw_int_items(self): + ctx = DaemonContext( + detach_process=False, + files_preserve=[7], + ) + fds = ctx._get_exclude_file_descriptors() + assert 7 in fds + + def test_open_calls_expected_steps(self): + with mock.patch.multiple( + "airflow.utils.daemon", + _prevent_core_dump=mock.DEFAULT, + _change_file_creation_mask=mock.DEFAULT, + _change_working_directory=mock.DEFAULT, + _change_process_owner=mock.DEFAULT, + _close_all_open_files=mock.DEFAULT, + _redirect_stream=mock.DEFAULT, + _set_signal_handlers=mock.DEFAULT, + ) as mocks: + ctx = DaemonContext(detach_process=False, umask=0o077, working_directory="/tmp") + ctx.open() + try: + mocks["_prevent_core_dump"].assert_called_once() + mocks["_change_file_creation_mask"].assert_called_once_with(0o077) + mocks["_change_working_directory"].assert_called_once_with("/tmp") + mocks["_change_process_owner"].assert_called_once() + mocks["_close_all_open_files"].assert_called_once() + assert mocks["_redirect_stream"].call_count == 3 + mocks["_set_signal_handlers"].assert_called_once() + finally: + ctx.close() + + def test_detach_process_called_when_enabled(self): + with mock.patch.multiple( + "airflow.utils.daemon", + _prevent_core_dump=mock.DEFAULT, + _change_file_creation_mask=mock.DEFAULT, + _change_working_directory=mock.DEFAULT, + _change_process_owner=mock.DEFAULT, + _detach_process_context=mock.DEFAULT, + _close_all_open_files=mock.DEFAULT, + _redirect_stream=mock.DEFAULT, + _set_signal_handlers=mock.DEFAULT, + ) as mocks: + ctx = DaemonContext(detach_process=True) + ctx.open() + try: + mocks["_detach_process_context"].assert_called_once() + finally: + ctx.close() + + def test_prevent_core_skipped_when_disabled(self): + with mock.patch.multiple( + "airflow.utils.daemon", + _prevent_core_dump=mock.DEFAULT, + _change_file_creation_mask=mock.DEFAULT, + _change_working_directory=mock.DEFAULT, + _change_process_owner=mock.DEFAULT, + _close_all_open_files=mock.DEFAULT, + _redirect_stream=mock.DEFAULT, + _set_signal_handlers=mock.DEFAULT, + ) as mocks: + ctx = DaemonContext(detach_process=False, prevent_core=False) + ctx.open() + try: + mocks["_prevent_core_dump"].assert_not_called() + finally: + ctx.close() diff --git a/airflow-core/tests/unit/utils/test_pidfile.py b/airflow-core/tests/unit/utils/test_pidfile.py new file mode 100644 index 0000000000000..19ad77f80ca60 --- /dev/null +++ b/airflow-core/tests/unit/utils/test_pidfile.py @@ -0,0 +1,198 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os + +import pytest + +from airflow.utils.pidfile import ( + PIDLockFile, + TimeoutPIDLockFile, + read_pid_from_pidfile, + remove_existing_pidfile, + write_pid_to_pidfile, +) + + +class TestReadPidFromPidfile: + def test_reads_valid_pid(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("12345\n") + assert read_pid_from_pidfile(str(pidfile)) == 12345 + + def test_reads_pid_without_trailing_newline(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99") + assert read_pid_from_pidfile(str(pidfile)) == 99 + + def test_ignores_leading_whitespace(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text(" 42 \n") + assert read_pid_from_pidfile(str(pidfile)) == 42 + + def test_returns_none_for_missing_file(self, tmp_path): + assert read_pid_from_pidfile(str(tmp_path / "nonexistent.pid")) is None + + def test_returns_none_for_non_numeric_content(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("not-a-number\n") + assert read_pid_from_pidfile(str(pidfile)) is None + + def test_returns_none_for_empty_file(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("") + assert read_pid_from_pidfile(str(pidfile)) is None + + +class TestWritePidToPidfile: + def test_writes_current_pid(self, tmp_path): + pidfile = str(tmp_path / "test.pid") + write_pid_to_pidfile(pidfile) + with open(pidfile) as file: + content = file.read() + assert content.strip() == str(os.getpid()) + + def test_raises_on_existing_file(self, tmp_path): + pidfile = str(tmp_path / "test.pid") + write_pid_to_pidfile(pidfile) + with pytest.raises(OSError, match="File exists"): + write_pid_to_pidfile(pidfile) + + def test_creates_file_with_correct_permissions(self, tmp_path): + pidfile = str(tmp_path / "test.pid") + previous_umask = os.umask(0o022) + try: + write_pid_to_pidfile(pidfile) + finally: + os.umask(previous_umask) + mode = os.stat(pidfile).st_mode & 0o777 + assert mode == 0o644 + + +class TestRemoveExistingPidfile: + def test_removes_existing_file(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("123\n") + remove_existing_pidfile(str(pidfile)) + assert not pidfile.exists() + + def test_no_error_on_missing_file(self, tmp_path): + remove_existing_pidfile(str(tmp_path / "nonexistent.pid")) + + +class TestPIDLockFile: + def test_not_locked_initially(self, tmp_path): + lock = PIDLockFile(str(tmp_path / "test.pid")) + assert not lock.is_locked() + + def test_read_pid_returns_none_when_not_locked(self, tmp_path): + lock = PIDLockFile(str(tmp_path / "test.pid")) + assert lock.read_pid() is None + + def test_is_locked_after_acquire(self, tmp_path): + lock = PIDLockFile(str(tmp_path / "test.pid")) + lock.acquire() + try: + assert lock.is_locked() + assert lock.read_pid() == os.getpid() + finally: + lock.release() + + def test_i_am_locking(self, tmp_path): + lock = PIDLockFile(str(tmp_path / "test.pid")) + lock.acquire() + try: + assert lock.i_am_locking() + finally: + lock.release() + + def test_i_am_not_locking_other_pid(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99999\n") + lock = PIDLockFile(str(pidfile)) + assert lock.is_locked() + assert not lock.i_am_locking() + + def test_release_removes_file(self, tmp_path): + pidfile_path = str(tmp_path / "test.pid") + lock = PIDLockFile(pidfile_path) + lock.acquire() + lock.release() + assert not os.path.exists(pidfile_path) + + def test_release_noop_when_not_locked(self, tmp_path): + lock = PIDLockFile(str(tmp_path / "test.pid")) + lock.release() + + def test_break_lock(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99999\n") + lock = PIDLockFile(str(pidfile)) + lock.break_lock() + assert not pidfile.exists() + + def test_context_manager(self, tmp_path): + pidfile_path = str(tmp_path / "test.pid") + with PIDLockFile(pidfile_path) as lock: + assert lock.is_locked() + assert lock.read_pid() == os.getpid() + assert not os.path.exists(pidfile_path) + + def test_acquire_raises_immediately_with_zero_timeout(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99999\n") + lock = PIDLockFile(str(pidfile)) + with pytest.raises(FileExistsError): + lock.acquire(timeout=0) + + def test_acquire_raises_immediately_with_negative_timeout(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99999\n") + lock = PIDLockFile(str(pidfile)) + with pytest.raises(FileExistsError): + lock.acquire(timeout=-1) + + +class TestTimeoutPIDLockFile: + def test_uses_acquire_timeout_as_default(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99999\n") + lock = TimeoutPIDLockFile(str(pidfile), acquire_timeout=-1) + with pytest.raises(FileExistsError): + lock.acquire() + + def test_explicit_timeout_overrides_default(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99999\n") + lock = TimeoutPIDLockFile(str(pidfile), acquire_timeout=None) + with pytest.raises(FileExistsError): + lock.acquire(timeout=0) + + def test_context_manager_acquires_and_releases(self, tmp_path): + pidfile_path = str(tmp_path / "test.pid") + lock = TimeoutPIDLockFile(pidfile_path, acquire_timeout=None) + with lock: + assert os.path.exists(pidfile_path) + assert not os.path.exists(pidfile_path) + + def test_timeout_expires_raises(self, tmp_path): + pidfile = tmp_path / "test.pid" + pidfile.write_text("99999\n") + lock = TimeoutPIDLockFile(str(pidfile), acquire_timeout=0.01) + with pytest.raises(TimeoutError): + lock.acquire() diff --git a/providers/celery/src/airflow/providers/celery/cli/celery_command.py b/providers/celery/src/airflow/providers/celery/cli/celery_command.py index 0e2b66aabf10c..dcd5b3e384ce1 100644 --- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py +++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py @@ -32,16 +32,26 @@ from celery.app.defaults import DEFAULT_TASK_LOG_FMT from celery.app.log import TaskFormatter from celery.signals import after_setup_logger -from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile from airflow import settings from airflow.cli.simple_table import AirflowConsole from airflow.exceptions import AirflowConfigException -from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS +from airflow.providers.celery.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, +) from airflow.providers.common.compat.sdk import conf from airflow.utils import cli as cli_utils from airflow.utils.cli import setup_locations +if AIRFLOW_V_3_3_PLUS: + # Airflow 3.3.0 has dropped dependency to lockfile and vendored-in the package + from airflow.utils.pidfile import read_pid_from_pidfile, remove_existing_pidfile +else: + # For Airflow versions < 3.3.0, we need to import from lockfile package + from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile # type: ignore + WORKER_PROCESS_NAME = "worker" log = logging.getLogger(__name__) diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py b/providers/celery/tests/unit/celery/cli/test_celery_command.py index 99a9506f3d625..1786796931bbd 100644 --- a/providers/celery/tests/unit/celery/cli/test_celery_command.py +++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py @@ -430,7 +430,9 @@ def test_run_command(self, mock_celery_app): ] ) - def _test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locations, mock_pid_file): + def _test_run_command_daemon( + self, mock_celery_app, mock_daemon_context, mock_setup_locations, mock_pid_file + ): mock_setup_locations.return_value = ( mock.MagicMock(name="pidfile"), mock.MagicMock(name="stdout"), @@ -480,16 +482,16 @@ def _test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_loca "--conf=flower_config", ] ) - assert mock_daemon.mock_calls[:3] == [ - mock.call.DaemonContext( + assert mock_daemon_context.mock_calls[:3] == [ + mock.call( pidfile=mock_pid_file.return_value, files_preserve=None, stdout=mock_open.return_value, stderr=mock_open.return_value, umask=0o077, ), - mock.call.DaemonContext().__enter__(), - mock.call.DaemonContext().__exit__(None, None, None), + mock.call().__enter__(), + mock.call().__exit__(None, None, None), ] assert mock_setup_locations.mock_calls == [ @@ -531,22 +533,26 @@ def _test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_loca @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 3.0-") @mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile") @mock.patch("airflow.cli.commands.daemon_utils.setup_locations") - @mock.patch("airflow.cli.commands.daemon_utils.daemon") + @mock.patch("airflow.cli.commands.daemon_utils.DaemonContext") @mock.patch("airflow.providers.celery.executors.celery_executor.app") def test_run_command_daemon_v_3_below( - self, mock_celery_app, mock_daemon, mock_setup_locations, mock_pid_file + self, mock_celery_app, mock_daemon_context, mock_setup_locations, mock_pid_file ): - self._test_run_command_daemon(mock_celery_app, mock_daemon, mock_setup_locations, mock_pid_file) + self._test_run_command_daemon( + mock_celery_app, mock_daemon_context, mock_setup_locations, mock_pid_file + ) @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 3.0+") @mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile") @mock.patch("airflow.cli.commands.daemon_utils.setup_locations") - @mock.patch("airflow.cli.commands.daemon_utils.daemon") + @mock.patch("airflow.cli.commands.daemon_utils.DaemonContext") @mock.patch("airflow.providers.celery.executors.celery_executor.app") def test_run_command_daemon_v3_above( - self, mock_celery_app, mock_daemon, mock_setup_locations, mock_pid_file + self, mock_celery_app, mock_daemon_context, mock_setup_locations, mock_pid_file ): - self._test_run_command_daemon(mock_celery_app, mock_daemon, mock_setup_locations, mock_pid_file) + self._test_run_command_daemon( + mock_celery_app, mock_daemon_context, mock_setup_locations, mock_pid_file + ) class TestRemoteCeleryControlCommands: diff --git a/providers/edge3/src/airflow/providers/edge3/cli/signalling.py b/providers/edge3/src/airflow/providers/edge3/cli/signalling.py index 9cdccfb53c1e3..08b16a07ab45d 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/signalling.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/signalling.py @@ -23,15 +23,26 @@ from pathlib import Path import psutil -from lockfile.pidlockfile import ( - read_pid_from_pidfile, - remove_existing_pidfile, - write_pid_to_pidfile as write_pid, -) +from airflow.providers.edge3.version_compat import AIRFLOW_V_3_3_PLUS from airflow.utils import cli as cli_utils from airflow.utils.platform import IS_WINDOWS +if AIRFLOW_V_3_3_PLUS: + # Airflow 3.3.0 has dropped dependency to lockfile and vendored-in the package + from airflow.utils.pidfile import ( + read_pid_from_pidfile, + remove_existing_pidfile, + write_pid_to_pidfile as write_pid, + ) +else: + # For Airflow versions < 3.3.0, we need to import from lockfile package + from lockfile.pidlockfile import ( # type: ignore + read_pid_from_pidfile, + remove_existing_pidfile, + write_pid_to_pidfile as write_pid, + ) + EDGE_WORKER_PROCESS_NAME = "edge-worker" logger = logging.getLogger(__name__) diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 5eff1a9cac850..5960d1764ecf0 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -35,7 +35,6 @@ import anyio from aiofiles import open as aio_open from aiohttp import ClientResponseError -from lockfile.pidlockfile import remove_existing_pidfile from airflow import __version__ as airflow_version from airflow.providers.common.compat.sdk import conf, timezone @@ -59,10 +58,18 @@ EdgeWorkerState, EdgeWorkerVersionException, ) -from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS +from airflow.providers.edge3.version_compat import AIRFLOW_V_3_2_PLUS, AIRFLOW_V_3_3_PLUS from airflow.utils.net import getfqdn from airflow.utils.state import TaskInstanceState +if AIRFLOW_V_3_3_PLUS: + # Airflow 3.3.0 has dropped dependency to lockfile and vendored-in the package + from airflow.utils.pidfile import remove_existing_pidfile +else: + # For Airflow versions < 3.3.0, we need to import from lockfile package + from lockfile.pidlockfile import remove_existing_pidfile # type: ignore + + if TYPE_CHECKING: from airflow.configuration import AirflowConfigParser from airflow.executors.workloads import ExecuteTask diff --git a/providers/edge3/src/airflow/providers/edge3/version_compat.py b/providers/edge3/src/airflow/providers/edge3/version_compat.py index 61b31ae45a22d..0f3b2b445c15a 100644 --- a/providers/edge3/src/airflow/providers/edge3/version_compat.py +++ b/providers/edge3/src/airflow/providers/edge3/version_compat.py @@ -34,8 +34,10 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0) AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0) +AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0) __all__ = [ "AIRFLOW_V_3_1_PLUS", "AIRFLOW_V_3_2_PLUS", + "AIRFLOW_V_3_3_PLUS", ] diff --git a/scripts/ci/prek/known_airflow_core_utils_modules.txt b/scripts/ci/prek/known_airflow_core_utils_modules.txt index bdd3a2aa51fa5..13a982516b770 100644 --- a/scripts/ci/prek/known_airflow_core_utils_modules.txt +++ b/scripts/ci/prek/known_airflow_core_utils_modules.txt @@ -2,6 +2,7 @@ cli cli_action_loggers code_utils context +daemon dag_cycle_tester dag_edges dag_version_inflation_checker @@ -24,6 +25,7 @@ memray_utils net operator_helpers orm_event_handlers +pidfile platform process_utils providers_configuration_loader diff --git a/uv.lock b/uv.lock index 8a0ba2fa35452..a9641a99cc5fd 100644 --- a/uv.lock +++ b/uv.lock @@ -1825,7 +1825,6 @@ dependencies = [ { name = "lazy-object-proxy" }, { name = "libcst" }, { name = "linkify-it-py" }, - { name = "lockfile" }, { name = "methodtools" }, { name = "msgspec" }, { name = "natsort" }, @@ -1841,7 +1840,6 @@ dependencies = [ { name = "pygments" }, { name = "pygtrie" }, { name = "pyjwt" }, - { name = "python-daemon" }, { name = "python-dateutil" }, { name = "python-slugify" }, { name = "pyyaml" }, @@ -1962,7 +1960,6 @@ requires-dist = [ { name = "libcst", marker = "python_full_version < '3.14'", specifier = ">=1.8.2" }, { name = "libcst", marker = "python_full_version >= '3.14'", specifier = ">=1.8.6" }, { name = "linkify-it-py", specifier = ">=2.0.0" }, - { name = "lockfile", specifier = ">=0.12.2" }, { name = "memray", marker = "extra == 'memray'", specifier = ">=1.19.0" }, { name = "methodtools", specifier = ">=0.4.7" }, { name = "msgspec", specifier = ">=0.19.0" }, @@ -1981,7 +1978,6 @@ requires-dist = [ { name = "pygtrie", specifier = ">=2.5.0" }, { name = "pyjwt", specifier = ">=2.11.0" }, { name = "pykerberos", marker = "extra == 'kerberos'", specifier = ">=1.1.13" }, - { name = "python-daemon", specifier = ">=3.0.0" }, { name = "python-dateutil", specifier = ">=2.7.0" }, { name = "python-slugify", specifier = ">=5.0" }, { name = "pyyaml", specifier = ">=6.0.3" }, @@ -14690,15 +14686,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/6c/5327667e6dbe9e98cbfbd4261c8e91386a52e38f41419575854248bbab6a/litellm-1.82.6-py3-none-any.whl", hash = "sha256:164a3ef3e19f309e3cabc199bef3d2045212712fefdfa25fc7f75884a5b5b205", size = 15591595, upload-time = "2026-03-22T06:35:56.795Z" }, ] -[[package]] -name = "lockfile" -version = "0.12.2" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/17/47/72cb04a58a35ec495f96984dddb48232b551aafb95bde614605b754fe6f7/lockfile-0.12.2.tar.gz", hash = "sha256:6aed02de03cba24efabcd600b30540140634fc06cfa603822d508d5361e9f799", size = 20874, upload-time = "2015-11-25T18:29:58.279Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/c8/22/9460e311f340cb62d26a38c419b1381b8593b0bb6b5d1f056938b086d362/lockfile-0.12.2-py2.py3-none-any.whl", hash = "sha256:6c3cb24f344923d30b2785d5ad75182c8ea7ac1b6171b08657258ec7429d50fa", size = 13564, upload-time = "2015-11-25T18:29:51.462Z" }, -] - [[package]] name = "logfire-api" version = "4.32.1" @@ -18884,18 +18871,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/95/ddc25f7f3d8b6c9bfa615807e9cd2241148bcd18b53c6de465ed0e29426e/python_arango-8.3.2-py3-none-any.whl", hash = "sha256:0c42913b79928cf9a1941815c8c9169c74959cd1dac60395dde589b21e38cddf", size = 116291, upload-time = "2026-04-13T10:28:12.874Z" }, ] -[[package]] -name = "python-daemon" -version = "3.1.2" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "lockfile" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/3d/37/4f10e37bdabc058a32989da2daf29e57dc59dbc5395497f3d36d5f5e2694/python_daemon-3.1.2.tar.gz", hash = "sha256:f7b04335adc473de877f5117e26d5f1142f4c9f7cd765408f0877757be5afbf4", size = 71576, upload-time = "2024-12-03T08:41:07.843Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/45/3c/b88167e2d6785c0e781ee5d498b07472aeb9b6765da3b19e7cc9e0813841/python_daemon-3.1.2-py3-none-any.whl", hash = "sha256:b906833cef63502994ad48e2eab213259ed9bb18d54fa8774dcba2ff7864cec6", size = 30872, upload-time = "2024-12-03T08:41:03.322Z" }, -] - [[package]] name = "python-dateutil" version = "2.9.0.post0"