diff --git a/news/12653.feature.rst b/news/12653.feature.rst new file mode 100644 index 00000000000..83e1a46e6a8 --- /dev/null +++ b/news/12653.feature.rst @@ -0,0 +1,2 @@ +Detect recursively referencing requirements files and help users identify +the source. diff --git a/src/pip/_internal/req/req_file.py b/src/pip/_internal/req/req_file.py index 53ad8674cd8..55c0726f370 100644 --- a/src/pip/_internal/req/req_file.py +++ b/src/pip/_internal/req/req_file.py @@ -324,11 +324,15 @@ def __init__( ) -> None: self._session = session self._line_parser = line_parser + self._parsed_files: dict[str, Optional[str]] = {} def parse( self, filename: str, constraint: bool ) -> Generator[ParsedLine, None, None]: """Parse a given file, yielding parsed lines.""" + self._parsed_files[os.path.abspath(filename)] = ( + None # The primary requirements file passed + ) yield from self._parse_and_recurse(filename, constraint) def _parse_and_recurse( @@ -353,11 +357,25 @@ def _parse_and_recurse( # original file and nested file are paths elif not SCHEME_RE.search(req_path): # do a join so relative paths work - req_path = os.path.join( - os.path.dirname(filename), - req_path, + # and then abspath so that we can identify recursive references + req_path = os.path.abspath( + os.path.join( + os.path.dirname(filename), + req_path, + ) ) - + if req_path in self._parsed_files: + initial_file = self._parsed_files[req_path] + tail = ( + f" and again in {initial_file}" + if initial_file is not None + else "" + ) + raise RequirementsFileParseError( + f"{req_path} recursively references itself in {filename}{tail}" + ) + # Keeping a track where was each file first included in + self._parsed_files[req_path] = filename yield from self._parse_and_recurse(req_path, nested_constraint) else: yield line diff --git a/tests/unit/test_req_file.py b/tests/unit/test_req_file.py index 236b666fb34..1ddccdee58e 100644 --- a/tests/unit/test_req_file.py +++ b/tests/unit/test_req_file.py @@ -1,6 +1,7 @@ import collections import logging import os +import re import textwrap from optparse import Values from pathlib import Path @@ -345,6 +346,63 @@ def test_nested_constraints_file( assert reqs[0].name == req_name assert reqs[0].constraint + def test_recursive_requirements_file( + self, tmpdir: Path, session: PipSession + ) -> None: + req_files: list[Path] = [] + req_file_count = 4 + for i in range(req_file_count): + req_file = tmpdir / f"{i}.txt" + req_file.write_text(f"-r {(i+1) % req_file_count}.txt") + req_files.append(req_file) + + # When the passed requirements file recursively references itself + with pytest.raises( + RequirementsFileParseError, + match=( + f"{re.escape(str(req_files[0]))} recursively references itself" + f" in {re.escape(str(req_files[req_file_count - 1]))}" + ), + ): + list(parse_requirements(filename=str(req_files[0]), session=session)) + + # When one of other the requirements file recursively references itself + req_files[req_file_count - 1].write_text( + # Just name since they are in the same folder + f"-r {req_files[req_file_count - 2].name}" + ) + with pytest.raises( + RequirementsFileParseError, + match=( + f"{re.escape(str(req_files[req_file_count - 2]))} recursively" + " references itself in" + f" {re.escape(str(req_files[req_file_count - 1]))} and again in" + f" {re.escape(str(req_files[req_file_count - 3]))}" + ), + ): + list(parse_requirements(filename=str(req_files[0]), session=session)) + + def test_recursive_relative_requirements_file( + self, tmpdir: Path, session: PipSession + ) -> None: + root_req_file = tmpdir / "root.txt" + (tmpdir / "nest" / "nest").mkdir(parents=True) + level_1_req_file = tmpdir / "nest" / "level_1.txt" + level_2_req_file = tmpdir / "nest" / "nest" / "level_2.txt" + + root_req_file.write_text("-r nest/level_1.txt") + level_1_req_file.write_text("-r nest/level_2.txt") + level_2_req_file.write_text("-r ../../root.txt") + + with pytest.raises( + RequirementsFileParseError, + match=( + f"{re.escape(str(root_req_file))} recursively references itself in" + f" {re.escape(str(level_2_req_file))}" + ), + ): + list(parse_requirements(filename=str(root_req_file), session=session)) + def test_options_on_a_requirement_line(self, line_processor: LineProcessor) -> None: line = ( 'SomeProject --global-option="yo3" --global-option "yo4" '