Skip to content

Commit

Permalink
#3299 prompt the user to accept certificates / hostname mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed Oct 11, 2021
1 parent 874ddc9 commit dc7c8df
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 24 deletions.
177 changes: 162 additions & 15 deletions xpra/net/socket_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
parse_encoded_bin_data,
)
from xpra.util import (
envint, envbool, csv, parse_simple_dict, print_nested_dict,
envint, envbool, csv, parse_simple_dict, print_nested_dict, std,
ellipsizer, noerr,
DEFAULT_PORT,
)
Expand Down Expand Up @@ -840,39 +840,65 @@ def ssl_wrap_socket(sock, **kwargs):
def log_ssl_info(ssl_sock):
from xpra.log import Logger
ssllog = Logger("ssl")
ssllog.info("SSL handshake complete, %s", ssl_sock.version())
ssllog("server_hostname=%s", ssl_sock.server_hostname)
cipher = ssl_sock.cipher()
ssllog.info(" %s, %s bits", cipher[0], cipher[2])
cert = ssl_sock.getpeercert()
if cert:
ssllog.info("certificate:")
print_nested_dict(ssl_sock.getpeercert(), prefix=" ", print_fn=ssllog.info)
if cipher:
ssllog.info(" %s, %s bits", cipher[0], cipher[2])
try:
cert = ssl_sock.getpeercert()
except ValueError:
pass
else:
if cert:
ssllog.info("certificate:")
print_nested_dict(ssl_sock.getpeercert(), prefix=" ", print_fn=ssllog.info)

SSL_VERIFY_EXPIRED = 10
SSL_VERIFY_WRONG_HOST = 20
SSL_VERIFY_SELF_SIGNED = 18
SSL_VERIFY_UNTRUSTED_ROOT = 19
SSL_VERIFY_IP_MISMATCH = 64
SSL_VERIFY_CODES = {
SSL_VERIFY_EXPIRED : "expired", #also revoked!
SSL_VERIFY_WRONG_HOST : "wrong host",
SSL_VERIFY_SELF_SIGNED : "self-signed",
SSL_VERIFY_UNTRUSTED_ROOT : "untrusted-root",
SSL_VERIFY_IP_MISMATCH : "ip-mismatch",
}

class SSLVerifyFailure(InitExit):
def __init__(self, status, msg, verify_code, ssl_sock):
super().__init__(status, msg)
self.verify_code = verify_code
self.ssl_sock = ssl_sock

def ssl_handshake(ssl_sock):
from xpra.log import Logger
ssllog = Logger("ssl")
try:
ssl_sock.do_handshake(True)
ssllog.info("SSL handshake complete, %s", ssl_sock.version())
log_ssl_info(ssl_sock)
except Exception as e:
from xpra.log import Logger
ssllog = Logger("ssl")
ssllog("do_handshake", exc_info=True)
ssllog("server_hostname=%s", ssl_sock.server_hostname)
log_ssl_info(ssl_sock)
import ssl
SSLEOFError = getattr(ssl, "SSLEOFError", None)
if SSLEOFError and isinstance(e, SSLEOFError):
return None
status = EXIT_SSL_FAILURE
SSLCertVerificationError = getattr(ssl, "SSLCertVerificationError", None)
if SSLCertVerificationError and isinstance(e, SSLCertVerificationError):
verify_code = getattr(e, "verify_code", 0)
ssllog("verify_code=%s", SSL_VERIFY_CODES.get(verify_code, verify_code))
try:
msg = getattr(e, "verify_message") or (e.args[1].split(":", 2)[2])
except (ValueError, IndexError):
msg = str(e)
status = EXIT_SSL_CERTIFICATE_VERIFY_FAILURE
ssllog("host failed SSL verification: %s", msg)
else:
msg = str(e)
raise InitExit(status, "SSL handshake failed: %s" % msg) from None
raise SSLVerifyFailure(status, msg, verify_code, ssl_sock) from None
raise InitExit(status, "SSL handshake failed: %s" % str(e)) from None
return ssl_sock

