123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017 |
- """ A wrapper for the 'gpg' command::
- Portions of this module are derived from A.M. Kuchling's well-designed
- GPG.py, using Richard Jones' updated version 1.3, which can be found
- in the pycrypto CVS repository on Sourceforge:
- http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py
- This module is *not* forward-compatible with amk's; some of the
- old interface has changed. For instance, since I've added decrypt
- functionality, I elected to initialize with a 'gnupghome' argument
- instead of 'keyring', so that gpg can find both the public and secret
- keyrings. I've also altered some of the returned objects in order for
- the caller to not have to know as much about the internals of the
- result classes.
- While the rest of ISconf is released under the GPL, I am releasing
- this single file under the same terms that A.M. Kuchling used for
- pycrypto.
- Steve Traugott, stevegt@terraluna.org
- Thu Jun 23 21:27:20 PDT 2005
- This version of the module has been modified from Steve Traugott's version
- (see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by
- Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
- and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
- the previous versions.
- Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved.
- A unittest harness (test_gnupg.py) has also been added.
- """
- import locale
- __version__ = "0.2.9"
- __author__ = "Vinay Sajip"
- __date__ = "$29-Mar-2012 21:12:58$"
- try:
- from io import StringIO
- except ImportError:
- from cStringIO import StringIO
- import codecs
- import locale
- import logging
- import os
- import socket
- from subprocess import Popen
- from subprocess import PIPE
- import sys
- import threading
- try:
- import logging.NullHandler as NullHandler
- except ImportError:
- class NullHandler(logging.Handler):
- def handle(self, record):
- pass
- try:
- unicode
- _py3k = False
- except NameError:
- _py3k = True
- logger = logging.getLogger(__name__)
- if not logger.handlers:
- logger.addHandler(NullHandler())
- def _copy_data(instream, outstream):
- # Copy one stream to another
- sent = 0
- if hasattr(sys.stdin, 'encoding'):
- enc = sys.stdin.encoding
- else:
- enc = 'ascii'
- while True:
- data = instream.read(1024)
- if len(data) == 0:
- break
- sent += len(data)
- logger.debug("sending chunk (%d): %r", sent, data[:256])
- try:
- outstream.write(data)
- except UnicodeError:
- outstream.write(data.encode(enc))
- except:
- # Can sometimes get 'broken pipe' errors even when the data has all
- # been sent
- logger.exception('Error sending data')
- break
- try:
- outstream.close()
- except IOError:
- logger.warning('Exception occurred while closing: ignored', exc_info=1)
- logger.debug("closed output, %d bytes sent", sent)
- def _threaded_copy_data(instream, outstream):
- wr = threading.Thread(target=_copy_data, args=(instream, outstream))
- wr.setDaemon(True)
- logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
- wr.start()
- return wr
- def _write_passphrase(stream, passphrase, encoding):
- passphrase = '%s\n' % passphrase
- passphrase = passphrase.encode(encoding)
- stream.write(passphrase)
- logger.debug("Wrote passphrase: %r", passphrase)
- def _is_sequence(instance):
- return isinstance(instance,list) or isinstance(instance,tuple)
- def _make_binary_stream(s, encoding):
- try:
- if _py3k:
- if isinstance(s, str):
- s = s.encode(encoding)
- else:
- if type(s) is not str:
- s = s.encode(encoding)
- from io import BytesIO
- rv = BytesIO(s)
- except ImportError:
- rv = StringIO(s)
- return rv
- class Verify(object):
- "Handle status messages for --verify"
- def __init__(self, gpg):
- self.gpg = gpg
- self.valid = False
- self.fingerprint = self.creation_date = self.timestamp = None
- self.signature_id = self.key_id = None
- self.username = None
- def __nonzero__(self):
- return self.valid
- __bool__ = __nonzero__
- def handle_status(self, key, value):
- if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL",
- "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA", "NODATA",
- "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH",
- "POLICY_URL", "DECRYPTION_INFO", "DECRYPTION_OKAY", "IMPORTED"):
- pass
- elif key == "BADSIG":
- self.valid = False
- self.status = 'signature bad'
- self.key_id, self.username = value.split(None, 1)
- elif key == "GOODSIG":
- self.valid = True
- self.status = 'signature good'
- self.key_id, self.username = value.split(None, 1)
- elif key == "VALIDSIG":
- (self.fingerprint,
- self.creation_date,
- self.sig_timestamp,
- self.expire_timestamp) = value.split()[:4]
- # may be different if signature is made with a subkey
- self.pubkey_fingerprint = value.split()[-1]
- self.status = 'signature valid'
- elif key == "SIG_ID":
- (self.signature_id,
- self.creation_date, self.timestamp) = value.split()
- elif key == "ERRSIG":
- self.valid = False
- (self.key_id,
- algo, hash_algo,
- cls,
- self.timestamp) = value.split()[:5]
- self.status = 'signature error'
- elif key == "DECRYPTION_FAILED":
- self.valid = False
- self.key_id = value
- self.status = 'decryption failed'
- elif key == "NO_PUBKEY":
- self.valid = False
- self.key_id = value
- self.status = 'no public key'
- elif key in ("KEYEXPIRED", "SIGEXPIRED"):
- # these are useless in verify, since they are spit out for any
- # pub/subkeys on the key, not just the one doing the signing.
- # if we want to check for signatures with expired key,
- # the relevant flag is EXPKEYSIG.
- pass
- elif key in ("EXPKEYSIG", "REVKEYSIG"):
- # signed with expired or revoked key
- self.valid = False
- self.key_id = value.split()[0]
- self.status = (('%s %s') % (key[:3], key[3:])).lower()
- else:
- raise ValueError("Unknown status message: %r" % key)
- class ImportResult(object):
- "Handle status messages for --import"
- counts = '''count no_user_id imported imported_rsa unchanged
- n_uids n_subk n_sigs n_revoc sec_read sec_imported
- sec_dups not_imported'''.split()
- def __init__(self, gpg):
- self.gpg = gpg
- self.imported = []
- self.results = []
- self.fingerprints = []
- for result in self.counts:
- setattr(self, result, None)
- def __nonzero__(self):
- if self.not_imported: return False
- if not self.fingerprints: return False
- return True
- __bool__ = __nonzero__
- ok_reason = {
- '0': 'Not actually changed',
- '1': 'Entirely new key',
- '2': 'New user IDs',
- '4': 'New signatures',
- '8': 'New subkeys',
- '16': 'Contains private key',
- }
- problem_reason = {
- '0': 'No specific reason given',
- '1': 'Invalid Certificate',
- '2': 'Issuer Certificate missing',
- '3': 'Certificate Chain too long',
- '4': 'Error storing certificate',
- }
- def handle_status(self, key, value):
- if key == "IMPORTED":
- # this duplicates info we already see in import_ok & import_problem
- pass
- elif key == "NODATA":
- self.results.append({'fingerprint': None,
- 'problem': '0', 'text': 'No valid data found'})
- elif key == "IMPORT_OK":
- reason, fingerprint = value.split()
- reasons = []
- for code, text in list(self.ok_reason.items()):
- if int(reason) | int(code) == int(reason):
- reasons.append(text)
- reasontext = '\n'.join(reasons) + "\n"
- self.results.append({'fingerprint': fingerprint,
- 'ok': reason, 'text': reasontext})
- self.fingerprints.append(fingerprint)
- elif key == "IMPORT_PROBLEM":
- try:
- reason, fingerprint = value.split()
- except:
- reason = value
- fingerprint = '<unknown>'
- self.results.append({'fingerprint': fingerprint,
- 'problem': reason, 'text': self.problem_reason[reason]})
- elif key == "IMPORT_RES":
- import_res = value.split()
- for i in range(len(self.counts)):
- setattr(self, self.counts[i], int(import_res[i]))
- elif key == "KEYEXPIRED":
- self.results.append({'fingerprint': None,
- 'problem': '0', 'text': 'Key expired'})
- elif key == "SIGEXPIRED":
- self.results.append({'fingerprint': None,
- 'problem': '0', 'text': 'Signature expired'})
- else:
- raise ValueError("Unknown status message: %r" % key)
- def summary(self):
- l = []
- l.append('%d imported'%self.imported)
- if self.not_imported:
- l.append('%d not imported'%self.not_imported)
- return ', '.join(l)
- class ListKeys(list):
- ''' Handle status messages for --list-keys.
- Handle pub and uid (relating the latter to the former).
- Don't care about (info from src/DETAILS):
- crt = X.509 certificate
- crs = X.509 certificate and private key available
- sub = subkey (secondary key)
- ssb = secret subkey (secondary key)
- uat = user attribute (same as user id except for field 10).
- sig = signature
- rev = revocation signature
- pkd = public key data (special field format, see below)
- grp = reserved for gpgsm
- rvk = revocation key
- '''
- def __init__(self, gpg):
- self.gpg = gpg
- self.curkey = None
- self.fingerprints = []
- self.uids = []
- def key(self, args):
- vars = ("""
- type trust length algo keyid date expires dummy ownertrust uid
- """).split()
- self.curkey = {}
- for i in range(len(vars)):
- self.curkey[vars[i]] = args[i]
- self.curkey['uids'] = []
- if self.curkey['uid']:
- self.curkey['uids'].append(self.curkey['uid'])
- del self.curkey['uid']
- self.append(self.curkey)
- pub = sec = key
- def fpr(self, args):
- self.curkey['fingerprint'] = args[9]
- self.fingerprints.append(args[9])
- def uid(self, args):
- self.curkey['uids'].append(args[9])
- self.uids.append(args[9])
- def handle_status(self, key, value):
- pass
- class Crypt(Verify):
- "Handle status messages for --encrypt and --decrypt"
- def __init__(self, gpg):
- Verify.__init__(self, gpg)
- self.data = ''
- self.ok = False
- self.status = ''
- def __nonzero__(self):
- if self.ok: return True
- return False
- __bool__ = __nonzero__
- def __str__(self):
- return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
- def handle_status(self, key, value):
- if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
- "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
- "CARDCTRL"):
- # in the case of ERROR, this is because a more specific error
- # message will have come first
- pass
- elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
- "MISSING_PASSPHRASE", "DECRYPTION_FAILED",
- "KEY_NOT_CREATED"):
- self.status = key.replace("_", " ").lower()
- elif key == "NEED_PASSPHRASE_SYM":
- self.status = 'need symmetric passphrase'
- elif key == "BEGIN_DECRYPTION":
- self.status = 'decryption incomplete'
- elif key == "BEGIN_ENCRYPTION":
- self.status = 'encryption incomplete'
- elif key == "DECRYPTION_OKAY":
- self.status = 'decryption ok'
- self.ok = True
- elif key == "END_ENCRYPTION":
- self.status = 'encryption ok'
- self.ok = True
- elif key == "INV_RECP":
- self.status = 'invalid recipient'
- elif key == "KEYEXPIRED":
- self.status = 'key expired'
- elif key == "SIG_CREATED":
- self.status = 'sig created'
- elif key == "SIGEXPIRED":
- self.status = 'sig expired'
- else:
- Verify.handle_status(self, key, value)
- class GenKey(object):
- "Handle status messages for --gen-key"
- def __init__(self, gpg):
- self.gpg = gpg
- self.type = None
- self.fingerprint = None
- def __nonzero__(self):
- if self.fingerprint: return True
- return False
- __bool__ = __nonzero__
- def __str__(self):
- return self.fingerprint or ''
- def handle_status(self, key, value):
- if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"):
- pass
- elif key == "KEY_CREATED":
- (self.type,self.fingerprint) = value.split()
- else:
- raise ValueError("Unknown status message: %r" % key)
- class DeleteResult(object):
- "Handle status messages for --delete-key and --delete-secret-key"
- def __init__(self, gpg):
- self.gpg = gpg
- self.status = 'ok'
- def __str__(self):
- return self.status
- problem_reason = {
- '1': 'No such key',
- '2': 'Must delete secret key first',
- '3': 'Ambigious specification',
- }
- def handle_status(self, key, value):
- if key == "DELETE_PROBLEM":
- self.status = self.problem_reason.get(value,
- "Unknown error: %r" % value)
- else:
- raise ValueError("Unknown status message: %r" % key)
- class Sign(object):
- "Handle status messages for --sign"
- def __init__(self, gpg):
- self.gpg = gpg
- self.type = None
- self.fingerprint = None
- def __nonzero__(self):
- return self.fingerprint is not None
- __bool__ = __nonzero__
- def __str__(self):
- return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
- def handle_status(self, key, value):
- if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
- "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL"):
- pass
- elif key == "SIG_CREATED":
- (self.type,
- algo, hashalgo, cls,
- self.timestamp, self.fingerprint
- ) = value.split()
- else:
- raise ValueError("Unknown status message: %r" % key)
- class GPG(object):
- decode_errors = 'strict'
- result_map = {
- 'crypt': Crypt,
- 'delete': DeleteResult,
- 'generate': GenKey,
- 'import': ImportResult,
- 'list': ListKeys,
- 'sign': Sign,
- 'verify': Verify,
- }
- "Encapsulate access to the gpg executable"
- def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False,
- use_agent=False, keyring=None):
- """Initialize a GPG process wrapper. Options are:
- gpgbinary -- full pathname for GPG binary.
- gnupghome -- full pathname to where we can find the public and
- private keyrings. Default is whatever gpg defaults to.
- keyring -- name of alternative keyring file to use. If specified,
- the default keyring is not used.
- """
- self.gpgbinary = gpgbinary
- self.gnupghome = gnupghome
- self.keyring = keyring
- self.verbose = verbose
- self.use_agent = use_agent
- self.encoding = locale.getpreferredencoding()
- if self.encoding is None: # This happens on Jython!
- self.encoding = sys.stdin.encoding
- if gnupghome and not os.path.isdir(self.gnupghome):
- os.makedirs(self.gnupghome,0x1C0)
- p = self._open_subprocess(["--version"])
- result = self.result_map['verify'](self) # any result will do for this
- self._collect_output(p, result, stdin=p.stdin)
- if p.returncode != 0:
- raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
- result.stderr))
- def _open_subprocess(self, args, passphrase=False):
- # Internal method: open a pipe to a GPG subprocess and return
- # the file objects for communicating with it.
- cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
- if self.gnupghome:
- cmd.append('--homedir "%s" ' % self.gnupghome)
- if self.keyring:
- cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring)
- if passphrase:
- cmd.append('--batch --passphrase-fd 0')
- if self.use_agent:
- cmd.append('--use-agent')
- cmd.extend(args)
- cmd = ' '.join(cmd)
- if self.verbose:
- print(cmd)
- logger.debug("%s", cmd)
- return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- def _read_response(self, stream, result):
- # Internal method: reads all the stderr output from GPG, taking notice
- # only of lines that begin with the magic [GNUPG:] prefix.
- #
- # Calls methods on the response object for each valid token found,
- # with the arg being the remainder of the status line.
- lines = []
- while True:
- line = stream.readline()
- if len(line) == 0:
- break
- lines.append(line)
- line = line.rstrip()
- if self.verbose:
- print(line)
- logger.debug("%s", line)
- if line[0:9] == '[GNUPG:] ':
- # Chop off the prefix
- line = line[9:]
- L = line.split(None, 1)
- keyword = L[0]
- if len(L) > 1:
- value = L[1]
- else:
- value = ""
- result.handle_status(keyword, value)
- result.stderr = ''.join(lines)
- def _read_data(self, stream, result):
- # Read the contents of the file from GPG's stdout
- chunks = []
- while True:
- data = stream.read(1024)
- if len(data) == 0:
- break
- logger.debug("chunk: %r" % data[:256])
- chunks.append(data)
- if _py3k:
- # Join using b'' or '', as appropriate
- result.data = type(data)().join(chunks)
- else:
- result.data = ''.join(chunks)
- def _collect_output(self, process, result, writer=None, stdin=None):
- """
- Drain the subprocesses output streams, writing the collected output
- to the result. If a writer thread (writing to the subprocess) is given,
- make sure it's joined before returning. If a stdin stream is given,
- close it before returning.
- """
- stderr = codecs.getreader(self.encoding)(process.stderr)
- rr = threading.Thread(target=self._read_response, args=(stderr, result))
- rr.setDaemon(True)
- logger.debug('stderr reader: %r', rr)
- rr.start()
- stdout = process.stdout
- dr = threading.Thread(target=self._read_data, args=(stdout, result))
- dr.setDaemon(True)
- logger.debug('stdout reader: %r', dr)
- dr.start()
- dr.join()
- rr.join()
- if writer is not None:
- writer.join()
- process.wait()
- if stdin is not None:
- try:
- stdin.close()
- except IOError:
- pass
- stderr.close()
- stdout.close()
- def _handle_io(self, args, file, result, passphrase=None, binary=False):
- "Handle a call to GPG - pass input data, collect output data"
- # Handle a basic data call - pass data to GPG, handle the output
- # including status information. Garbage In, Garbage Out :)
- p = self._open_subprocess(args, passphrase is not None)
- if not binary:
- stdin = codecs.getwriter(self.encoding)(p.stdin)
- else:
- stdin = p.stdin
- if passphrase:
- _write_passphrase(stdin, passphrase, self.encoding)
- writer = _threaded_copy_data(file, stdin)
- self._collect_output(p, result, writer, stdin)
- return result
- #
- # SIGNATURE METHODS
- #
- def sign(self, message, **kwargs):
- """sign message"""
- f = _make_binary_stream(message, self.encoding)
- result = self.sign_file(f, **kwargs)
- f.close()
- return result
- def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
- detach=False, binary=False):
- """sign file"""
- logger.debug("sign_file: %s", file)
- if binary:
- args = ['-s']
- else:
- args = ['-sa']
- # You can't specify detach-sign and clearsign together: gpg ignores
- # the detach-sign in that case.
- if detach:
- args.append("--detach-sign")
- elif clearsign:
- args.append("--clearsign")
- if keyid:
- args.append('--default-key "%s"' % keyid)
- args.extend(['--no-version', "--comment ''"])
- result = self.result_map['sign'](self)
- #We could use _handle_io here except for the fact that if the
- #passphrase is bad, gpg bails and you can't write the message.
- p = self._open_subprocess(args, passphrase is not None)
- try:
- stdin = p.stdin
- if passphrase:
- _write_passphrase(stdin, passphrase, self.encoding)
- writer = _threaded_copy_data(file, stdin)
- except IOError:
- logging.exception("error writing message")
- writer = None
- self._collect_output(p, result, writer, stdin)
- return result
- def verify(self, data):
- """Verify the signature on the contents of the string 'data'
- >>> gpg = GPG(gnupghome="keys")
- >>> input = gpg.gen_key_input(Passphrase='foo')
- >>> key = gpg.gen_key(input)
- >>> assert key
- >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
- >>> assert not sig
- >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
- >>> assert sig
- >>> verify = gpg.verify(sig.data)
- >>> assert verify
- """
- f = _make_binary_stream(data, self.encoding)
- result = self.verify_file(f)
- f.close()
- return result
- def verify_file(self, file, data_filename=None):
- "Verify the signature on the contents of the file-like object 'file'"
- logger.debug('verify_file: %r, %r', file, data_filename)
- result = self.result_map['verify'](self)
- args = ['--verify']
- if data_filename is None:
- self._handle_io(args, file, result, binary=True)
- else:
- logger.debug('Handling detached verification')
- import tempfile
- fd, fn = tempfile.mkstemp(prefix='pygpg')
- s = file.read()
- file.close()
- logger.debug('Wrote to temp file: %r', s)
- os.write(fd, s)
- os.close(fd)
- args.append(fn)
- args.append('"%s"' % data_filename)
- try:
- p = self._open_subprocess(args)
- self._collect_output(p, result, stdin=p.stdin)
- finally:
- os.unlink(fn)
- return result
- #
- # KEY MANAGEMENT
- #
- def import_keys(self, key_data):
- """ import the key_data into our keyring
- >>> import shutil
- >>> shutil.rmtree("keys")
- >>> gpg = GPG(gnupghome="keys")
- >>> input = gpg.gen_key_input()
- >>> result = gpg.gen_key(input)
- >>> print1 = result.fingerprint
- >>> result = gpg.gen_key(input)
- >>> print2 = result.fingerprint
- >>> pubkey1 = gpg.export_keys(print1)
- >>> seckey1 = gpg.export_keys(print1,secret=True)
- >>> seckeys = gpg.list_keys(secret=True)
- >>> pubkeys = gpg.list_keys()
- >>> assert print1 in seckeys.fingerprints
- >>> assert print1 in pubkeys.fingerprints
- >>> str(gpg.delete_keys(print1))
- 'Must delete secret key first'
- >>> str(gpg.delete_keys(print1,secret=True))
- 'ok'
- >>> str(gpg.delete_keys(print1))
- 'ok'
- >>> str(gpg.delete_keys("nosuchkey"))
- 'No such key'
- >>> seckeys = gpg.list_keys(secret=True)
- >>> pubkeys = gpg.list_keys()
- >>> assert not print1 in seckeys.fingerprints
- >>> assert not print1 in pubkeys.fingerprints
- >>> result = gpg.import_keys('foo')
- >>> assert not result
- >>> result = gpg.import_keys(pubkey1)
- >>> pubkeys = gpg.list_keys()
- >>> seckeys = gpg.list_keys(secret=True)
- >>> assert not print1 in seckeys.fingerprints
- >>> assert print1 in pubkeys.fingerprints
- >>> result = gpg.import_keys(seckey1)
- >>> assert result
- >>> seckeys = gpg.list_keys(secret=True)
- >>> pubkeys = gpg.list_keys()
- >>> assert print1 in seckeys.fingerprints
- >>> assert print1 in pubkeys.fingerprints
- >>> assert print2 in pubkeys.fingerprints
- """
- result = self.result_map['import'](self)
- logger.debug('import_keys: %r', key_data[:256])
- data = _make_binary_stream(key_data, self.encoding)
- self._handle_io(['--import'], data, result, binary=True)
- logger.debug('import_keys result: %r', result.__dict__)
- data.close()
- return result
- def recv_keys(self, keyserver, *keyids):
- """Import a key from a keyserver
- >>> import shutil
- >>> shutil.rmtree("keys")
- >>> gpg = GPG(gnupghome="keys")
- >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA')
- >>> assert result
- """
- result = self.result_map['import'](self)
- logger.debug('recv_keys: %r', keyids)
- data = _make_binary_stream("", self.encoding)
- #data = ""
- args = ['--keyserver', keyserver, '--recv-keys']
- args.extend(keyids)
- self._handle_io(args, data, result, binary=True)
- logger.debug('recv_keys result: %r', result.__dict__)
- data.close()
- return result
- def delete_keys(self, fingerprints, secret=False):
- which='key'
- if secret:
- which='secret-key'
- if _is_sequence(fingerprints):
- fingerprints = ' '.join(fingerprints)
- args = ['--batch --delete-%s "%s"' % (which, fingerprints)]
- result = self.result_map['delete'](self)
- p = self._open_subprocess(args)
- self._collect_output(p, result, stdin=p.stdin)
- return result
- def export_keys(self, keyids, secret=False):
- "export the indicated keys. 'keyid' is anything gpg accepts"
- which=''
- if secret:
- which='-secret-key'
- if _is_sequence(keyids):
- keyids = ' '.join(['"%s"' % k for k in keyids])
- args = ["--armor --export%s %s" % (which, keyids)]
- p = self._open_subprocess(args)
- # gpg --export produces no status-fd output; stdout will be
- # empty in case of failure
- #stdout, stderr = p.communicate()
- result = self.result_map['delete'](self) # any result will do
- self._collect_output(p, result, stdin=p.stdin)
- logger.debug('export_keys result: %r', result.data)
- return result.data.decode(self.encoding, self.decode_errors)
- def list_keys(self, secret=False):
- """ list the keys currently in the keyring
- >>> import shutil
- >>> shutil.rmtree("keys")
- >>> gpg = GPG(gnupghome="keys")
- >>> input = gpg.gen_key_input()
- >>> result = gpg.gen_key(input)
- >>> print1 = result.fingerprint
- >>> result = gpg.gen_key(input)
- >>> print2 = result.fingerprint
- >>> pubkeys = gpg.list_keys()
- >>> assert print1 in pubkeys.fingerprints
- >>> assert print2 in pubkeys.fingerprints
- """
- which='keys'
- if secret:
- which='secret-keys'
- args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,)
- args = [args]
- p = self._open_subprocess(args)
- # there might be some status thingumy here I should handle... (amk)
- # ...nope, unless you care about expired sigs or keys (stevegt)
- # Get the response information
- result = self.result_map['list'](self)
- self._collect_output(p, result, stdin=p.stdin)
- lines = result.data.decode(self.encoding,
- self.decode_errors).splitlines()
- valid_keywords = 'pub uid sec fpr'.split()
- for line in lines:
- if self.verbose:
- print(line)
- logger.debug("line: %r", line.rstrip())
- if not line:
- break
- L = line.strip().split(':')
- if not L:
- continue
- keyword = L[0]
- if keyword in valid_keywords:
- getattr(result, keyword)(L)
- return result
- def gen_key(self, input):
- """Generate a key; you might use gen_key_input() to create the
- control input.
- >>> gpg = GPG(gnupghome="keys")
- >>> input = gpg.gen_key_input()
- >>> result = gpg.gen_key(input)
- >>> assert result
- >>> result = gpg.gen_key('foo')
- >>> assert not result
- """
- args = ["--gen-key --batch"]
- result = self.result_map['generate'](self)
- f = _make_binary_stream(input, self.encoding)
- self._handle_io(args, f, result, binary=True)
- f.close()
- return result
- def gen_key_input(self, **kwargs):
- """
- Generate --gen-key input per gpg doc/DETAILS
- """
- parms = {}
- for key, val in list(kwargs.items()):
- key = key.replace('_','-').title()
- parms[key] = val
- parms.setdefault('Key-Type','RSA')
- parms.setdefault('Key-Length',1024)
- parms.setdefault('Name-Real', "Autogenerated Key")
- parms.setdefault('Name-Comment', "Generated by gnupg.py")
- try:
- logname = os.environ['LOGNAME']
- except KeyError:
- logname = os.environ['USERNAME']
- hostname = socket.gethostname()
- parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'),
- hostname))
- out = "Key-Type: %s\n" % parms.pop('Key-Type')
- for key, val in list(parms.items()):
- out += "%s: %s\n" % (key, val)
- out += "%commit\n"
- return out
- # Key-Type: RSA
- # Key-Length: 1024
- # Name-Real: ISdlink Server on %s
- # Name-Comment: Created by %s
- # Name-Email: isdlink@%s
- # Expire-Date: 0
- # %commit
- #
- #
- # Key-Type: DSA
- # Key-Length: 1024
- # Subkey-Type: ELG-E
- # Subkey-Length: 1024
- # Name-Real: Joe Tester
- # Name-Comment: with stupid passphrase
- # Name-Email: joe@foo.bar
- # Expire-Date: 0
- # Passphrase: abc
- # %pubring foo.pub
- # %secring foo.sec
- # %commit
- #
- # ENCRYPTION
- #
- def encrypt_file(self, file, recipients, sign=None,
- always_trust=False, passphrase=None,
- armor=True, output=None, symmetric=False):
- "Encrypt the message read from the file-like object 'file'"
- args = ['--no-version', "--comment ''"]
- if symmetric:
- args.append('--symmetric')
- else:
- args.append('--encrypt')
- if not _is_sequence(recipients):
- recipients = (recipients,)
- for recipient in recipients:
- args.append('--recipient "%s"' % recipient)
- if armor: # create ascii-armored output - set to False for binary output
- args.append('--armor')
- if output: # write the output to a file with the specified name
- if os.path.exists(output):
- os.remove(output) # to avoid overwrite confirmation message
- args.append('--output "%s"' % output)
- if sign:
- args.append('--sign --default-key "%s"' % sign)
- if always_trust:
- args.append("--always-trust")
- result = self.result_map['crypt'](self)
- self._handle_io(args, file, result, passphrase=passphrase, binary=True)
- logger.debug('encrypt result: %r', result.data)
- return result
- def encrypt(self, data, recipients, **kwargs):
- """Encrypt the message contained in the string 'data'
- >>> import shutil
- >>> if os.path.exists("keys"):
- ... shutil.rmtree("keys")
- >>> gpg = GPG(gnupghome="keys")
- >>> input = gpg.gen_key_input(passphrase='foo')
- >>> result = gpg.gen_key(input)
- >>> print1 = result.fingerprint
- >>> input = gpg.gen_key_input()
- >>> result = gpg.gen_key(input)
- >>> print2 = result.fingerprint
- >>> result = gpg.encrypt("hello",print2)
- >>> message = str(result)
- >>> assert message != 'hello'
- >>> result = gpg.decrypt(message)
- >>> assert result
- >>> str(result)
- 'hello'
- >>> result = gpg.encrypt("hello again",print1)
- >>> message = str(result)
- >>> result = gpg.decrypt(message)
- >>> result.status == 'need passphrase'
- True
- >>> result = gpg.decrypt(message,passphrase='bar')
- >>> result.status in ('decryption failed', 'bad passphrase')
- True
- >>> assert not result
- >>> result = gpg.decrypt(message,passphrase='foo')
- >>> result.status == 'decryption ok'
- True
- >>> str(result)
- 'hello again'
- >>> result = gpg.encrypt("signed hello",print2,sign=print1)
- >>> result.status == 'need passphrase'
- True
- >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
- >>> result.status == 'encryption ok'
- True
- >>> message = str(result)
- >>> result = gpg.decrypt(message)
- >>> result.status == 'decryption ok'
- True
- >>> assert result.fingerprint == print1
- """
- data = _make_binary_stream(data, self.encoding)
- result = self.encrypt_file(data, recipients, **kwargs)
- data.close()
- return result
- def decrypt(self, message, **kwargs):
- data = _make_binary_stream(message, self.encoding)
- result = self.decrypt_file(data, **kwargs)
- data.close()
- return result
- def decrypt_file(self, file, always_trust=False, passphrase=None,
- output=None):
- args = ["--decrypt"]
- if output: # write the output to a file with the specified name
- if os.path.exists(output):
- os.remove(output) # to avoid overwrite confirmation message
- args.append('--output "%s"' % output)
- if always_trust:
- args.append("--always-trust")
- result = self.result_map['crypt'](self)
- self._handle_io(args, file, result, passphrase, binary=True)
- logger.debug('decrypt result: %r', result.data)
- return result
|