123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782 |
- """ A wrapper for the 'gpg' command::
- Portions of this module are derived from A.M. Kuchling's well-designed
- GPG.py, using Richard Jones' updated version 1.3, which can be found
- in the pycrypto CVS repository on Sourceforge:
- http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py
- This module is *not* forward-compatible with amk's; some of the
- old interface has changed. For instance, since I've added decrypt
- functionality, I elected to initialize with a 'gnupghome' argument
- instead of 'keyring', so that gpg can find both the public and secret
- keyrings. I've also altered some of the returned objects in order for
- the caller to not have to know as much about the internals of the
- result classes.
- While the rest of ISconf is released under the GPL, I am releasing
- this single file under the same terms that A.M. Kuchling used for
- pycrypto.
- Steve Traugott, stevegt@terraluna.org
- Thu Jun 23 21:27:20 PDT 2005
- This version of the module has been modified from Steve Traugott's version
- (see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by
- Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
- and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
- the previous versions.
- Modifications Copyright (C) 2008-2022 Vinay Sajip. All rights reserved.
- A unittest harness (test_gnupg.py) has also been added.
- """
- import codecs
- from io import StringIO
- import logging
- import os
- import re
- import socket
- from subprocess import Popen, PIPE
- import sys
- import threading
- __version__ = '0.4.9'
- __author__ = 'Vinay Sajip'
- __date__ = '$20-May-2022 09:01:43$'
- STARTUPINFO = None
- if os.name == 'nt': # pragma: no cover
- try:
- from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE
- except ImportError:
- STARTUPINFO = None
- try:
- unicode
- _py3k = False
- string_types = basestring
- text_type = unicode
- except NameError:
- _py3k = True
- string_types = str
- text_type = str
- logger = logging.getLogger(__name__)
- if not logger.handlers:
- logger.addHandler(logging.NullHandler())
- # We use the test below because it works for Jython as well as CPython
- if os.path.__name__ == 'ntpath': # pragma: no cover
- # On Windows, we don't need shell quoting, other than worrying about
- # paths with spaces in them.
- def shell_quote(s):
- return '"%s"' % s
- else:
- # Section copied from sarge
- # This regex determines which shell input needs quoting
- # because it may be unsafe
- UNSAFE = re.compile(r'[^\w%+,./:=@-]')
- def shell_quote(s):
- """
- Quote text so that it is safe for Posix command shells.
- For example, "*.py" would be converted to "'*.py'". If the text is
- considered safe it is returned unquoted.
- :param s: The value to quote
- :type s: str (or unicode on 2.x)
- :return: A safe version of the input, from the point of view of Posix
- command shells
- :rtype: The passed-in type
- """
- if not isinstance(s, string_types): # pragma: no cover
- raise TypeError('Expected string type, got %s' % type(s))
- if not s:
- result = "''"
- elif not UNSAFE.search(s):
- result = s
- else:
- result = "'%s'" % s.replace("'", r"'\''")
- return result
- # end of sarge code
- # Now that we use shell=False, we shouldn't need to quote arguments.
- # Use no_quote instead of shell_quote to remind us of where quoting
- # was needed. However, note that we still need, on 2.x, to encode any
- # Unicode argument with the file system encoding - see Issue #41 and
- # Python issue #1759845 ("subprocess.call fails with unicode strings in
- # command line").
- # Allows the encoding used to be overridden in special cases by setting
- # this module attribute appropriately.
- fsencoding = sys.getfilesystemencoding()
- def no_quote(s):
- if not _py3k and isinstance(s, text_type):
- s = s.encode(fsencoding)
- return s
- def _copy_data(instream, outstream):
- # Copy one stream to another
- sent = 0
- if hasattr(sys.stdin, 'encoding'):
- enc = sys.stdin.encoding
- else: # pragma: no cover
- enc = 'ascii'
- while True:
- # See issue #39: read can fail when e.g. a text stream is provided
- # for what is actually a binary file
- try:
- data = instream.read(1024)
- except UnicodeError:
- logger.warning('Exception occurred while reading', exc_info=1)
- break
- if not data:
- break
- sent += len(data)
- # logger.debug('sending chunk (%d): %r', sent, data[:256])
- try:
- outstream.write(data)
- except UnicodeError: # pragma: no cover
- outstream.write(data.encode(enc))
- except Exception:
- # Can sometimes get 'broken pipe' errors even when the data has all
- # been sent
- logger.exception('Error sending data')
- break
- try:
- outstream.close()
- except IOError: # pragma: no cover
- logger.warning('Exception occurred while closing: ignored', exc_info=1)
- logger.debug('closed output, %d bytes sent', sent)
- def _threaded_copy_data(instream, outstream):
- wr = threading.Thread(target=_copy_data, args=(instream, outstream))
- wr.daemon = True
- logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
- wr.start()
- return wr
- def _write_passphrase(stream, passphrase, encoding):
- passphrase = '%s\n' % passphrase
- passphrase = passphrase.encode(encoding)
- stream.write(passphrase)
- logger.debug('Wrote passphrase')
- def _is_sequence(instance):
- return isinstance(instance, (list, tuple, set, frozenset))
- def _make_memory_stream(s):
- try:
- from io import BytesIO
- rv = BytesIO(s)
- except ImportError: # pragma: no cover
- rv = StringIO(s)
- return rv
- def _make_binary_stream(s, encoding):
- if _py3k:
- if isinstance(s, str):
- s = s.encode(encoding)
- else:
- if type(s) is not str:
- s = s.encode(encoding)
- return _make_memory_stream(s)
- class Verify(object):
- "Handle status messages for --verify"
- TRUST_UNDEFINED = 0
- TRUST_NEVER = 1
- TRUST_MARGINAL = 2
- TRUST_FULLY = 3
- TRUST_ULTIMATE = 4
- TRUST_LEVELS = {
- 'TRUST_UNDEFINED': TRUST_UNDEFINED,
- 'TRUST_NEVER': TRUST_NEVER,
- 'TRUST_MARGINAL': TRUST_MARGINAL,
- 'TRUST_FULLY': TRUST_FULLY,
- 'TRUST_ULTIMATE': TRUST_ULTIMATE,
- }
- # for now, just the most common error codes. This can be expanded as and
- # when reports come in of other errors.
- GPG_SYSTEM_ERROR_CODES = {
- 1: 'permission denied',
- 35: 'file exists',
- 81: 'file not found',
- 97: 'not a directory',
- }
- GPG_ERROR_CODES = {
- 11: 'incorrect passphrase',
- }
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- self.valid = False
- self.fingerprint = self.creation_date = self.timestamp = None
- self.signature_id = self.key_id = None
- self.username = None
- self.key_id = None
- self.key_status = None
- self.status = None
- self.pubkey_fingerprint = None
- self.expire_timestamp = None
- self.sig_timestamp = None
- self.trust_text = None
- self.trust_level = None
- self.sig_info = {}
- def __nonzero__(self):
- return self.valid
- __bool__ = __nonzero__
- def handle_status(self, key, value):
- def update_sig_info(**kwargs):
- sig_id = self.signature_id
- if sig_id:
- info = self.sig_info[sig_id]
- info.update(kwargs)
- if key in self.TRUST_LEVELS:
- self.trust_text = key
- self.trust_level = self.TRUST_LEVELS[key]
- update_sig_info(trust_level=self.trust_level, trust_text=self.trust_text)
- elif key in ('WARNING', 'ERROR'):
- logger.warning('potential problem: %s: %s', key, value)
- elif key == 'BADSIG': # pragma: no cover
- self.valid = False
- self.status = 'signature bad'
- self.key_id, self.username = value.split(None, 1)
- update_sig_info(keyid=self.key_id, username=self.username, status=self.status)
- elif key == 'ERRSIG': # pragma: no cover
- self.valid = False
- parts = value.split()
- (self.key_id, algo, hash_algo, cls, self.timestamp) = parts[:5]
- # Since GnuPG 2.2.7, a fingerprint is tacked on
- if len(parts) >= 7:
- self.fingerprint = parts[6]
- self.status = 'signature error'
- update_sig_info(keyid=self.key_id,
- timestamp=self.timestamp,
- fingerprint=self.fingerprint,
- status=self.status)
- elif key == 'EXPSIG': # pragma: no cover
- self.valid = False
- self.status = 'signature expired'
- self.key_id, self.username = value.split(None, 1)
- update_sig_info(keyid=self.key_id, username=self.username, status=self.status)
- elif key == 'GOODSIG':
- self.valid = True
- self.status = 'signature good'
- self.key_id, self.username = value.split(None, 1)
- update_sig_info(keyid=self.key_id, username=self.username, status=self.status)
- elif key == 'VALIDSIG':
- fingerprint, creation_date, sig_ts, expire_ts = value.split()[:4]
- (self.fingerprint, self.creation_date, self.sig_timestamp,
- self.expire_timestamp) = (fingerprint, creation_date, sig_ts, expire_ts)
- # may be different if signature is made with a subkey
- self.pubkey_fingerprint = value.split()[-1]
- self.status = 'signature valid'
- update_sig_info(fingerprint=fingerprint,
- creation_date=creation_date,
- timestamp=sig_ts,
- expiry=expire_ts,
- pubkey_fingerprint=self.pubkey_fingerprint,
- status=self.status)
- elif key == 'SIG_ID':
- sig_id, creation_date, timestamp = value.split()
- self.sig_info[sig_id] = {'creation_date': creation_date, 'timestamp': timestamp}
- (self.signature_id, self.creation_date, self.timestamp) = (sig_id, creation_date,
- timestamp)
- elif key == 'DECRYPTION_FAILED': # pragma: no cover
- self.valid = False
- self.key_id = value
- self.status = 'decryption failed'
- elif key == 'NO_PUBKEY': # pragma: no cover
- self.valid = False
- self.key_id = value
- self.status = 'no public key'
- elif key == 'NO_SECKEY': # pragma: no cover
- self.valid = False
- self.key_id = value
- self.status = 'no secret key'
- elif key in ('EXPKEYSIG', 'REVKEYSIG'): # pragma: no cover
- # signed with expired or revoked key
- self.valid = False
- self.key_id = value.split()[0]
- if key == 'EXPKEYSIG':
- self.key_status = 'signing key has expired'
- else:
- self.key_status = 'signing key was revoked'
- self.status = self.key_status
- update_sig_info(status=self.status, keyid=self.key_id)
- elif key in ('UNEXPECTED', 'FAILURE'): # pragma: no cover
- self.valid = False
- self.key_id = value
- if key == 'UNEXPECTED':
- self.status = 'unexpected data'
- else:
- # N.B. there might be other reasons. For example, if an output
- # file can't be created - /dev/null/foo will lead to a
- # "not a directory" error, but which is not sent as a status
- # message with the [GNUPG:] prefix. Similarly if you try to
- # write to "/etc/foo" as a non-root user, a "permission denied"
- # error will be sent as a non-status message.
- message = 'error - %s' % value
- operation, code = value.rsplit(' ', 1)
- if code.isdigit():
- code = int(code) & 0xFFFFFF # lose the error source
- if self.gpg.error_map and code in self.gpg.error_map:
- message = '%s: %s' % (operation, self.gpg.error_map[code])
- else:
- system_error = bool(code & 0x8000)
- code = code & 0x7FFF
- if system_error:
- mapping = self.GPG_SYSTEM_ERROR_CODES
- else:
- mapping = self.GPG_ERROR_CODES
- if code in mapping:
- message = '%s: %s' % (operation, mapping[code])
- if not self.status:
- self.status = message
- elif key in ('DECRYPTION_INFO', 'PLAINTEXT', 'PLAINTEXT_LENGTH', 'BEGIN_SIGNING'):
- pass
- else: # pragma: no cover
- logger.debug('message ignored: %s, %s', key, value)
- class ImportResult(object):
- "Handle status messages for --import"
- counts = '''count no_user_id imported imported_rsa unchanged
- n_uids n_subk n_sigs n_revoc sec_read sec_imported
- sec_dups not_imported'''.split()
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- self.results = []
- self.fingerprints = []
- for result in self.counts:
- setattr(self, result, 0)
- def __nonzero__(self):
- if self.not_imported or not self.fingerprints:
- return False
- return True
- __bool__ = __nonzero__
- ok_reason = {
- '0': 'Not actually changed',
- '1': 'Entirely new key',
- '2': 'New user IDs',
- '4': 'New signatures',
- '8': 'New subkeys',
- '16': 'Contains private key',
- }
- problem_reason = {
- '0': 'No specific reason given',
- '1': 'Invalid Certificate',
- '2': 'Issuer Certificate missing',
- '3': 'Certificate Chain too long',
- '4': 'Error storing certificate',
- }
- def handle_status(self, key, value):
- if key in ('WARNING', 'ERROR'):
- logger.warning('potential problem: %s: %s', key, value)
- elif key in ('IMPORTED', 'KEY_CONSIDERED'):
- # this duplicates info we already see in import_ok & import_problem
- pass
- elif key == 'NODATA': # pragma: no cover
- self.results.append({
- 'fingerprint': None,
- 'problem': '0',
- 'text': 'No valid data found'
- })
- elif key == 'IMPORT_OK':
- reason, fingerprint = value.split()
- reasons = []
- for code, text in list(self.ok_reason.items()):
- if int(reason) | int(code) == int(reason):
- reasons.append(text)
- reasontext = '\n'.join(reasons) + '\n'
- self.results.append({'fingerprint': fingerprint, 'ok': reason, 'text': reasontext})
- self.fingerprints.append(fingerprint)
- elif key == 'IMPORT_PROBLEM': # pragma: no cover
- try:
- reason, fingerprint = value.split()
- except Exception:
- reason = value
- fingerprint = '<unknown>'
- self.results.append({
- 'fingerprint': fingerprint,
- 'problem': reason,
- 'text': self.problem_reason[reason]
- })
- elif key == 'IMPORT_RES':
- import_res = value.split()
- for i, count in enumerate(self.counts):
- setattr(self, count, int(import_res[i]))
- elif key == 'KEYEXPIRED': # pragma: no cover
- self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Key expired'})
- elif key == 'SIGEXPIRED': # pragma: no cover
- self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Signature expired'})
- elif key == 'FAILURE': # pragma: no cover
- self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Other failure'})
- else: # pragma: no cover
- logger.debug('message ignored: %s, %s', key, value)
- def summary(self):
- result = []
- result.append('%d imported' % self.imported)
- if self.not_imported: # pragma: no cover
- result.append('%d not imported' % self.not_imported)
- return ', '.join(result)
- ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
- BASIC_ESCAPES = {
- r'\n': '\n',
- r'\r': '\r',
- r'\f': '\f',
- r'\v': '\v',
- r'\b': '\b',
- r'\0': '\0',
- }
- class SendResult(object):
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- def handle_status(self, key, value):
- logger.debug('SendResult: %s: %s', key, value)
- def _set_fields(target, fieldnames, args):
- for i, var in enumerate(fieldnames):
- if i < len(args):
- target[var] = args[i]
- else:
- target[var] = 'unavailable'
- class SearchKeys(list):
- ''' Handle status messages for --search-keys.
- Handle pub and uid (relating the latter to the former).
- Don't care about the rest
- '''
- UID_INDEX = 1
- FIELDS = 'type keyid algo length date expires'.split()
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- self.curkey = None
- self.fingerprints = []
- self.uids = []
- def get_fields(self, args):
- result = {}
- _set_fields(result, self.FIELDS, args)
- result['uids'] = []
- result['sigs'] = []
- return result
- def pub(self, args):
- self.curkey = curkey = self.get_fields(args)
- self.append(curkey)
- def uid(self, args):
- uid = args[self.UID_INDEX]
- uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
- for k, v in BASIC_ESCAPES.items():
- uid = uid.replace(k, v)
- self.curkey['uids'].append(uid)
- self.uids.append(uid)
- def handle_status(self, key, value): # pragma: no cover
- pass
- class ListKeys(SearchKeys):
- ''' Handle status messages for --list-keys, --list-sigs.
- Handle pub and uid (relating the latter to the former).
- Don't care about (info from src/DETAILS):
- crt = X.509 certificate
- crs = X.509 certificate and private key available
- uat = user attribute (same as user id except for field 10).
- sig = signature
- rev = revocation signature
- pkd = public key data (special field format, see below)
- grp = reserved for gpgsm
- rvk = revocation key
- '''
- UID_INDEX = 9
- FIELDS = ('type trust length algo keyid date expires dummy ownertrust uid sig'
- ' cap issuer flag token hash curve compliance updated origin keygrip').split()
- def __init__(self, gpg):
- super(ListKeys, self).__init__(gpg)
- self.in_subkey = False
- self.key_map = {}
- def key(self, args):
- self.curkey = curkey = self.get_fields(args)
- if curkey['uid']:
- curkey['uids'].append(curkey['uid'])
- del curkey['uid']
- curkey['subkeys'] = []
- self.append(curkey)
- self.in_subkey = False
- pub = sec = key
- def fpr(self, args):
- fp = args[9]
- if fp in self.key_map and self.gpg.check_fingerprint_collisions: # pragma: no cover
- raise ValueError('Unexpected fingerprint collision: %s' % fp)
- if not self.in_subkey:
- self.curkey['fingerprint'] = fp
- self.fingerprints.append(fp)
- self.key_map[fp] = self.curkey
- else:
- self.curkey['subkeys'][-1][2] = fp
- self.key_map[fp] = self.curkey
- def grp(self, args):
- grp = args[9]
- if not self.in_subkey:
- self.curkey['keygrip'] = grp
- else:
- self.curkey['subkeys'][-1][3] = grp
- def _collect_subkey_info(self, curkey, args):
- info_map = curkey.setdefault('subkey_info', {})
- info = {}
- _set_fields(info, self.FIELDS, args)
- info_map[args[4]] = info
- def sub(self, args):
- # See issue #81. We create a dict with more information about
- # subkeys, but for backward compatibility reason, have to add it in
- # as a separate entry 'subkey_info'
- subkey = [args[4], args[11], None, None] # keyid, type, fp, grp
- self.curkey['subkeys'].append(subkey)
- self._collect_subkey_info(self.curkey, args)
- self.in_subkey = True
- def ssb(self, args):
- subkey = [args[4], None, None, None] # keyid, type, fp, grp
- self.curkey['subkeys'].append(subkey)
- self._collect_subkey_info(self.curkey, args)
- self.in_subkey = True
- def sig(self, args):
- # keyid, uid, sigclass
- self.curkey['sigs'].append((args[4], args[9], args[10]))
- class ScanKeys(ListKeys):
- ''' Handle status messages for --with-fingerprint.'''
- def sub(self, args):
- # --with-fingerprint --with-colons somehow outputs fewer colons,
- # use the last value args[-1] instead of args[11]
- subkey = [args[4], args[-1], None, None]
- self.curkey['subkeys'].append(subkey)
- self._collect_subkey_info(self.curkey, args)
- self.in_subkey = True
- class TextHandler(object):
- def _as_text(self):
- return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
- if _py3k:
- __str__ = _as_text
- else:
- __unicode__ = _as_text
- def __str__(self):
- return self.data
- class Crypt(Verify, TextHandler):
- "Handle status messages for --encrypt and --decrypt"
- def __init__(self, gpg):
- Verify.__init__(self, gpg)
- self.data = ''
- self.ok = False
- self.status = ''
- self.key_id = None
- def __nonzero__(self):
- return bool(self.ok)
- __bool__ = __nonzero__
- def handle_status(self, key, value):
- if key in ('WARNING', 'ERROR'):
- logger.warning('potential problem: %s: %s', key, value)
- elif key == 'NODATA':
- if self.status not in ('decryption failed', ):
- self.status = 'no data was provided'
- elif key in ('NEED_PASSPHRASE', 'BAD_PASSPHRASE', 'GOOD_PASSPHRASE', 'MISSING_PASSPHRASE',
- 'KEY_NOT_CREATED', 'NEED_PASSPHRASE_PIN'):
- self.status = key.replace('_', ' ').lower()
- elif key == 'DECRYPTION_FAILED':
- if self.status != 'no secret key': # don't overwrite more useful message
- self.status = 'decryption failed'
- elif key == 'NEED_PASSPHRASE_SYM':
- self.status = 'need symmetric passphrase'
- elif key == 'BEGIN_DECRYPTION':
- if self.status != 'no secret key': # don't overwrite more useful message
- self.status = 'decryption incomplete'
- elif key == 'BEGIN_ENCRYPTION':
- self.status = 'encryption incomplete'
- elif key == 'DECRYPTION_OKAY':
- self.status = 'decryption ok'
- self.ok = True
- elif key == 'END_ENCRYPTION':
- self.status = 'encryption ok'
- self.ok = True
- elif key == 'INV_RECP': # pragma: no cover
- self.status = 'invalid recipient'
- elif key == 'KEYEXPIRED': # pragma: no cover
- self.status = 'key expired'
- elif key == 'SIG_CREATED': # pragma: no cover
- self.status = 'sig created'
- elif key == 'SIGEXPIRED': # pragma: no cover
- self.status = 'sig expired'
- elif key == 'ENC_TO': # pragma: no cover
- # ENC_TO <long_keyid> <keytype> <keylength>
- self.key_id = value.split(' ', 1)[0]
- elif key in ('USERID_HINT', 'GOODMDC', 'END_DECRYPTION', 'CARDCTRL', 'BADMDC',
- 'SC_OP_FAILURE', 'SC_OP_SUCCESS', 'PINENTRY_LAUNCHED', 'KEY_CONSIDERED'):
- pass
- else:
- Verify.handle_status(self, key, value)
- class GenKey(object):
- "Handle status messages for --gen-key"
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- self.type = None
- self.fingerprint = ''
- self.status = None
- def __nonzero__(self):
- return bool(self.fingerprint)
- __bool__ = __nonzero__
- def __str__(self):
- return self.fingerprint
- def handle_status(self, key, value):
- if key in ('WARNING', 'ERROR'): # pragma: no cover
- logger.warning('potential problem: %s: %s', key, value)
- elif key == 'KEY_CREATED':
- (self.type, self.fingerprint) = value.split()
- self.status = 'ok'
- elif key == 'KEY_NOT_CREATED':
- self.status = key.replace('_', ' ').lower()
- elif key in ('PROGRESS', 'GOOD_PASSPHRASE'):
- pass
- else: # pragma: no cover
- logger.debug('message ignored: %s, %s', key, value)
- class AddSubkey(object):
- "Handle status messages for --quick-add-key"
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- self.type = None
- self.fingerprint = ''
- self.status = None
- def __nonzero__(self):
- return bool(self.fingerprint)
- __bool__ = __nonzero__
- def __str__(self):
- return self.fingerprint
- def handle_status(self, key, value):
- if key in ('WARNING', 'ERROR'): # pragma: no cover
- logger.warning('potential problem: %s: %s', key, value)
- elif key == 'KEY_CREATED':
- (self.type, self.fingerprint) = value.split()
- self.status = 'ok'
- else: # pragma: no cover
- logger.debug('message ignored: %s, %s', key, value)
- class ExportResult(GenKey):
- """Handle status messages for --export[-secret-key].
- For now, just use an existing class to base it on - if needed, we
- can override handle_status for more specific message handling.
- """
- def handle_status(self, key, value):
- if key in ('EXPORTED', 'EXPORT_RES'):
- pass
- else:
- super(ExportResult, self).handle_status(key, value)
- class DeleteResult(object):
- "Handle status messages for --delete-key and --delete-secret-key"
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- self.status = 'ok'
- def __str__(self):
- return self.status
- problem_reason = {
- '1': 'No such key',
- '2': 'Must delete secret key first',
- '3': 'Ambiguous specification',
- }
- def handle_status(self, key, value):
- if key == 'DELETE_PROBLEM': # pragma: no cover
- self.status = self.problem_reason.get(value, 'Unknown error: %r' % value)
- else: # pragma: no cover
- logger.debug('message ignored: %s, %s', key, value)
- def __nonzero__(self):
- return self.status == 'ok'
- __bool__ = __nonzero__
- class TrustResult(DeleteResult):
- pass
- class Sign(TextHandler):
- "Handle status messages for --sign"
- returncode = None
- def __init__(self, gpg):
- self.gpg = gpg
- self.type = None
- self.hash_algo = None
- self.fingerprint = None
- self.status = None
- self.key_id = None
- self.username = None
- def __nonzero__(self):
- return self.fingerprint is not None
- __bool__ = __nonzero__
- def handle_status(self, key, value):
- if key in ('WARNING', 'ERROR', 'FAILURE'): # pragma: no cover
- logger.warning('potential problem: %s: %s', key, value)
- elif key in ('KEYEXPIRED', 'SIGEXPIRED'): # pragma: no cover
- self.status = 'key expired'
- elif key == 'KEYREVOKED': # pragma: no cover
- self.status = 'key revoked'
- elif key == 'SIG_CREATED':
- (self.type, algo, self.hash_algo, cls, self.timestamp,
- self.fingerprint) = value.split()
- self.status = 'signature created'
- elif key == 'USERID_HINT': # pragma: no cover
- self.key_id, self.username = value.split(' ', 1)
- elif key == 'BAD_PASSPHRASE':
- self.status = 'bad passphrase'
- elif key in ('NEED_PASSPHRASE', 'GOOD_PASSPHRASE', 'BEGIN_SIGNING'):
- pass
- else: # pragma: no cover
- logger.debug('message ignored: %s, %s', key, value)
- VERSION_RE = re.compile(r'gpg \(GnuPG(?:/MacGPG2)?\) (\d+(\.\d+)*)'.encode('ascii'), re.I)
- HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I)
- PUBLIC_KEY_RE = re.compile(r'gpg: public key is (\w+)')
- class GPG(object):
- error_map = None
- decode_errors = 'strict'
- result_map = {
- 'crypt': Crypt,
- 'delete': DeleteResult,
- 'generate': GenKey,
- 'addSubkey': AddSubkey,
- 'import': ImportResult,
- 'send': SendResult,
- 'list': ListKeys,
- 'scan': ScanKeys,
- 'search': SearchKeys,
- 'sign': Sign,
- 'trust': TrustResult,
- 'verify': Verify,
- 'export': ExportResult,
- }
- "Encapsulate access to the gpg executable"
- def __init__(self,
- gpgbinary='gpg',
- gnupghome=None,
- verbose=False,
- use_agent=False,
- keyring=None,
- options=None,
- secret_keyring=None):
- """Initialize a GPG process wrapper. Options are:
- gpgbinary -- full pathname for GPG binary.
- gnupghome -- full pathname to where we can find the public and
- private keyrings. Default is whatever gpg defaults to.
- keyring -- name of alternative keyring file to use, or list of such
- keyrings. If specified, the default keyring is not used.
- options =-- a list of additional options to pass to the GPG binary.
- secret_keyring -- name of alternative secret keyring file to use, or
- list of such keyrings.
- """
- self.gpgbinary = gpgbinary
- self.gnupghome = gnupghome
- # issue 112: fail if the specified value isn't a directory
- if gnupghome and not os.path.isdir(gnupghome):
- raise ValueError('gnupghome should be a directory (it isn\'t): %s' % gnupghome)
- if keyring:
- # Allow passing a string or another iterable. Make it uniformly
- # a list of keyring filenames
- if isinstance(keyring, string_types):
- keyring = [keyring]
- self.keyring = keyring
- if secret_keyring:
- # Allow passing a string or another iterable. Make it uniformly
- # a list of keyring filenames
- if isinstance(secret_keyring, string_types):
- secret_keyring = [secret_keyring]
- self.secret_keyring = secret_keyring
- self.verbose = verbose
- self.use_agent = use_agent
- if isinstance(options, str): # pragma: no cover
- options = [options]
- self.options = options
- self.on_data = None # or a callable - will be called with data chunks
- # Changed in 0.3.7 to use Latin-1 encoding rather than
- # locale.getpreferredencoding falling back to sys.stdin.encoding
- # falling back to utf-8, because gpg itself uses latin-1 as the default
- # encoding.
- self.encoding = 'latin-1'
- if gnupghome and not os.path.isdir(self.gnupghome):
- os.makedirs(self.gnupghome, 0o700)
- try:
- p = self._open_subprocess(['--version'])
- except OSError:
- msg = 'Unable to run gpg (%s) - it may not be available.' % self.gpgbinary
- logger.exception(msg)
- raise OSError(msg)
- result = self.result_map['verify'](self) # any result will do for this
- self._collect_output(p, result, stdin=p.stdin)
- if p.returncode != 0: # pragma: no cover
- raise ValueError('Error invoking gpg: %s: %s' % (p.returncode, result.stderr))
- m = VERSION_RE.match(result.data)
- if not m: # pragma: no cover
- self.version = None
- else:
- dot = '.'.encode('ascii')
- self.version = tuple([int(s) for s in m.groups()[0].split(dot)])
- # See issue #97. It seems gpg allow duplicate keys in keyrings, so we
- # can't be too strict.
- self.check_fingerprint_collisions = False
- def make_args(self, args, passphrase):
- """
- Make a list of command line elements for GPG. The value of ``args``
- will be appended. The ``passphrase`` argument needs to be True if
- a passphrase will be sent to GPG, else False.
- """
- cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty', '--no-verbose']
- if 'DEBUG_IPC' in os.environ:
- cmd.extend(['--debug', 'ipc'])
- if passphrase and hasattr(self, 'version'):
- if self.version >= (2, 1):
- cmd[1:1] = ['--pinentry-mode', 'loopback']
- cmd.extend(['--fixed-list-mode', '--batch', '--with-colons'])
- if self.gnupghome:
- cmd.extend(['--homedir', no_quote(self.gnupghome)])
- if self.keyring:
- cmd.append('--no-default-keyring')
- for fn in self.keyring:
- cmd.extend(['--keyring', no_quote(fn)])
- if self.secret_keyring:
- for fn in self.secret_keyring:
- cmd.extend(['--secret-keyring', no_quote(fn)])
- if passphrase:
- cmd.extend(['--passphrase-fd', '0'])
- if self.use_agent: # pragma: no cover
- cmd.append('--use-agent')
- if self.options:
- cmd.extend(self.options)
- cmd.extend(args)
- return cmd
- def _open_subprocess(self, args, passphrase=False):
- # Internal method: open a pipe to a GPG subprocess and return
- # the file objects for communicating with it.
- from subprocess import list2cmdline as debug_print
- cmd = self.make_args(args, passphrase)
- if self.verbose: # pragma: no cover
- print(debug_print(cmd))
- if not STARTUPINFO:
- si = None
- else: # pragma: no cover
- si = STARTUPINFO()
- si.dwFlags = STARTF_USESHOWWINDOW
- si.wShowWindow = SW_HIDE
- result = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE, startupinfo=si)
- logger.debug('%s: %s', result.pid, debug_print(cmd))
- return result
- def _read_response(self, stream, result):
- # Internal method: reads all the stderr output from GPG, taking notice
- # only of lines that begin with the magic [GNUPG:] prefix.
- #
- # Calls methods on the response object for each valid token found,
- # with the arg being the remainder of the status line.
- lines = []
- while True:
- line = stream.readline()
- if len(line) == 0:
- break
- lines.append(line)
- line = line.rstrip()
- if self.verbose: # pragma: no cover
- print(line)
- logger.debug('%s', line)
- if line[0:9] == '[GNUPG:] ':
- # Chop off the prefix
- line = line[9:]
- L = line.split(None, 1)
- keyword = L[0]
- if len(L) > 1:
- value = L[1]
- else:
- value = ''
- result.handle_status(keyword, value)
- result.stderr = ''.join(lines)
- def _read_data(self, stream, result, on_data=None):
- # Read the contents of the file from GPG's stdout
- chunks = []
- while True:
- data = stream.read(1024)
- if len(data) == 0:
- if on_data:
- on_data(data)
- break
- logger.debug('chunk: %r' % data[:256])
- append = True
- if on_data:
- append = on_data(data) is not False
- if append:
- chunks.append(data)
- if _py3k:
- # Join using b'' or '', as appropriate
- result.data = type(data)().join(chunks)
- else:
- result.data = ''.join(chunks)
- def _collect_output(self, process, result, writer=None, stdin=None):
- """
- Drain the subprocesses output streams, writing the collected output
- to the result. If a writer thread (writing to the subprocess) is given,
- make sure it's joined before returning. If a stdin stream is given,
- close it before returning.
- """
- stderr = codecs.getreader(self.encoding)(process.stderr)
- rr = threading.Thread(target=self._read_response, args=(stderr, result))
- rr.daemon = True
- logger.debug('stderr reader: %r', rr)
- rr.start()
- stdout = process.stdout
- dr = threading.Thread(target=self._read_data, args=(stdout, result, self.on_data))
- dr.daemon = True
- logger.debug('stdout reader: %r', dr)
- dr.start()
- dr.join()
- rr.join()
- if writer is not None:
- writer.join()
- process.wait()
- result.returncode = rc = process.returncode
- if rc != 0:
- logger.warning('gpg returned a non-zero error code: %d', rc)
- if stdin is not None:
- try:
- stdin.close()
- except IOError: # pragma: no cover
- pass
- stderr.close()
- stdout.close()
- return rc
- def is_valid_file(self, fileobj):
- """
- Simplistic check for a file object
- """
- return hasattr(fileobj, 'read')
- def _handle_io(self, args, fileobj, result, passphrase=None, binary=False):
- "Handle a call to GPG - pass input data, collect output data"
- # Handle a basic data call - pass data to GPG, handle the output
- # including status information. Garbage In, Garbage Out :)
- if not self.is_valid_file(fileobj):
- raise TypeError('Not a valid file: %s' % fileobj)
- p = self._open_subprocess(args, passphrase is not None)
- if not binary: # pragma: no cover
- stdin = codecs.getwriter(self.encoding)(p.stdin)
- else:
- stdin = p.stdin
- if passphrase:
- _write_passphrase(stdin, passphrase, self.encoding)
- writer = _threaded_copy_data(fileobj, stdin)
- self._collect_output(p, result, writer, stdin)
- return result
- #
- # SIGNATURE METHODS
- #
- def sign(self, message, **kwargs):
- """sign message"""
- f = _make_binary_stream(message, self.encoding)
- result = self.sign_file(f, **kwargs)
- f.close()
- return result
- def set_output_without_confirmation(self, args, output):
- "If writing to a file which exists, avoid a confirmation message."
- if os.path.exists(output):
- # We need to avoid an overwrite confirmation message
- args.extend(['--yes'])
- args.extend(['--output', no_quote(output)])
- def is_valid_passphrase(self, passphrase):
- """
- Confirm that the passphrase doesn't contain newline-type characters -
- it is passed in a pipe to gpg, and so not checking could lead to
- spoofing attacks by passing arbitrary text after passphrase and newline.
- """
- return ('\n' not in passphrase and '\r' not in passphrase and '\x00' not in passphrase)
- def sign_file(self,
- file,
- keyid=None,
- passphrase=None,
- clearsign=True,
- detach=False,
- binary=False,
- output=None,
- extra_args=None):
- """sign file"""
- if passphrase and not self.is_valid_passphrase(passphrase):
- raise ValueError('Invalid passphrase')
- logger.debug('sign_file: %s', file)
- if binary: # pragma: no cover
- args = ['-s']
- else:
- args = ['-sa']
- # You can't specify detach-sign and clearsign together: gpg ignores
- # the detach-sign in that case.
- if detach:
- args.append('--detach-sign')
- elif clearsign:
- args.append('--clearsign')
- if keyid:
- args.extend(['--default-key', no_quote(keyid)])
- if output: # write the output to a file with the specified name
- self.set_output_without_confirmation(args, output)
- if extra_args:
- args.extend(extra_args)
- result = self.result_map['sign'](self)
- # We could use _handle_io here except for the fact that if the
- # passphrase is bad, gpg bails and you can't write the message.
- p = self._open_subprocess(args, passphrase is not None)
- try:
- stdin = p.stdin
- if passphrase:
- _write_passphrase(stdin, passphrase, self.encoding)
- writer = _threaded_copy_data(file, stdin)
- except IOError: # pragma: no cover
- logging.exception('error writing message')
- writer = None
- self._collect_output(p, result, writer, stdin)
- return result
- def verify(self, data, **kwargs):
- """Verify the signature on the contents of the string 'data'
- >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
- >>> if not os.path.isdir('keys'): os.mkdir('keys')
- >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
- >>> input = gpg.gen_key_input(passphrase='foo')
- >>> key = gpg.gen_key(input)
- >>> assert key
- >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
- >>> assert not sig
- >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
- >>> assert sig
- >>> verify = gpg.verify(sig.data)
- >>> assert verify
- """
- f = _make_binary_stream(data, self.encoding)
- result = self.verify_file(f, **kwargs)
- f.close()
- return result
- def verify_file(self, file, data_filename=None, close_file=True, extra_args=None):
- "Verify the signature on the contents of the file-like object 'file'"
- logger.debug('verify_file: %r, %r', file, data_filename)
- result = self.result_map['verify'](self)
- args = ['--verify']
- if extra_args:
- args.extend(extra_args)
- if data_filename is None:
- self._handle_io(args, file, result, binary=True)
- else:
- logger.debug('Handling detached verification')
- import tempfile
- fd, fn = tempfile.mkstemp(prefix='pygpg')
- s = file.read()
- if close_file:
- file.close()
- logger.debug('Wrote to temp file: %r', s)
- os.write(fd, s)
- os.close(fd)
- args.append(no_quote(fn))
- args.append(no_quote(data_filename))
- try:
- p = self._open_subprocess(args)
- self._collect_output(p, result, stdin=p.stdin)
- finally:
- os.unlink(fn)
- return result
- def verify_data(self, sig_filename, data, extra_args=None):
- "Verify the signature in sig_filename against data in memory"
- logger.debug('verify_data: %r, %r ...', sig_filename, data[:16])
- result = self.result_map['verify'](self)
- args = ['--verify']
- if extra_args:
- args.extend(extra_args)
- args.extend([no_quote(sig_filename), '-'])
- stream = _make_memory_stream(data)
- self._handle_io(args, stream, result, binary=True)
- return result
- #
- # KEY MANAGEMENT
- #
- def import_keys(self, key_data, extra_args=None, passphrase=None):
- """
- Import the key_data into our keyring.
- """
- result = self.result_map['import'](self)
- logger.debug('import_keys: %r', key_data[:256])
- data = _make_binary_stream(key_data, self.encoding)
- args = ['--import']
- if extra_args:
- args.extend(extra_args)
- self._handle_io(args, data, result, passphrase=passphrase, binary=True)
- logger.debug('import_keys result: %r', result.__dict__)
- data.close()
- return result
- def recv_keys(self, keyserver, *keyids, **kwargs):
- """Import a key from a keyserver
- >>> import shutil
- >>> shutil.rmtree("keys", ignore_errors=True)
- >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
- >>> if not os.path.isdir('keys'): os.mkdir('keys')
- >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
- >>> os.chmod('keys', 0x1C0)
- >>> result = gpg.recv_keys('pgp.mit.edu', '92905378')
- >>> if 'NO_EXTERNAL_TESTS' not in os.environ: assert result
- """
- result = self.result_map['import'](self)
- logger.debug('recv_keys: %r', keyids)
- data = _make_binary_stream('', self.encoding)
- args = ['--keyserver', no_quote(keyserver)]
- if 'extra_args' in kwargs:
- args.extend(kwargs['extra_args'])
- args.append('--recv-keys')
- args.extend([no_quote(k) for k in keyids])
- self._handle_io(args, data, result, binary=True)
- logger.debug('recv_keys result: %r', result.__dict__)
- data.close()
- return result
- def send_keys(self, keyserver, *keyids, **kwargs):
- """Send a key to a keyserver.
- Note: it's not practical to test this function without sending
- arbitrary data to live keyservers.
- """
- result = self.result_map['send'](self)
- logger.debug('send_keys: %r', keyids)
- data = _make_binary_stream('', self.encoding)
- args = ['--keyserver', no_quote(keyserver)]
- if 'extra_args' in kwargs:
- args.extend(kwargs['extra_args'])
- args.append('--send-keys')
- args.extend([no_quote(k) for k in keyids])
- self._handle_io(args, data, result, binary=True)
- logger.debug('send_keys result: %r', result.__dict__)
- data.close()
- return result
- def delete_keys(self,
- fingerprints,
- secret=False,
- passphrase=None,
- expect_passphrase=True,
- exclamation_mode=False):
- """
- Delete the indicated keys.
- Since GnuPG 2.1, you can't delete secret keys without providing a
- passphrase. However, if you're expecting the passphrase to go to gpg
- via pinentry, you should specify expect_passphrase=False. (It's only
- checked for GnuPG >= 2.1).
- """
- if passphrase and not self.is_valid_passphrase(passphrase):
- raise ValueError('Invalid passphrase')
- which = 'key'
- if secret: # pragma: no cover
- if self.version >= (2, 1) and passphrase is None and expect_passphrase:
- raise ValueError('For GnuPG >= 2.1, deleting secret keys '
- 'needs a passphrase to be provided')
- which = 'secret-key'
- if _is_sequence(fingerprints): # pragma: no cover
- fingerprints = [no_quote(s) for s in fingerprints]
- else:
- fingerprints = [no_quote(fingerprints)]
- if exclamation_mode:
- fingerprints = [f + '!' for f in fingerprints]
- args = ['--delete-%s' % which]
- if secret and self.version >= (2, 1):
- args.insert(0, '--yes')
- args.extend(fingerprints)
- result = self.result_map['delete'](self)
- if not secret or self.version < (2, 1):
- p = self._open_subprocess(args)
- self._collect_output(p, result, stdin=p.stdin)
- else:
- # Need to send in a passphrase.
- f = _make_binary_stream('', self.encoding)
- try:
- self._handle_io(args, f, result, passphrase=passphrase, binary=True)
- finally:
- f.close()
- return result
- def export_keys(self,
- keyids,
- secret=False,
- armor=True,
- minimal=False,
- passphrase=None,
- expect_passphrase=True):
- """
- Export the indicated keys. A 'keyid' is anything gpg accepts.
- Since GnuPG 2.1, you can't export secret keys without providing a
- passphrase. However, if you're expecting the passphrase to go to gpg
- via pinentry, you should specify expect_passphrase=False. (It's only
- checked for GnuPG >= 2.1).
- """
- if passphrase and not self.is_valid_passphrase(passphrase):
- raise ValueError('Invalid passphrase')
- which = ''
- if secret:
- which = '-secret-key'
- if self.version >= (2, 1) and passphrase is None and expect_passphrase:
- raise ValueError('For GnuPG >= 2.1, exporting secret keys '
- 'needs a passphrase to be provided')
- if _is_sequence(keyids):
- keyids = [no_quote(k) for k in keyids]
- else:
- keyids = [no_quote(keyids)]
- args = ['--export%s' % which]
- if armor:
- args.append('--armor')
- if minimal: # pragma: no cover
- args.extend(['--export-options', 'export-minimal'])
- args.extend(keyids)
- # gpg --export produces no status-fd output; stdout will be
- # empty in case of failure
- result = self.result_map['export'](self)
- if not secret or self.version < (2, 1):
- p = self._open_subprocess(args)
- self._collect_output(p, result, stdin=p.stdin)
- else:
- # Need to send in a passphrase.
- f = _make_binary_stream('', self.encoding)
- try:
- self._handle_io(args, f, result, passphrase=passphrase, binary=True)
- finally:
- f.close()
- logger.debug('export_keys result[:100]: %r', result.data[:100])
- # Issue #49: Return bytes if armor not specified, else text
- result = result.data
- if armor:
- result = result.decode(self.encoding, self.decode_errors)
- return result
- def _get_list_output(self, p, kind):
- # Get the response information
- result = self.result_map[kind](self)
- self._collect_output(p, result, stdin=p.stdin)
- lines = result.data.decode(self.encoding, self.decode_errors).splitlines()
- valid_keywords = 'pub uid sec fpr sub ssb sig grp'.split()
- for line in lines:
- if self.verbose: # pragma: no cover
- print(line)
- logger.debug('line: %r', line.rstrip())
- if not line: # pragma: no cover
- break
- L = line.strip().split(':')
- if not L: # pragma: no cover
- continue
- keyword = L[0]
- if keyword in valid_keywords:
- getattr(result, keyword)(L)
- return result
- def list_keys(self, secret=False, keys=None, sigs=False):
- """ list the keys currently in the keyring
- >>> import shutil
- >>> shutil.rmtree("keys", ignore_errors=True)
- >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
- >>> if not os.path.isdir('keys'): os.mkdir('keys')
- >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
- >>> input = gpg.gen_key_input(passphrase='foo')
- >>> result = gpg.gen_key(input)
- >>> fp1 = result.fingerprint
- >>> result = gpg.gen_key(input)
- >>> fp2 = result.fingerprint
- >>> pubkeys = gpg.list_keys()
- >>> assert fp1 in pubkeys.fingerprints
- >>> assert fp2 in pubkeys.fingerprints
- """
- if secret:
- which = 'secret-keys'
- else:
- which = 'sigs' if sigs else 'keys'
- args = ['--list-%s' % which, '--fingerprint', '--fingerprint'] # get subkey FPs, too
- if self.version >= (2, 1):
- args.append('--with-keygrip')
- if keys:
- if isinstance(keys, string_types):
- keys = [keys]
- args.extend(keys)
- p = self._open_subprocess(args)
- return self._get_list_output(p, 'list')
- def scan_keys(self, filename):
- """
- List details of an ascii armored or binary key file
- without first importing it to the local keyring.
- The function achieves this on modern GnuPG by running:
- $ gpg --dry-run --import-options import-show --import
- On older versions, it does the *much* riskier:
- $ gpg --with-fingerprint --with-colons filename
- """
- if self.version >= (2, 1):
- args = ['--dry-run', '--import-options', 'import-show', '--import']
- else:
- logger.warning('Trying to list packets, but if the file is not a '
- 'keyring, might accidentally decrypt')
- args = ['--with-fingerprint', '--with-colons', '--fixed-list-mode']
- args.append(no_quote(filename))
- p = self._open_subprocess(args)
- return self._get_list_output(p, 'scan')
- def search_keys(self, query, keyserver='pgp.mit.edu', extra_args=None):
- """ search keyserver by query (using --search-keys option)
- >>> import shutil
- >>> shutil.rmtree('keys', ignore_errors=True)
- >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
- >>> if not os.path.isdir('keys'): os.mkdir('keys')
- >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
- >>> os.chmod('keys', 0x1C0)
- >>> result = gpg.search_keys('<vinay_sajip@hotmail.com>')
- >>> if 'NO_EXTERNAL_TESTS' not in os.environ: assert result, 'Failed using default keyserver'
- >>> #keyserver = 'keyserver.ubuntu.com'
- >>> #result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver)
- >>> #assert result, 'Failed using keyserver.ubuntu.com'
- """
- query = query.strip()
- if HEX_DIGITS_RE.match(query):
- query = '0x' + query
- args = ['--fingerprint', '--keyserver', no_quote(keyserver)]
- if extra_args:
- args.extend(extra_args)
- args.extend(['--search-keys', no_quote(query)])
- p = self._open_subprocess(args)
- # Get the response information
- result = self.result_map['search'](self)
- self._collect_output(p, result, stdin=p.stdin)
- lines = result.data.decode(self.encoding, self.decode_errors).splitlines()
- valid_keywords = ['pub', 'uid']
- for line in lines:
- if self.verbose: # pragma: no cover
- print(line)
- logger.debug('line: %r', line.rstrip())
- if not line: # sometimes get blank lines on Windows
- continue
- L = line.strip().split(':')
- if not L: # pragma: no cover
- continue
- keyword = L[0]
- if keyword in valid_keywords:
- getattr(result, keyword)(L)
- return result
- def gen_key(self, input):
- """Generate a key; you might use gen_key_input() to create the
- control input.
- >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
- >>> if not os.path.isdir('keys'): os.mkdir('keys')
- >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
- >>> input = gpg.gen_key_input(passphrase='foo')
- >>> result = gpg.gen_key(input)
- >>> assert result
- >>> result = gpg.gen_key('foo')
- >>> assert not result
- """
- args = ['--gen-key']
- result = self.result_map['generate'](self)
- f = _make_binary_stream(input, self.encoding)
- self._handle_io(args, f, result, binary=True)
- f.close()
- return result
- def gen_key_input(self, **kwargs):
- """
- Generate --gen-key input per gpg doc/DETAILS
- """
- parms = {}
- no_protection = kwargs.pop('no_protection', False)
- for key, val in list(kwargs.items()):
- key = key.replace('_', '-').title()
- if str(val).strip(): # skip empty strings
- parms[key] = val
- parms.setdefault('Key-Type', 'RSA')
- if 'key_curve' not in kwargs:
- parms.setdefault('Key-Length', 2048)
- parms.setdefault('Name-Real', 'Autogenerated Key')
- logname = (os.environ.get('LOGNAME') or os.environ.get('USERNAME') or 'unspecified')
- hostname = socket.gethostname()
- parms.setdefault('Name-Email', '%s@%s' % (logname.replace(' ', '_'), hostname))
- out = 'Key-Type: %s\n' % parms.pop('Key-Type')
- for key, val in list(parms.items()):
- out += '%s: %s\n' % (key, val)
- if no_protection:
- out += '%no-protection\n'
- out += '%commit\n'
- return out
- # Key-Type: RSA
- # Key-Length: 1024
- # Name-Real: ISdlink Server on %s
- # Name-Comment: Created by %s
- # Name-Email: isdlink@%s
- # Expire-Date: 0
- # %commit
- #
- #
- # Key-Type: DSA
- # Key-Length: 1024
- # Subkey-Type: ELG-E
- # Subkey-Length: 1024
- # Name-Real: Joe Tester
- # Name-Comment: with stupid passphrase
- # Name-Email: joe@foo.bar
- # Expire-Date: 0
- # Passphrase: abc
- # %pubring foo.pub
- # %secring foo.sec
- # %commit
- def add_subkey(self,
- master_key,
- master_passphrase=None,
- algorithm='rsa',
- usage='encrypt',
- expire='-'):
- """
- Add subkeys to a masterkey
- """
- if self.version[0] < 2:
- raise NotImplementedError('Not available in GnuPG 1.x')
- if not master_key:
- raise ValueError('No master key fingerprint specified')
- if master_passphrase and not self.is_valid_passphrase(master_passphrase):
- raise ValueError('Invalid passphrase')
- args = ['--quick-add-key', master_key, algorithm, usage, str(expire)]
- result = self.result_map['addSubkey'](self)
- f = _make_binary_stream('', self.encoding)
- self._handle_io(args, f, result, passphrase=master_passphrase, binary=True)
- return result
- #
- # ENCRYPTION
- #
- def encrypt_file(self,
- file,
- recipients,
- sign=None,
- always_trust=False,
- passphrase=None,
- armor=True,
- output=None,
- symmetric=False,
- extra_args=None):
- "Encrypt the message read from the file-like object 'file'"
- if passphrase and not self.is_valid_passphrase(passphrase):
- raise ValueError('Invalid passphrase')
- args = ['--encrypt']
- if symmetric:
- # can't be False or None - could be True or a cipher algo value
- # such as AES256
- args = ['--symmetric']
- if symmetric is not True:
- args.extend(['--cipher-algo', no_quote(symmetric)])
- # else use the default, currently CAST5
- else:
- if not recipients:
- raise ValueError('No recipients specified with asymmetric '
- 'encryption')
- if not _is_sequence(recipients):
- recipients = (recipients, )
- for recipient in recipients:
- args.extend(['--recipient', no_quote(recipient)])
- if armor: # create ascii-armored output - False for binary output
- args.append('--armor')
- if output: # write the output to a file with the specified name
- self.set_output_without_confirmation(args, output)
- if sign is True: # pragma: no cover
- args.append('--sign')
- elif sign: # pragma: no cover
- args.extend(['--sign', '--default-key', no_quote(sign)])
- if always_trust: # pragma: no cover
- args.append('--always-trust')
- if extra_args:
- args.extend(extra_args)
- result = self.result_map['crypt'](self)
- self._handle_io(args, file, result, passphrase=passphrase, binary=True)
- logger.debug('encrypt result[:100]: %r', result.data[:100])
- return result
- def encrypt(self, data, recipients, **kwargs):
- """Encrypt the message contained in the string 'data'
- >>> import shutil
- >>> if os.path.exists("keys"):
- ... shutil.rmtree("keys", ignore_errors=True)
- >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
- >>> if not os.path.isdir('keys'): os.mkdir('keys')
- >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
- >>> input = gpg.gen_key_input(name_email='user1@test', passphrase='pp1')
- >>> result = gpg.gen_key(input)
- >>> fp1 = result.fingerprint
- >>> input = gpg.gen_key_input(name_email='user2@test', passphrase='pp2')
- >>> result = gpg.gen_key(input)
- >>> fp2 = result.fingerprint
- >>> result = gpg.encrypt("hello",fp2)
- >>> message = str(result)
- >>> assert message != 'hello'
- >>> result = gpg.decrypt(message, passphrase='pp2')
- >>> assert result
- >>> str(result)
- 'hello'
- >>> result = gpg.encrypt("hello again", fp1)
- >>> message = str(result)
- >>> result = gpg.decrypt(message, passphrase='bar')
- >>> result.status in ('decryption failed', 'bad passphrase')
- True
- >>> assert not result
- >>> result = gpg.decrypt(message, passphrase='pp1')
- >>> result.status == 'decryption ok'
- True
- >>> str(result)
- 'hello again'
- >>> result = gpg.encrypt("signed hello", fp2, sign=fp1, passphrase='pp1')
- >>> result.status == 'encryption ok'
- True
- >>> message = str(result)
- >>> result = gpg.decrypt(message, passphrase='pp2')
- >>> result.status == 'decryption ok'
- True
- >>> assert result.fingerprint == fp1
- """
- data = _make_binary_stream(data, self.encoding)
- result = self.encrypt_file(data, recipients, **kwargs)
- data.close()
- return result
- def decrypt(self, message, **kwargs):
- data = _make_binary_stream(message, self.encoding)
- result = self.decrypt_file(data, **kwargs)
- data.close()
- return result
- def decrypt_file(self,
- file,
- always_trust=False,
- passphrase=None,
- output=None,
- extra_args=None):
- if passphrase and not self.is_valid_passphrase(passphrase):
- raise ValueError('Invalid passphrase')
- args = ['--decrypt']
- if output: # write the output to a file with the specified name
- self.set_output_without_confirmation(args, output)
- if always_trust: # pragma: no cover
- args.append('--always-trust')
- if extra_args:
- args.extend(extra_args)
- result = self.result_map['crypt'](self)
- self._handle_io(args, file, result, passphrase, binary=True)
- logger.debug('decrypt result[:100]: %r', result.data[:100])
- return result
- def get_recipients(self, message, **kwargs):
- data = _make_binary_stream(message, self.encoding)
- result = self.get_recipients_file(data, **kwargs)
- data.close()
- return result
- def get_recipients_file(self, file, extra_args=None):
- args = ['--decrypt', '--list-only', '-v']
- if extra_args:
- args.extend(extra_args)
- result = self.result_map['crypt'](self)
- self._handle_io(args, file, result, binary=True)
- ids = []
- for m in PUBLIC_KEY_RE.finditer(result.stderr):
- ids.append(m.group(1))
- return ids
- def trust_keys(self, fingerprints, trustlevel):
- levels = Verify.TRUST_LEVELS
- if trustlevel not in levels:
- poss = ', '.join(sorted(levels))
- raise ValueError('Invalid trust level: "%s" (must be one of %s)' % (trustlevel, poss))
- trustlevel = levels[trustlevel] + 2
- import tempfile
- try:
- fd, fn = tempfile.mkstemp()
- lines = []
- if isinstance(fingerprints, string_types):
- fingerprints = [fingerprints]
- for f in fingerprints:
- lines.append('%s:%s:' % (f, trustlevel))
- # The trailing newline is required!
- s = os.linesep.join(lines) + os.linesep
- logger.debug('writing ownertrust info: %s', s)
- os.write(fd, s.encode(self.encoding))
- os.close(fd)
- result = self.result_map['trust'](self)
- p = self._open_subprocess(['--import-ownertrust', fn])
- self._collect_output(p, result, stdin=p.stdin)
- if p.returncode != 0:
- raise ValueError('gpg returned an error - return code %d' % p.returncode)
- finally:
- os.remove(fn)
- return result
|