diff --git a/tests/test_utils.py b/tests/test_utils.py index 4ff31392..6ec178b9 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -150,6 +150,31 @@ def test_get_repository_config_missing(config_file): assert utils.get_repository_from_config(config_file, "pypi") == exp +def test_get_repository_config_url_with_auth(config_file): + repository_url = "https://user:pass@notexisting.python.org/pypi" + exp = { + "repository": "https://notexisting.python.org/pypi", + "username": "user", + "password": "pass", + } + assert utils.get_repository_from_config(config_file, "foo", repository_url) == exp + assert utils.get_repository_from_config(config_file, "pypi", repository_url) == exp + + +@pytest.mark.parametrize( + "input_url, expected_url", + [ + ("https://upload.pypi.org/legacy/", "https://upload.pypi.org/legacy/"), + ( + "https://user:pass@upload.pypi.org/legacy/", + "https://********@upload.pypi.org/legacy/", + ), + ], +) +def test_sanitize_url(input_url: str, expected_url: str) -> None: + assert utils.sanitize_url(input_url) == expected_url + + @pytest.mark.parametrize( "repo_url, message", [ diff --git a/twine/commands/upload.py b/twine/commands/upload.py index 842ac859..0f4f3633 100644 --- a/twine/commands/upload.py +++ b/twine/commands/upload.py @@ -17,7 +17,6 @@ import fnmatch import logging import os.path -import re from typing import Dict, List, NamedTuple, cast import requests @@ -149,27 +148,6 @@ def _split_inputs( return Inputs(dists, signatures, attestations_by_dist) -def _sanitize_url(url: str) -> str: - """Sanitize a URL. - - Sanitize URLs, removing any user:password combinations and replacing them with - asterisks. Returns the original URL if the string is a non-matching pattern. - - :param url: - str containing a URL to sanitize. - - return: - str either sanitized or as entered depending on pattern match. - """ - pattern = r"(.*https?://)(\w+:\w+)@(\w+\..*)" - m = re.match(pattern, url) - if m: - newurl = f"{m.group(1)}*****:*****@{m.group(3)}" - return newurl - else: - return url - - def upload(upload_settings: settings.Settings, dists: List[str]) -> None: """Upload one or more distributions to a repository, and display the progress. @@ -211,7 +189,7 @@ def upload(upload_settings: settings.Settings, dists: List[str]) -> None: # Determine if the user has passed in pre-signed distributions or any attestations. uploads, signatures, attestations_by_dist = _split_inputs(dists) - print(f"Uploading distributions to {_sanitize_url(repository_url)}") + print(f"Uploading distributions to {utils.sanitize_url(repository_url)}") packages_to_upload = [ _make_package( @@ -272,8 +250,8 @@ def upload(upload_settings: settings.Settings, dists: List[str]) -> None: # redirects as well. if resp.is_redirect: raise exceptions.RedirectDetected.from_args( - repository_url, - resp.headers["location"], + utils.sanitize_url(repository_url), + utils.sanitize_url(resp.headers["location"]), ) if skip_upload(resp, upload_settings.skip_existing, package): diff --git a/twine/utils.py b/twine/utils.py index 484a0234..05739bc4 100644 --- a/twine/utils.py +++ b/twine/utils.py @@ -100,6 +100,24 @@ def get_config(path: str) -> Dict[str, RepositoryConfig]: return dict(config) +def sanitize_url(url: str) -> str: + """Sanitize a URL. + + Sanitize URLs, removing any user:password combinations and replacing them with + asterisks. Returns the original URL if the string is a non-matching pattern. + + :param url: + str containing a URL to sanitize. + + return: + str either sanitized or as entered depending on pattern match. + """ + uri = rfc3986.urlparse(url) + if uri.userinfo: + return cast(str, uri.copy_with(userinfo="*" * 8).unsplit()) + return url + + def _validate_repository_url(repository_url: str) -> None: """Validate the given url for allowed schemes and components.""" # Allowed schemes are http and https, based on whether the repository @@ -126,11 +144,7 @@ def get_repository_from_config( # Prefer CLI `repository_url` over `repository` or .pypirc if repository_url: _validate_repository_url(repository_url) - return { - "repository": repository_url, - "username": None, - "password": None, - } + return _config_from_repository_url(repository_url) try: config = get_config(config_file)[repository] @@ -154,6 +168,17 @@ def get_repository_from_config( } +def _config_from_repository_url(url: str) -> RepositoryConfig: + parsed = urlparse(url) + config = {"repository": url, "username": None, "password": None} + if parsed.username: + config["username"] = parsed.username + config["password"] = parsed.password + config["repository"] = urlunparse((parsed.scheme, parsed.hostname) + parsed[2:]) + config["repository"] = normalize_repository_url(cast(str, config["repository"])) + return config + + def normalize_repository_url(url: str) -> str: parsed = urlparse(url) if parsed.netloc in _HOSTNAMES: