gnupg.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017
  1. """ A wrapper for the 'gpg' command::
  2. Portions of this module are derived from A.M. Kuchling's well-designed
  3. GPG.py, using Richard Jones' updated version 1.3, which can be found
  4. in the pycrypto CVS repository on Sourceforge:
  5. http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py
  6. This module is *not* forward-compatible with amk's; some of the
  7. old interface has changed. For instance, since I've added decrypt
  8. functionality, I elected to initialize with a 'gnupghome' argument
  9. instead of 'keyring', so that gpg can find both the public and secret
  10. keyrings. I've also altered some of the returned objects in order for
  11. the caller to not have to know as much about the internals of the
  12. result classes.
  13. While the rest of ISconf is released under the GPL, I am releasing
  14. this single file under the same terms that A.M. Kuchling used for
  15. pycrypto.
  16. Steve Traugott, stevegt@terraluna.org
  17. Thu Jun 23 21:27:20 PDT 2005
  18. This version of the module has been modified from Steve Traugott's version
  19. (see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by
  20. Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
  21. and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
  22. the previous versions.
  23. Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved.
  24. A unittest harness (test_gnupg.py) has also been added.
  25. """
  26. import locale
  27. __version__ = "0.2.9"
  28. __author__ = "Vinay Sajip"
  29. __date__ = "$29-Mar-2012 21:12:58$"
  30. try:
  31. from io import StringIO
  32. except ImportError:
  33. from cStringIO import StringIO
  34. import codecs
  35. import locale
  36. import logging
  37. import os
  38. import socket
  39. from subprocess import Popen
  40. from subprocess import PIPE
  41. import sys
  42. import threading
  43. try:
  44. import logging.NullHandler as NullHandler
  45. except ImportError:
  46. class NullHandler(logging.Handler):
  47. def handle(self, record):
  48. pass
  49. try:
  50. unicode
  51. _py3k = False
  52. except NameError:
  53. _py3k = True
  54. logger = logging.getLogger(__name__)
  55. if not logger.handlers:
  56. logger.addHandler(NullHandler())
  57. def _copy_data(instream, outstream):
  58. # Copy one stream to another
  59. sent = 0
  60. if hasattr(sys.stdin, 'encoding'):
  61. enc = sys.stdin.encoding
  62. else:
  63. enc = 'ascii'
  64. while True:
  65. data = instream.read(1024)
  66. if len(data) == 0:
  67. break
  68. sent += len(data)
  69. logger.debug("sending chunk (%d): %r", sent, data[:256])
  70. try:
  71. outstream.write(data)
  72. except UnicodeError:
  73. outstream.write(data.encode(enc))
  74. except:
  75. # Can sometimes get 'broken pipe' errors even when the data has all
  76. # been sent
  77. logger.exception('Error sending data')
  78. break
  79. try:
  80. outstream.close()
  81. except IOError:
  82. logger.warning('Exception occurred while closing: ignored', exc_info=1)
  83. logger.debug("closed output, %d bytes sent", sent)
  84. def _threaded_copy_data(instream, outstream):
  85. wr = threading.Thread(target=_copy_data, args=(instream, outstream))
  86. wr.setDaemon(True)
  87. logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
  88. wr.start()
  89. return wr
  90. def _write_passphrase(stream, passphrase, encoding):
  91. passphrase = '%s\n' % passphrase
  92. passphrase = passphrase.encode(encoding)
  93. stream.write(passphrase)
  94. logger.debug("Wrote passphrase: %r", passphrase)
  95. def _is_sequence(instance):
  96. return isinstance(instance,list) or isinstance(instance,tuple)
  97. def _make_binary_stream(s, encoding):
  98. try:
  99. if _py3k:
  100. if isinstance(s, str):
  101. s = s.encode(encoding)
  102. else:
  103. if type(s) is not str:
  104. s = s.encode(encoding)
  105. from io import BytesIO
  106. rv = BytesIO(s)
  107. except ImportError:
  108. rv = StringIO(s)
  109. return rv
  110. class Verify(object):
  111. "Handle status messages for --verify"
  112. def __init__(self, gpg):
  113. self.gpg = gpg
  114. self.valid = False
  115. self.fingerprint = self.creation_date = self.timestamp = None
  116. self.signature_id = self.key_id = None
  117. self.username = None
  118. def __nonzero__(self):
  119. return self.valid
  120. __bool__ = __nonzero__
  121. def handle_status(self, key, value):
  122. if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL",
  123. "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA", "NODATA",
  124. "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH",
  125. "POLICY_URL", "DECRYPTION_INFO", "DECRYPTION_OKAY", "IMPORTED"):
  126. pass
  127. elif key == "BADSIG":
  128. self.valid = False
  129. self.status = 'signature bad'
  130. self.key_id, self.username = value.split(None, 1)
  131. elif key == "GOODSIG":
  132. self.valid = True
  133. self.status = 'signature good'
  134. self.key_id, self.username = value.split(None, 1)
  135. elif key == "VALIDSIG":
  136. (self.fingerprint,
  137. self.creation_date,
  138. self.sig_timestamp,
  139. self.expire_timestamp) = value.split()[:4]
  140. # may be different if signature is made with a subkey
  141. self.pubkey_fingerprint = value.split()[-1]
  142. self.status = 'signature valid'
  143. elif key == "SIG_ID":
  144. (self.signature_id,
  145. self.creation_date, self.timestamp) = value.split()
  146. elif key == "ERRSIG":
  147. self.valid = False
  148. (self.key_id,
  149. algo, hash_algo,
  150. cls,
  151. self.timestamp) = value.split()[:5]
  152. self.status = 'signature error'
  153. elif key == "DECRYPTION_FAILED":
  154. self.valid = False
  155. self.key_id = value
  156. self.status = 'decryption failed'
  157. elif key == "NO_PUBKEY":
  158. self.valid = False
  159. self.key_id = value
  160. self.status = 'no public key'
  161. elif key in ("KEYEXPIRED", "SIGEXPIRED"):
  162. # these are useless in verify, since they are spit out for any
  163. # pub/subkeys on the key, not just the one doing the signing.
  164. # if we want to check for signatures with expired key,
  165. # the relevant flag is EXPKEYSIG.
  166. pass
  167. elif key in ("EXPKEYSIG", "REVKEYSIG"):
  168. # signed with expired or revoked key
  169. self.valid = False
  170. self.key_id = value.split()[0]
  171. self.status = (('%s %s') % (key[:3], key[3:])).lower()
  172. else:
  173. raise ValueError("Unknown status message: %r" % key)
  174. class ImportResult(object):
  175. "Handle status messages for --import"
  176. counts = '''count no_user_id imported imported_rsa unchanged
  177. n_uids n_subk n_sigs n_revoc sec_read sec_imported
  178. sec_dups not_imported'''.split()
  179. def __init__(self, gpg):
  180. self.gpg = gpg
  181. self.imported = []
  182. self.results = []
  183. self.fingerprints = []
  184. for result in self.counts:
  185. setattr(self, result, None)
  186. def __nonzero__(self):
  187. if self.not_imported: return False
  188. if not self.fingerprints: return False
  189. return True
  190. __bool__ = __nonzero__
  191. ok_reason = {
  192. '0': 'Not actually changed',
  193. '1': 'Entirely new key',
  194. '2': 'New user IDs',
  195. '4': 'New signatures',
  196. '8': 'New subkeys',
  197. '16': 'Contains private key',
  198. }
  199. problem_reason = {
  200. '0': 'No specific reason given',
  201. '1': 'Invalid Certificate',
  202. '2': 'Issuer Certificate missing',
  203. '3': 'Certificate Chain too long',
  204. '4': 'Error storing certificate',
  205. }
  206. def handle_status(self, key, value):
  207. if key == "IMPORTED":
  208. # this duplicates info we already see in import_ok & import_problem
  209. pass
  210. elif key == "NODATA":
  211. self.results.append({'fingerprint': None,
  212. 'problem': '0', 'text': 'No valid data found'})
  213. elif key == "IMPORT_OK":
  214. reason, fingerprint = value.split()
  215. reasons = []
  216. for code, text in list(self.ok_reason.items()):
  217. if int(reason) | int(code) == int(reason):
  218. reasons.append(text)
  219. reasontext = '\n'.join(reasons) + "\n"
  220. self.results.append({'fingerprint': fingerprint,
  221. 'ok': reason, 'text': reasontext})
  222. self.fingerprints.append(fingerprint)
  223. elif key == "IMPORT_PROBLEM":
  224. try:
  225. reason, fingerprint = value.split()
  226. except:
  227. reason = value
  228. fingerprint = '<unknown>'
  229. self.results.append({'fingerprint': fingerprint,
  230. 'problem': reason, 'text': self.problem_reason[reason]})
  231. elif key == "IMPORT_RES":
  232. import_res = value.split()
  233. for i in range(len(self.counts)):
  234. setattr(self, self.counts[i], int(import_res[i]))
  235. elif key == "KEYEXPIRED":
  236. self.results.append({'fingerprint': None,
  237. 'problem': '0', 'text': 'Key expired'})
  238. elif key == "SIGEXPIRED":
  239. self.results.append({'fingerprint': None,
  240. 'problem': '0', 'text': 'Signature expired'})
  241. else:
  242. raise ValueError("Unknown status message: %r" % key)
  243. def summary(self):
  244. l = []
  245. l.append('%d imported'%self.imported)
  246. if self.not_imported:
  247. l.append('%d not imported'%self.not_imported)
  248. return ', '.join(l)
  249. class ListKeys(list):
  250. ''' Handle status messages for --list-keys.
  251. Handle pub and uid (relating the latter to the former).
  252. Don't care about (info from src/DETAILS):
  253. crt = X.509 certificate
  254. crs = X.509 certificate and private key available
  255. sub = subkey (secondary key)
  256. ssb = secret subkey (secondary key)
  257. uat = user attribute (same as user id except for field 10).
  258. sig = signature
  259. rev = revocation signature
  260. pkd = public key data (special field format, see below)
  261. grp = reserved for gpgsm
  262. rvk = revocation key
  263. '''
  264. def __init__(self, gpg):
  265. self.gpg = gpg
  266. self.curkey = None
  267. self.fingerprints = []
  268. self.uids = []
  269. def key(self, args):
  270. vars = ("""
  271. type trust length algo keyid date expires dummy ownertrust uid
  272. """).split()
  273. self.curkey = {}
  274. for i in range(len(vars)):
  275. self.curkey[vars[i]] = args[i]
  276. self.curkey['uids'] = []
  277. if self.curkey['uid']:
  278. self.curkey['uids'].append(self.curkey['uid'])
  279. del self.curkey['uid']
  280. self.append(self.curkey)
  281. pub = sec = key
  282. def fpr(self, args):
  283. self.curkey['fingerprint'] = args[9]
  284. self.fingerprints.append(args[9])
  285. def uid(self, args):
  286. self.curkey['uids'].append(args[9])
  287. self.uids.append(args[9])
  288. def handle_status(self, key, value):
  289. pass
  290. class Crypt(Verify):
  291. "Handle status messages for --encrypt and --decrypt"
  292. def __init__(self, gpg):
  293. Verify.__init__(self, gpg)
  294. self.data = ''
  295. self.ok = False
  296. self.status = ''
  297. def __nonzero__(self):
  298. if self.ok: return True
  299. return False
  300. __bool__ = __nonzero__
  301. def __str__(self):
  302. return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
  303. def handle_status(self, key, value):
  304. if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
  305. "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
  306. "CARDCTRL"):
  307. # in the case of ERROR, this is because a more specific error
  308. # message will have come first
  309. pass
  310. elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
  311. "MISSING_PASSPHRASE", "DECRYPTION_FAILED",
  312. "KEY_NOT_CREATED"):
  313. self.status = key.replace("_", " ").lower()
  314. elif key == "NEED_PASSPHRASE_SYM":
  315. self.status = 'need symmetric passphrase'
  316. elif key == "BEGIN_DECRYPTION":
  317. self.status = 'decryption incomplete'
  318. elif key == "BEGIN_ENCRYPTION":
  319. self.status = 'encryption incomplete'
  320. elif key == "DECRYPTION_OKAY":
  321. self.status = 'decryption ok'
  322. self.ok = True
  323. elif key == "END_ENCRYPTION":
  324. self.status = 'encryption ok'
  325. self.ok = True
  326. elif key == "INV_RECP":
  327. self.status = 'invalid recipient'
  328. elif key == "KEYEXPIRED":
  329. self.status = 'key expired'
  330. elif key == "SIG_CREATED":
  331. self.status = 'sig created'
  332. elif key == "SIGEXPIRED":
  333. self.status = 'sig expired'
  334. else:
  335. Verify.handle_status(self, key, value)
  336. class GenKey(object):
  337. "Handle status messages for --gen-key"
  338. def __init__(self, gpg):
  339. self.gpg = gpg
  340. self.type = None
  341. self.fingerprint = None
  342. def __nonzero__(self):
  343. if self.fingerprint: return True
  344. return False
  345. __bool__ = __nonzero__
  346. def __str__(self):
  347. return self.fingerprint or ''
  348. def handle_status(self, key, value):
  349. if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"):
  350. pass
  351. elif key == "KEY_CREATED":
  352. (self.type,self.fingerprint) = value.split()
  353. else:
  354. raise ValueError("Unknown status message: %r" % key)
  355. class DeleteResult(object):
  356. "Handle status messages for --delete-key and --delete-secret-key"
  357. def __init__(self, gpg):
  358. self.gpg = gpg
  359. self.status = 'ok'
  360. def __str__(self):
  361. return self.status
  362. problem_reason = {
  363. '1': 'No such key',
  364. '2': 'Must delete secret key first',
  365. '3': 'Ambigious specification',
  366. }
  367. def handle_status(self, key, value):
  368. if key == "DELETE_PROBLEM":
  369. self.status = self.problem_reason.get(value,
  370. "Unknown error: %r" % value)
  371. else:
  372. raise ValueError("Unknown status message: %r" % key)
  373. class Sign(object):
  374. "Handle status messages for --sign"
  375. def __init__(self, gpg):
  376. self.gpg = gpg
  377. self.type = None
  378. self.fingerprint = None
  379. def __nonzero__(self):
  380. return self.fingerprint is not None
  381. __bool__ = __nonzero__
  382. def __str__(self):
  383. return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
  384. def handle_status(self, key, value):
  385. if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
  386. "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL"):
  387. pass
  388. elif key == "SIG_CREATED":
  389. (self.type,
  390. algo, hashalgo, cls,
  391. self.timestamp, self.fingerprint
  392. ) = value.split()
  393. else:
  394. raise ValueError("Unknown status message: %r" % key)
  395. class GPG(object):
  396. decode_errors = 'strict'
  397. result_map = {
  398. 'crypt': Crypt,
  399. 'delete': DeleteResult,
  400. 'generate': GenKey,
  401. 'import': ImportResult,
  402. 'list': ListKeys,
  403. 'sign': Sign,
  404. 'verify': Verify,
  405. }
  406. "Encapsulate access to the gpg executable"
  407. def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False,
  408. use_agent=False, keyring=None):
  409. """Initialize a GPG process wrapper. Options are:
  410. gpgbinary -- full pathname for GPG binary.
  411. gnupghome -- full pathname to where we can find the public and
  412. private keyrings. Default is whatever gpg defaults to.
  413. keyring -- name of alternative keyring file to use. If specified,
  414. the default keyring is not used.
  415. """
  416. self.gpgbinary = gpgbinary
  417. self.gnupghome = gnupghome
  418. self.keyring = keyring
  419. self.verbose = verbose
  420. self.use_agent = use_agent
  421. self.encoding = locale.getpreferredencoding()
  422. if self.encoding is None: # This happens on Jython!
  423. self.encoding = sys.stdin.encoding
  424. if gnupghome and not os.path.isdir(self.gnupghome):
  425. os.makedirs(self.gnupghome,0x1C0)
  426. p = self._open_subprocess(["--version"])
  427. result = self.result_map['verify'](self) # any result will do for this
  428. self._collect_output(p, result, stdin=p.stdin)
  429. if p.returncode != 0:
  430. raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
  431. result.stderr))
  432. def _open_subprocess(self, args, passphrase=False):
  433. # Internal method: open a pipe to a GPG subprocess and return
  434. # the file objects for communicating with it.
  435. cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
  436. if self.gnupghome:
  437. cmd.append('--homedir "%s" ' % self.gnupghome)
  438. if self.keyring:
  439. cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring)
  440. if passphrase:
  441. cmd.append('--batch --passphrase-fd 0')
  442. if self.use_agent:
  443. cmd.append('--use-agent')
  444. cmd.extend(args)
  445. cmd = ' '.join(cmd)
  446. if self.verbose:
  447. print(cmd)
  448. logger.debug("%s", cmd)
  449. return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
  450. def _read_response(self, stream, result):
  451. # Internal method: reads all the stderr output from GPG, taking notice
  452. # only of lines that begin with the magic [GNUPG:] prefix.
  453. #
  454. # Calls methods on the response object for each valid token found,
  455. # with the arg being the remainder of the status line.
  456. lines = []
  457. while True:
  458. line = stream.readline()
  459. if len(line) == 0:
  460. break
  461. lines.append(line)
  462. line = line.rstrip()
  463. if self.verbose:
  464. print(line)
  465. logger.debug("%s", line)
  466. if line[0:9] == '[GNUPG:] ':
  467. # Chop off the prefix
  468. line = line[9:]
  469. L = line.split(None, 1)
  470. keyword = L[0]
  471. if len(L) > 1:
  472. value = L[1]
  473. else:
  474. value = ""
  475. result.handle_status(keyword, value)
  476. result.stderr = ''.join(lines)
  477. def _read_data(self, stream, result):
  478. # Read the contents of the file from GPG's stdout
  479. chunks = []
  480. while True:
  481. data = stream.read(1024)
  482. if len(data) == 0:
  483. break
  484. logger.debug("chunk: %r" % data[:256])
  485. chunks.append(data)
  486. if _py3k:
  487. # Join using b'' or '', as appropriate
  488. result.data = type(data)().join(chunks)
  489. else:
  490. result.data = ''.join(chunks)
  491. def _collect_output(self, process, result, writer=None, stdin=None):
  492. """
  493. Drain the subprocesses output streams, writing the collected output
  494. to the result. If a writer thread (writing to the subprocess) is given,
  495. make sure it's joined before returning. If a stdin stream is given,
  496. close it before returning.
  497. """
  498. stderr = codecs.getreader(self.encoding)(process.stderr)
  499. rr = threading.Thread(target=self._read_response, args=(stderr, result))
  500. rr.setDaemon(True)
  501. logger.debug('stderr reader: %r', rr)
  502. rr.start()
  503. stdout = process.stdout
  504. dr = threading.Thread(target=self._read_data, args=(stdout, result))
  505. dr.setDaemon(True)
  506. logger.debug('stdout reader: %r', dr)
  507. dr.start()
  508. dr.join()
  509. rr.join()
  510. if writer is not None:
  511. writer.join()
  512. process.wait()
  513. if stdin is not None:
  514. try:
  515. stdin.close()
  516. except IOError:
  517. pass
  518. stderr.close()
  519. stdout.close()
  520. def _handle_io(self, args, file, result, passphrase=None, binary=False):
  521. "Handle a call to GPG - pass input data, collect output data"
  522. # Handle a basic data call - pass data to GPG, handle the output
  523. # including status information. Garbage In, Garbage Out :)
  524. p = self._open_subprocess(args, passphrase is not None)
  525. if not binary:
  526. stdin = codecs.getwriter(self.encoding)(p.stdin)
  527. else:
  528. stdin = p.stdin
  529. if passphrase:
  530. _write_passphrase(stdin, passphrase, self.encoding)
  531. writer = _threaded_copy_data(file, stdin)
  532. self._collect_output(p, result, writer, stdin)
  533. return result
  534. #
  535. # SIGNATURE METHODS
  536. #
  537. def sign(self, message, **kwargs):
  538. """sign message"""
  539. f = _make_binary_stream(message, self.encoding)
  540. result = self.sign_file(f, **kwargs)
  541. f.close()
  542. return result
  543. def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
  544. detach=False, binary=False):
  545. """sign file"""
  546. logger.debug("sign_file: %s", file)
  547. if binary:
  548. args = ['-s']
  549. else:
  550. args = ['-sa']
  551. # You can't specify detach-sign and clearsign together: gpg ignores
  552. # the detach-sign in that case.
  553. if detach:
  554. args.append("--detach-sign")
  555. elif clearsign:
  556. args.append("--clearsign")
  557. if keyid:
  558. args.append('--default-key "%s"' % keyid)
  559. args.extend(['--no-version', "--comment ''"])
  560. result = self.result_map['sign'](self)
  561. #We could use _handle_io here except for the fact that if the
  562. #passphrase is bad, gpg bails and you can't write the message.
  563. p = self._open_subprocess(args, passphrase is not None)
  564. try:
  565. stdin = p.stdin
  566. if passphrase:
  567. _write_passphrase(stdin, passphrase, self.encoding)
  568. writer = _threaded_copy_data(file, stdin)
  569. except IOError:
  570. logging.exception("error writing message")
  571. writer = None
  572. self._collect_output(p, result, writer, stdin)
  573. return result
  574. def verify(self, data):
  575. """Verify the signature on the contents of the string 'data'
  576. >>> gpg = GPG(gnupghome="keys")
  577. >>> input = gpg.gen_key_input(Passphrase='foo')
  578. >>> key = gpg.gen_key(input)
  579. >>> assert key
  580. >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
  581. >>> assert not sig
  582. >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
  583. >>> assert sig
  584. >>> verify = gpg.verify(sig.data)
  585. >>> assert verify
  586. """
  587. f = _make_binary_stream(data, self.encoding)
  588. result = self.verify_file(f)
  589. f.close()
  590. return result
  591. def verify_file(self, file, data_filename=None):
  592. "Verify the signature on the contents of the file-like object 'file'"
  593. logger.debug('verify_file: %r, %r', file, data_filename)
  594. result = self.result_map['verify'](self)
  595. args = ['--verify']
  596. if data_filename is None:
  597. self._handle_io(args, file, result, binary=True)
  598. else:
  599. logger.debug('Handling detached verification')
  600. import tempfile
  601. fd, fn = tempfile.mkstemp(prefix='pygpg')
  602. s = file.read()
  603. file.close()
  604. logger.debug('Wrote to temp file: %r', s)
  605. os.write(fd, s)
  606. os.close(fd)
  607. args.append(fn)
  608. args.append('"%s"' % data_filename)
  609. try:
  610. p = self._open_subprocess(args)
  611. self._collect_output(p, result, stdin=p.stdin)
  612. finally:
  613. os.unlink(fn)
  614. return result
  615. #
  616. # KEY MANAGEMENT
  617. #
  618. def import_keys(self, key_data):
  619. """ import the key_data into our keyring
  620. >>> import shutil
  621. >>> shutil.rmtree("keys")
  622. >>> gpg = GPG(gnupghome="keys")
  623. >>> input = gpg.gen_key_input()
  624. >>> result = gpg.gen_key(input)
  625. >>> print1 = result.fingerprint
  626. >>> result = gpg.gen_key(input)
  627. >>> print2 = result.fingerprint
  628. >>> pubkey1 = gpg.export_keys(print1)
  629. >>> seckey1 = gpg.export_keys(print1,secret=True)
  630. >>> seckeys = gpg.list_keys(secret=True)
  631. >>> pubkeys = gpg.list_keys()
  632. >>> assert print1 in seckeys.fingerprints
  633. >>> assert print1 in pubkeys.fingerprints
  634. >>> str(gpg.delete_keys(print1))
  635. 'Must delete secret key first'
  636. >>> str(gpg.delete_keys(print1,secret=True))
  637. 'ok'
  638. >>> str(gpg.delete_keys(print1))
  639. 'ok'
  640. >>> str(gpg.delete_keys("nosuchkey"))
  641. 'No such key'
  642. >>> seckeys = gpg.list_keys(secret=True)
  643. >>> pubkeys = gpg.list_keys()
  644. >>> assert not print1 in seckeys.fingerprints
  645. >>> assert not print1 in pubkeys.fingerprints
  646. >>> result = gpg.import_keys('foo')
  647. >>> assert not result
  648. >>> result = gpg.import_keys(pubkey1)
  649. >>> pubkeys = gpg.list_keys()
  650. >>> seckeys = gpg.list_keys(secret=True)
  651. >>> assert not print1 in seckeys.fingerprints
  652. >>> assert print1 in pubkeys.fingerprints
  653. >>> result = gpg.import_keys(seckey1)
  654. >>> assert result
  655. >>> seckeys = gpg.list_keys(secret=True)
  656. >>> pubkeys = gpg.list_keys()
  657. >>> assert print1 in seckeys.fingerprints
  658. >>> assert print1 in pubkeys.fingerprints
  659. >>> assert print2 in pubkeys.fingerprints
  660. """
  661. result = self.result_map['import'](self)
  662. logger.debug('import_keys: %r', key_data[:256])
  663. data = _make_binary_stream(key_data, self.encoding)
  664. self._handle_io(['--import'], data, result, binary=True)
  665. logger.debug('import_keys result: %r', result.__dict__)
  666. data.close()
  667. return result
  668. def recv_keys(self, keyserver, *keyids):
  669. """Import a key from a keyserver
  670. >>> import shutil
  671. >>> shutil.rmtree("keys")
  672. >>> gpg = GPG(gnupghome="keys")
  673. >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA')
  674. >>> assert result
  675. """
  676. result = self.result_map['import'](self)
  677. logger.debug('recv_keys: %r', keyids)
  678. data = _make_binary_stream("", self.encoding)
  679. #data = ""
  680. args = ['--keyserver', keyserver, '--recv-keys']
  681. args.extend(keyids)
  682. self._handle_io(args, data, result, binary=True)
  683. logger.debug('recv_keys result: %r', result.__dict__)
  684. data.close()
  685. return result
  686. def delete_keys(self, fingerprints, secret=False):
  687. which='key'
  688. if secret:
  689. which='secret-key'
  690. if _is_sequence(fingerprints):
  691. fingerprints = ' '.join(fingerprints)
  692. args = ['--batch --delete-%s "%s"' % (which, fingerprints)]
  693. result = self.result_map['delete'](self)
  694. p = self._open_subprocess(args)
  695. self._collect_output(p, result, stdin=p.stdin)
  696. return result
  697. def export_keys(self, keyids, secret=False):
  698. "export the indicated keys. 'keyid' is anything gpg accepts"
  699. which=''
  700. if secret:
  701. which='-secret-key'
  702. if _is_sequence(keyids):
  703. keyids = ' '.join(['"%s"' % k for k in keyids])
  704. args = ["--armor --export%s %s" % (which, keyids)]
  705. p = self._open_subprocess(args)
  706. # gpg --export produces no status-fd output; stdout will be
  707. # empty in case of failure
  708. #stdout, stderr = p.communicate()
  709. result = self.result_map['delete'](self) # any result will do
  710. self._collect_output(p, result, stdin=p.stdin)
  711. logger.debug('export_keys result: %r', result.data)
  712. return result.data.decode(self.encoding, self.decode_errors)
  713. def list_keys(self, secret=False):
  714. """ list the keys currently in the keyring
  715. >>> import shutil
  716. >>> shutil.rmtree("keys")
  717. >>> gpg = GPG(gnupghome="keys")
  718. >>> input = gpg.gen_key_input()
  719. >>> result = gpg.gen_key(input)
  720. >>> print1 = result.fingerprint
  721. >>> result = gpg.gen_key(input)
  722. >>> print2 = result.fingerprint
  723. >>> pubkeys = gpg.list_keys()
  724. >>> assert print1 in pubkeys.fingerprints
  725. >>> assert print2 in pubkeys.fingerprints
  726. """
  727. which='keys'
  728. if secret:
  729. which='secret-keys'
  730. args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,)
  731. args = [args]
  732. p = self._open_subprocess(args)
  733. # there might be some status thingumy here I should handle... (amk)
  734. # ...nope, unless you care about expired sigs or keys (stevegt)
  735. # Get the response information
  736. result = self.result_map['list'](self)
  737. self._collect_output(p, result, stdin=p.stdin)
  738. lines = result.data.decode(self.encoding,
  739. self.decode_errors).splitlines()
  740. valid_keywords = 'pub uid sec fpr'.split()
  741. for line in lines:
  742. if self.verbose:
  743. print(line)
  744. logger.debug("line: %r", line.rstrip())
  745. if not line:
  746. break
  747. L = line.strip().split(':')
  748. if not L:
  749. continue
  750. keyword = L[0]
  751. if keyword in valid_keywords:
  752. getattr(result, keyword)(L)
  753. return result
  754. def gen_key(self, input):
  755. """Generate a key; you might use gen_key_input() to create the
  756. control input.
  757. >>> gpg = GPG(gnupghome="keys")
  758. >>> input = gpg.gen_key_input()
  759. >>> result = gpg.gen_key(input)
  760. >>> assert result
  761. >>> result = gpg.gen_key('foo')
  762. >>> assert not result
  763. """
  764. args = ["--gen-key --batch"]
  765. result = self.result_map['generate'](self)
  766. f = _make_binary_stream(input, self.encoding)
  767. self._handle_io(args, f, result, binary=True)
  768. f.close()
  769. return result
  770. def gen_key_input(self, **kwargs):
  771. """
  772. Generate --gen-key input per gpg doc/DETAILS
  773. """
  774. parms = {}
  775. for key, val in list(kwargs.items()):
  776. key = key.replace('_','-').title()
  777. parms[key] = val
  778. parms.setdefault('Key-Type','RSA')
  779. parms.setdefault('Key-Length',1024)
  780. parms.setdefault('Name-Real', "Autogenerated Key")
  781. parms.setdefault('Name-Comment', "Generated by gnupg.py")
  782. try:
  783. logname = os.environ['LOGNAME']
  784. except KeyError:
  785. logname = os.environ['USERNAME']
  786. hostname = socket.gethostname()
  787. parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'),
  788. hostname))
  789. out = "Key-Type: %s\n" % parms.pop('Key-Type')
  790. for key, val in list(parms.items()):
  791. out += "%s: %s\n" % (key, val)
  792. out += "%commit\n"
  793. return out
  794. # Key-Type: RSA
  795. # Key-Length: 1024
  796. # Name-Real: ISdlink Server on %s
  797. # Name-Comment: Created by %s
  798. # Name-Email: isdlink@%s
  799. # Expire-Date: 0
  800. # %commit
  801. #
  802. #
  803. # Key-Type: DSA
  804. # Key-Length: 1024
  805. # Subkey-Type: ELG-E
  806. # Subkey-Length: 1024
  807. # Name-Real: Joe Tester
  808. # Name-Comment: with stupid passphrase
  809. # Name-Email: joe@foo.bar
  810. # Expire-Date: 0
  811. # Passphrase: abc
  812. # %pubring foo.pub
  813. # %secring foo.sec
  814. # %commit
  815. #
  816. # ENCRYPTION
  817. #
  818. def encrypt_file(self, file, recipients, sign=None,
  819. always_trust=False, passphrase=None,
  820. armor=True, output=None, symmetric=False):
  821. "Encrypt the message read from the file-like object 'file'"
  822. args = ['--no-version', "--comment ''"]
  823. if symmetric:
  824. args.append('--symmetric')
  825. else:
  826. args.append('--encrypt')
  827. if not _is_sequence(recipients):
  828. recipients = (recipients,)
  829. for recipient in recipients:
  830. args.append('--recipient "%s"' % recipient)
  831. if armor: # create ascii-armored output - set to False for binary output
  832. args.append('--armor')
  833. if output: # write the output to a file with the specified name
  834. if os.path.exists(output):
  835. os.remove(output) # to avoid overwrite confirmation message
  836. args.append('--output "%s"' % output)
  837. if sign:
  838. args.append('--sign --default-key "%s"' % sign)
  839. if always_trust:
  840. args.append("--always-trust")
  841. result = self.result_map['crypt'](self)
  842. self._handle_io(args, file, result, passphrase=passphrase, binary=True)
  843. logger.debug('encrypt result: %r', result.data)
  844. return result
  845. def encrypt(self, data, recipients, **kwargs):
  846. """Encrypt the message contained in the string 'data'
  847. >>> import shutil
  848. >>> if os.path.exists("keys"):
  849. ... shutil.rmtree("keys")
  850. >>> gpg = GPG(gnupghome="keys")
  851. >>> input = gpg.gen_key_input(passphrase='foo')
  852. >>> result = gpg.gen_key(input)
  853. >>> print1 = result.fingerprint
  854. >>> input = gpg.gen_key_input()
  855. >>> result = gpg.gen_key(input)
  856. >>> print2 = result.fingerprint
  857. >>> result = gpg.encrypt("hello",print2)
  858. >>> message = str(result)
  859. >>> assert message != 'hello'
  860. >>> result = gpg.decrypt(message)
  861. >>> assert result
  862. >>> str(result)
  863. 'hello'
  864. >>> result = gpg.encrypt("hello again",print1)
  865. >>> message = str(result)
  866. >>> result = gpg.decrypt(message)
  867. >>> result.status == 'need passphrase'
  868. True
  869. >>> result = gpg.decrypt(message,passphrase='bar')
  870. >>> result.status in ('decryption failed', 'bad passphrase')
  871. True
  872. >>> assert not result
  873. >>> result = gpg.decrypt(message,passphrase='foo')
  874. >>> result.status == 'decryption ok'
  875. True
  876. >>> str(result)
  877. 'hello again'
  878. >>> result = gpg.encrypt("signed hello",print2,sign=print1)
  879. >>> result.status == 'need passphrase'
  880. True
  881. >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
  882. >>> result.status == 'encryption ok'
  883. True
  884. >>> message = str(result)
  885. >>> result = gpg.decrypt(message)
  886. >>> result.status == 'decryption ok'
  887. True
  888. >>> assert result.fingerprint == print1
  889. """
  890. data = _make_binary_stream(data, self.encoding)
  891. result = self.encrypt_file(data, recipients, **kwargs)
  892. data.close()
  893. return result
  894. def decrypt(self, message, **kwargs):
  895. data = _make_binary_stream(message, self.encoding)
  896. result = self.decrypt_file(data, **kwargs)
  897. data.close()
  898. return result
  899. def decrypt_file(self, file, always_trust=False, passphrase=None,
  900. output=None):
  901. args = ["--decrypt"]
  902. if output: # write the output to a file with the specified name
  903. if os.path.exists(output):
  904. os.remove(output) # to avoid overwrite confirmation message
  905. args.append('--output "%s"' % output)
  906. if always_trust:
  907. args.append("--always-trust")
  908. result = self.result_map['crypt'](self)
  909. self._handle_io(args, file, result, passphrase, binary=True)
  910. logger.debug('decrypt result: %r', result.data)
  911. return result