gnupg.py 64 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782
  1. """ A wrapper for the 'gpg' command::
  2. Portions of this module are derived from A.M. Kuchling's well-designed
  3. GPG.py, using Richard Jones' updated version 1.3, which can be found
  4. in the pycrypto CVS repository on Sourceforge:
  5. http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py
  6. This module is *not* forward-compatible with amk's; some of the
  7. old interface has changed. For instance, since I've added decrypt
  8. functionality, I elected to initialize with a 'gnupghome' argument
  9. instead of 'keyring', so that gpg can find both the public and secret
  10. keyrings. I've also altered some of the returned objects in order for
  11. the caller to not have to know as much about the internals of the
  12. result classes.
  13. While the rest of ISconf is released under the GPL, I am releasing
  14. this single file under the same terms that A.M. Kuchling used for
  15. pycrypto.
  16. Steve Traugott, stevegt@terraluna.org
  17. Thu Jun 23 21:27:20 PDT 2005
  18. This version of the module has been modified from Steve Traugott's version
  19. (see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by
  20. Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
  21. and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
  22. the previous versions.
  23. Modifications Copyright (C) 2008-2022 Vinay Sajip. All rights reserved.
  24. A unittest harness (test_gnupg.py) has also been added.
  25. """
  26. import codecs
  27. from io import StringIO
  28. import logging
  29. import os
  30. import re
  31. import socket
  32. from subprocess import Popen, PIPE
  33. import sys
  34. import threading
  35. __version__ = '0.4.9'
  36. __author__ = 'Vinay Sajip'
  37. __date__ = '$20-May-2022 09:01:43$'
  38. STARTUPINFO = None
  39. if os.name == 'nt': # pragma: no cover
  40. try:
  41. from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE
  42. except ImportError:
  43. STARTUPINFO = None
  44. try:
  45. unicode
  46. _py3k = False
  47. string_types = basestring
  48. text_type = unicode
  49. except NameError:
  50. _py3k = True
  51. string_types = str
  52. text_type = str
  53. logger = logging.getLogger(__name__)
  54. if not logger.handlers:
  55. logger.addHandler(logging.NullHandler())
  56. # We use the test below because it works for Jython as well as CPython
  57. if os.path.__name__ == 'ntpath': # pragma: no cover
  58. # On Windows, we don't need shell quoting, other than worrying about
  59. # paths with spaces in them.
  60. def shell_quote(s):
  61. return '"%s"' % s
  62. else:
  63. # Section copied from sarge
  64. # This regex determines which shell input needs quoting
  65. # because it may be unsafe
  66. UNSAFE = re.compile(r'[^\w%+,./:=@-]')
  67. def shell_quote(s):
  68. """
  69. Quote text so that it is safe for Posix command shells.
  70. For example, "*.py" would be converted to "'*.py'". If the text is
  71. considered safe it is returned unquoted.
  72. :param s: The value to quote
  73. :type s: str (or unicode on 2.x)
  74. :return: A safe version of the input, from the point of view of Posix
  75. command shells
  76. :rtype: The passed-in type
  77. """
  78. if not isinstance(s, string_types): # pragma: no cover
  79. raise TypeError('Expected string type, got %s' % type(s))
  80. if not s:
  81. result = "''"
  82. elif not UNSAFE.search(s):
  83. result = s
  84. else:
  85. result = "'%s'" % s.replace("'", r"'\''")
  86. return result
  87. # end of sarge code
  88. # Now that we use shell=False, we shouldn't need to quote arguments.
  89. # Use no_quote instead of shell_quote to remind us of where quoting
  90. # was needed. However, note that we still need, on 2.x, to encode any
  91. # Unicode argument with the file system encoding - see Issue #41 and
  92. # Python issue #1759845 ("subprocess.call fails with unicode strings in
  93. # command line").
  94. # Allows the encoding used to be overridden in special cases by setting
  95. # this module attribute appropriately.
  96. fsencoding = sys.getfilesystemencoding()
  97. def no_quote(s):
  98. if not _py3k and isinstance(s, text_type):
  99. s = s.encode(fsencoding)
  100. return s
  101. def _copy_data(instream, outstream):
  102. # Copy one stream to another
  103. sent = 0
  104. if hasattr(sys.stdin, 'encoding'):
  105. enc = sys.stdin.encoding
  106. else: # pragma: no cover
  107. enc = 'ascii'
  108. while True:
  109. # See issue #39: read can fail when e.g. a text stream is provided
  110. # for what is actually a binary file
  111. try:
  112. data = instream.read(1024)
  113. except UnicodeError:
  114. logger.warning('Exception occurred while reading', exc_info=1)
  115. break
  116. if not data:
  117. break
  118. sent += len(data)
  119. # logger.debug('sending chunk (%d): %r', sent, data[:256])
  120. try:
  121. outstream.write(data)
  122. except UnicodeError: # pragma: no cover
  123. outstream.write(data.encode(enc))
  124. except Exception:
  125. # Can sometimes get 'broken pipe' errors even when the data has all
  126. # been sent
  127. logger.exception('Error sending data')
  128. break
  129. try:
  130. outstream.close()
  131. except IOError: # pragma: no cover
  132. logger.warning('Exception occurred while closing: ignored', exc_info=1)
  133. logger.debug('closed output, %d bytes sent', sent)
  134. def _threaded_copy_data(instream, outstream):
  135. wr = threading.Thread(target=_copy_data, args=(instream, outstream))
  136. wr.daemon = True
  137. logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
  138. wr.start()
  139. return wr
  140. def _write_passphrase(stream, passphrase, encoding):
  141. passphrase = '%s\n' % passphrase
  142. passphrase = passphrase.encode(encoding)
  143. stream.write(passphrase)
  144. logger.debug('Wrote passphrase')
  145. def _is_sequence(instance):
  146. return isinstance(instance, (list, tuple, set, frozenset))
  147. def _make_memory_stream(s):
  148. try:
  149. from io import BytesIO
  150. rv = BytesIO(s)
  151. except ImportError: # pragma: no cover
  152. rv = StringIO(s)
  153. return rv
  154. def _make_binary_stream(s, encoding):
  155. if _py3k:
  156. if isinstance(s, str):
  157. s = s.encode(encoding)
  158. else:
  159. if type(s) is not str:
  160. s = s.encode(encoding)
  161. return _make_memory_stream(s)
  162. class Verify(object):
  163. "Handle status messages for --verify"
  164. TRUST_UNDEFINED = 0
  165. TRUST_NEVER = 1
  166. TRUST_MARGINAL = 2
  167. TRUST_FULLY = 3
  168. TRUST_ULTIMATE = 4
  169. TRUST_LEVELS = {
  170. 'TRUST_UNDEFINED': TRUST_UNDEFINED,
  171. 'TRUST_NEVER': TRUST_NEVER,
  172. 'TRUST_MARGINAL': TRUST_MARGINAL,
  173. 'TRUST_FULLY': TRUST_FULLY,
  174. 'TRUST_ULTIMATE': TRUST_ULTIMATE,
  175. }
  176. # for now, just the most common error codes. This can be expanded as and
  177. # when reports come in of other errors.
  178. GPG_SYSTEM_ERROR_CODES = {
  179. 1: 'permission denied',
  180. 35: 'file exists',
  181. 81: 'file not found',
  182. 97: 'not a directory',
  183. }
  184. GPG_ERROR_CODES = {
  185. 11: 'incorrect passphrase',
  186. }
  187. returncode = None
  188. def __init__(self, gpg):
  189. self.gpg = gpg
  190. self.valid = False
  191. self.fingerprint = self.creation_date = self.timestamp = None
  192. self.signature_id = self.key_id = None
  193. self.username = None
  194. self.key_id = None
  195. self.key_status = None
  196. self.status = None
  197. self.pubkey_fingerprint = None
  198. self.expire_timestamp = None
  199. self.sig_timestamp = None
  200. self.trust_text = None
  201. self.trust_level = None
  202. self.sig_info = {}
  203. def __nonzero__(self):
  204. return self.valid
  205. __bool__ = __nonzero__
  206. def handle_status(self, key, value):
  207. def update_sig_info(**kwargs):
  208. sig_id = self.signature_id
  209. if sig_id:
  210. info = self.sig_info[sig_id]
  211. info.update(kwargs)
  212. if key in self.TRUST_LEVELS:
  213. self.trust_text = key
  214. self.trust_level = self.TRUST_LEVELS[key]
  215. update_sig_info(trust_level=self.trust_level, trust_text=self.trust_text)
  216. elif key in ('WARNING', 'ERROR'):
  217. logger.warning('potential problem: %s: %s', key, value)
  218. elif key == 'BADSIG': # pragma: no cover
  219. self.valid = False
  220. self.status = 'signature bad'
  221. self.key_id, self.username = value.split(None, 1)
  222. update_sig_info(keyid=self.key_id, username=self.username, status=self.status)
  223. elif key == 'ERRSIG': # pragma: no cover
  224. self.valid = False
  225. parts = value.split()
  226. (self.key_id, algo, hash_algo, cls, self.timestamp) = parts[:5]
  227. # Since GnuPG 2.2.7, a fingerprint is tacked on
  228. if len(parts) >= 7:
  229. self.fingerprint = parts[6]
  230. self.status = 'signature error'
  231. update_sig_info(keyid=self.key_id,
  232. timestamp=self.timestamp,
  233. fingerprint=self.fingerprint,
  234. status=self.status)
  235. elif key == 'EXPSIG': # pragma: no cover
  236. self.valid = False
  237. self.status = 'signature expired'
  238. self.key_id, self.username = value.split(None, 1)
  239. update_sig_info(keyid=self.key_id, username=self.username, status=self.status)
  240. elif key == 'GOODSIG':
  241. self.valid = True
  242. self.status = 'signature good'
  243. self.key_id, self.username = value.split(None, 1)
  244. update_sig_info(keyid=self.key_id, username=self.username, status=self.status)
  245. elif key == 'VALIDSIG':
  246. fingerprint, creation_date, sig_ts, expire_ts = value.split()[:4]
  247. (self.fingerprint, self.creation_date, self.sig_timestamp,
  248. self.expire_timestamp) = (fingerprint, creation_date, sig_ts, expire_ts)
  249. # may be different if signature is made with a subkey
  250. self.pubkey_fingerprint = value.split()[-1]
  251. self.status = 'signature valid'
  252. update_sig_info(fingerprint=fingerprint,
  253. creation_date=creation_date,
  254. timestamp=sig_ts,
  255. expiry=expire_ts,
  256. pubkey_fingerprint=self.pubkey_fingerprint,
  257. status=self.status)
  258. elif key == 'SIG_ID':
  259. sig_id, creation_date, timestamp = value.split()
  260. self.sig_info[sig_id] = {'creation_date': creation_date, 'timestamp': timestamp}
  261. (self.signature_id, self.creation_date, self.timestamp) = (sig_id, creation_date,
  262. timestamp)
  263. elif key == 'DECRYPTION_FAILED': # pragma: no cover
  264. self.valid = False
  265. self.key_id = value
  266. self.status = 'decryption failed'
  267. elif key == 'NO_PUBKEY': # pragma: no cover
  268. self.valid = False
  269. self.key_id = value
  270. self.status = 'no public key'
  271. elif key == 'NO_SECKEY': # pragma: no cover
  272. self.valid = False
  273. self.key_id = value
  274. self.status = 'no secret key'
  275. elif key in ('EXPKEYSIG', 'REVKEYSIG'): # pragma: no cover
  276. # signed with expired or revoked key
  277. self.valid = False
  278. self.key_id = value.split()[0]
  279. if key == 'EXPKEYSIG':
  280. self.key_status = 'signing key has expired'
  281. else:
  282. self.key_status = 'signing key was revoked'
  283. self.status = self.key_status
  284. update_sig_info(status=self.status, keyid=self.key_id)
  285. elif key in ('UNEXPECTED', 'FAILURE'): # pragma: no cover
  286. self.valid = False
  287. self.key_id = value
  288. if key == 'UNEXPECTED':
  289. self.status = 'unexpected data'
  290. else:
  291. # N.B. there might be other reasons. For example, if an output
  292. # file can't be created - /dev/null/foo will lead to a
  293. # "not a directory" error, but which is not sent as a status
  294. # message with the [GNUPG:] prefix. Similarly if you try to
  295. # write to "/etc/foo" as a non-root user, a "permission denied"
  296. # error will be sent as a non-status message.
  297. message = 'error - %s' % value
  298. operation, code = value.rsplit(' ', 1)
  299. if code.isdigit():
  300. code = int(code) & 0xFFFFFF # lose the error source
  301. if self.gpg.error_map and code in self.gpg.error_map:
  302. message = '%s: %s' % (operation, self.gpg.error_map[code])
  303. else:
  304. system_error = bool(code & 0x8000)
  305. code = code & 0x7FFF
  306. if system_error:
  307. mapping = self.GPG_SYSTEM_ERROR_CODES
  308. else:
  309. mapping = self.GPG_ERROR_CODES
  310. if code in mapping:
  311. message = '%s: %s' % (operation, mapping[code])
  312. if not self.status:
  313. self.status = message
  314. elif key in ('DECRYPTION_INFO', 'PLAINTEXT', 'PLAINTEXT_LENGTH', 'BEGIN_SIGNING'):
  315. pass
  316. else: # pragma: no cover
  317. logger.debug('message ignored: %s, %s', key, value)
  318. class ImportResult(object):
  319. "Handle status messages for --import"
  320. counts = '''count no_user_id imported imported_rsa unchanged
  321. n_uids n_subk n_sigs n_revoc sec_read sec_imported
  322. sec_dups not_imported'''.split()
  323. returncode = None
  324. def __init__(self, gpg):
  325. self.gpg = gpg
  326. self.results = []
  327. self.fingerprints = []
  328. for result in self.counts:
  329. setattr(self, result, 0)
  330. def __nonzero__(self):
  331. if self.not_imported or not self.fingerprints:
  332. return False
  333. return True
  334. __bool__ = __nonzero__
  335. ok_reason = {
  336. '0': 'Not actually changed',
  337. '1': 'Entirely new key',
  338. '2': 'New user IDs',
  339. '4': 'New signatures',
  340. '8': 'New subkeys',
  341. '16': 'Contains private key',
  342. }
  343. problem_reason = {
  344. '0': 'No specific reason given',
  345. '1': 'Invalid Certificate',
  346. '2': 'Issuer Certificate missing',
  347. '3': 'Certificate Chain too long',
  348. '4': 'Error storing certificate',
  349. }
  350. def handle_status(self, key, value):
  351. if key in ('WARNING', 'ERROR'):
  352. logger.warning('potential problem: %s: %s', key, value)
  353. elif key in ('IMPORTED', 'KEY_CONSIDERED'):
  354. # this duplicates info we already see in import_ok & import_problem
  355. pass
  356. elif key == 'NODATA': # pragma: no cover
  357. self.results.append({
  358. 'fingerprint': None,
  359. 'problem': '0',
  360. 'text': 'No valid data found'
  361. })
  362. elif key == 'IMPORT_OK':
  363. reason, fingerprint = value.split()
  364. reasons = []
  365. for code, text in list(self.ok_reason.items()):
  366. if int(reason) | int(code) == int(reason):
  367. reasons.append(text)
  368. reasontext = '\n'.join(reasons) + '\n'
  369. self.results.append({'fingerprint': fingerprint, 'ok': reason, 'text': reasontext})
  370. self.fingerprints.append(fingerprint)
  371. elif key == 'IMPORT_PROBLEM': # pragma: no cover
  372. try:
  373. reason, fingerprint = value.split()
  374. except Exception:
  375. reason = value
  376. fingerprint = '<unknown>'
  377. self.results.append({
  378. 'fingerprint': fingerprint,
  379. 'problem': reason,
  380. 'text': self.problem_reason[reason]
  381. })
  382. elif key == 'IMPORT_RES':
  383. import_res = value.split()
  384. for i, count in enumerate(self.counts):
  385. setattr(self, count, int(import_res[i]))
  386. elif key == 'KEYEXPIRED': # pragma: no cover
  387. self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Key expired'})
  388. elif key == 'SIGEXPIRED': # pragma: no cover
  389. self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Signature expired'})
  390. elif key == 'FAILURE': # pragma: no cover
  391. self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Other failure'})
  392. else: # pragma: no cover
  393. logger.debug('message ignored: %s, %s', key, value)
  394. def summary(self):
  395. result = []
  396. result.append('%d imported' % self.imported)
  397. if self.not_imported: # pragma: no cover
  398. result.append('%d not imported' % self.not_imported)
  399. return ', '.join(result)
  400. ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
  401. BASIC_ESCAPES = {
  402. r'\n': '\n',
  403. r'\r': '\r',
  404. r'\f': '\f',
  405. r'\v': '\v',
  406. r'\b': '\b',
  407. r'\0': '\0',
  408. }
  409. class SendResult(object):
  410. returncode = None
  411. def __init__(self, gpg):
  412. self.gpg = gpg
  413. def handle_status(self, key, value):
  414. logger.debug('SendResult: %s: %s', key, value)
  415. def _set_fields(target, fieldnames, args):
  416. for i, var in enumerate(fieldnames):
  417. if i < len(args):
  418. target[var] = args[i]
  419. else:
  420. target[var] = 'unavailable'
  421. class SearchKeys(list):
  422. ''' Handle status messages for --search-keys.
  423. Handle pub and uid (relating the latter to the former).
  424. Don't care about the rest
  425. '''
  426. UID_INDEX = 1
  427. FIELDS = 'type keyid algo length date expires'.split()
  428. returncode = None
  429. def __init__(self, gpg):
  430. self.gpg = gpg
  431. self.curkey = None
  432. self.fingerprints = []
  433. self.uids = []
  434. def get_fields(self, args):
  435. result = {}
  436. _set_fields(result, self.FIELDS, args)
  437. result['uids'] = []
  438. result['sigs'] = []
  439. return result
  440. def pub(self, args):
  441. self.curkey = curkey = self.get_fields(args)
  442. self.append(curkey)
  443. def uid(self, args):
  444. uid = args[self.UID_INDEX]
  445. uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
  446. for k, v in BASIC_ESCAPES.items():
  447. uid = uid.replace(k, v)
  448. self.curkey['uids'].append(uid)
  449. self.uids.append(uid)
  450. def handle_status(self, key, value): # pragma: no cover
  451. pass
  452. class ListKeys(SearchKeys):
  453. ''' Handle status messages for --list-keys, --list-sigs.
  454. Handle pub and uid (relating the latter to the former).
  455. Don't care about (info from src/DETAILS):
  456. crt = X.509 certificate
  457. crs = X.509 certificate and private key available
  458. uat = user attribute (same as user id except for field 10).
  459. sig = signature
  460. rev = revocation signature
  461. pkd = public key data (special field format, see below)
  462. grp = reserved for gpgsm
  463. rvk = revocation key
  464. '''
  465. UID_INDEX = 9
  466. FIELDS = ('type trust length algo keyid date expires dummy ownertrust uid sig'
  467. ' cap issuer flag token hash curve compliance updated origin keygrip').split()
  468. def __init__(self, gpg):
  469. super(ListKeys, self).__init__(gpg)
  470. self.in_subkey = False
  471. self.key_map = {}
  472. def key(self, args):
  473. self.curkey = curkey = self.get_fields(args)
  474. if curkey['uid']:
  475. curkey['uids'].append(curkey['uid'])
  476. del curkey['uid']
  477. curkey['subkeys'] = []
  478. self.append(curkey)
  479. self.in_subkey = False
  480. pub = sec = key
  481. def fpr(self, args):
  482. fp = args[9]
  483. if fp in self.key_map and self.gpg.check_fingerprint_collisions: # pragma: no cover
  484. raise ValueError('Unexpected fingerprint collision: %s' % fp)
  485. if not self.in_subkey:
  486. self.curkey['fingerprint'] = fp
  487. self.fingerprints.append(fp)
  488. self.key_map[fp] = self.curkey
  489. else:
  490. self.curkey['subkeys'][-1][2] = fp
  491. self.key_map[fp] = self.curkey
  492. def grp(self, args):
  493. grp = args[9]
  494. if not self.in_subkey:
  495. self.curkey['keygrip'] = grp
  496. else:
  497. self.curkey['subkeys'][-1][3] = grp
  498. def _collect_subkey_info(self, curkey, args):
  499. info_map = curkey.setdefault('subkey_info', {})
  500. info = {}
  501. _set_fields(info, self.FIELDS, args)
  502. info_map[args[4]] = info
  503. def sub(self, args):
  504. # See issue #81. We create a dict with more information about
  505. # subkeys, but for backward compatibility reason, have to add it in
  506. # as a separate entry 'subkey_info'
  507. subkey = [args[4], args[11], None, None] # keyid, type, fp, grp
  508. self.curkey['subkeys'].append(subkey)
  509. self._collect_subkey_info(self.curkey, args)
  510. self.in_subkey = True
  511. def ssb(self, args):
  512. subkey = [args[4], None, None, None] # keyid, type, fp, grp
  513. self.curkey['subkeys'].append(subkey)
  514. self._collect_subkey_info(self.curkey, args)
  515. self.in_subkey = True
  516. def sig(self, args):
  517. # keyid, uid, sigclass
  518. self.curkey['sigs'].append((args[4], args[9], args[10]))
  519. class ScanKeys(ListKeys):
  520. ''' Handle status messages for --with-fingerprint.'''
  521. def sub(self, args):
  522. # --with-fingerprint --with-colons somehow outputs fewer colons,
  523. # use the last value args[-1] instead of args[11]
  524. subkey = [args[4], args[-1], None, None]
  525. self.curkey['subkeys'].append(subkey)
  526. self._collect_subkey_info(self.curkey, args)
  527. self.in_subkey = True
  528. class TextHandler(object):
  529. def _as_text(self):
  530. return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
  531. if _py3k:
  532. __str__ = _as_text
  533. else:
  534. __unicode__ = _as_text
  535. def __str__(self):
  536. return self.data
  537. class Crypt(Verify, TextHandler):
  538. "Handle status messages for --encrypt and --decrypt"
  539. def __init__(self, gpg):
  540. Verify.__init__(self, gpg)
  541. self.data = ''
  542. self.ok = False
  543. self.status = ''
  544. self.key_id = None
  545. def __nonzero__(self):
  546. return bool(self.ok)
  547. __bool__ = __nonzero__
  548. def handle_status(self, key, value):
  549. if key in ('WARNING', 'ERROR'):
  550. logger.warning('potential problem: %s: %s', key, value)
  551. elif key == 'NODATA':
  552. if self.status not in ('decryption failed', ):
  553. self.status = 'no data was provided'
  554. elif key in ('NEED_PASSPHRASE', 'BAD_PASSPHRASE', 'GOOD_PASSPHRASE', 'MISSING_PASSPHRASE',
  555. 'KEY_NOT_CREATED', 'NEED_PASSPHRASE_PIN'):
  556. self.status = key.replace('_', ' ').lower()
  557. elif key == 'DECRYPTION_FAILED':
  558. if self.status != 'no secret key': # don't overwrite more useful message
  559. self.status = 'decryption failed'
  560. elif key == 'NEED_PASSPHRASE_SYM':
  561. self.status = 'need symmetric passphrase'
  562. elif key == 'BEGIN_DECRYPTION':
  563. if self.status != 'no secret key': # don't overwrite more useful message
  564. self.status = 'decryption incomplete'
  565. elif key == 'BEGIN_ENCRYPTION':
  566. self.status = 'encryption incomplete'
  567. elif key == 'DECRYPTION_OKAY':
  568. self.status = 'decryption ok'
  569. self.ok = True
  570. elif key == 'END_ENCRYPTION':
  571. self.status = 'encryption ok'
  572. self.ok = True
  573. elif key == 'INV_RECP': # pragma: no cover
  574. self.status = 'invalid recipient'
  575. elif key == 'KEYEXPIRED': # pragma: no cover
  576. self.status = 'key expired'
  577. elif key == 'SIG_CREATED': # pragma: no cover
  578. self.status = 'sig created'
  579. elif key == 'SIGEXPIRED': # pragma: no cover
  580. self.status = 'sig expired'
  581. elif key == 'ENC_TO': # pragma: no cover
  582. # ENC_TO <long_keyid> <keytype> <keylength>
  583. self.key_id = value.split(' ', 1)[0]
  584. elif key in ('USERID_HINT', 'GOODMDC', 'END_DECRYPTION', 'CARDCTRL', 'BADMDC',
  585. 'SC_OP_FAILURE', 'SC_OP_SUCCESS', 'PINENTRY_LAUNCHED', 'KEY_CONSIDERED'):
  586. pass
  587. else:
  588. Verify.handle_status(self, key, value)
  589. class GenKey(object):
  590. "Handle status messages for --gen-key"
  591. returncode = None
  592. def __init__(self, gpg):
  593. self.gpg = gpg
  594. self.type = None
  595. self.fingerprint = ''
  596. self.status = None
  597. def __nonzero__(self):
  598. return bool(self.fingerprint)
  599. __bool__ = __nonzero__
  600. def __str__(self):
  601. return self.fingerprint
  602. def handle_status(self, key, value):
  603. if key in ('WARNING', 'ERROR'): # pragma: no cover
  604. logger.warning('potential problem: %s: %s', key, value)
  605. elif key == 'KEY_CREATED':
  606. (self.type, self.fingerprint) = value.split()
  607. self.status = 'ok'
  608. elif key == 'KEY_NOT_CREATED':
  609. self.status = key.replace('_', ' ').lower()
  610. elif key in ('PROGRESS', 'GOOD_PASSPHRASE'):
  611. pass
  612. else: # pragma: no cover
  613. logger.debug('message ignored: %s, %s', key, value)
  614. class AddSubkey(object):
  615. "Handle status messages for --quick-add-key"
  616. returncode = None
  617. def __init__(self, gpg):
  618. self.gpg = gpg
  619. self.type = None
  620. self.fingerprint = ''
  621. self.status = None
  622. def __nonzero__(self):
  623. return bool(self.fingerprint)
  624. __bool__ = __nonzero__
  625. def __str__(self):
  626. return self.fingerprint
  627. def handle_status(self, key, value):
  628. if key in ('WARNING', 'ERROR'): # pragma: no cover
  629. logger.warning('potential problem: %s: %s', key, value)
  630. elif key == 'KEY_CREATED':
  631. (self.type, self.fingerprint) = value.split()
  632. self.status = 'ok'
  633. else: # pragma: no cover
  634. logger.debug('message ignored: %s, %s', key, value)
  635. class ExportResult(GenKey):
  636. """Handle status messages for --export[-secret-key].
  637. For now, just use an existing class to base it on - if needed, we
  638. can override handle_status for more specific message handling.
  639. """
  640. def handle_status(self, key, value):
  641. if key in ('EXPORTED', 'EXPORT_RES'):
  642. pass
  643. else:
  644. super(ExportResult, self).handle_status(key, value)
  645. class DeleteResult(object):
  646. "Handle status messages for --delete-key and --delete-secret-key"
  647. returncode = None
  648. def __init__(self, gpg):
  649. self.gpg = gpg
  650. self.status = 'ok'
  651. def __str__(self):
  652. return self.status
  653. problem_reason = {
  654. '1': 'No such key',
  655. '2': 'Must delete secret key first',
  656. '3': 'Ambiguous specification',
  657. }
  658. def handle_status(self, key, value):
  659. if key == 'DELETE_PROBLEM': # pragma: no cover
  660. self.status = self.problem_reason.get(value, 'Unknown error: %r' % value)
  661. else: # pragma: no cover
  662. logger.debug('message ignored: %s, %s', key, value)
  663. def __nonzero__(self):
  664. return self.status == 'ok'
  665. __bool__ = __nonzero__
  666. class TrustResult(DeleteResult):
  667. pass
  668. class Sign(TextHandler):
  669. "Handle status messages for --sign"
  670. returncode = None
  671. def __init__(self, gpg):
  672. self.gpg = gpg
  673. self.type = None
  674. self.hash_algo = None
  675. self.fingerprint = None
  676. self.status = None
  677. self.key_id = None
  678. self.username = None
  679. def __nonzero__(self):
  680. return self.fingerprint is not None
  681. __bool__ = __nonzero__
  682. def handle_status(self, key, value):
  683. if key in ('WARNING', 'ERROR', 'FAILURE'): # pragma: no cover
  684. logger.warning('potential problem: %s: %s', key, value)
  685. elif key in ('KEYEXPIRED', 'SIGEXPIRED'): # pragma: no cover
  686. self.status = 'key expired'
  687. elif key == 'KEYREVOKED': # pragma: no cover
  688. self.status = 'key revoked'
  689. elif key == 'SIG_CREATED':
  690. (self.type, algo, self.hash_algo, cls, self.timestamp,
  691. self.fingerprint) = value.split()
  692. self.status = 'signature created'
  693. elif key == 'USERID_HINT': # pragma: no cover
  694. self.key_id, self.username = value.split(' ', 1)
  695. elif key == 'BAD_PASSPHRASE':
  696. self.status = 'bad passphrase'
  697. elif key in ('NEED_PASSPHRASE', 'GOOD_PASSPHRASE', 'BEGIN_SIGNING'):
  698. pass
  699. else: # pragma: no cover
  700. logger.debug('message ignored: %s, %s', key, value)
  701. VERSION_RE = re.compile(r'gpg \(GnuPG(?:/MacGPG2)?\) (\d+(\.\d+)*)'.encode('ascii'), re.I)
  702. HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I)
  703. PUBLIC_KEY_RE = re.compile(r'gpg: public key is (\w+)')
  704. class GPG(object):
  705. error_map = None
  706. decode_errors = 'strict'
  707. result_map = {
  708. 'crypt': Crypt,
  709. 'delete': DeleteResult,
  710. 'generate': GenKey,
  711. 'addSubkey': AddSubkey,
  712. 'import': ImportResult,
  713. 'send': SendResult,
  714. 'list': ListKeys,
  715. 'scan': ScanKeys,
  716. 'search': SearchKeys,
  717. 'sign': Sign,
  718. 'trust': TrustResult,
  719. 'verify': Verify,
  720. 'export': ExportResult,
  721. }
  722. "Encapsulate access to the gpg executable"
  723. def __init__(self,
  724. gpgbinary='gpg',
  725. gnupghome=None,
  726. verbose=False,
  727. use_agent=False,
  728. keyring=None,
  729. options=None,
  730. secret_keyring=None):
  731. """Initialize a GPG process wrapper. Options are:
  732. gpgbinary -- full pathname for GPG binary.
  733. gnupghome -- full pathname to where we can find the public and
  734. private keyrings. Default is whatever gpg defaults to.
  735. keyring -- name of alternative keyring file to use, or list of such
  736. keyrings. If specified, the default keyring is not used.
  737. options =-- a list of additional options to pass to the GPG binary.
  738. secret_keyring -- name of alternative secret keyring file to use, or
  739. list of such keyrings.
  740. """
  741. self.gpgbinary = gpgbinary
  742. self.gnupghome = gnupghome
  743. # issue 112: fail if the specified value isn't a directory
  744. if gnupghome and not os.path.isdir(gnupghome):
  745. raise ValueError('gnupghome should be a directory (it isn\'t): %s' % gnupghome)
  746. if keyring:
  747. # Allow passing a string or another iterable. Make it uniformly
  748. # a list of keyring filenames
  749. if isinstance(keyring, string_types):
  750. keyring = [keyring]
  751. self.keyring = keyring
  752. if secret_keyring:
  753. # Allow passing a string or another iterable. Make it uniformly
  754. # a list of keyring filenames
  755. if isinstance(secret_keyring, string_types):
  756. secret_keyring = [secret_keyring]
  757. self.secret_keyring = secret_keyring
  758. self.verbose = verbose
  759. self.use_agent = use_agent
  760. if isinstance(options, str): # pragma: no cover
  761. options = [options]
  762. self.options = options
  763. self.on_data = None # or a callable - will be called with data chunks
  764. # Changed in 0.3.7 to use Latin-1 encoding rather than
  765. # locale.getpreferredencoding falling back to sys.stdin.encoding
  766. # falling back to utf-8, because gpg itself uses latin-1 as the default
  767. # encoding.
  768. self.encoding = 'latin-1'
  769. if gnupghome and not os.path.isdir(self.gnupghome):
  770. os.makedirs(self.gnupghome, 0o700)
  771. try:
  772. p = self._open_subprocess(['--version'])
  773. except OSError:
  774. msg = 'Unable to run gpg (%s) - it may not be available.' % self.gpgbinary
  775. logger.exception(msg)
  776. raise OSError(msg)
  777. result = self.result_map['verify'](self) # any result will do for this
  778. self._collect_output(p, result, stdin=p.stdin)
  779. if p.returncode != 0: # pragma: no cover
  780. raise ValueError('Error invoking gpg: %s: %s' % (p.returncode, result.stderr))
  781. m = VERSION_RE.match(result.data)
  782. if not m: # pragma: no cover
  783. self.version = None
  784. else:
  785. dot = '.'.encode('ascii')
  786. self.version = tuple([int(s) for s in m.groups()[0].split(dot)])
  787. # See issue #97. It seems gpg allow duplicate keys in keyrings, so we
  788. # can't be too strict.
  789. self.check_fingerprint_collisions = False
  790. def make_args(self, args, passphrase):
  791. """
  792. Make a list of command line elements for GPG. The value of ``args``
  793. will be appended. The ``passphrase`` argument needs to be True if
  794. a passphrase will be sent to GPG, else False.
  795. """
  796. cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty', '--no-verbose']
  797. if 'DEBUG_IPC' in os.environ:
  798. cmd.extend(['--debug', 'ipc'])
  799. if passphrase and hasattr(self, 'version'):
  800. if self.version >= (2, 1):
  801. cmd[1:1] = ['--pinentry-mode', 'loopback']
  802. cmd.extend(['--fixed-list-mode', '--batch', '--with-colons'])
  803. if self.gnupghome:
  804. cmd.extend(['--homedir', no_quote(self.gnupghome)])
  805. if self.keyring:
  806. cmd.append('--no-default-keyring')
  807. for fn in self.keyring:
  808. cmd.extend(['--keyring', no_quote(fn)])
  809. if self.secret_keyring:
  810. for fn in self.secret_keyring:
  811. cmd.extend(['--secret-keyring', no_quote(fn)])
  812. if passphrase:
  813. cmd.extend(['--passphrase-fd', '0'])
  814. if self.use_agent: # pragma: no cover
  815. cmd.append('--use-agent')
  816. if self.options:
  817. cmd.extend(self.options)
  818. cmd.extend(args)
  819. return cmd
  820. def _open_subprocess(self, args, passphrase=False):
  821. # Internal method: open a pipe to a GPG subprocess and return
  822. # the file objects for communicating with it.
  823. from subprocess import list2cmdline as debug_print
  824. cmd = self.make_args(args, passphrase)
  825. if self.verbose: # pragma: no cover
  826. print(debug_print(cmd))
  827. if not STARTUPINFO:
  828. si = None
  829. else: # pragma: no cover
  830. si = STARTUPINFO()
  831. si.dwFlags = STARTF_USESHOWWINDOW
  832. si.wShowWindow = SW_HIDE
  833. result = Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE, startupinfo=si)
  834. logger.debug('%s: %s', result.pid, debug_print(cmd))
  835. return result
  836. def _read_response(self, stream, result):
  837. # Internal method: reads all the stderr output from GPG, taking notice
  838. # only of lines that begin with the magic [GNUPG:] prefix.
  839. #
  840. # Calls methods on the response object for each valid token found,
  841. # with the arg being the remainder of the status line.
  842. lines = []
  843. while True:
  844. line = stream.readline()
  845. if len(line) == 0:
  846. break
  847. lines.append(line)
  848. line = line.rstrip()
  849. if self.verbose: # pragma: no cover
  850. print(line)
  851. logger.debug('%s', line)
  852. if line[0:9] == '[GNUPG:] ':
  853. # Chop off the prefix
  854. line = line[9:]
  855. L = line.split(None, 1)
  856. keyword = L[0]
  857. if len(L) > 1:
  858. value = L[1]
  859. else:
  860. value = ''
  861. result.handle_status(keyword, value)
  862. result.stderr = ''.join(lines)
  863. def _read_data(self, stream, result, on_data=None):
  864. # Read the contents of the file from GPG's stdout
  865. chunks = []
  866. while True:
  867. data = stream.read(1024)
  868. if len(data) == 0:
  869. if on_data:
  870. on_data(data)
  871. break
  872. logger.debug('chunk: %r' % data[:256])
  873. append = True
  874. if on_data:
  875. append = on_data(data) is not False
  876. if append:
  877. chunks.append(data)
  878. if _py3k:
  879. # Join using b'' or '', as appropriate
  880. result.data = type(data)().join(chunks)
  881. else:
  882. result.data = ''.join(chunks)
  883. def _collect_output(self, process, result, writer=None, stdin=None):
  884. """
  885. Drain the subprocesses output streams, writing the collected output
  886. to the result. If a writer thread (writing to the subprocess) is given,
  887. make sure it's joined before returning. If a stdin stream is given,
  888. close it before returning.
  889. """
  890. stderr = codecs.getreader(self.encoding)(process.stderr)
  891. rr = threading.Thread(target=self._read_response, args=(stderr, result))
  892. rr.daemon = True
  893. logger.debug('stderr reader: %r', rr)
  894. rr.start()
  895. stdout = process.stdout
  896. dr = threading.Thread(target=self._read_data, args=(stdout, result, self.on_data))
  897. dr.daemon = True
  898. logger.debug('stdout reader: %r', dr)
  899. dr.start()
  900. dr.join()
  901. rr.join()
  902. if writer is not None:
  903. writer.join()
  904. process.wait()
  905. result.returncode = rc = process.returncode
  906. if rc != 0:
  907. logger.warning('gpg returned a non-zero error code: %d', rc)
  908. if stdin is not None:
  909. try:
  910. stdin.close()
  911. except IOError: # pragma: no cover
  912. pass
  913. stderr.close()
  914. stdout.close()
  915. return rc
  916. def is_valid_file(self, fileobj):
  917. """
  918. Simplistic check for a file object
  919. """
  920. return hasattr(fileobj, 'read')
  921. def _handle_io(self, args, fileobj, result, passphrase=None, binary=False):
  922. "Handle a call to GPG - pass input data, collect output data"
  923. # Handle a basic data call - pass data to GPG, handle the output
  924. # including status information. Garbage In, Garbage Out :)
  925. if not self.is_valid_file(fileobj):
  926. raise TypeError('Not a valid file: %s' % fileobj)
  927. p = self._open_subprocess(args, passphrase is not None)
  928. if not binary: # pragma: no cover
  929. stdin = codecs.getwriter(self.encoding)(p.stdin)
  930. else:
  931. stdin = p.stdin
  932. if passphrase:
  933. _write_passphrase(stdin, passphrase, self.encoding)
  934. writer = _threaded_copy_data(fileobj, stdin)
  935. self._collect_output(p, result, writer, stdin)
  936. return result
  937. #
  938. # SIGNATURE METHODS
  939. #
  940. def sign(self, message, **kwargs):
  941. """sign message"""
  942. f = _make_binary_stream(message, self.encoding)
  943. result = self.sign_file(f, **kwargs)
  944. f.close()
  945. return result
  946. def set_output_without_confirmation(self, args, output):
  947. "If writing to a file which exists, avoid a confirmation message."
  948. if os.path.exists(output):
  949. # We need to avoid an overwrite confirmation message
  950. args.extend(['--yes'])
  951. args.extend(['--output', no_quote(output)])
  952. def is_valid_passphrase(self, passphrase):
  953. """
  954. Confirm that the passphrase doesn't contain newline-type characters -
  955. it is passed in a pipe to gpg, and so not checking could lead to
  956. spoofing attacks by passing arbitrary text after passphrase and newline.
  957. """
  958. return ('\n' not in passphrase and '\r' not in passphrase and '\x00' not in passphrase)
  959. def sign_file(self,
  960. file,
  961. keyid=None,
  962. passphrase=None,
  963. clearsign=True,
  964. detach=False,
  965. binary=False,
  966. output=None,
  967. extra_args=None):
  968. """sign file"""
  969. if passphrase and not self.is_valid_passphrase(passphrase):
  970. raise ValueError('Invalid passphrase')
  971. logger.debug('sign_file: %s', file)
  972. if binary: # pragma: no cover
  973. args = ['-s']
  974. else:
  975. args = ['-sa']
  976. # You can't specify detach-sign and clearsign together: gpg ignores
  977. # the detach-sign in that case.
  978. if detach:
  979. args.append('--detach-sign')
  980. elif clearsign:
  981. args.append('--clearsign')
  982. if keyid:
  983. args.extend(['--default-key', no_quote(keyid)])
  984. if output: # write the output to a file with the specified name
  985. self.set_output_without_confirmation(args, output)
  986. if extra_args:
  987. args.extend(extra_args)
  988. result = self.result_map['sign'](self)
  989. # We could use _handle_io here except for the fact that if the
  990. # passphrase is bad, gpg bails and you can't write the message.
  991. p = self._open_subprocess(args, passphrase is not None)
  992. try:
  993. stdin = p.stdin
  994. if passphrase:
  995. _write_passphrase(stdin, passphrase, self.encoding)
  996. writer = _threaded_copy_data(file, stdin)
  997. except IOError: # pragma: no cover
  998. logging.exception('error writing message')
  999. writer = None
  1000. self._collect_output(p, result, writer, stdin)
  1001. return result
  1002. def verify(self, data, **kwargs):
  1003. """Verify the signature on the contents of the string 'data'
  1004. >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
  1005. >>> if not os.path.isdir('keys'): os.mkdir('keys')
  1006. >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
  1007. >>> input = gpg.gen_key_input(passphrase='foo')
  1008. >>> key = gpg.gen_key(input)
  1009. >>> assert key
  1010. >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
  1011. >>> assert not sig
  1012. >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
  1013. >>> assert sig
  1014. >>> verify = gpg.verify(sig.data)
  1015. >>> assert verify
  1016. """
  1017. f = _make_binary_stream(data, self.encoding)
  1018. result = self.verify_file(f, **kwargs)
  1019. f.close()
  1020. return result
  1021. def verify_file(self, file, data_filename=None, close_file=True, extra_args=None):
  1022. "Verify the signature on the contents of the file-like object 'file'"
  1023. logger.debug('verify_file: %r, %r', file, data_filename)
  1024. result = self.result_map['verify'](self)
  1025. args = ['--verify']
  1026. if extra_args:
  1027. args.extend(extra_args)
  1028. if data_filename is None:
  1029. self._handle_io(args, file, result, binary=True)
  1030. else:
  1031. logger.debug('Handling detached verification')
  1032. import tempfile
  1033. fd, fn = tempfile.mkstemp(prefix='pygpg')
  1034. s = file.read()
  1035. if close_file:
  1036. file.close()
  1037. logger.debug('Wrote to temp file: %r', s)
  1038. os.write(fd, s)
  1039. os.close(fd)
  1040. args.append(no_quote(fn))
  1041. args.append(no_quote(data_filename))
  1042. try:
  1043. p = self._open_subprocess(args)
  1044. self._collect_output(p, result, stdin=p.stdin)
  1045. finally:
  1046. os.unlink(fn)
  1047. return result
  1048. def verify_data(self, sig_filename, data, extra_args=None):
  1049. "Verify the signature in sig_filename against data in memory"
  1050. logger.debug('verify_data: %r, %r ...', sig_filename, data[:16])
  1051. result = self.result_map['verify'](self)
  1052. args = ['--verify']
  1053. if extra_args:
  1054. args.extend(extra_args)
  1055. args.extend([no_quote(sig_filename), '-'])
  1056. stream = _make_memory_stream(data)
  1057. self._handle_io(args, stream, result, binary=True)
  1058. return result
  1059. #
  1060. # KEY MANAGEMENT
  1061. #
  1062. def import_keys(self, key_data, extra_args=None, passphrase=None):
  1063. """
  1064. Import the key_data into our keyring.
  1065. """
  1066. result = self.result_map['import'](self)
  1067. logger.debug('import_keys: %r', key_data[:256])
  1068. data = _make_binary_stream(key_data, self.encoding)
  1069. args = ['--import']
  1070. if extra_args:
  1071. args.extend(extra_args)
  1072. self._handle_io(args, data, result, passphrase=passphrase, binary=True)
  1073. logger.debug('import_keys result: %r', result.__dict__)
  1074. data.close()
  1075. return result
  1076. def recv_keys(self, keyserver, *keyids, **kwargs):
  1077. """Import a key from a keyserver
  1078. >>> import shutil
  1079. >>> shutil.rmtree("keys", ignore_errors=True)
  1080. >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
  1081. >>> if not os.path.isdir('keys'): os.mkdir('keys')
  1082. >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
  1083. >>> os.chmod('keys', 0x1C0)
  1084. >>> result = gpg.recv_keys('pgp.mit.edu', '92905378')
  1085. >>> if 'NO_EXTERNAL_TESTS' not in os.environ: assert result
  1086. """
  1087. result = self.result_map['import'](self)
  1088. logger.debug('recv_keys: %r', keyids)
  1089. data = _make_binary_stream('', self.encoding)
  1090. args = ['--keyserver', no_quote(keyserver)]
  1091. if 'extra_args' in kwargs:
  1092. args.extend(kwargs['extra_args'])
  1093. args.append('--recv-keys')
  1094. args.extend([no_quote(k) for k in keyids])
  1095. self._handle_io(args, data, result, binary=True)
  1096. logger.debug('recv_keys result: %r', result.__dict__)
  1097. data.close()
  1098. return result
  1099. def send_keys(self, keyserver, *keyids, **kwargs):
  1100. """Send a key to a keyserver.
  1101. Note: it's not practical to test this function without sending
  1102. arbitrary data to live keyservers.
  1103. """
  1104. result = self.result_map['send'](self)
  1105. logger.debug('send_keys: %r', keyids)
  1106. data = _make_binary_stream('', self.encoding)
  1107. args = ['--keyserver', no_quote(keyserver)]
  1108. if 'extra_args' in kwargs:
  1109. args.extend(kwargs['extra_args'])
  1110. args.append('--send-keys')
  1111. args.extend([no_quote(k) for k in keyids])
  1112. self._handle_io(args, data, result, binary=True)
  1113. logger.debug('send_keys result: %r', result.__dict__)
  1114. data.close()
  1115. return result
  1116. def delete_keys(self,
  1117. fingerprints,
  1118. secret=False,
  1119. passphrase=None,
  1120. expect_passphrase=True,
  1121. exclamation_mode=False):
  1122. """
  1123. Delete the indicated keys.
  1124. Since GnuPG 2.1, you can't delete secret keys without providing a
  1125. passphrase. However, if you're expecting the passphrase to go to gpg
  1126. via pinentry, you should specify expect_passphrase=False. (It's only
  1127. checked for GnuPG >= 2.1).
  1128. """
  1129. if passphrase and not self.is_valid_passphrase(passphrase):
  1130. raise ValueError('Invalid passphrase')
  1131. which = 'key'
  1132. if secret: # pragma: no cover
  1133. if self.version >= (2, 1) and passphrase is None and expect_passphrase:
  1134. raise ValueError('For GnuPG >= 2.1, deleting secret keys '
  1135. 'needs a passphrase to be provided')
  1136. which = 'secret-key'
  1137. if _is_sequence(fingerprints): # pragma: no cover
  1138. fingerprints = [no_quote(s) for s in fingerprints]
  1139. else:
  1140. fingerprints = [no_quote(fingerprints)]
  1141. if exclamation_mode:
  1142. fingerprints = [f + '!' for f in fingerprints]
  1143. args = ['--delete-%s' % which]
  1144. if secret and self.version >= (2, 1):
  1145. args.insert(0, '--yes')
  1146. args.extend(fingerprints)
  1147. result = self.result_map['delete'](self)
  1148. if not secret or self.version < (2, 1):
  1149. p = self._open_subprocess(args)
  1150. self._collect_output(p, result, stdin=p.stdin)
  1151. else:
  1152. # Need to send in a passphrase.
  1153. f = _make_binary_stream('', self.encoding)
  1154. try:
  1155. self._handle_io(args, f, result, passphrase=passphrase, binary=True)
  1156. finally:
  1157. f.close()
  1158. return result
  1159. def export_keys(self,
  1160. keyids,
  1161. secret=False,
  1162. armor=True,
  1163. minimal=False,
  1164. passphrase=None,
  1165. expect_passphrase=True):
  1166. """
  1167. Export the indicated keys. A 'keyid' is anything gpg accepts.
  1168. Since GnuPG 2.1, you can't export secret keys without providing a
  1169. passphrase. However, if you're expecting the passphrase to go to gpg
  1170. via pinentry, you should specify expect_passphrase=False. (It's only
  1171. checked for GnuPG >= 2.1).
  1172. """
  1173. if passphrase and not self.is_valid_passphrase(passphrase):
  1174. raise ValueError('Invalid passphrase')
  1175. which = ''
  1176. if secret:
  1177. which = '-secret-key'
  1178. if self.version >= (2, 1) and passphrase is None and expect_passphrase:
  1179. raise ValueError('For GnuPG >= 2.1, exporting secret keys '
  1180. 'needs a passphrase to be provided')
  1181. if _is_sequence(keyids):
  1182. keyids = [no_quote(k) for k in keyids]
  1183. else:
  1184. keyids = [no_quote(keyids)]
  1185. args = ['--export%s' % which]
  1186. if armor:
  1187. args.append('--armor')
  1188. if minimal: # pragma: no cover
  1189. args.extend(['--export-options', 'export-minimal'])
  1190. args.extend(keyids)
  1191. # gpg --export produces no status-fd output; stdout will be
  1192. # empty in case of failure
  1193. result = self.result_map['export'](self)
  1194. if not secret or self.version < (2, 1):
  1195. p = self._open_subprocess(args)
  1196. self._collect_output(p, result, stdin=p.stdin)
  1197. else:
  1198. # Need to send in a passphrase.
  1199. f = _make_binary_stream('', self.encoding)
  1200. try:
  1201. self._handle_io(args, f, result, passphrase=passphrase, binary=True)
  1202. finally:
  1203. f.close()
  1204. logger.debug('export_keys result[:100]: %r', result.data[:100])
  1205. # Issue #49: Return bytes if armor not specified, else text
  1206. result = result.data
  1207. if armor:
  1208. result = result.decode(self.encoding, self.decode_errors)
  1209. return result
  1210. def _get_list_output(self, p, kind):
  1211. # Get the response information
  1212. result = self.result_map[kind](self)
  1213. self._collect_output(p, result, stdin=p.stdin)
  1214. lines = result.data.decode(self.encoding, self.decode_errors).splitlines()
  1215. valid_keywords = 'pub uid sec fpr sub ssb sig grp'.split()
  1216. for line in lines:
  1217. if self.verbose: # pragma: no cover
  1218. print(line)
  1219. logger.debug('line: %r', line.rstrip())
  1220. if not line: # pragma: no cover
  1221. break
  1222. L = line.strip().split(':')
  1223. if not L: # pragma: no cover
  1224. continue
  1225. keyword = L[0]
  1226. if keyword in valid_keywords:
  1227. getattr(result, keyword)(L)
  1228. return result
  1229. def list_keys(self, secret=False, keys=None, sigs=False):
  1230. """ list the keys currently in the keyring
  1231. >>> import shutil
  1232. >>> shutil.rmtree("keys", ignore_errors=True)
  1233. >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
  1234. >>> if not os.path.isdir('keys'): os.mkdir('keys')
  1235. >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
  1236. >>> input = gpg.gen_key_input(passphrase='foo')
  1237. >>> result = gpg.gen_key(input)
  1238. >>> fp1 = result.fingerprint
  1239. >>> result = gpg.gen_key(input)
  1240. >>> fp2 = result.fingerprint
  1241. >>> pubkeys = gpg.list_keys()
  1242. >>> assert fp1 in pubkeys.fingerprints
  1243. >>> assert fp2 in pubkeys.fingerprints
  1244. """
  1245. if secret:
  1246. which = 'secret-keys'
  1247. else:
  1248. which = 'sigs' if sigs else 'keys'
  1249. args = ['--list-%s' % which, '--fingerprint', '--fingerprint'] # get subkey FPs, too
  1250. if self.version >= (2, 1):
  1251. args.append('--with-keygrip')
  1252. if keys:
  1253. if isinstance(keys, string_types):
  1254. keys = [keys]
  1255. args.extend(keys)
  1256. p = self._open_subprocess(args)
  1257. return self._get_list_output(p, 'list')
  1258. def scan_keys(self, filename):
  1259. """
  1260. List details of an ascii armored or binary key file
  1261. without first importing it to the local keyring.
  1262. The function achieves this on modern GnuPG by running:
  1263. $ gpg --dry-run --import-options import-show --import
  1264. On older versions, it does the *much* riskier:
  1265. $ gpg --with-fingerprint --with-colons filename
  1266. """
  1267. if self.version >= (2, 1):
  1268. args = ['--dry-run', '--import-options', 'import-show', '--import']
  1269. else:
  1270. logger.warning('Trying to list packets, but if the file is not a '
  1271. 'keyring, might accidentally decrypt')
  1272. args = ['--with-fingerprint', '--with-colons', '--fixed-list-mode']
  1273. args.append(no_quote(filename))
  1274. p = self._open_subprocess(args)
  1275. return self._get_list_output(p, 'scan')
  1276. def search_keys(self, query, keyserver='pgp.mit.edu', extra_args=None):
  1277. """ search keyserver by query (using --search-keys option)
  1278. >>> import shutil
  1279. >>> shutil.rmtree('keys', ignore_errors=True)
  1280. >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
  1281. >>> if not os.path.isdir('keys'): os.mkdir('keys')
  1282. >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
  1283. >>> os.chmod('keys', 0x1C0)
  1284. >>> result = gpg.search_keys('<vinay_sajip@hotmail.com>')
  1285. >>> if 'NO_EXTERNAL_TESTS' not in os.environ: assert result, 'Failed using default keyserver'
  1286. >>> #keyserver = 'keyserver.ubuntu.com'
  1287. >>> #result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver)
  1288. >>> #assert result, 'Failed using keyserver.ubuntu.com'
  1289. """
  1290. query = query.strip()
  1291. if HEX_DIGITS_RE.match(query):
  1292. query = '0x' + query
  1293. args = ['--fingerprint', '--keyserver', no_quote(keyserver)]
  1294. if extra_args:
  1295. args.extend(extra_args)
  1296. args.extend(['--search-keys', no_quote(query)])
  1297. p = self._open_subprocess(args)
  1298. # Get the response information
  1299. result = self.result_map['search'](self)
  1300. self._collect_output(p, result, stdin=p.stdin)
  1301. lines = result.data.decode(self.encoding, self.decode_errors).splitlines()
  1302. valid_keywords = ['pub', 'uid']
  1303. for line in lines:
  1304. if self.verbose: # pragma: no cover
  1305. print(line)
  1306. logger.debug('line: %r', line.rstrip())
  1307. if not line: # sometimes get blank lines on Windows
  1308. continue
  1309. L = line.strip().split(':')
  1310. if not L: # pragma: no cover
  1311. continue
  1312. keyword = L[0]
  1313. if keyword in valid_keywords:
  1314. getattr(result, keyword)(L)
  1315. return result
  1316. def gen_key(self, input):
  1317. """Generate a key; you might use gen_key_input() to create the
  1318. control input.
  1319. >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
  1320. >>> if not os.path.isdir('keys'): os.mkdir('keys')
  1321. >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
  1322. >>> input = gpg.gen_key_input(passphrase='foo')
  1323. >>> result = gpg.gen_key(input)
  1324. >>> assert result
  1325. >>> result = gpg.gen_key('foo')
  1326. >>> assert not result
  1327. """
  1328. args = ['--gen-key']
  1329. result = self.result_map['generate'](self)
  1330. f = _make_binary_stream(input, self.encoding)
  1331. self._handle_io(args, f, result, binary=True)
  1332. f.close()
  1333. return result
  1334. def gen_key_input(self, **kwargs):
  1335. """
  1336. Generate --gen-key input per gpg doc/DETAILS
  1337. """
  1338. parms = {}
  1339. no_protection = kwargs.pop('no_protection', False)
  1340. for key, val in list(kwargs.items()):
  1341. key = key.replace('_', '-').title()
  1342. if str(val).strip(): # skip empty strings
  1343. parms[key] = val
  1344. parms.setdefault('Key-Type', 'RSA')
  1345. if 'key_curve' not in kwargs:
  1346. parms.setdefault('Key-Length', 2048)
  1347. parms.setdefault('Name-Real', 'Autogenerated Key')
  1348. logname = (os.environ.get('LOGNAME') or os.environ.get('USERNAME') or 'unspecified')
  1349. hostname = socket.gethostname()
  1350. parms.setdefault('Name-Email', '%s@%s' % (logname.replace(' ', '_'), hostname))
  1351. out = 'Key-Type: %s\n' % parms.pop('Key-Type')
  1352. for key, val in list(parms.items()):
  1353. out += '%s: %s\n' % (key, val)
  1354. if no_protection:
  1355. out += '%no-protection\n'
  1356. out += '%commit\n'
  1357. return out
  1358. # Key-Type: RSA
  1359. # Key-Length: 1024
  1360. # Name-Real: ISdlink Server on %s
  1361. # Name-Comment: Created by %s
  1362. # Name-Email: isdlink@%s
  1363. # Expire-Date: 0
  1364. # %commit
  1365. #
  1366. #
  1367. # Key-Type: DSA
  1368. # Key-Length: 1024
  1369. # Subkey-Type: ELG-E
  1370. # Subkey-Length: 1024
  1371. # Name-Real: Joe Tester
  1372. # Name-Comment: with stupid passphrase
  1373. # Name-Email: joe@foo.bar
  1374. # Expire-Date: 0
  1375. # Passphrase: abc
  1376. # %pubring foo.pub
  1377. # %secring foo.sec
  1378. # %commit
  1379. def add_subkey(self,
  1380. master_key,
  1381. master_passphrase=None,
  1382. algorithm='rsa',
  1383. usage='encrypt',
  1384. expire='-'):
  1385. """
  1386. Add subkeys to a masterkey
  1387. """
  1388. if self.version[0] < 2:
  1389. raise NotImplementedError('Not available in GnuPG 1.x')
  1390. if not master_key:
  1391. raise ValueError('No master key fingerprint specified')
  1392. if master_passphrase and not self.is_valid_passphrase(master_passphrase):
  1393. raise ValueError('Invalid passphrase')
  1394. args = ['--quick-add-key', master_key, algorithm, usage, str(expire)]
  1395. result = self.result_map['addSubkey'](self)
  1396. f = _make_binary_stream('', self.encoding)
  1397. self._handle_io(args, f, result, passphrase=master_passphrase, binary=True)
  1398. return result
  1399. #
  1400. # ENCRYPTION
  1401. #
  1402. def encrypt_file(self,
  1403. file,
  1404. recipients,
  1405. sign=None,
  1406. always_trust=False,
  1407. passphrase=None,
  1408. armor=True,
  1409. output=None,
  1410. symmetric=False,
  1411. extra_args=None):
  1412. "Encrypt the message read from the file-like object 'file'"
  1413. if passphrase and not self.is_valid_passphrase(passphrase):
  1414. raise ValueError('Invalid passphrase')
  1415. args = ['--encrypt']
  1416. if symmetric:
  1417. # can't be False or None - could be True or a cipher algo value
  1418. # such as AES256
  1419. args = ['--symmetric']
  1420. if symmetric is not True:
  1421. args.extend(['--cipher-algo', no_quote(symmetric)])
  1422. # else use the default, currently CAST5
  1423. else:
  1424. if not recipients:
  1425. raise ValueError('No recipients specified with asymmetric '
  1426. 'encryption')
  1427. if not _is_sequence(recipients):
  1428. recipients = (recipients, )
  1429. for recipient in recipients:
  1430. args.extend(['--recipient', no_quote(recipient)])
  1431. if armor: # create ascii-armored output - False for binary output
  1432. args.append('--armor')
  1433. if output: # write the output to a file with the specified name
  1434. self.set_output_without_confirmation(args, output)
  1435. if sign is True: # pragma: no cover
  1436. args.append('--sign')
  1437. elif sign: # pragma: no cover
  1438. args.extend(['--sign', '--default-key', no_quote(sign)])
  1439. if always_trust: # pragma: no cover
  1440. args.append('--always-trust')
  1441. if extra_args:
  1442. args.extend(extra_args)
  1443. result = self.result_map['crypt'](self)
  1444. self._handle_io(args, file, result, passphrase=passphrase, binary=True)
  1445. logger.debug('encrypt result[:100]: %r', result.data[:100])
  1446. return result
  1447. def encrypt(self, data, recipients, **kwargs):
  1448. """Encrypt the message contained in the string 'data'
  1449. >>> import shutil
  1450. >>> if os.path.exists("keys"):
  1451. ... shutil.rmtree("keys", ignore_errors=True)
  1452. >>> GPGBINARY = os.environ.get('GPGBINARY', 'gpg')
  1453. >>> if not os.path.isdir('keys'): os.mkdir('keys')
  1454. >>> gpg = GPG(gpgbinary=GPGBINARY, gnupghome='keys')
  1455. >>> input = gpg.gen_key_input(name_email='user1@test', passphrase='pp1')
  1456. >>> result = gpg.gen_key(input)
  1457. >>> fp1 = result.fingerprint
  1458. >>> input = gpg.gen_key_input(name_email='user2@test', passphrase='pp2')
  1459. >>> result = gpg.gen_key(input)
  1460. >>> fp2 = result.fingerprint
  1461. >>> result = gpg.encrypt("hello",fp2)
  1462. >>> message = str(result)
  1463. >>> assert message != 'hello'
  1464. >>> result = gpg.decrypt(message, passphrase='pp2')
  1465. >>> assert result
  1466. >>> str(result)
  1467. 'hello'
  1468. >>> result = gpg.encrypt("hello again", fp1)
  1469. >>> message = str(result)
  1470. >>> result = gpg.decrypt(message, passphrase='bar')
  1471. >>> result.status in ('decryption failed', 'bad passphrase')
  1472. True
  1473. >>> assert not result
  1474. >>> result = gpg.decrypt(message, passphrase='pp1')
  1475. >>> result.status == 'decryption ok'
  1476. True
  1477. >>> str(result)
  1478. 'hello again'
  1479. >>> result = gpg.encrypt("signed hello", fp2, sign=fp1, passphrase='pp1')
  1480. >>> result.status == 'encryption ok'
  1481. True
  1482. >>> message = str(result)
  1483. >>> result = gpg.decrypt(message, passphrase='pp2')
  1484. >>> result.status == 'decryption ok'
  1485. True
  1486. >>> assert result.fingerprint == fp1
  1487. """
  1488. data = _make_binary_stream(data, self.encoding)
  1489. result = self.encrypt_file(data, recipients, **kwargs)
  1490. data.close()
  1491. return result
  1492. def decrypt(self, message, **kwargs):
  1493. data = _make_binary_stream(message, self.encoding)
  1494. result = self.decrypt_file(data, **kwargs)
  1495. data.close()
  1496. return result
  1497. def decrypt_file(self,
  1498. file,
  1499. always_trust=False,
  1500. passphrase=None,
  1501. output=None,
  1502. extra_args=None):
  1503. if passphrase and not self.is_valid_passphrase(passphrase):
  1504. raise ValueError('Invalid passphrase')
  1505. args = ['--decrypt']
  1506. if output: # write the output to a file with the specified name
  1507. self.set_output_without_confirmation(args, output)
  1508. if always_trust: # pragma: no cover
  1509. args.append('--always-trust')
  1510. if extra_args:
  1511. args.extend(extra_args)
  1512. result = self.result_map['crypt'](self)
  1513. self._handle_io(args, file, result, passphrase, binary=True)
  1514. logger.debug('decrypt result[:100]: %r', result.data[:100])
  1515. return result
  1516. def get_recipients(self, message, **kwargs):
  1517. data = _make_binary_stream(message, self.encoding)
  1518. result = self.get_recipients_file(data, **kwargs)
  1519. data.close()
  1520. return result
  1521. def get_recipients_file(self, file, extra_args=None):
  1522. args = ['--decrypt', '--list-only', '-v']
  1523. if extra_args:
  1524. args.extend(extra_args)
  1525. result = self.result_map['crypt'](self)
  1526. self._handle_io(args, file, result, binary=True)
  1527. ids = []
  1528. for m in PUBLIC_KEY_RE.finditer(result.stderr):
  1529. ids.append(m.group(1))
  1530. return ids
  1531. def trust_keys(self, fingerprints, trustlevel):
  1532. levels = Verify.TRUST_LEVELS
  1533. if trustlevel not in levels:
  1534. poss = ', '.join(sorted(levels))
  1535. raise ValueError('Invalid trust level: "%s" (must be one of %s)' % (trustlevel, poss))
  1536. trustlevel = levels[trustlevel] + 2
  1537. import tempfile
  1538. try:
  1539. fd, fn = tempfile.mkstemp()
  1540. lines = []
  1541. if isinstance(fingerprints, string_types):
  1542. fingerprints = [fingerprints]
  1543. for f in fingerprints:
  1544. lines.append('%s:%s:' % (f, trustlevel))
  1545. # The trailing newline is required!
  1546. s = os.linesep.join(lines) + os.linesep
  1547. logger.debug('writing ownertrust info: %s', s)
  1548. os.write(fd, s.encode(self.encoding))
  1549. os.close(fd)
  1550. result = self.result_map['trust'](self)
  1551. p = self._open_subprocess(['--import-ownertrust', fn])
  1552. self._collect_output(p, result, stdin=p.stdin)
  1553. if p.returncode != 0:
  1554. raise ValueError('gpg returned an error - return code %d' % p.returncode)
  1555. finally:
  1556. os.remove(fn)
  1557. return result