Skip to content

Commit

Permalink
split pinetry wrapper functions to a module
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed Oct 10, 2021
1 parent 76a7bbd commit 874ddc9
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 135 deletions.
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1947,8 +1947,7 @@ def osx_pkgconfig(*pkgs_options, **ekw):
if client_ENABLED:
add_modules("xpra.client")
add_packages("xpra.client.mixins", "xpra.client.auth")
add_modules("xpra.scripts.gtk_info")
add_modules("xpra.scripts.show_webcam")
add_modules("xpra.scripts.gtk_info", "xpra.scripts.show_webcam", "xpra.scripts.pinentry_wrapper")
if gtk3_ENABLED:
add_modules("xpra.scripts.bug_report")
toggle_packages((client_ENABLED and gtk3_ENABLED) or sound_ENABLED or server_ENABLED, "xpra.gtk_common")
Expand Down
4 changes: 2 additions & 2 deletions xpra/client/gtk_base/gtk_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def do_process_challenge_prompt(self, packet, prompt="password"):
authlog = Logger("auth")
self.show_progress(100, "authentication")
PINENTRY = os.environ.get("XPRA_PINENTRY", "")
from xpra.scripts.main import get_pinentry_command
from xpra.scripts.pinentry_wrapper import get_pinentry_command
pinentry_cmd = get_pinentry_command(PINENTRY)
authlog("do_process_challenge_prompt%s get_pinentry_command(%s)=%s",
(packet, prompt), PINENTRY, pinentry_cmd)
Expand Down Expand Up @@ -323,7 +323,7 @@ def got_pin(value):
self.idle_add(self.send_challenge_reply, packet, value)
def no_pin():
self.idle_add(self.quit, EXIT_PASSWORD_REQUIRED)
from xpra.scripts.main import pinentry_getpin
from xpra.scripts.pinentry_wrapper import pinentry_getpin
title = self.get_server_authentication_string()
pinentry_getpin(proc, title, q, got_pin, no_pin)
return True
Expand Down
10 changes: 7 additions & 3 deletions xpra/net/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from xpra.scripts.main import (
InitException, InitExit,
shellquote, host_target_string,
)
from xpra.scripts.pinentry_wrapper import (
get_pinentry_command, run_pinentry_getpin, run_pinentry_confirm,
)
from xpra.platform.paths import get_ssh_known_hosts_files
Expand All @@ -34,6 +36,8 @@
from xpra.util import envint, envbool, envfloat, engs, csv
from xpra.log import Logger, is_debug_enabled

#pylint: disable=import-outside-toplevel