def get_ssl_wrap_socket_context(cert=None, key=None, ca_certs=None, ca_data=None,
Expand Down Expand Up @@ -958,10 +984,14 @@ def get_ssl_wrap_socket_context(cert=None, key=None, ca_certs=None, ca_data=None
#try to locate the cert file from known locations
cert = find_ssl_cert()
if not cert:
raise InitException("failed to locate an ssl certificate to use")
raise InitException("failed to automatically locate an SSL certificate to use")
SSL_KEY_PASSWORD = os.environ.get("XPRA_SSL_KEY_PASSWORD")
ssllog("context.load_cert_chain%s", (cert or None, key or None, SSL_KEY_PASSWORD))
context.load_cert_chain(certfile=cert or None, keyfile=key or None, password=SSL_KEY_PASSWORD)
try:
context.load_cert_chain(certfile=cert or None, keyfile=key or None, password=SSL_KEY_PASSWORD)
except ssl.SSLError as e:
ssllog("load_cert_chain", exc_info=True)
raise InitException("SSL error, failed to load certificate chain: %s" % e) from e
if ssl_cert_reqs!=ssl.CERT_NONE:
if server_side:
purpose = ssl.Purpose.CLIENT_AUTH #@UndefinedVariable
Expand Down Expand Up @@ -1028,6 +1058,123 @@ def do_wrap_socket(tcp_socket, context, **kwargs):
return ssl_sock


def ssl_retry(e, ssl_ca_certs):
SSL_RETRY = envbool("XPRA_SSL_RETRY", True)
if not SSL_RETRY:
return None
if not isinstance(e, SSLVerifyFailure):
return None
#we may be able to ask the user if we wants to accept this certificate
from xpra.log import Logger
ssllog = Logger("ssl")
verify_code = e.verify_code
ssl_sock = e.ssl_sock
msg = str(e)
del e
server_hostname = ssl_sock.server_hostname
ssllog("ssl_retry: server_hostname=%s, ssl verify_code=%s",
server_hostname, SSL_VERIFY_CODES.get(verify_code, verify_code))
if verify_code not in (SSL_VERIFY_SELF_SIGNED, SSL_VERIFY_WRONG_HOST, SSL_VERIFY_IP_MISMATCH):
return None
if not server_hostname:
return None
from xpra.platform.paths import get_ssl_hosts_config_dirs
from xpra.scripts.pinentry_wrapper import get_pinentry_command, run_pinentry_confirm
host_dirname = std(server_hostname, extras="-.:#_")
#self-signed cert:
if verify_code==SSL_VERIFY_SELF_SIGNED:
if ssl_ca_certs not in ("", "default"):
ssllog("self-signed cert does not match %r", ssl_ca_certs)
return None
#perhaps we already have the certificate for this hostname
dirs = get_ssl_hosts_config_dirs()
host_dirs = [os.path.join(osexpand(d), host_dirname) for d in dirs]
cert_filename = "cert.pem"
for d in host_dirs:
f = os.path.join(d, cert_filename)
if os.path.exists(f):
ssllog("found certificate for %s: %s", server_hostname, f)
ssllog("retrying")
return {"ssl_ca_certs" : f}
#ask the user if he wants to accept this certificate:
pinentry_cmd = get_pinentry_command()
if not pinentry_cmd:
return None
#download the certificate data
import ssl
addr = ssl_sock.getpeername()
#addr = (server_hostname, port)
try:
cert_data = ssl.get_server_certificate(addr)
except ssl.SSLError:
cert_data = None
if not cert_data:
ssllog("failed to get server certificate from %s", addr)
return None
ssllog("ssl cert data for %s: %s", addr, ellipsizer(cert_data))
#ask the user if he wants to accept this certificate:
title = "SSL Certificate Verification Failure"
prompt = "%0A".join((
msg,
"",
"Do you want to accept this certificate?",
))
r = run_pinentry_confirm(pinentry_cmd, title, prompt) #, "save it")
ssllog("run_pinentry_confirm(..) returned %r", r)
if r!="OK":
return None
#if there is an existing host config dir, try to use it:
for d in [x for x in host_dirs if os.path.exists(x)]:
try:
filename = os.path.join(d, cert_filename)
with open(filename, "wb") as f:
f.write(cert_data.encode("latin1"))
return {"ssl_ca_certs" : f}
except OSError:
ssllog("failed to save cert data to %r", filename, exc_info=True)
#try to create a host config dir:
for d in host_dirs:
folders = os.path.normpath(d).split(os.sep)
#we have to be careful and create the 'ssl' dir with 0o700 permissions
#but any directory above that can use 0o755
try:
ssl_dir_index = len(folders)-1
while ssl_dir_index>0 and folders[ssl_dir_index]!="ssl":
ssl_dir_index -= 1
if ssl_dir_index>1:
parent = os.path.join(*folders[:ssl_dir_index-1])
ssl_dir = os.path.join(*folders[:ssl_dir_index])
os.makedirs(parent, exist_ok=True)
os.makedirs(ssl_dir, mode=0o700, exist_ok=True)
os.makedirs(d, mode=0o700)
filename = os.path.join(d, cert_filename)
with open(filename, "wb") as f:
f.write(cert_data.encode("latin1"))
ssllog("saving certificate to %r", f)
ssllog("retrying")
return {"ssl_ca_certs" : f}
except OSError:
ssllog("failed to save cert data to %r", d, exc_info=True)
ssllog.warn("Warning: failed to save certificate data")
return None
if verify_code in (SSL_VERIFY_WRONG_HOST, SSL_VERIFY_IP_MISMATCH):
#ask the user if he wants to skip verifying the host
pinentry_cmd = get_pinentry_command()
if not pinentry_cmd:
return None
title = "SSL Certificate Verification Failure"
prompt = "%0A".join((
msg,
"",
"Do you want to connect anyway?",
))
r = run_pinentry_confirm(pinentry_cmd, title, prompt)
ssllog("run_pinentry_confirm(..) returned %r", r)
if r=="OK":
return {"ssl_check_hostname" : False}
return None


def socket_connect(host, port, timeout=SOCKET_TIMEOUT):
socktype = socket.SOCK_STREAM
family = 0 #0 means any
Expand Down
21 changes: 15 additions & 6 deletions xpra/platform/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@ def do_get_system_conf_dirs():
def get_ssl_cert_dirs():
return envaslist_or_delegate("XPRA_SSL_CERT_PATHS", do_get_ssl_cert_dirs)
def do_get_ssl_cert_dirs():
if os.name=="posix" and os.getuid()==0:
d = ["/etc/xpra/", "/usr/local/etc/xpra"]
else:
d = ["~/.config/xpra/", "~/.xpra/", "/etc/xpra/", "/usr/local/etc/xpra", "./"]
return d
dirs = ["/etc/xpra/", "/etc/xpra/ssl", "/usr/local/etc/xpra", "/usr/local/etc/xpra/ssl"]
if os.name!="posix" or os.getuid()!=0:
dirs = ["~/.config/xpra/ssl", "~/.xpra/ssl"] + dirs + ["./"]
return dirs

def get_ssl_hosts_config_dirs():
return envaslist_or_delegate("XPRA_SSL_HOSTS_CONFIG_DIRS", do_get_ssl_hosts_config_dirs)
def do_get_ssl_hosts_config_dirs():
dirs = []
for d in get_ssl_cert_dirs():
if d.rstrip("/\\").endswith("ssl"):
dirs.append(os.path.join(d, "hosts"))
return dirs

def get_ssh_conf_dirs():
return envaslist_or_delegate("XPRA_SSH_CONF_DIRS", do_get_ssh_conf_dirs)
Expand Down Expand Up @@ -326,7 +334,8 @@ def get_info():
"install" : {"prefix" : get_install_prefix()},
"default_conf" : {"dirs" : get_default_conf_dirs()},
"system_conf" : {"dirs" : get_system_conf_dirs()},
"ssl_cert" : {"dirs" : get_ssl_cert_dirs()},
"ssl-cert" : {"dirs" : get_ssl_cert_dirs()},
"ssl-hosts-config" : {"dirs" : get_ssl_hosts_config_dirs()},
"ssh_conf" : {"dirs" : get_ssh_conf_dirs()},
"user_conf" : {"dirs" : get_user_conf_dirs()},
"sessions" : {"dir" : do_get_sessions_dir()},
Expand Down
13 changes: 10 additions & 3 deletions xpra/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from xpra.platform.dotxpra import DotXpra
from xpra.util import (
csv, envbool, envint, nonl, pver, std,
noerr, sorted_nicely, typedict,
noerr, sorted_nicely, typedict, ellipsizer,
DEFAULT_PORTS,
)
from xpra.exit_codes import (
Expand All @@ -35,7 +35,7 @@
)
from xpra.os_util import (
get_util_logger, getuid, getgid, get_username_for_uid,
bytestostr, use_tty,
bytestostr, use_tty, osexpand,
set_proc_title,
is_systemd_pid1,
WIN32, OSX, POSIX, SIGNAMES, is_Ubuntu,
Expand Down Expand Up @@ -838,7 +838,7 @@ def connect_or_fail(display_desc, opts):
except InitInfo:
raise
except Exception as e:
get_util_logger().debug("failed to connect", exc_info=True)
Logger("network").debug("failed to connect", exc_info=True)
raise InitException("connection failed: %s" % e) from None


Expand Down Expand Up @@ -1197,6 +1197,13 @@ def do_setup_connection():
werr("failed to connect:", " %s" % e)
GLib.idle_add(app.quit, EXIT_OK)
except InitExit as e:
from xpra.net.socket_util import ssl_retry
mods = ssl_retry(e, opts.ssl_ca_certs)
if mods:
for k,v in mods.items():
setattr(opts, k, v)
do_setup_connection()
return
log("do_setup_connection() display_desc=%s", display_desc, exc_info=True)
werr("Warning: failed to connect:", " %s" % e)
GLib.idle_add(app.quit, e.status)
Expand Down

0 comments on commit dc7c8df

Please sign in to comment.