From dc7c8df60fe43300a6b329e56b81cf51d569e13e Mon Sep 17 00:00:00 2001 From: totaam Date: Mon, 11 Oct 2021 12:52:32 +0700 Subject: [PATCH] #3299 prompt the user to accept certificates / hostname mismatch --- xpra/net/socket_util.py | 177 ++++++++++++++++++++++++++++++++++++---- xpra/platform/paths.py | 21 +++-- xpra/scripts/main.py | 13 ++- 3 files changed, 187 insertions(+), 24 deletions(-) diff --git a/xpra/net/socket_util.py b/xpra/net/socket_util.py index 0e5aa9f7c8..434b946d32 100644 --- a/xpra/net/socket_util.py +++ b/xpra/net/socket_util.py @@ -20,7 +20,7 @@ parse_encoded_bin_data, ) from xpra.util import ( - envint, envbool, csv, parse_simple_dict, print_nested_dict, + envint, envbool, csv, parse_simple_dict, print_nested_dict, std, ellipsizer, noerr, DEFAULT_PORT, ) @@ -840,23 +840,48 @@ def ssl_wrap_socket(sock, **kwargs): def log_ssl_info(ssl_sock): from xpra.log import Logger ssllog = Logger("ssl") - ssllog.info("SSL handshake complete, %s", ssl_sock.version()) + ssllog("server_hostname=%s", ssl_sock.server_hostname) cipher = ssl_sock.cipher() - ssllog.info(" %s, %s bits", cipher[0], cipher[2]) - cert = ssl_sock.getpeercert() - if cert: - ssllog.info("certificate:") - print_nested_dict(ssl_sock.getpeercert(), prefix=" ", print_fn=ssllog.info) + if cipher: + ssllog.info(" %s, %s bits", cipher[0], cipher[2]) + try: + cert = ssl_sock.getpeercert() + except ValueError: + pass + else: + if cert: + ssllog.info("certificate:") + print_nested_dict(ssl_sock.getpeercert(), prefix=" ", print_fn=ssllog.info) + +SSL_VERIFY_EXPIRED = 10 +SSL_VERIFY_WRONG_HOST = 20 +SSL_VERIFY_SELF_SIGNED = 18 +SSL_VERIFY_UNTRUSTED_ROOT = 19 +SSL_VERIFY_IP_MISMATCH = 64 +SSL_VERIFY_CODES = { + SSL_VERIFY_EXPIRED : "expired", #also revoked! + SSL_VERIFY_WRONG_HOST : "wrong host", + SSL_VERIFY_SELF_SIGNED : "self-signed", + SSL_VERIFY_UNTRUSTED_ROOT : "untrusted-root", + SSL_VERIFY_IP_MISMATCH : "ip-mismatch", + } + +class SSLVerifyFailure(InitExit): + def __init__(self, status, msg, verify_code, ssl_sock): + super().__init__(status, msg) + self.verify_code = verify_code + self.ssl_sock = ssl_sock def ssl_handshake(ssl_sock): + from xpra.log import Logger + ssllog = Logger("ssl") try: ssl_sock.do_handshake(True) + ssllog.info("SSL handshake complete, %s", ssl_sock.version()) log_ssl_info(ssl_sock) except Exception as e: - from xpra.log import Logger - ssllog = Logger("ssl") ssllog("do_handshake", exc_info=True) - ssllog("server_hostname=%s", ssl_sock.server_hostname) + log_ssl_info(ssl_sock) import ssl SSLEOFError = getattr(ssl, "SSLEOFError", None) if SSLEOFError and isinstance(e, SSLEOFError): @@ -864,15 +889,16 @@ def ssl_handshake(ssl_sock): status = EXIT_SSL_FAILURE SSLCertVerificationError = getattr(ssl, "SSLCertVerificationError", None) if SSLCertVerificationError and isinstance(e, SSLCertVerificationError): + verify_code = getattr(e, "verify_code", 0) + ssllog("verify_code=%s", SSL_VERIFY_CODES.get(verify_code, verify_code)) try: msg = getattr(e, "verify_message") or (e.args[1].split(":", 2)[2]) except (ValueError, IndexError): msg = str(e) status = EXIT_SSL_CERTIFICATE_VERIFY_FAILURE ssllog("host failed SSL verification: %s", msg) - else: - msg = str(e) - raise InitExit(status, "SSL handshake failed: %s" % msg) from None + raise SSLVerifyFailure(status, msg, verify_code, ssl_sock) from None + raise InitExit(status, "SSL handshake failed: %s" % str(e)) from None return ssl_sock def get_ssl_wrap_socket_context(cert=None, key=None, ca_certs=None, ca_data=None, @@ -958,10 +984,14 @@ def get_ssl_wrap_socket_context(cert=None, key=None, ca_certs=None, ca_data=None #try to locate the cert file from known locations cert = find_ssl_cert() if not cert: - raise InitException("failed to locate an ssl certificate to use") + raise InitException("failed to automatically locate an SSL certificate to use") SSL_KEY_PASSWORD = os.environ.get("XPRA_SSL_KEY_PASSWORD") ssllog("context.load_cert_chain%s", (cert or None, key or None, SSL_KEY_PASSWORD)) - context.load_cert_chain(certfile=cert or None, keyfile=key or None, password=SSL_KEY_PASSWORD) + try: + context.load_cert_chain(certfile=cert or None, keyfile=key or None, password=SSL_KEY_PASSWORD) + except ssl.SSLError as e: + ssllog("load_cert_chain", exc_info=True) + raise InitException("SSL error, failed to load certificate chain: %s" % e) from e if ssl_cert_reqs!=ssl.CERT_NONE: if server_side: purpose = ssl.Purpose.CLIENT_AUTH #@UndefinedVariable @@ -1028,6 +1058,123 @@ def do_wrap_socket(tcp_socket, context, **kwargs): return ssl_sock +def ssl_retry(e, ssl_ca_certs): + SSL_RETRY = envbool("XPRA_SSL_RETRY", True) + if not SSL_RETRY: + return None + if not isinstance(e, SSLVerifyFailure): + return None + #we may be able to ask the user if we wants to accept this certificate + from xpra.log import Logger + ssllog = Logger("ssl") + verify_code = e.verify_code + ssl_sock = e.ssl_sock + msg = str(e) + del e + server_hostname = ssl_sock.server_hostname + ssllog("ssl_retry: server_hostname=%s, ssl verify_code=%s", + server_hostname, SSL_VERIFY_CODES.get(verify_code, verify_code)) + if verify_code not in (SSL_VERIFY_SELF_SIGNED, SSL_VERIFY_WRONG_HOST, SSL_VERIFY_IP_MISMATCH): + return None + if not server_hostname: + return None + from xpra.platform.paths import get_ssl_hosts_config_dirs + from xpra.scripts.pinentry_wrapper import get_pinentry_command, run_pinentry_confirm + host_dirname = std(server_hostname, extras="-.:#_") + #self-signed cert: + if verify_code==SSL_VERIFY_SELF_SIGNED: + if ssl_ca_certs not in ("", "default"): + ssllog("self-signed cert does not match %r", ssl_ca_certs) + return None + #perhaps we already have the certificate for this hostname + dirs = get_ssl_hosts_config_dirs() + host_dirs = [os.path.join(osexpand(d), host_dirname) for d in dirs] + cert_filename = "cert.pem" + for d in host_dirs: + f = os.path.join(d, cert_filename) + if os.path.exists(f): + ssllog("found certificate for %s: %s", server_hostname, f) + ssllog("retrying") + return {"ssl_ca_certs" : f} + #ask the user if he wants to accept this certificate: + pinentry_cmd = get_pinentry_command() + if not pinentry_cmd: + return None + #download the certificate data + import ssl + addr = ssl_sock.getpeername() + #addr = (server_hostname, port) + try: + cert_data = ssl.get_server_certificate(addr) + except ssl.SSLError: + cert_data = None + if not cert_data: + ssllog("failed to get server certificate from %s", addr) + return None + ssllog("ssl cert data for %s: %s", addr, ellipsizer(cert_data)) + #ask the user if he wants to accept this certificate: + title = "SSL Certificate Verification Failure" + prompt = "%0A".join(( + msg, + "", + "Do you want to accept this certificate?", + )) + r = run_pinentry_confirm(pinentry_cmd, title, prompt) #, "save it") + ssllog("run_pinentry_confirm(..) returned %r", r) + if r!="OK": + return None + #if there is an existing host config dir, try to use it: + for d in [x for x in host_dirs if os.path.exists(x)]: + try: + filename = os.path.join(d, cert_filename) + with open(filename, "wb") as f: + f.write(cert_data.encode("latin1")) + return {"ssl_ca_certs" : f} + except OSError: + ssllog("failed to save cert data to %r", filename, exc_info=True) + #try to create a host config dir: + for d in host_dirs: + folders = os.path.normpath(d).split(os.sep) + #we have to be careful and create the 'ssl' dir with 0o700 permissions + #but any directory above that can use 0o755 + try: + ssl_dir_index = len(folders)-1 + while ssl_dir_index>0 and folders[ssl_dir_index]!="ssl": + ssl_dir_index -= 1 + if ssl_dir_index>1: + parent = os.path.join(*folders[:ssl_dir_index-1]) + ssl_dir = os.path.join(*folders[:ssl_dir_index]) + os.makedirs(parent, exist_ok=True) + os.makedirs(ssl_dir, mode=0o700, exist_ok=True) + os.makedirs(d, mode=0o700) + filename = os.path.join(d, cert_filename) + with open(filename, "wb") as f: + f.write(cert_data.encode("latin1")) + ssllog("saving certificate to %r", f) + ssllog("retrying") + return {"ssl_ca_certs" : f} + except OSError: + ssllog("failed to save cert data to %r", d, exc_info=True) + ssllog.warn("Warning: failed to save certificate data") + return None + if verify_code in (SSL_VERIFY_WRONG_HOST, SSL_VERIFY_IP_MISMATCH): + #ask the user if he wants to skip verifying the host + pinentry_cmd = get_pinentry_command() + if not pinentry_cmd: + return None + title = "SSL Certificate Verification Failure" + prompt = "%0A".join(( + msg, + "", + "Do you want to connect anyway?", + )) + r = run_pinentry_confirm(pinentry_cmd, title, prompt) + ssllog("run_pinentry_confirm(..) returned %r", r) + if r=="OK": + return {"ssl_check_hostname" : False} + return None + + def socket_connect(host, port, timeout=SOCKET_TIMEOUT): socktype = socket.SOCK_STREAM family = 0 #0 means any diff --git a/xpra/platform/paths.py b/xpra/platform/paths.py index 9827d31b56..d35d092686 100755 --- a/xpra/platform/paths.py +++ b/xpra/platform/paths.py @@ -47,11 +47,19 @@ def do_get_system_conf_dirs(): def get_ssl_cert_dirs(): return envaslist_or_delegate("XPRA_SSL_CERT_PATHS", do_get_ssl_cert_dirs) def do_get_ssl_cert_dirs(): - if os.name=="posix" and os.getuid()==0: - d = ["/etc/xpra/", "/usr/local/etc/xpra"] - else: - d = ["~/.config/xpra/", "~/.xpra/", "/etc/xpra/", "/usr/local/etc/xpra", "./"] - return d + dirs = ["/etc/xpra/", "/etc/xpra/ssl", "/usr/local/etc/xpra", "/usr/local/etc/xpra/ssl"] + if os.name!="posix" or os.getuid()!=0: + dirs = ["~/.config/xpra/ssl", "~/.xpra/ssl"] + dirs + ["./"] + return dirs + +def get_ssl_hosts_config_dirs(): + return envaslist_or_delegate("XPRA_SSL_HOSTS_CONFIG_DIRS", do_get_ssl_hosts_config_dirs) +def do_get_ssl_hosts_config_dirs(): + dirs = [] + for d in get_ssl_cert_dirs(): + if d.rstrip("/\\").endswith("ssl"): + dirs.append(os.path.join(d, "hosts")) + return dirs def get_ssh_conf_dirs(): return envaslist_or_delegate("XPRA_SSH_CONF_DIRS", do_get_ssh_conf_dirs) @@ -326,7 +334,8 @@ def get_info(): "install" : {"prefix" : get_install_prefix()}, "default_conf" : {"dirs" : get_default_conf_dirs()}, "system_conf" : {"dirs" : get_system_conf_dirs()}, - "ssl_cert" : {"dirs" : get_ssl_cert_dirs()}, + "ssl-cert" : {"dirs" : get_ssl_cert_dirs()}, + "ssl-hosts-config" : {"dirs" : get_ssl_hosts_config_dirs()}, "ssh_conf" : {"dirs" : get_ssh_conf_dirs()}, "user_conf" : {"dirs" : get_user_conf_dirs()}, "sessions" : {"dir" : do_get_sessions_dir()}, diff --git a/xpra/scripts/main.py b/xpra/scripts/main.py index f1444c163d..427a426746 100755 --- a/xpra/scripts/main.py +++ b/xpra/scripts/main.py @@ -22,7 +22,7 @@ from xpra.platform.dotxpra import DotXpra from xpra.util import ( csv, envbool, envint, nonl, pver, std, - noerr, sorted_nicely, typedict, + noerr, sorted_nicely, typedict, ellipsizer, DEFAULT_PORTS, ) from xpra.exit_codes import ( @@ -35,7 +35,7 @@ ) from xpra.os_util import ( get_util_logger, getuid, getgid, get_username_for_uid, - bytestostr, use_tty, + bytestostr, use_tty, osexpand, set_proc_title, is_systemd_pid1, WIN32, OSX, POSIX, SIGNAMES, is_Ubuntu, @@ -838,7 +838,7 @@ def connect_or_fail(display_desc, opts): except InitInfo: raise except Exception as e: - get_util_logger().debug("failed to connect", exc_info=True) + Logger("network").debug("failed to connect", exc_info=True) raise InitException("connection failed: %s" % e) from None @@ -1197,6 +1197,13 @@ def do_setup_connection(): werr("failed to connect:", " %s" % e) GLib.idle_add(app.quit, EXIT_OK) except InitExit as e: + from xpra.net.socket_util import ssl_retry + mods = ssl_retry(e, opts.ssl_ca_certs) + if mods: + for k,v in mods.items(): + setattr(opts, k, v) + do_setup_connection() + return log("do_setup_connection() display_desc=%s", display_desc, exc_info=True) werr("Warning: failed to connect:", " %s" % e) GLib.idle_add(app.quit, e.status)