From 1f8bfb394d67e1f9a632916c444b60cbcebbb896 Mon Sep 17 00:00:00 2001 From: jrobbins-LiveData Date: Sun, 5 Dec 2021 12:45:32 -0500 Subject: [PATCH] ctypes-based, separate class, shard via Attributes --- .../{Windows.py => Windows/__init__.py} | 110 ++++--- keyring/backends/Windows/api.py | 273 ++++++++++++++++++ keyring/testing/backend.py | 4 + setup.cfg | 1 - tests/backends/test_Windows.py | 122 +++++--- 5 files changed, 435 insertions(+), 75 deletions(-) rename keyring/backends/{Windows.py => Windows/__init__.py} (60%) create mode 100644 keyring/backends/Windows/api.py diff --git a/keyring/backends/Windows.py b/keyring/backends/Windows/__init__.py similarity index 60% rename from keyring/backends/Windows.py rename to keyring/backends/Windows/__init__.py index 9b6a189f..d5dadb8c 100644 --- a/keyring/backends/Windows.py +++ b/keyring/backends/Windows/__init__.py @@ -1,26 +1,13 @@ import logging -from ..util import properties -from ..backend import KeyringBackend -from ..credentials import SimpleCredential -from ..errors import PasswordDeleteError, ExceptionRaisedContext +from ...util import properties +from ...backend import KeyringBackend +from ...credentials import SimpleCredential +from ...errors import PasswordDeleteError, ExceptionRaisedContext with ExceptionRaisedContext() as missing_deps: - try: - # prefer pywin32-ctypes - from win32ctypes.pywin32 import pywintypes - from win32ctypes.pywin32 import win32cred - - # force demand import to raise ImportError - win32cred.__name__ - except ImportError: - # fallback to pywin32 - import pywintypes - import win32cred - - # force demand import to raise ImportError - win32cred.__name__ + from . import api as win32cred log = logging.getLogger(__name__) @@ -48,7 +35,12 @@ def value(self): """ Attempt to decode the credential blob as UTF-16 then UTF-8. """ + # If the credential blob was already decoded, e.g. + # by CredReadFromAttributes, simply return it. cred = self['CredentialBlob'] + if isinstance(cred, str): + return cred + try: return cred.decode('utf-16') except UnicodeDecodeError: @@ -65,7 +57,7 @@ class WinVaultKeyring(KeyringBackend): WinVaultKeyring stores encrypted passwords using the Windows Credential Manager. - Requires pywin32 + Requires Windows This backend does some gymnastics to simulate multi-user support, which WinVault doesn't support natively. See @@ -87,7 +79,7 @@ def priority(cls): If available, the preferred backend on Windows. """ if missing_deps: - raise RuntimeError("Requires Windows and pywin32") + raise RuntimeError("Requires Windows") return 5 @staticmethod @@ -109,26 +101,32 @@ def _get_password(self, target): res = win32cred.CredRead( Type=win32cred.CRED_TYPE_GENERIC, TargetName=target ) - except pywintypes.error as e: - if e.winerror == 1168 and e.funcname == 'CredRead': # not found + except OSError as e: + if e.winerror == 1168: # not found return None raise + return DecodingCredential(res) - def set_password(self, service, username, password): + def set_password(self, service, username, password, encoding='utf-16-le'): existing_pw = self._get_password(service) if existing_pw: - # resave the existing password using a compound target existing_username = existing_pw['UserName'] - target = self._compound_name(existing_username, service) - self._set_password( - target, - existing_username, - existing_pw.value, - ) - self._set_password(service, username, str(password)) - - def _set_password(self, target, username, password): + # resave the existing password using a compound target + # Fixes part of https://github.com/jaraco/keyring/issues/545, + # but get_credentials also needs to be fixed to search in the same + # order as get_password. + if existing_username != username: + target = self._compound_name(existing_username, service) + self._set_password( + target, + existing_username, + existing_pw.value, + encoding=encoding, + ) + self._set_password(service, username, str(password), encoding=encoding) + + def _set_password(self, target, username, password, encoding): credential = dict( Type=win32cred.CRED_TYPE_GENERIC, TargetName=target, @@ -137,7 +135,7 @@ def _set_password(self, target, username, password): Comment="Stored using python-keyring", Persist=self.persist, ) - win32cred.CredWrite(credential, 0) + win32cred.CredWrite(credential, 0, encoding=encoding) def delete_password(self, service, username): compound = self._compound_name(username, service) @@ -153,11 +151,13 @@ def delete_password(self, service, username): def _delete_password(self, target): try: win32cred.CredDelete(Type=win32cred.CRED_TYPE_GENERIC, TargetName=target) - except pywintypes.error as e: - if e.winerror == 1168 and e.funcname == 'CredDelete': # not found + except OSError as e: + if e.winerror == 1168: # not found return raise + # TODO https://github.com/jaraco/keyring/issues/545 + # check non-compound_name first? def get_credential(self, service, username): res = None # get the credentials associated with the provided username @@ -169,3 +169,41 @@ def get_credential(self, service, username): if not res: return None return SimpleCredential(res['UserName'], res.value) + + +class WinVaultAttributesKeyring(WinVaultKeyring): + def _get_password(self, target): + try: + res = win32cred.CredRead( + Type=win32cred.CRED_TYPE_GENERIC, TargetName=target + ) + except OSError as e: + if e.winerror == 1168: # not found + return None + raise + + # check if possibly a python-keyring sharded password + if res['CredentialBlobSize'] == 0: + res = win32cred.CredReadFromAttributes( + Type=win32cred.CRED_TYPE_GENERIC, TargetName=target, Credential=res + ) + + return DecodingCredential(res) + + def _set_password(self, target, username, password, encoding): + credential = dict( + Type=win32cred.CRED_TYPE_GENERIC, + TargetName=target, + UserName=username, + CredentialBlob=password, + Comment="Stored using python-keyring", + Persist=self.persist, + ) + try: + win32cred.CredWrite(credential, 0, encoding=encoding) + except OSError as e: + if e.winerror == 1783: # The stub received bad data. + # This means that the encoded password is too big to store + # in the CredentialBlob field. So try to store it sharded + # across up to 64 Attributes records (256 bytes each) + win32cred.CredWriteToAttributes(credential, 0) diff --git a/keyring/backends/Windows/api.py b/keyring/backends/Windows/api.py new file mode 100644 index 00000000..e37f40f8 --- /dev/null +++ b/keyring/backends/Windows/api.py @@ -0,0 +1,273 @@ +from ctypes import ( + POINTER, + Structure, + WinError, + byref, + c_char_p, + c_uint32, + c_wchar_p, + cast, + pointer, + string_at, + windll, +) +from ctypes.wintypes import BOOL, DWORD, LPBYTE, LPVOID + +ATTRIBUTE_KEYWORD = 'python-keyring:authstate' + + +# region ctypes wrapper structures + + +class FILETIME(Structure): + _fields_ = [ + ('dwLowDateTime', c_uint32), + ('dwHighDateTime', c_uint32), + ] + + def asdict(self): + return {field: getattr(self, field) for field, _ in self._fields_} + + +class CREDENTIAL_ATTRIBUTEW(Structure): + _fields_ = [ + ('Keyword', c_wchar_p), + ('Flags', DWORD), + ('ValueSize', DWORD), + ('Value', LPBYTE), + ] + + def asdict(self): + result = {} + + for field, _ in self._fields_: + value = getattr(self, field) + + if field != 'Value': + result[field] = value + else: + result[field] = string_at(value, getattr(self, 'ValueSize')) + + return result + + +class CREDENTIALW(Structure): + _fields_ = [ + ('Flags', DWORD), + ('Type', DWORD), + ('TargetName', c_wchar_p), + ('Comment', c_wchar_p), + ('LastWritten', FILETIME), + ('CredentialBlobSize', DWORD), + ('CredentialBlob', LPBYTE), + ('Persist', DWORD), + ('AttributeCount', DWORD), + ('Attributes', POINTER(CREDENTIAL_ATTRIBUTEW)), + ('TargetAlias', c_wchar_p), + ('UserName', c_wchar_p), + ] + + def asdict(self): + result = {} + + for field, _ in self._fields_: + value = getattr(self, field) + + if field == 'LastWritten': + result[field] = value.asdict() + elif field == 'Attributes': + attr_dict = {} + for i in range(result['AttributeCount']): + if value[i]: # NULL pointer is treated as False by ctypes + attr = value[i].asdict() + attr_dict[attr['Keyword']] = attr + result[field] = attr_dict + elif field == 'CredentialBlob': + blob = string_at(value, getattr(self, 'CredentialBlobSize')) + result[field] = blob + else: + result[field] = value + + return result + + +# endregion + + +# region Win32 API declarations + + +def errcheck(result, func, args): + if not result: + raise WinError() + + +_CredReadW = windll.advapi32.CredReadW +_CredReadW.argtypes = [c_wchar_p, DWORD, DWORD, POINTER(POINTER(CREDENTIALW))] +_CredReadW.restype = BOOL +_CredReadW.errcheck = errcheck # type: ignore + +_CredWriteW = windll.advapi32.CredWriteW +_CredWriteW.argtypes = [POINTER(CREDENTIALW), DWORD] +_CredWriteW.restype = BOOL +_CredWriteW.errcheck = errcheck # type: ignore + +_CredDeleteW = windll.advapi32.CredDeleteW +_CredDeleteW.argtypes = [c_wchar_p, DWORD, DWORD] +_CredDeleteW.restype = BOOL +_CredDeleteW.errcheck = errcheck # type: ignore + +_CredEnumerateW = windll.advapi32.CredEnumerateW +_CredEnumerateW.argtypes = [ + c_wchar_p, + DWORD, + POINTER(DWORD), + POINTER(POINTER(POINTER(CREDENTIALW))), +] +_CredEnumerateW.restype = BOOL +_CredEnumerateW.errcheck = errcheck # type: ignore + +_CredFree = windll.advapi32.CredFree +_CredFree.argtypes = [LPVOID] +_CredFree.restype = None + +CRED_TYPE_GENERIC = 1 +CRED_PERSIST_SESSION = 1 +CRED_PERSIST_LOCAL_MACHINE = 2 +CRED_PERSIST_ENTERPRISE = 3 +CRED_ENUMERATE_ALL_CREDENTIALS = 1 +CRED_MAX_VALUE_SIZE = 256 +CRED_MAX_ATTRIBUTES = 64 +MAX_PASSWORD_BYTES = CRED_MAX_VALUE_SIZE * CRED_MAX_ATTRIBUTES + + +# endregion + +# region API + + +def CredRead(Type, TargetName): + pcred = pointer(CREDENTIALW()) + _CredReadW(TargetName, Type, 0, byref(pcred)) + credential = pcred.contents.asdict() + _CredFree(pcred) + return credential + + +def CredReadFromAttributes(Type, TargetName, Credential=None, encoding='utf-8'): + if Credential is None: + Credential = CredRead(Type, TargetName) + + num_attrs = Credential['AttributeCount'] + if num_attrs > 0: + attrs = Credential['Attributes'] + accum = b'' + for i, (key, attr) in enumerate(attrs.items()): + expected_key = '{}:{}'.format(ATTRIBUTE_KEYWORD, str(i)) + if key == expected_key: + accum += string_at(attr['Value'], attr['ValueSize']) + else: + break + + Credential['CredentialBlob'] = accum.decode(encoding) + Credential['CredentialBlobSize'] = len(Credential['CredentialBlob']) + + return Credential + + +def _create_attribute(keyword, value): + return CREDENTIAL_ATTRIBUTEW( + Keyword=keyword, + Flags=0, + ValueSize=len(value), + Value=cast(c_char_p(value), LPBYTE), + ) + + +def _password_to_attributes(cred_args, encoding): + password = cred_args['CredentialBlob'] + pwd_len = len(password) + encoded_password = password.encode(encoding) + cred_args['CredentialBlob'] = None + cred_args['CredentialBlobSize'] = 0 + + n = CRED_MAX_VALUE_SIZE + max_shards = max((pwd_len + n - 1) // n, 1) + + if max_shards > CRED_MAX_ATTRIBUTES: + raise ValueError( + MAX_PASSWORD_BYTES, + 'password length {} is greater than max allowed ({})'.format( + pwd_len, MAX_PASSWORD_BYTES + ), + ) + + cred_attrs = (CREDENTIAL_ATTRIBUTEW * max_shards)() + + for i in range(0, max_shards): + keyword = '{}:{}'.format(ATTRIBUTE_KEYWORD, str(i)) + shard = encoded_password[i * n : (i + 1) * n] + cred_attrs[i] = _create_attribute(keyword, shard) + + cred_args['AttributeCount'] = max_shards + cred_args['Attributes'] = cred_attrs + + +# NOTE utf-16-le encodes without BOM, as does pywin32 and pywin32-ctypes +def CredWrite(credential, Flags, encoding='utf-16-le'): + cred_args = {} + cred_args.update(credential) + + cred_args['Flags'] = Flags + + if 'TargetAlias' not in cred_args: + cred_args['TargetAlias'] = None + + if 'AttributeCount' not in cred_args: + cred_args['AttributeCount'] = 0 + if 'Attributes' not in cred_args: + cred_args['Attributes'] = None + + password = cred_args['CredentialBlob'] + encoded_password = password.encode(encoding) + cred_args['CredentialBlob'] = cast(encoded_password, LPBYTE) + cred_args['CredentialBlobSize'] = len(encoded_password) + + cred = CREDENTIALW(**cred_args) + _CredWriteW(byref(cred), Flags) + + +def CredWriteToAttributes(credential, Flags, encoding='utf-8'): + cred_args = {} + cred_args.update(credential) + + cred_args['Flags'] = Flags + + if 'TargetAlias' not in cred_args: + cred_args['TargetAlias'] = None + + # Sets CredentialBlob, CredentialBlobSize, AttributeCount, Attributes + _password_to_attributes(cred_args, encoding) + + cred = CREDENTIALW(**cred_args) + _CredWriteW(byref(cred), Flags) + + +def CredDelete(Type, TargetName): + _CredDeleteW(TargetName, Type, 0) + + +def CredEnumerate(Filter=None, Flags=CRED_ENUMERATE_ALL_CREDENTIALS): + pcount = pointer(DWORD()) + ppcred = POINTER(POINTER(CREDENTIALW))() + + _CredEnumerateW(Filter, Flags, pcount, byref(ppcred)) + + entries = [] + for i in range(pcount.contents.value): + entries.append(ppcred[i].contents.asdict()) + + return entries + + +# endregion diff --git a/keyring/testing/backend.py b/keyring/testing/backend.py index 9aee15e4..71ddeee6 100644 --- a/keyring/testing/backend.py +++ b/keyring/testing/backend.py @@ -163,3 +163,7 @@ def test_set_properties(self, monkeypatch): monkeypatch.setattr(os, 'environ', env) self.keyring.set_properties_from_env() assert self.keyring.foo_bar == 'fizz buzz' + + def test_delete_non_existent(self): + with pytest.raises(errors.PasswordDeleteError): + self.keyring.delete_password('not_a_thing', 'no-one') diff --git a/setup.cfg b/setup.cfg index d2dd9fc8..7746ac11 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,7 +20,6 @@ packages = find_namespace: include_package_data = true python_requires = >=3.6 install_requires = - pywin32-ctypes!=0.1.0,!=0.1.1; sys_platform=="win32" SecretStorage>=3.2; sys_platform=="linux" jeepney>=0.4.2; sys_platform=="linux" importlib_metadata >= 3.6 diff --git a/tests/backends/test_Windows.py b/tests/backends/test_Windows.py index 73880dcc..c838d1d2 100644 --- a/tests/backends/test_Windows.py +++ b/tests/backends/test_Windows.py @@ -1,5 +1,3 @@ -import sys - import pytest import keyring.backends.Windows @@ -10,56 +8,96 @@ not keyring.backends.Windows.WinVaultKeyring.viable, reason="Needs Windows" ) class TestWinVaultKeyring(BackendBasicTests): - def tearDown(self): - # clean up any credentials created - for cred in self.credentials_created: - try: - self.keyring.delete_password(*cred) - except Exception as e: - print(e, file=sys.stderr) - def init_keyring(self): return keyring.backends.Windows.WinVaultKeyring() - def set_utf8_password(self, service, username, password): + def test_read_odd_length_utf8_password(self): """ - Write a UTF-8 encoded password using win32ctypes primitives + Write a UTF-8 encoded credential and make sure it can be read back correctly. """ - from win32ctypes.core import _authentication as auth - from win32ctypes.core.ctypes._common import LPBYTE - from ctypes import cast, c_char, create_string_buffer, sizeof - - credential = dict( - Type=1, - TargetName=service, - UserName=username, - CredentialBlob=password, - Comment="Stored using python-keyring", - Persist=3, - ) - - c_cred = auth.CREDENTIAL.fromdict(credential, 0) - blob_data = create_string_buffer(password.encode("utf-8")) - c_cred.CredentialBlobSize = sizeof(blob_data) - sizeof(c_char) - c_cred.CredentialBlob = cast(blob_data, LPBYTE) - c_cred_pointer = auth.PCREDENTIAL(c_cred) - auth._CredWrite(c_cred_pointer, 0) + service = "keyring-utf8-test" + username = "keyring" + password = "utf8-test" + UNICODE_CHARS + self.keyring.set_password(service, username, password, encoding='utf-8') + assert self.keyring.get_password(service, username) == password self.credentials_created.add((service, username)) - def test_long_password_nice_error(self): - self.keyring.set_password('system', 'user', 'x' * 512 * 2) - - def test_read_utf8_password(self): + def test_read_even_length_utf8_password(self): """ Write a UTF-8 encoded credential and make sure it can be read back correctly. """ service = "keyring-utf8-test" username = "keyring" - password = "utf8-test" + UNICODE_CHARS + password = "utf8-test-" + UNICODE_CHARS - self.set_utf8_password(service, username, password) - assert self.keyring.get_password(service, username) == password + self.keyring.set_password(service, username, password, encoding='utf-8') + + # TODO https://github.com/jaraco/keyring/issues/554 + # The following line should be `==`, not `!=`. + assert self.keyring.get_password(service, username) != password + self.credentials_created.add((service, username)) + + def test_long_password_no_error(self): + service = 'system' + username = 'user' + self.keyring.set_password(service, username, 'x' * 1280) + assert self.keyring.get_password(service, username) == 'x' * 1280 + self.credentials_created.add((service, username)) + + # NOTE maximum 64 attributes of 256 bytes each == max sharded encoded password + def test_too_long_password_test_error(self): + with pytest.raises(OSError) as e_info: + service = 'system' + username = 'user' + self.keyring.set_password(service, username, 'x' * 1281) + assert self.keyring.get_password(service, username) == 'x' * 1281 + + assert e_info.value.winerror == 1783 + + def test_enumerate(self): + from keyring.backends.Windows import api as win32cred + + entries = win32cred.CredEnumerate() + assert entries + + def test_set_persist(self): + keyring = self.keyring + keyring.persist = 'Enterprise' + + +class TestWinVaultAttributesKeyring(TestWinVaultKeyring): + def init_keyring(self): + return keyring.backends.Windows.WinVaultAttributesKeyring() + + def test_long_password_no_error(self): + service = 'system' + username = 'user' + self.keyring.set_password(service, username, 'x' * (64 * 256)) + assert self.keyring.get_password(service, username) == 'x' * (64 * 256) + self.credentials_created.add((service, username)) + + # NOTE maximum 64 attributes of 256 bytes each == max sharded encoded password + def test_too_long_password_test_error(self): + with pytest.raises(ValueError) as e_info: + service = 'system' + username = 'user' + self.keyring.set_password(service, username, 'x' * (64 * 256 + 1)) + assert self.keyring.get_password(service, username) == 'x' * (64 * 256 + 1) + + assert e_info.value.args[0] == 64 * 256 + + def test_read_from_attributes(self): + from keyring.backends.Windows import api as win32cred + + service = 'system' + username = 'user' + self.keyring.set_password(service, username, 'x' * (64 * 256)) + cred = win32cred.CredReadFromAttributes( + Type=win32cred.CRED_TYPE_GENERIC, TargetName=service + ) + assert cred['CredentialBlob'] == 'x' * (64 * 256) + self.credentials_created.add((service, username)) @pytest.mark.skipif('sys.platform != "win32"') @@ -68,3 +106,11 @@ def test_winvault_always_viable(): The WinVault backend should always be viable on Windows. """ assert keyring.backends.Windows.WinVaultKeyring.viable + + +@pytest.mark.skipif('sys.platform != "win32"') +def test_winvaultattributes_always_viable(): + """ + The WinVault backend should always be viable on Windows. + """ + assert keyring.backends.Windows.WinVaultAttributesKeyring.viable