Skip to content

Commit

Permalink
DagFileProcessorManager: Start a new process group only if current pr…
Browse files Browse the repository at this point in the history
…ocess not a session leader (#23872)

(cherry picked from commit 9216489)
  • Loading branch information
Taragolis authored and ephraimbuddy committed May 27, 2022
1 parent 0682ac5 commit 71d5eba
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
9 changes: 6 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
from airflow.utils.process_utils import (
kill_child_processes_by_pids,
reap_process_group,
set_new_process_group,
)
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks

Expand Down Expand Up @@ -471,8 +475,7 @@ def start(self):
"""
self.register_exit_signals()

# Start a new process group
os.setpgid(0, 0)
set_new_process_group()

self.log.info("Processing files using up to %s processes at a time ", self._parallelism)
self.log.info("Process each file at most once every %s seconds", self._file_process_interval)
Expand Down
15 changes: 15 additions & 0 deletions airflow/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,18 @@ def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
except psutil.NoSuchProcess:
# If process is dead remove the pidfile
pid_lock_file.break_lock()


def set_new_process_group() -> None:
"""
Tries to set current process to a new process group
That makes it easy to kill all sub-process of this at the OS-level,
rather than having to iterate the child processes.
If current process spawn by system call ``exec()`` than keep current process group
"""

if os.getpid() == os.getsid(0):
# If PID = SID than process a session leader, and it is not possible to change process group
return

os.setpgid(0, 0)
24 changes: 23 additions & 1 deletion tests/utils/test_process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@

from airflow.exceptions import AirflowException
from airflow.utils import process_utils
from airflow.utils.process_utils import check_if_pidfile_process_is_running, execute_in_subprocess
from airflow.utils.process_utils import (
check_if_pidfile_process_is_running,
execute_in_subprocess,
set_new_process_group,
)


class TestReapProcessGroup(unittest.TestCase):
Expand Down Expand Up @@ -212,3 +216,21 @@ def test_raise_error_if_process_is_running(self):
f.flush()
with pytest.raises(AirflowException, match="is already running under PID"):
check_if_pidfile_process_is_running(f.name, process_name="test")


class TestSetNewProcessGroup(unittest.TestCase):
@mock.patch("os.setpgid")
def test_not_session_leader(self, mock_set_pid):
pid = os.getpid()
with mock.patch('os.getsid', autospec=True) as mock_get_sid:
mock_get_sid.return_value = pid + 1
set_new_process_group()
assert mock_set_pid.call_count == 1

@mock.patch("os.setpgid")
def test_session_leader(self, mock_set_pid):
pid = os.getpid()
with mock.patch('os.getsid', autospec=True) as mock_get_sid:
mock_get_sid.return_value = pid
set_new_process_group()
assert mock_set_pid.call_count == 0

0 comments on commit 71d5eba

Please sign in to comment.