log = Logger("network", "ssh")
if log.is_debug_enabled():
import logging
Expand Down Expand Up @@ -135,13 +139,13 @@ def confirm_run():
def confirm_key(info=(), title="Confirm Key", prompt="Are you sure you want to continue connecting?") -> bool:
if SKIP_UI:
return False
from xpra.platform.paths import get_icon_filename
if PINENTRY:
pinentry_cmd = get_pinentry_command()
if pinentry_cmd:
messages = list(info)+["", prompt]
return run_pinentry_confirm(pinentry_cmd, title, "%0A".join(messages))
return run_pinentry_confirm(pinentry_cmd, title, "%0A".join(messages))=="OK"
if use_gui_prompt():
from xpra.platform.paths import get_icon_filename
icon = get_icon_filename("authentication", "png") or ""
code = dialog_confirm(title, prompt, info, icon, buttons=[("yes", 200), ("NO", 201)])
log("dialog return code=%s", code)
Expand All @@ -160,8 +164,8 @@ def confirm_key(info=(), title="Confirm Key", prompt="Are you sure you want to c
def input_pass(prompt) -> str:
if SKIP_UI:
return None
from xpra.platform.paths import get_icon_filename
if PINENTRY or use_gui_prompt():
from xpra.platform.paths import get_icon_filename
icon = get_icon_filename("authentication", "png") or ""
log("input_pass(%s) using dialog", prompt)
return dialog_pass("Password Input", prompt, icon)
Expand Down
131 changes: 3 additions & 128 deletions xpra/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from xpra import __version__ as XPRA_VERSION
from xpra.platform.dotxpra import DotXpra
from xpra.util import (
csv, envbool, envint, nonl, pver,
csv, envbool, envint, nonl, pver, std,
noerr, sorted_nicely, typedict,
DEFAULT_PORTS,
)
Expand Down Expand Up @@ -66,6 +66,7 @@
VERIFY_X11_SOCKET_TIMEOUT = envint("XPRA_VERIFY_X11_SOCKET_TIMEOUT", 1)
LIST_REPROBE_TIMEOUT = envint("XPRA_LIST_REPROBE_TIMEOUT", 10)

#pylint: disable=import-outside-toplevel

def nox():
DISPLAY = os.environ.get("DISPLAY")
Expand Down Expand Up @@ -529,6 +530,7 @@ def do_run_mode(script_file, cmdline, error_cb, options, args, mode, defaults):
return run_sound(mode, error_cb, options, args)
elif mode=="pinentry":
check_gtk()
from xpra.scripts.pinentry_wrapper import run_pinentry
return run_pinentry(args)
elif mode=="_dialog":
check_gtk()
Expand Down Expand Up @@ -1004,133 +1006,6 @@ def sockpathfail_cb(msg):
raise InitException("unsupported display type: %s" % dtype)


def get_pinentry_command(setting="yes"):
log = Logger("exec")
log("get_pinentry_command(%s)", setting)
if setting.lower() in FALSE_OPTIONS:
return None
from xpra.os_util import is_gnome, is_kde, which
def find_pinentry_bin():
if is_gnome():
return which("pinentry-gnome3")
if is_kde():
return which("pinentry-qt")
return None
if setting.lower() in TRUE_OPTIONS:
return find_pinentry_bin() or which("pinentry")
if setting=="" or setting.lower()=="auto":
#figure out if we should use it:
if WIN32 or OSX:
#not enabled by default on those platforms
return None
return find_pinentry_bin()
return setting

def popen_pinentry(pinentry_cmd):
try:
return Popen([pinentry_cmd], stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError as e:
log = Logger("exec")
log("popen_pinentry(%s) failed", pinentry_cmd, exc_info=True)
log.error("Error: failed to run '%s'", pinentry_cmd)
log.error(" %s", e)
return None

def run_pinentry(extra_args):
messages = list(extra_args)
log = Logger("exec")
def get_input():
if not messages:
return None
return messages.pop(0)
def process_output(message, line):
if line.startswith(b"ERR "):
log.error("Error: pinentry responded to '%s' with:", message)
log.error(" %s", line.rstrip(b"\n\r").decode())
else:
log("pinentry sent %r", line)
pinentry_cmd = get_pinentry_command() or "pinentry"
proc = popen_pinentry(pinentry_cmd)
if not proc:
raise InitExit(EXIT_UNSUPPORTED, "cannot run pinentry")
return do_run_pinentry(proc, get_input, process_output)

def do_run_pinentry(proc, get_input, process_output):
log = Logger("exec")
message = "connection"
while proc.poll() is None:
try:
line = proc.stdout.readline()
process_output(message, line)
message = get_input()
if message is None:
break
log("sending %r", message)
r = proc.stdin.write(("%s\n" % message).encode())
proc.stdin.flush()
log("write returned: %s", r)
except OSError:
log("error running pinentry", exc_info=True)
break
if proc.poll() is None:
proc.terminate()
log("pinentry ended: %s" % proc.poll())

def pinentry_getpin(pinentry_proc, title, description, pin_cb, err_cb):
messages = [
"SETPROMPT %s" % title,
"SETDESC %s:" % description,
"GETPIN",
]
def get_input():
if not messages:
return None
return messages.pop(0)
def process_output(message, output):
if message=="GETPIN":
if output.startswith(b"D "):
pin_value = output[2:].rstrip(b"\n\r").decode()
pin_cb(pin_value)
else:
err_cb()
do_run_pinentry(pinentry_proc, get_input, process_output)
return True

def run_pinentry_getpin(pinentry_cmd, title, description):
proc = popen_pinentry(pinentry_cmd)
if proc is None:
return None
values = []
def rec(value=None):
values.append(value)
try:
pinentry_getpin(proc, title, description, rec, rec)
finally:
noerr(proc.terminate)
if not values:
return None
return values[0]

def run_pinentry_confirm(pinentry_cmd, title, prompt):
proc = popen_pinentry(pinentry_cmd)
if proc is None:
return None
messages = [
"SETPROMPT %s" % title,
"SETDESC %s:" % prompt,
"CONFIRM",
]
def get_input():
if not messages:
return None
return messages.pop(0)
confirm_values = []
def process_output(message, output):
if message=="CONFIRM":
confirm_values.append(output.strip(b"\n\r"))
do_run_pinentry(proc, get_input, process_output)
return len(confirm_values)==1 and confirm_values[0]==b"OK"


def run_dialog(extra_args):
from xpra.client.gtk_base.confirm_dialog import show_confirm_dialog
Expand Down
148 changes: 148 additions & 0 deletions xpra/scripts/pinentry_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#!/usr/bin/env python3
# This file is part of Xpra.
# Copyright (C) 2021 Antoine Martin <antoine@xpra.org>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

from subprocess import Popen, PIPE

from xpra.util import noerr
from xpra.os_util import WIN32, OSX, is_gnome, is_kde, which, bytestostr
from xpra.scripts.config import FALSE_OPTIONS, TRUE_OPTIONS, InitExit
from xpra.exit_codes import EXIT_UNSUPPORTED
from xpra.log import Logger

log = Logger("exec")


def get_pinentry_command(setting="yes"):
log("get_pinentry_command(%s)", setting)
if setting.lower() in FALSE_OPTIONS:
return None
def find_pinentry_bin():
if is_gnome():
return which("pinentry-gnome3")
if is_kde():
return which("pinentry-qt")
return None
if setting.lower() in TRUE_OPTIONS:
return find_pinentry_bin() or which("pinentry")
if setting=="" or setting.lower()=="auto":
#figure out if we should use it:
if WIN32 or OSX:
#not enabled by default on those platforms
return None
return find_pinentry_bin()
return setting

def popen_pinentry(pinentry_cmd):
try:
cmd = [pinentry_cmd]
if log.is_debug_enabled():
cmd.append("--debug")
return Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError as e:
log("popen_pinentry(%s) failed", pinentry_cmd, exc_info=True)
log.error("Error: failed to run '%s'", pinentry_cmd)
log.error(" %s", e)
return None

def run_pinentry(extra_args):
messages = list(extra_args)
def get_input():
if not messages:
return None
return messages.pop(0)
def process_output(message, line):
if line.startswith(b"ERR "):
log.error("Error: pinentry responded to '%s' with:", message)
log.error(" %s", line.rstrip(b"\n\r").decode())
else:
log("pinentry sent %r", line)
pinentry_cmd = get_pinentry_command() or "pinentry"
proc = popen_pinentry(pinentry_cmd)
if not proc:
raise InitExit(EXIT_UNSUPPORTED, "cannot run pinentry")
return do_run_pinentry(proc, get_input, process_output)

def do_run_pinentry(proc, get_input, process_output):
message = "connection"
while proc.poll() is None:
try:
line = proc.stdout.readline()
process_output(message, line)
message = get_input()
if message is None:
break
log("sending %r", message)
r = proc.stdin.write(("%s\n" % message).encode())
proc.stdin.flush()
log("write returned: %s", r)
except OSError:
log("error running pinentry", exc_info=True)
break
if proc.poll() is None:
proc.terminate()
log("pinentry ended: %s" % proc.poll())

def pinentry_getpin(pinentry_proc, title, description, pin_cb, err_cb):
messages = [
"SETPROMPT %s" % title,
"SETDESC %s:" % description,
"GETPIN",
]
def get_input():
if not messages:
return None
return messages.pop(0)
def process_output(message, output):
if message=="GETPIN":
if output.startswith(b"D "):
pin_value = output[2:].rstrip(b"\n\r").decode()
pin_cb(pin_value)
else:
err_cb()
do_run_pinentry(pinentry_proc, get_input, process_output)
return True

def run_pinentry_getpin(pinentry_cmd, title, description):
proc = popen_pinentry(pinentry_cmd)
if proc is None:
return None
values = []
def rec(value=None):
values.append(value)
try:
pinentry_getpin(proc, title, description, rec, rec)
finally:
noerr(proc.terminate)
if not values:
return None
return values[0]

def run_pinentry_confirm(pinentry_cmd, title, prompt, notok=None):
proc = popen_pinentry(pinentry_cmd)
if proc is None:
return None
messages = []
if notok:
messages.append("SETNOTOK %s" % notok)
messages += [
"SETPROMPT %s" % title,
"SETDESC %s" % prompt,
]
messages.append("CONFIRM")
log("run_pinentry_confirm%s messages=%s", (pinentry_cmd, title, prompt, notok), messages)
def get_input():
if not messages:
return None
return messages.pop(0)
confirm_values = []
def process_output(message, output):
log("received %s for %s", output, message)
if message=="CONFIRM":
confirm_values.append(output.strip(b"\n\r"))
do_run_pinentry(proc, get_input, process_output)
if len(confirm_values)!=1:
return None
return bytestostr(confirm_values[0]) #ie: "OK"

0 comments on commit 874ddc9

Please sign in to comment.