# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:expandtab
""" BinGPG is a "gpg" or "gpg2" command line wrapper which
implements all operations we need for Autocrypt usage.
It is not meant as a general wrapper outside Autocrypt
contexts.
"""
from __future__ import print_function, unicode_literals
import logging
from distutils.version import LooseVersion as V
import six
import os
import sys
from subprocess import Popen, PIPE
from contextlib import contextmanager
import tempfile
import re
iswin32 = sys.platform == "win32" or (getattr(os, '_name', False) == 'nt')
def cached_property(f):
# returns a property definition which lazily computes and
# caches the result of calling f. The property also allows
# setting the value (before or after read).
def get(self):
propcache = self.__dict__.setdefault("_property_cache", {})
key = f.__name__
try:
return propcache[key]
except KeyError:
x = self._property_cache[key] = f(self)
return x
def set(self, val):
propcache = self.__dict__.setdefault("_property_cache", {})
propcache[f.__name__] = val
return property(get, set)
[docs]class InvocationFailure(Exception):
[docs] def __init__(self, ret, cmd, out, err, extrainfo=None):
self.ret = ret
self.cmd = cmd
self.out = out
self.err = err
self.extrainfo = extrainfo
def __str__(self):
lines = ["GPG Command '%s' retcode=%d" % (self.cmd, self.ret)]
for name, olines in [("stdout:", self.out.__str__()),
("stderr:", self.err)]:
lines.append(name)
for line in olines.splitlines():
lines.append(" " + line)
if self.extrainfo:
lines.append(self.extrainfo)
return "\n".join(lines)
[docs]class BinGPG(object):
""" basic wrapper for gpg command line invocations. """
InvocationFailure = InvocationFailure
[docs] def __init__(self, homedir=None, gpgpath="gpg"):
"""
:type homedir: unicode or None
:param homedir: gpg home directory, if None system gpg homedir is used.
:type gpgpath: unicode
:param gpgpath:
If the path contains path separators and points
to an existing file we use it directly.
If it contains no path separators, we lookup
the path to the binary under the system's PATH.
If we can not determine an eventual binary
we raise ValueError.
"""
self.homedir = homedir
p = find_executable(gpgpath)
if p is None:
raise ValueError("could not find binary for {!r}".format(gpgpath))
self.gpgpath = p
self._ensure_init()
def __str__(self):
return "BinGPG(gpgpath={gpgpath!r}, homedir={homedir!r})".format(
gpgpath=self.gpgpath, homedir=self.homedir)
@cached_property
def _version_info(self):
return self._gpg_out(['--version'])
@cached_property
def gpg_version(self):
vline = self._version_info.split('\n', 1)[0]
return V(vline.split(' ')[2])
@cached_property
def isgpg2(self, min_version=V("2.0")):
return self.gpg_version >= min_version
@cached_property
def isgpg21(self, min_version=V("2.1")):
return self.gpg_version >= min_version
def _ensure_init(self):
if self.homedir is None:
return
if not os.path.exists(self.homedir):
# we create the dir if the basedir exists, otherwise we fail
os.makedirs(self.homedir)
os.chmod(self.homedir, 0o700)
# fix bad defaults for certain gpg2 versions
if V("2.0") <= self.gpg_version < V("2.1.12"):
p = os.path.join(self.homedir, "gpg-agent.conf")
if not os.path.exists(p):
with open(p, "w") as f:
f.write("allow-loopback-pinentry\n")
def killagent(self):
if self.isgpg2:
args = [find_executable("gpg-connect-agent"), "--no-autostart"]
args += self._homedirflags + ["KILLAGENT"]
popen = Popen(args)
popen.wait()
@contextmanager
def temp_written_file(self, data):
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(data)
try:
yield f.name
finally:
os.remove(f.name)
@property
def _homedirflags(self):
return ["--homedir", self.homedir] if self.homedir else []
@cached_property
def _nopassphrase(self):
opts = ["--passphrase", "123"]
if self.isgpg21:
opts.append("--pinentry-mode=loopback")
return opts
def _gpg_out(self, argv, input=None, strict=False, encoding="utf8"):
return self._gpg_outerr(argv, input=input, strict=strict, encoding=encoding)[0]
def _gpg_outerr(self, argv, input=None, strict=False, encoding="utf8"):
""" return stdout and stderr output of invoking gpg with the
specified parameters.
If the invocation leads to a non-zero exit
status an InvocationFailure exception is thrown. It is also
thrown if strict is True and there was non-empty stderr output.
stderr output will always be returned as a text type (utf8-decoded)
while stdout output is returned decoded if encoding is set (default is "utf8").
If you want binary stdout output specify encoding=None.
"""
assert input is None or isinstance(input, bytes)
args = [self.gpgpath, "--batch"] + self._homedirflags
# make sure we use unicode for all provided arguments
def ensure_unicode(x):
return x.decode("utf8") if isinstance(x, bytes) else x
args.extend(map(ensure_unicode, argv))
# open the process with a C locale, pipe everything
env = os.environ.copy()
env["LANG"] = "C"
env["LANGUAGE"] = "C"
env["LC_ALL"] = "en_US.UTF-8"
popen = Popen(args, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env)
# some debugging info
G = os.environ.get("GNUPGHOME")
extra = "" if not G else ("GNUPGHOME=" + G + " ")
logging.debug("$ %s%s", extra, " ".join(args))
out, err = popen.communicate(input=input)
ret = popen.wait()
if ret == 130:
raise KeyboardInterrupt("detected in gpg invocation")
err = err.decode("utf-8")
if encoding:
out = out.decode(encoding)
if ret != 0 or (strict and err):
raise self.InvocationFailure(ret, " ".join(args),
out=out, err=err)
return out, err
def supports_eddsa(self):
for l in self._version_info.split('\n'):
if l.startswith('Pubkey:'):
return 'eddsa' in map(
lambda x: x.strip().lower(), l.split(':', 1)[1].split(','))
return False
def gen_secret_key(self, emailadr):
spec = "\n".join([
"Key-Type: RSA",
"Key-Length: 3072",
"Key-Usage: sign",
"Subkey-Type: RSA",
"Subkey-Length: 3072",
"Subkey-Usage: encrypt",
"Name-Email: " + emailadr,
"Expire-Date: 0",
"%commit"
]).encode("utf8")
with self.temp_written_file(spec) as fn:
try:
out, err = self._gpg_outerr(self._nopassphrase + ["--gen-key", fn])
except InvocationFailure as e:
e.extrainfo = open(fn).read()
raise
keyhandle = self._find_keyhandle(err)
logging.debug("created secret key: %s", keyhandle)
return keyhandle
def list_secret_keyinfos(self, keyhandle=None):
args = ["--skip-verify", "--with-colons", "--list-secret-keys"]
if keyhandle is not None:
args.append(keyhandle)
return self._parse_list(args, ("sec", "ssb"))
def get_secret_keyhandle(self, keyhandle):
assert isinstance(keyhandle, six.text_type)
for k in self.list_secret_keyinfos(keyhandle):
is_in_uids = any(keyhandle in uid for uid in k.uids)
if is_in_uids or k.match(keyhandle):
return k.id
return None
def list_public_keyinfos(self, keyhandle=None):
args = ["--skip-verify", "--with-colons", "--list-public-keys"]
if keyhandle is not None:
args.append(keyhandle)
return self._parse_list(args, ("pub", "sub"))
def _parse_list(self, args, types):
out = self._gpg_out(args)
keyinfos = []
last_main_type_keyinfo = None
for line in out.splitlines():
parts = line.split(":")
if parts[0] in types:
keyinfos.append(
KeyInfo(type=parts[3], bits=int(parts[2]), uid=parts[9],
id=parts[4], date_created=parts[5]))
if parts[0] == types[0]:
last_main_type_keyinfo = keyinfos[-1]
elif parts[0] == "uid":
last_main_type_keyinfo.uids.append(parts[9])
return keyinfos
def _find_keyhandle(self, string, _pattern=re.compile("key (?:ID )?([0-9A-F]+)")):
m = _pattern.search(string)
assert m and len(m.groups()) == 1, string
x = m.groups()[0]
# now search the fingerprint if we only have a shortid
if len(x) <= 8: # keyid has 8 hex bytes
keyinfos = self.list_public_keyinfos(x)
for k in keyinfos:
if k.match(x):
return k.id
raise ValueError("could not find fingerprint %r in %r" % (x, keyinfos))
# note that this might be a 16-char fingerprint or a 40-char one (gpg-2.1.18)
return x
def list_secret_key_packets(self, keyhandle):
return self.list_packets(self.get_secret_keydata(keyhandle))
def list_public_key_packets(self, keyhandle):
return self.list_packets(self.get_public_keydata(keyhandle))
def list_packets(self, keydata):
out = self._gpg_out(["--list-packets"], input=keydata)
# build up a list of (pkgname, pkgvalue, lines) tuples
packets = []
lines = []
last_package_type = None
for rawline in out.splitlines():
line = rawline.strip()
c = line[0:1]
if c == "#":
continue
if c == ":":
i = line[1:].find(c)
if i != -1:
ptype = line[1: i + 1]
pvalue = line[i + 2:].strip()
if last_package_type is not None:
packets.append(last_package_type + (lines,))
lines = []
last_package_type = (ptype, pvalue)
else:
assert last_package_type, line
lines.append(line)
else:
packets.append(last_package_type + (lines,))
return packets
def get_public_keydata(self, keyhandle, armor=False):
args = ["-a"] if armor else []
args.extend(["--export-options=export-minimal", "--export", str(keyhandle)])
out = self._gpg_out(args, strict=True, encoding=None)
return out
def get_secret_keydata(self, keyhandle, armor=False):
args = ["-a"] if armor else []
args.extend(self._nopassphrase + ["--export-options=export-minimal",
"--export-secret-key", keyhandle])
return self._gpg_out(args, strict=True, encoding=None)
def encrypt(self, data, recipients, signkey=None, text=False):
opts = self._nopassphrase + ["--encrypt", "--always-trust"]
for r in recipients:
opts.extend(["-r", r])
if signkey:
opts.extend(["--sign", "-u", signkey])
if text:
opts.extend(["--armor"])
# print(opts)
return self._gpg_out(opts, input=data, encoding=None)
def sign(self, data, keyhandle):
args = self._nopassphrase + ["--detach-sign", "-u", keyhandle]
return self._gpg_out(args, input=data, encoding=None)
def verify(self, data, signature):
with self.temp_written_file(signature) as sig_fn:
out, err = self._gpg_outerr(["--verify", sig_fn, "-"], input=data)
return self._find_keyhandle(err)
def decrypt(self, enc_data):
args = self._nopassphrase + ["--with-colons", "--decrypt"]
out, err = self._gpg_outerr(args, input=enc_data, encoding=None)
lines = err.splitlines()
keyinfos = []
while lines:
line1 = lines.pop(0)
m = re.match(r"gpg.*with (\d+)-bit (\w+).*"
r"ID (\w+).*created (.*)", line1)
if m:
bits, keytype, id, date = m.groups()
line2 = lines.pop(0)
if line2.startswith(" "):
uid = line2.strip().strip('"')
keyinfos.append(KeyInfo(keytype, bits, id, uid, date))
return out, keyinfos
def import_keydata(self, keydata, minimize=False):
out, err = self._gpg_outerr(["--skip-verify", "--import"], input=keydata)
kh = self._find_keyhandle(err)
if minimize:
# get_public_keydata gets us a minimized key
minimized_keydata = self.get_public_keydata(kh)
self._gpg_outerr(["--yes", "--delete-key", kh])
_, err = self._gpg_outerr(["--skip-verify", "--import"], input=minimized_keydata)
min_kh = self._find_keyhandle(err)
assert min_kh == kh
return kh
class KeyInfo:
def __init__(self, type, bits, id, uid, date_created):
self.type = type
self.bits = int(bits)
self.id = id
self.uids = [uid] if uid else []
self.date_created = date_created
def match(self, other_id):
i = min(len(other_id), len(self.id))
return self.id[-i:].lower() == other_id[-i:].lower()
def __str__(self):
return "KeyInfo(id={id!r}, uids={uids!r}, bits={bits}, type={type})".format(
**self.__dict__)
__repr__ = __str__
[docs]def find_executable(name):
""" return a path object found by looking at the systems
underlying PATH specification. If an executable
cannot be found, None is returned. copied and adapted
from py.path.local.sysfind.
"""
if os.path.isabs(name):
return name if os.path.isfile(name) else None
else:
if iswin32:
paths = os.environ['Path'].split(';')
if '' not in paths and '.' not in paths:
paths.append('.')
try:
systemroot = os.environ['SYSTEMROOT']
except KeyError:
pass
else:
paths = [re.sub('%SystemRoot%', systemroot, path)
for path in paths]
else:
paths = os.environ['PATH'].split(':')
tryadd = []
if iswin32:
tryadd += os.environ['PATHEXT'].split(os.pathsep)
tryadd.append("")
for x in paths:
for addext in tryadd:
p = os.path.join(x, name) + addext
try:
if os.path.isfile(p):
return p
except Exception:
pass
return None