Skip to content

Commit

Permalink
Fix job control for callable aliases (xonsh#4901)
Browse files Browse the repository at this point in the history
* Make job control task queue thread-local

This allows callable aliases to maintain a separate task queue from the
main thread.

* Make job control dictionary thread-local

Use XSH.all_jobs for the main thread and a separate dictionary for other
threads. This allows callable aliases to keep track of their jobs
separate from the main thread.

* Implement use_main_jobs() context manager

This allows threaded commands like jobs, disown, and bg to handle the
main thread's job control

* Run commands in the same process group if in a thread

* Fix tests that use jobs and tasks

* Add tests for subprocess call inside alias

* Add news

* Remove type declarations for _jobs_thread_local

Fixes the mypy error: "Type cannot be declared in assignment to non-self attribute"

Co-authored-by: Noorhteen Raja NJ <jnoortheen@gmail.com>
  • Loading branch information
yaxollum and jnoortheen committed Aug 3, 2022
1 parent 2d75729 commit 86e4f00
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 23 deletions.
24 changes: 24 additions & 0 deletions news/fix-callable-alias-subproc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
**Added:**

* <news item>

**Changed:**

* <news item>

**Deprecated:**

* <news item>

**Removed:**

* <news item>

**Fixed:**

* A callable alias containing subprocess commands no longer freezes when piped to another command
* ``less`` no longer stops when a callable alias containing subprocess commands is piped into it

**Security:**

* <news item>
52 changes: 52 additions & 0 deletions tests/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,58 @@ def _echo():
""",
0,
),
# test $() inside piped callable alias
(
r"""
def _callme(args):
result = $(python -c 'print("tree");print("car")')
print(result[::-1])
print('one\ntwo\nthree')
aliases['callme'] = _callme
callme | grep t
""",
"""eert
two
three
""",
0,
),
# test ![] inside piped callable alias
(
r"""
def _callme(args):
python -c 'print("tree");print("car")'
print('one\ntwo\nthree')
aliases['callme'] = _callme
callme | grep t
""",
"""tree
two
three
""",
0,
),
# test $[] inside piped callable alias
pytest.param(
(
r"""
def _callme(args):
$[python -c 'print("tree");print("car")']
print('one\ntwo\nthree')
aliases['callme'] = _callme
callme | grep t
""",
"""tree
two
three
""",
0,
),
marks=pytest.mark.xfail(reason="$[] does not send stdout through the pipe"),
),
]

if not ON_WINDOWS:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ def test_disown_completion(
}
all_jobs = {1: job, 2: job}

monkeypatch.setattr(xsh_with_aliases, "all_jobs", all_jobs)
monkeypatch.setattr(jobs, "tasks", [2, 1])
monkeypatch.setattr(jobs._jobs_thread_local, "jobs", all_jobs, raising=False)
monkeypatch.setattr(jobs._jobs_thread_local, "tasks", [2, 1], raising=False)
assert check_completer(args, prefix=prefix) == exp
4 changes: 2 additions & 2 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

@pytest.fixture(autouse=True)
def patched_events(monkeypatch, xonsh_events, xonsh_session):
from xonsh.jobs import tasks
from xonsh.jobs import get_tasks

tasks.clear()
get_tasks().clear()
# needed for ci tests
monkeypatch.setitem(
xonsh_session.env, "RAISE_SUBPROC_ERROR", False
Expand Down
99 changes: 82 additions & 17 deletions xonsh/jobs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Job control for the xonsh shell."""
import collections
import contextlib
import ctypes
import os
import signal
import subprocess
import sys
import threading
import time
import typing as tp

Expand All @@ -13,14 +15,69 @@
from xonsh.completers.tools import RichCompletion
from xonsh.lazyasd import LazyObject
from xonsh.platform import FD_STDERR, LIBC, ON_CYGWIN, ON_DARWIN, ON_MSYS, ON_WINDOWS
from xonsh.tools import unthreadable
from xonsh.tools import on_main_thread, unthreadable

# there is not much cost initing deque
tasks: tp.Deque[int] = collections.deque()
# Track time stamp of last exit command, so that two consecutive attempts to
# exit can kill all jobs and exit.
_last_exit_time: tp.Optional[float] = None

# Thread-local data for job control. Allows threadable callable aliases
# (ProcProxyThread) to maintain job control information separate from the main
# thread.
#
# _jobs_thread_local.tasks is a queue used to keep track of the tasks. On
# the main thread, it is set to _tasks_main.
#
# _jobs_thread_local.jobs is a dictionary used to keep track of the jobs.
# On the main thread, it is set to XSH.all_jobs.
_jobs_thread_local = threading.local()

# Task queue for the main thread
# The use_main_jobs context manager uses this variable to access the tasks on
# the main thread.
_tasks_main: tp.Deque[int] = collections.deque()


@contextlib.contextmanager
def use_main_jobs():
"""Context manager that replaces a thread's task queue and job dictionary
with those of the main thread
This allows another thread (e.g. the commands jobs, disown, and bg) to
handle the main thread's job control.
"""
old_tasks = get_tasks()
old_jobs = get_jobs()
try:
_jobs_thread_local.tasks = _tasks_main
_jobs_thread_local.jobs = XSH.all_jobs
yield
finally:
_jobs_thread_local.tasks = old_tasks
_jobs_thread_local.jobs = old_jobs


def get_tasks() -> tp.Deque[int]:
try:
return _jobs_thread_local.tasks
except AttributeError:
if on_main_thread():
_jobs_thread_local.tasks = _tasks_main
else:
_jobs_thread_local.tasks = collections.deque()
return _jobs_thread_local.tasks


def get_jobs() -> tp.Dict[int, tp.Dict]:
try:
return _jobs_thread_local.jobs
except AttributeError:
if on_main_thread():
_jobs_thread_local.jobs = XSH.all_jobs
else:
_jobs_thread_local.jobs = {}
return _jobs_thread_local.jobs


if ON_DARWIN:

Expand Down Expand Up @@ -246,6 +303,7 @@ def _safe_wait_for_active_job(last_task=None, backgrounded=False):

def get_next_task():
"""Get the next active task and put it on top of the queue"""
tasks = get_tasks()
_clear_dead_jobs()
selected_task = None
for tid in tasks:
Expand All @@ -261,25 +319,27 @@ def get_next_task():


def get_task(tid):
return XSH.all_jobs[tid]
return get_jobs()[tid]


def _clear_dead_jobs():
to_remove = set()
tasks = get_tasks()
for tid in tasks:
obj = get_task(tid)["obj"]
if obj is None or obj.poll() is not None:
to_remove.add(tid)
for job in to_remove:
tasks.remove(job)
del XSH.all_jobs[job]
del get_jobs()[job]


def format_job_string(num: int) -> str:
try:
job = XSH.all_jobs[num]
job = get_jobs()[num]
except KeyError:
return ""
tasks = get_tasks()
pos = "+" if tasks[0] == num else "-" if tasks[1] == num else " "
status = job["status"]
cmd = " ".join([" ".join(i) if isinstance(i, list) else i for i in job["cmds"]])
Expand All @@ -299,7 +359,7 @@ def get_next_job_number():
"""Get the lowest available unique job number (for the next job created)."""
_clear_dead_jobs()
i = 1
while i in XSH.all_jobs:
while i in get_jobs():
i += 1
return i

Expand All @@ -309,8 +369,8 @@ def add_job(info):
num = get_next_job_number()
info["started"] = time.time()
info["status"] = "running"
tasks.appendleft(num)
XSH.all_jobs[num] = info
get_tasks().appendleft(num)
get_jobs()[num] = info
if info["bg"] and XSH.env.get("XONSH_INTERACTIVE"):
print_one_job(num)

Expand All @@ -327,7 +387,7 @@ def clean_jobs():
if XSH.env["XONSH_INTERACTIVE"]:
_clear_dead_jobs()

if XSH.all_jobs:
if get_jobs():
global _last_exit_time
hist = XSH.history
if hist is not None and len(hist.tss) > 0:
Expand All @@ -342,7 +402,7 @@ def clean_jobs():
# unfinished jobs in this case.
hup_all_jobs()
else:
if len(XSH.all_jobs) > 1:
if len(get_jobs()) > 1:
msg = "there are unfinished jobs"
else:
msg = "there is an unfinished job"
Expand Down Expand Up @@ -372,18 +432,19 @@ def hup_all_jobs():
Send SIGHUP to all child processes (called when exiting xonsh).
"""
_clear_dead_jobs()
for job in XSH.all_jobs.values():
for job in get_jobs().values():
_hup(job)


@use_main_jobs()
def jobs(args, stdin=None, stdout=sys.stdout, stderr=None):
"""
xonsh command: jobs
Display a list of all current jobs.
"""
_clear_dead_jobs()
for j in tasks:
for j in get_tasks():
print_one_job(j, outfile=stdout)
return None, None

Expand All @@ -393,6 +454,7 @@ def resume_job(args, wording: tp.Literal["fg", "bg"]):
used by fg and bg to resume a job either in the foreground or in the background.
"""
_clear_dead_jobs()
tasks = get_tasks()
if len(tasks) == 0:
return "", "There are currently no suspended jobs"

Expand All @@ -409,7 +471,7 @@ def resume_job(args, wording: tp.Literal["fg", "bg"]):
except (ValueError, IndexError):
return "", f"Invalid job: {args[0]}\n"

if tid not in XSH.all_jobs:
if tid not in get_jobs():
return "", f"Invalid job: {args[0]}\n"
else:
return "", f"{wording} expects 0 or 1 arguments, not {len(args)}\n"
Expand Down Expand Up @@ -441,6 +503,7 @@ def fg(args, stdin=None):
return resume_job(args, wording="fg")


@use_main_jobs()
def bg(args, stdin=None):
"""xonsh command: bg
Expand All @@ -449,7 +512,7 @@ def bg(args, stdin=None):
"""
res = resume_job(args, wording="bg")
if res is None:
curtask = get_task(tasks[0])
curtask = get_task(get_tasks()[0])
curtask["bg"] = True
_continue(curtask)
else:
Expand All @@ -458,10 +521,11 @@ def bg(args, stdin=None):

def job_id_completer(xsh, **_):
"""Return currently running jobs ids"""
for job_id in xsh.all_jobs:
for job_id in get_jobs():
yield RichCompletion(str(job_id), description=format_job_string(job_id))


@use_main_jobs()
def disown_fn(
job_ids: Annotated[
tp.Sequence[int], Arg(type=int, nargs="*", completer=job_id_completer)
Expand All @@ -484,6 +548,7 @@ def disown_fn(
Automatically continue stopped jobs when they are disowned, equivalent to setting $AUTO_CONTINUE=True
"""

tasks = get_tasks()
if len(tasks) == 0:
return "", "There are no active jobs"

Expand All @@ -507,7 +572,7 @@ def disown_fn(

# Stop tracking this task
tasks.remove(tid)
del XSH.all_jobs[tid]
del get_jobs()[tid]
messages.append(f"Removed job {tid} ({current_task['status']})")

if messages:
Expand Down
7 changes: 7 additions & 0 deletions xonsh/procs/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ def __init__(self, specs):

background = self.spec.background
pipeline_group = None
if xp.ON_POSIX and not xt.on_main_thread():
# If we are inside a ProcProxyThread, then run commands in the same
# process group as xonsh. This fixes case 2 of issue #4277, where
# the terminal is given to a command inside the ProcProxyThread,
# taking the terminal away from the `less` command, causing `less`
# to stop.
pipeline_group = os.getpgid(0)
for spec in specs:
if self.starttime is None:
self.starttime = time.time()
Expand Down
4 changes: 2 additions & 2 deletions xonsh/pytest/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from xonsh.completer import Completer
from xonsh.events import events
from xonsh.execer import Execer
from xonsh.jobs import tasks
from xonsh.jobs import get_tasks
from xonsh.main import setup
from xonsh.parsers.completion_context import CompletionContextParser

Expand Down Expand Up @@ -228,7 +228,7 @@ def xonsh_session(xonsh_events, session_execer, os_env, monkeypatch):
)
yield XSH
XSH.unload()
tasks.clear() # must to this to enable resetting all_jobs
get_tasks().clear() # must do this to enable resetting all_jobs


@pytest.fixture
Expand Down

0 comments on commit 86e4f00

Please sign in to comment.