gpg_holder.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import logging
  2. import gnupg
  3. import os
  4. log = logging.getLogger(__name__)
  5. log.setLevel(logging.DEBUG)
  6. #handler = logging.FileHandler('gpg_handler.log')
  7. handler2 = logging.StreamHandler()
  8. formatter = logging.Formatter(
  9. '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  10. # handler.setFormatter(formatter)
  11. handler2.setFormatter(formatter)
  12. # log.addHandler(handler)
  13. log.addHandler(handler2)
  14. class Gpg:
  15. def __init__(self, gnupghome=''):
  16. self.gnupghome = gnupghome
  17. log.info('New session.\n')
  18. if len(gnupghome) == 0:
  19. log.debug("Path not specified. Setting default gnupg directory.")
  20. log.debug('Creating gnupg instance in {}'.format(
  21. "default directory."))
  22. self.gpg = gnupg.GPG()
  23. return None
  24. else:
  25. log.debug("Setting {} as gnupg directory".format(gnupghome))
  26. if os.path.exists(gnupghome):
  27. log.debug('Creating gnupg instance in {}'.format(gnupghome))
  28. self.gpg = gnupg.GPG(gnupghome=gnupghome)
  29. return None
  30. else:
  31. log.warning(
  32. "path {} does not exist. Trying to create...".format(gnupghome))
  33. try:
  34. log.info("Creating dir in {}".format(gnupghome))
  35. os.mkdir(gnupghome)
  36. log.debug('Creating gnupg instance in {}'.format(gnupghome))
  37. self.gpg = gnupg.GPG(gnupghome=gnupghome)
  38. return None
  39. except Exception as e:
  40. log.error('Cant create dir {}'.format(e))
  41. return None
  42. def list_keys(self, **kwargs):
  43. #TODO: multisearch
  44. try:
  45. keys = self.gpg.list_keys()
  46. if len(keys) == 0:
  47. log.warning("gpg database is empty.")
  48. return [], 'Empty database'
  49. set1 = set(keys[1].keys())
  50. set2 = set(kwargs.keys())
  51. if set2-set1 != set():
  52. log.warning('Wrong keyword {}'.format(set2-set1))
  53. return [], "Wrong keywords {}".format(set2-set1)
  54. elif kwargs == {}:
  55. log.debug('Returning all keys.')
  56. return keys, None
  57. else:
  58. results = []
  59. log.debug('Searching {} in keys.'.format(kwargs))
  60. for _dict in keys:
  61. for keyword in kwargs:
  62. if type(_dict[keyword]) == type(list()):
  63. for value in _dict[keyword]:
  64. if kwargs[keyword] in value:
  65. log.debug(
  66. 'Match in {}.'.format(_dict[keyword]))
  67. if not _dict in results:
  68. results.append(_dict)
  69. else:
  70. if kwargs[keyword] in _dict[keyword]:
  71. log.debug(
  72. 'Match in {}.'.format(_dict[keyword]))
  73. if not _dict in results:
  74. results.append(_dict)
  75. log.debug("Returning {} matches.".format(len(results)))
  76. return results, None
  77. except Exception as e:
  78. log.error("Error in list_keys(self)", e)
  79. return [], 'Error in Gpg.list_keys(self, **kwargs)'
  80. def encrypt(self, message, sign=None, file=None, **kwargs):
  81. # TODO FILE ENCRYPT
  82. try:
  83. log.debug("autosearch enabled.")
  84. recipients = []
  85. encrypted = []
  86. errors = ''
  87. log.debug("Searching recipients.")
  88. keys, error = self.list_keys(**kwargs)
  89. if error != None:
  90. log.error(error)
  91. return [], error
  92. for key in keys:
  93. if key['ownertrust'] == '-':
  94. log.warning(
  95. "Key {} is untrusted. Forcing...".format(key['uids']))
  96. _encrypted = self.gpg.encrypt(
  97. str(message), key['keyid'], sign=sign, always_trust=True)
  98. encrypted.append(_encrypted.data.decode())
  99. if not _encrypted.ok:
  100. errors += _encrypted.stderr + '\n'
  101. return encrypted, errors
  102. except Exception as e:
  103. log.error("Error in self.encrypt()", e)
  104. return [], "Error in self.encrypt()"
  105. def decrypt(self):
  106. pass
  107. def sign(self):
  108. pass
  109. def symmetric_encrypt(self):
  110. pass
  111. def symmetric_decrypt(self):
  112. pass