diff --git a/news/11589.feature.rst b/news/11589.feature.rst new file mode 100644 index 00000000000..d01a564b631 --- /dev/null +++ b/news/11589.feature.rst @@ -0,0 +1,2 @@ +Enable the use of ``keyring`` found on ``PATH``. This allows ``keyring`` +installed using ``pipx`` to be used by ``pip``. diff --git a/src/pip/_internal/network/auth.py b/src/pip/_internal/network/auth.py index ca42798bd95..241ddc53a9c 100644 --- a/src/pip/_internal/network/auth.py +++ b/src/pip/_internal/network/auth.py @@ -4,8 +4,12 @@ providing credentials in the context of network requests. """ +import os +import shutil +import subprocess import urllib.parse -from typing import Any, Dict, List, Optional, Tuple +from abc import ABC, abstractmethod +from typing import Any, Dict, List, NamedTuple, Optional, Tuple from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth from pip._vendor.requests.models import Request, Response @@ -23,51 +27,165 @@ logger = getLogger(__name__) -Credentials = Tuple[str, str, str] +KEYRING_DISABLED = False -try: - import keyring -except ImportError: - keyring = None # type: ignore[assignment] -except Exception as exc: - logger.warning( - "Keyring is skipped due to an exception: %s", - str(exc), - ) - keyring = None # type: ignore[assignment] +class Credentials(NamedTuple): + url: str + username: str + password: str -def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]: - """Return the tuple auth for a given url from keyring.""" - global keyring - if not url or not keyring: + +class KeyRingBaseProvider(ABC): + """Keyring base provider interface""" + + @abstractmethod + def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: + ... + + @abstractmethod + def save_auth_info(self, url: str, username: str, password: str) -> None: + ... + + +class KeyRingNullProvider(KeyRingBaseProvider): + """Keyring null provider""" + + def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: return None - try: - try: - get_credential = keyring.get_credential - except AttributeError: - pass - else: + def save_auth_info(self, url: str, username: str, password: str) -> None: + return None + + +class KeyRingPythonProvider(KeyRingBaseProvider): + """Keyring interface which uses locally imported `keyring`""" + + def __init__(self) -> None: + import keyring + + self.keyring = keyring + + def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: + # Support keyring's get_credential interface which supports getting + # credentials without a username. This is only available for + # keyring>=15.2.0. + if hasattr(self.keyring, "get_credential"): logger.debug("Getting credentials from keyring for %s", url) - cred = get_credential(url, username) + cred = self.keyring.get_credential(url, username) if cred is not None: return cred.username, cred.password return None - if username: + if username is not None: logger.debug("Getting password from keyring for %s", url) - password = keyring.get_password(url, username) + password = self.keyring.get_password(url, username) if password: return username, password + return None + + def save_auth_info(self, url: str, username: str, password: str) -> None: + self.keyring.set_password(url, username, password) + + +class KeyRingCliProvider(KeyRingBaseProvider): + """Provider which uses `keyring` cli + + Instead of calling the keyring package installed alongside pip + we call keyring on the command line which will enable pip to + use which ever installation of keyring is available first in + PATH. + """ + + def __init__(self, cmd: str) -> None: + self.keyring = cmd + + def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: + # This is the default implementation of keyring.get_credential + # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139 + if username is not None: + password = self._get_password(url, username) + if password is not None: + return username, password + return None + def save_auth_info(self, url: str, username: str, password: str) -> None: + return self._set_password(url, username, password) + + def _get_password(self, service_name: str, username: str) -> Optional[str]: + """Mirror the implemenation of keyring.get_password using cli""" + if self.keyring is None: + return None + + cmd = [self.keyring, "get", service_name, username] + env = os.environ.copy() + env["PYTHONIOENCODING"] = "utf-8" + res = subprocess.run( + cmd, + stdin=subprocess.DEVNULL, + capture_output=True, + env=env, + ) + if res.returncode: + return None + return res.stdout.decode("utf-8").strip("\n") + + def _set_password(self, service_name: str, username: str, password: str) -> None: + """Mirror the implemenation of keyring.set_password using cli""" + if self.keyring is None: + return None + + cmd = [self.keyring, "set", service_name, username] + input_ = password.encode("utf-8") + b"\n" + env = os.environ.copy() + env["PYTHONIOENCODING"] = "utf-8" + res = subprocess.run(cmd, input=input_, env=env) + res.check_returncode() + return None + + +def get_keyring_provider() -> KeyRingBaseProvider: + # keyring has previously failed and been disabled + if not KEYRING_DISABLED: + # Default to trying to use Python provider + try: + return KeyRingPythonProvider() + except ImportError: + pass + except Exception as exc: + # In the event of an unexpected exception + # we should warn the user + logger.warning( + "Installed copy of keyring fails with exception %s, " + "trying to find a keyring executable as a fallback", + str(exc), + ) + + # Fallback to Cli Provider if `keyring` isn't installed + cli = shutil.which("keyring") + if cli: + return KeyRingCliProvider(cli) + + return KeyRingNullProvider() + + +def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]: + """Return the tuple auth for a given url from keyring.""" + # Do nothing if no url was provided + if not url: + return None + + keyring = get_keyring_provider() + try: + return keyring.get_auth_info(url, username) except Exception as exc: logger.warning( "Keyring is skipped due to an exception: %s", str(exc), ) - keyring = None # type: ignore[assignment] - return None + global KEYRING_DISABLED + KEYRING_DISABLED = True + return None class MultiDomainBasicAuth(AuthBase): @@ -241,7 +359,7 @@ def _prompt_for_password( # Factored out to allow for easy patching in tests def _should_save_password_to_keyring(self) -> bool: - if not keyring: + if get_keyring_provider() is None: return False return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" @@ -276,7 +394,11 @@ def handle_401(self, resp: Response, **kwargs: Any) -> Response: # Prompt to save the password to keyring if save and self._should_save_password_to_keyring(): - self._credentials_to_save = (parsed.netloc, username, password) + self._credentials_to_save = Credentials( + url=parsed.netloc, + username=username, + password=password, + ) # Consume content and release the original connection to allow our new # request to reuse the same one. @@ -309,15 +431,16 @@ def warn_on_401(self, resp: Response, **kwargs: Any) -> None: def save_credentials(self, resp: Response, **kwargs: Any) -> None: """Response callback to save credentials on success.""" - assert keyring is not None, "should never reach here without keyring" - if not keyring: - return + keyring = get_keyring_provider() + assert not isinstance( + keyring, KeyRingNullProvider + ), "should never reach here without keyring" creds = self._credentials_to_save self._credentials_to_save = None if creds and resp.status_code < 400: try: logger.info("Saving credentials to keyring") - keyring.set_password(*creds) + keyring.save_auth_info(creds.url, creds.username, creds.password) except Exception: logger.exception("Failed to save credentials") diff --git a/tests/unit/test_network_auth.py b/tests/unit/test_network_auth.py index 5c0e5746281..625a20a48f5 100644 --- a/tests/unit/test_network_auth.py +++ b/tests/unit/test_network_auth.py @@ -1,5 +1,6 @@ import functools -from typing import Any, List, Optional, Tuple +import sys +from typing import Any, Dict, Iterable, List, Optional, Tuple import pytest @@ -8,6 +9,13 @@ from tests.lib.requests_mocks import MockConnection, MockRequest, MockResponse +@pytest.fixture(scope="function", autouse=True) +def reset_keyring() -> Iterable[None]: + yield None + # Reset the state of the module between tests + pip._internal.network.auth.KEYRING_DISABLED = False + + @pytest.mark.parametrize( ["input_url", "url", "username", "password"], [ @@ -138,7 +146,7 @@ def test_keyring_get_password( expect: Tuple[Optional[str], Optional[str]], ) -> None: keyring = KeyringModuleV1() - monkeypatch.setattr("pip._internal.network.auth.keyring", keyring) + monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) @@ -147,7 +155,7 @@ def test_keyring_get_password( def test_keyring_get_password_after_prompt(monkeypatch: pytest.MonkeyPatch) -> None: keyring = KeyringModuleV1() - monkeypatch.setattr("pip._internal.network.auth.keyring", keyring) + monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] auth = MultiDomainBasicAuth() def ask_input(prompt: str) -> str: @@ -163,7 +171,7 @@ def test_keyring_get_password_after_prompt_when_none( monkeypatch: pytest.MonkeyPatch, ) -> None: keyring = KeyringModuleV1() - monkeypatch.setattr("pip._internal.network.auth.keyring", keyring) + monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] auth = MultiDomainBasicAuth() def ask_input(prompt: str) -> str: @@ -184,7 +192,7 @@ def test_keyring_get_password_username_in_index( monkeypatch: pytest.MonkeyPatch, ) -> None: keyring = KeyringModuleV1() - monkeypatch.setattr("pip._internal.network.auth.keyring", keyring) + monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] auth = MultiDomainBasicAuth(index_urls=["http://user@example.com/path2"]) get = functools.partial( auth._get_new_credentials, allow_netrc=False, allow_keyring=True @@ -217,7 +225,7 @@ def test_keyring_set_password( expect_save: bool, ) -> None: keyring = KeyringModuleV1() - monkeypatch.setattr("pip._internal.network.auth.keyring", keyring) + monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc] auth = MultiDomainBasicAuth(prompting=True) monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) @@ -293,7 +301,7 @@ def get_credential(self, system: str, username: str) -> Optional[Credential]: def test_keyring_get_credential( monkeypatch: pytest.MonkeyPatch, url: str, expect: str ) -> None: - monkeypatch.setattr(pip._internal.network.auth, "keyring", KeyringModuleV2()) + monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2()) # type: ignore[misc] auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) assert ( @@ -314,7 +322,7 @@ def get_credential(self, system: str, username: str) -> None: def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> None: keyring_broken = KeyringModuleBroken() - monkeypatch.setattr(pip._internal.network.auth, "keyring", keyring_broken) + monkeypatch.setitem(sys.modules, "keyring", keyring_broken) # type: ignore[misc] auth = MultiDomainBasicAuth(index_urls=["http://example.com/"]) @@ -325,3 +333,143 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non url, allow_netrc=False, allow_keyring=True ) == (None, None) assert keyring_broken._call_count == 1 + + +class KeyringSubprocessResult(KeyringModuleV1): + """Represents the subprocess call to keyring""" + + returncode = 0 # Default to zero retcode + + def __call__( + self, + cmd: List[str], + *, + env: Dict[str, str], + stdin: Optional[Any] = None, + capture_output: Optional[bool] = None, + input: Optional[bytes] = None, + ) -> Any: + if cmd[1] == "get": + assert stdin == -3 # subprocess.DEVNULL + assert capture_output is True + assert env["PYTHONIOENCODING"] == "utf-8" + + password = self.get_password(*cmd[2:]) + if password is None: + # Expect non-zero returncode if no password present + self.returncode = 1 + else: + # Passwords are returned encoded with a newline appended + self.stdout = password.encode("utf-8") + b"\n" + + if cmd[1] == "set": + assert stdin is None + assert capture_output is None + assert env["PYTHONIOENCODING"] == "utf-8" + assert input is not None + + # Input from stdin is encoded + self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip("\n")) + + return self + + def check_returncode(self) -> None: + if self.returncode: + raise Exception() + + +@pytest.mark.parametrize( + "url, expect", + ( + ("http://example.com/path1", (None, None)), + # path1 URLs will be resolved by netloc + ("http://user@example.com/path1", ("user", "user!netloc")), + ("http://user2@example.com/path1", ("user2", "user2!netloc")), + # path2 URLs will be resolved by index URL + ("http://example.com/path2/path3", (None, None)), + ("http://foo@example.com/path2/path3", ("foo", "foo!url")), + ), +) +def test_keyring_cli_get_password( + monkeypatch: pytest.MonkeyPatch, + url: str, + expect: Tuple[Optional[str], Optional[str]], +) -> None: + monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") + monkeypatch.setattr( + pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult() + ) + auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) + + actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) + assert actual == expect + + +@pytest.mark.parametrize( + "response_status, creds, expect_save", + ( + (403, ("user", "pass", True), False), + ( + 200, + ("user", "pass", True), + True, + ), + ( + 200, + ("user", "pass", False), + False, + ), + ), +) +def test_keyring_cli_set_password( + monkeypatch: pytest.MonkeyPatch, + response_status: int, + creds: Tuple[str, str, bool], + expect_save: bool, +) -> None: + monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") + keyring = KeyringSubprocessResult() + monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring) + auth = MultiDomainBasicAuth(prompting=True) + monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) + monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) + if creds[2]: + # when _prompt_for_password indicates to save, we should save + def should_save_password_to_keyring(*a: Any) -> bool: + return True + + else: + # when _prompt_for_password indicates not to save, we should + # never call this function + def should_save_password_to_keyring(*a: Any) -> bool: + assert False, "_should_save_password_to_keyring should not be called" + + monkeypatch.setattr( + auth, "_should_save_password_to_keyring", should_save_password_to_keyring + ) + + req = MockRequest("https://example.com") + resp = MockResponse(b"") + resp.url = req.url + connection = MockConnection() + + def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse: + assert sent_req is req + assert "Authorization" in sent_req.headers + r = MockResponse(b"") + r.status_code = response_status + return r + + # https://github.com/python/mypy/issues/2427 + connection._send = _send # type: ignore[assignment] + + resp.request = req + resp.status_code = 401 + resp.connection = connection + + auth.handle_401(resp) + + if expect_save: + assert keyring.saved_passwords == [("example.com", creds[0], creds[1])] + else: + assert keyring.saved_passwords == []