diff --git a/tests/unit/test_network_auth.py b/tests/unit/test_network_auth.py index 03d39c452a2..a81404507d0 100644 --- a/tests/unit/test_network_auth.py +++ b/tests/unit/test_network_auth.py @@ -1,6 +1,6 @@ import functools import sys -from typing import Any, Iterable, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple import pytest @@ -334,3 +334,140 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non url, allow_netrc=False, allow_keyring=True ) == (None, None) assert keyring_broken._call_count == 1 + + +class KeyringSubprocessResult(KeyringModuleV1): + """Represents the subprocess call to keyring""" + + returncode = 0 # Default to zero retcode + + def __call__( + self, + cmd: List[str], + *, + env: Dict[str, str], + stdin: Optional[Any] = None, + capture_output: Optional[bool] = None, + input: Optional[bytes] = None, + ) -> Any: + if cmd[1] == "get": + assert stdin == -3 # subprocess.DEVNULL + assert capture_output is True + assert env["PYTHONIOENCODING"] == "utf-8" + + password = self.get_password(*cmd[2:]) + if password is None: + self.returncode = 1 + else: + self.stdout = password.encode("utf-8") + b"\n" + + if cmd[1] == "set": + assert stdin is None + assert capture_output is None + assert env["PYTHONIOENCODING"] == "utf-8" + assert input is not None + + self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip("\n")) + + return self + + def check_returncode(self) -> None: + if self.returncode: + raise Exception() + + +@pytest.mark.parametrize( + "url, expect", + ( + ("http://example.com/path1", (None, None)), + # path1 URLs will be resolved by netloc + ("http://user@example.com/path1", ("user", "user!netloc")), + ("http://user2@example.com/path1", ("user2", "user2!netloc")), + # path2 URLs will be resolved by index URL + ("http://example.com/path2/path3", (None, None)), + ("http://foo@example.com/path2/path3", ("foo", "foo!url")), + ), +) +def test_keyring_cli_get_password( + monkeypatch: pytest.MonkeyPatch, + url: str, + expect: Tuple[Optional[str], Optional[str]], +) -> None: + monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") + monkeypatch.setattr( + pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult() + ) + auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"]) + + actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) + assert actual == expect + + +@pytest.mark.parametrize( + "response_status, creds, expect_save", + ( + (403, ("user", "pass", True), False), + ( + 200, + ("user", "pass", True), + True, + ), + ( + 200, + ("user", "pass", False), + False, + ), + ), +) +def test_keyring_cli_set_password( + monkeypatch: pytest.MonkeyPatch, + response_status: int, + creds: Tuple[str, str, bool], + expect_save: bool, +) -> None: + monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring") + keyring = KeyringSubprocessResult() + monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring) + auth = MultiDomainBasicAuth(prompting=True) + monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None)) + monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds) + if creds[2]: + # when _prompt_for_password indicates to save, we should save + def should_save_password_to_keyring(*a: Any) -> bool: + return True + + else: + # when _prompt_for_password indicates not to save, we should + # never call this function + def should_save_password_to_keyring(*a: Any) -> bool: + assert False, "_should_save_password_to_keyring should not be called" + + monkeypatch.setattr( + auth, "_should_save_password_to_keyring", should_save_password_to_keyring + ) + + req = MockRequest("https://example.com") + resp = MockResponse(b"") + resp.url = req.url + connection = MockConnection() + + def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse: + assert sent_req is req + assert "Authorization" in sent_req.headers + r = MockResponse(b"") + r.status_code = response_status + return r + + # https://github.com/python/mypy/issues/2427 + connection._send = _send # type: ignore[assignment] + + resp.request = req + resp.status_code = 401 + resp.connection = connection + + auth.handle_401(resp) + + if expect_save: + assert keyring.saved_passwords == [("example.com", creds[0], creds[1])] + else: + assert keyring.saved_passwords == []