Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the user to configure a pinentry #202

Closed
wants to merge 14 commits into from
6 changes: 3 additions & 3 deletions libagent/device/fake_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ def close(self):
"""Close connection."""
self.conn = None

def pubkey(self, identity, ecdh=False):
def pubkey(self, identity, ecdh=False, options=None):
"""Return public key."""
_verify_support(identity)
data = self.vk.to_string()
x, y = data[:32], data[32:]
prefix = bytearray([2 + (bytearray(y)[0] & 1)])
return bytes(prefix) + x

def sign(self, identity, blob):
def sign(self, identity, blob, options=None):
"""Sign given blob and return the signature (as bytes)."""
if identity.identity_dict['proto'] in {'ssh'}:
digest = hashlib.sha256(blob).digest()
Expand All @@ -60,7 +60,7 @@ def sign(self, identity, blob):
return self.sk.sign_digest_deterministic(digest=digest,
hashfunc=hashlib.sha256)

def ecdh(self, identity, pubkey):
def ecdh(self, identity, pubkey, options=None):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
assert pubkey[:1] == b'\x04'
peer = ecdsa.VerifyingKey.from_string(
Expand Down
8 changes: 4 additions & 4 deletions libagent/device/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_curve_name(self, ecdh=False):
class Device(object):
"""Abstract cryptographic hardware device interface."""

def __init__(self):
def __init__(self, config=None):
"""C-tor."""
self.conn = None

Expand All @@ -126,15 +126,15 @@ def __exit__(self, *args):
log.exception('close failed: %s', e)
self.conn = None

def pubkey(self, identity, ecdh=False):
def pubkey(self, identity, ecdh=False, options=None):
"""Get public key (as bytes)."""
raise NotImplementedError()

def sign(self, identity, blob):
def sign(self, identity, blob, options=None):
"""Sign given blob and return the signature (as bytes)."""
raise NotImplementedError()

def ecdh(self, identity, pubkey):
def ecdh(self, identity, pubkey, options=None):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
raise NotImplementedError()

Expand Down
10 changes: 7 additions & 3 deletions libagent/device/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def package_name(cls):
"""Python package name (at PyPI)."""
return 'ledger-agent'

def __init__(self, config=None):
self.config = config
super(LedgerNanoS, self).__init__(config=config)

def connect(self):
"""Enumerate and connect to the first USB HID interface."""
try:
Expand All @@ -49,7 +53,7 @@ def connect(self):
raise interface.NotFoundError(
'{} not connected: "{}"'.format(self, e))

def pubkey(self, identity, ecdh=False):
def pubkey(self, identity, ecdh=False, options=None):
"""Get PublicKey object for specified BIP32 address and elliptic curve."""
curve_name = identity.get_curve_name(ecdh)
path = _expand_path(identity.get_bip32_address(ecdh))
Expand All @@ -66,7 +70,7 @@ def pubkey(self, identity, ecdh=False):
log.debug('result: %r', result)
return _convert_public_key(curve_name, result[1:])

def sign(self, identity, blob):
def sign(self, identity, blob, options=None):
"""Sign given blob and return the signature (as bytes)."""
path = _expand_path(identity.get_bip32_address(ecdh=False))
if identity.identity_dict['proto'] == 'ssh':
Expand Down Expand Up @@ -103,7 +107,7 @@ def sign(self, identity, blob):
else:
return bytes(result[:64])

def ecdh(self, identity, pubkey):
def ecdh(self, identity, pubkey, options=None):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
path = _expand_path(identity.get_bip32_address(ecdh=True))
if identity.curve_name == 'nist256p1':
Expand Down
141 changes: 117 additions & 24 deletions libagent/device/trezor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import subprocess
import sys
import traceback

import mnemonic
import semver
Expand All @@ -14,6 +15,10 @@
log = logging.getLogger(__name__)


class UserCancelException(RuntimeError):
pass


def _message_box(label, sp=subprocess):
"""Launch an external process for PIN/passphrase entry GUI."""
cmd = ('import sys, pymsgbox; '
Expand All @@ -32,6 +37,61 @@ def _is_open_tty(stream):
return not stream.closed and os.isatty(stream.fileno())


def _pin_communicate(self, program, message, error=None, options={}):
args = [program]
if 'DISPLAY' in os.environ:
args.extend(['--display', os.environ['DISPLAY']])
try:
entry = subprocess.Popen(
args,
encoding='utf-8',
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
)
except OSError as e:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Py2 specific. How would you handle compatibility in this case? Catch both? Wrap subprocess?

if e.errno == os.errno.ENOENT:
return None
else:
raise

def expect(prefix=None):
line = entry.stdout.readline()
if line.endswith('\n'):
line = line[:-1]
log.debug('PINENTRY <- {}'.format(line))
if line.startswith('ERR '):
raise UserCancelException()
if prefix and not line.startswith(prefix):
raise RuntimeError('Received unexpected response from pinentry')
return line[len(prefix) if prefix else 0:]

def send(line):
log.debug('PINENTRY -> {}'.format(line))
entry.stdin.write('{}\n'.format(line))
entry.stdin.flush()

expect('OK')
for k, v in options.items():
if v is not None:
send('OPTION {}={}'.format(k, v))
else:
send('OPTION {}'.format(k))
expect('OK')
send('SETDESC {}'.format(message))
expect('OK')
if error:
send('SETERROR {}'.format(error))
expect('OK')
send('GETPIN')
result = expect('D ')
send('BYE')
entry.stdin.close()
try:
entry.communicate()
except:
pass
return result


class Trezor(interface.Device):
"""Connection to TREZOR device."""

Expand All @@ -40,6 +100,11 @@ def package_name(cls):
"""Python package name (at PyPI)."""
return 'trezor-agent'

def __init__(self, config=None):
self.config = config or {}
self.options = {}
super(Trezor, self).__init__(config=config)

@property
def _defs(self):
from . import trezor_defs
Expand All @@ -57,20 +122,33 @@ def _override_pin_handler(self, conn):
cli_handler = conn.callback_PinMatrixRequest

def new_handler(msg):
if _is_open_tty(sys.stdin):
result = cli_handler(msg) # CLI-based PIN handler
else:
scrambled_pin = _message_box(
'Use the numeric keypad to describe number positions.\n'
'The layout is:\n'
' 7 8 9\n'
' 4 5 6\n'
' 1 2 3\n'
'Please enter PIN:')
fallback_message = (
'Use the numeric keypad to describe number positions.\n'
'The layout is:\n'
' 7 8 9\n'
' 4 5 6\n'
' 1 2 3\n'
'Please enter PIN:')
result = None
pinentry_program = self.config.get('pinentry-program')
scrambled_pin = _pin_communicate(
self,
pinentry_program or 'pinentry',
'Please enter your Trezor PIN' if pinentry_program
else fallback_message,
options=self.options,
)
if not scrambled_pin:
if _is_open_tty(sys.stdin):
result = cli_handler(msg) # CLI-based PIN handler
else:
scrambled_pin = _message_box(fallback_message)
if not result:
if not set(scrambled_pin).issubset('123456789'):
raise self._defs.PinException(
None,
'Invalid scrambled PIN: {!r}'.format(scrambled_pin))
result = self._defs.PinMatrixAck(pin=scrambled_pin)
if not set(result.pin).issubset('123456789'):
raise self._defs.PinException(
None, 'Invalid scrambled PIN: {!r}'.format(result.pin))
return result

conn.callback_PinMatrixRequest = new_handler
Expand All @@ -85,14 +163,24 @@ def new_handler(msg):
log.debug('re-using cached %s passphrase', self)
return self.__class__.cached_passphrase_ack

if _is_open_tty(sys.stdin):
# use CLI-based PIN handler
ack = cli_handler(msg)
else:
passphrase = _message_box('Please enter passphrase:')
passphrase = mnemonic.Mnemonic.normalize_string(passphrase)
ack = self._defs.PassphraseAck(passphrase=passphrase)
ack = None
passphrase_program = self.config.get('passphrase-program')
passphrase = _pin_communicate(
self,
passphrase_program or 'pinentry',
'Please enter your passphrase',
options=self.options,
)
if not passphrase:
if _is_open_tty(sys.stdin):
# use CLI-based PIN handler
ack = cli_handler(msg)
else:
passphrase = _message_box('Please enter passphrase:')
passphrase = mnemonic.Mnemonic.normalize_string(passphrase)

if not ack:
ack = self._defs.PassphraseAck(passphrase=passphrase)
self.__class__.cached_passphrase_ack = ack
return ack

Expand Down Expand Up @@ -130,19 +218,22 @@ def connect(self):
except (self._defs.PinException, ValueError) as e:
log.error('Invalid PIN: %s, retrying...', e)
continue
except UserCancelException:
connection.cancel()
return None
except Exception as e:
log.exception('ping failed: %s', e)
connection.close() # so the next HID open() will succeed
raise

raise interface.NotFoundError('{} not connected'.format(self))

def close(self):
"""Close connection."""
self.conn.close()

def pubkey(self, identity, ecdh=False):
def pubkey(self, identity, ecdh=False, options=None):
"""Return public key."""
self.options = options
curve_name = identity.get_curve_name(ecdh=ecdh)
log.debug('"%s" getting public key (%s) from %s',
identity.to_string(), curve_name, self)
Expand All @@ -158,8 +249,9 @@ def _identity_proto(self, identity):
setattr(result, name, value)
return result

def sign(self, identity, blob):
def sign(self, identity, blob, options=None):
"""Sign given blob and return the signature (as bytes)."""
self.options = options
curve_name = identity.get_curve_name(ecdh=False)
log.debug('"%s" signing %r (%s) on %s',
identity.to_string(), blob, curve_name, self)
Expand All @@ -178,8 +270,9 @@ def sign(self, identity, blob):
log.debug(msg, exc_info=True)
raise interface.DeviceError(msg)

def ecdh(self, identity, pubkey):
def ecdh(self, identity, pubkey, options=None):
"""Get shared session key using Elliptic Curve Diffie-Hellman."""
self.options = options
curve_name = identity.get_curve_name(ecdh=True)
log.debug('"%s" shared session key (%s) for %r from %s',
identity.to_string(), curve_name, pubkey, self)
Expand Down
21 changes: 17 additions & 4 deletions libagent/gpg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ def export_public_key(device_type, args):
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time)
c = client.Client(device=device_type())
config = {}
if args.pinentry:
config['pinentry-program'] = args.pinentry
if args.passentry:
config['passphrase-program'] = args.passentry
c = client.Client(device=device_type(config=config))
identity = client.create_identity(user_id=args.user_id,
curve_name=args.ecdsa_curve)
verifying_key = c.pubkey(identity=identity, ecdh=False)
Expand Down Expand Up @@ -151,10 +156,15 @@ def run_init(device_type, args):

# Prepare GPG agent configuration file
with open(os.path.join(homedir, 'gpg-agent.conf'), 'w') as f:
f.write("""# Hardware-based GPG agent emulator
lines = """# Hardware-based GPG agent emulator
log-file {0}/gpg-agent.log
verbosity 2
""".format(homedir))
""".format(homedir)
if args.pinentry:
lines += 'pinentry-program {}\n'.format(args.pinentry)
if args.passentry:
lines += 'passentry-program {}\n'.format(args.passentry)
f.write(lines)

# Prepare a helper script for setting up the new identity
with open(os.path.join(homedir, 'env'), 'w') as f:
Expand Down Expand Up @@ -219,7 +229,8 @@ def run_agent(device_type):
env = {'GNUPGHOME': args.homedir}
sock_path = keyring.get_agent_sock_path(env=env)
pubkey_bytes = keyring.export_public_keys(env=env)
handler = agent.Handler(device=device_type(), pubkey_bytes=pubkey_bytes)
handler = agent.Handler(device=device_type(config=config),
pubkey_bytes=pubkey_bytes)
with server.unix_domain_socket_server(sock_path) as sock:
for conn in agent.yield_connections(sock):
with contextlib.closing(conn):
Expand Down Expand Up @@ -255,6 +266,8 @@ def main(device_type):
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-v', '--verbose', default=0, action='count')
p.add_argument('-s', '--subkey', default=False, action='store_true')
p.add_argument('-p', '--pinentry')
p.add_argument('-pa', '--passentry')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these would be better as long options only.

p.set_defaults(func=run_init)

p = subparsers.add_parser('unlock', help='unlock the hardware device')
Expand Down
Loading