123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- import logging
- import gnupg
- import os
- log = logging.getLogger(__name__)
- log.setLevel(logging.DEBUG)
- #handler = logging.FileHandler('gpg_handler.log')
- handler2 = logging.StreamHandler()
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- # handler.setFormatter(formatter)
- handler2.setFormatter(formatter)
- # log.addHandler(handler)
- log.addHandler(handler2)
- class Gpg:
- def __init__(self, gnupghome=''):
- self.gnupghome = gnupghome
- log.info('New session.\n')
- if len(gnupghome) == 0:
- log.debug("Path not specified. Setting default gnupg directory.")
- log.debug('Creating gnupg instance in {}'.format(
- "default directory."))
- self.gpg = gnupg.GPG()
- return None
- else:
- log.debug("Setting {} as gnupg directory".format(gnupghome))
- if os.path.exists(gnupghome):
- log.debug('Creating gnupg instance in {}'.format(gnupghome))
- self.gpg = gnupg.GPG(gnupghome=gnupghome)
- return None
- else:
- log.warning(
- "path {} does not exist. Trying to create...".format(gnupghome))
- try:
- log.info("Creating dir in {}".format(gnupghome))
- os.mkdir(gnupghome)
- log.debug('Creating gnupg instance in {}'.format(gnupghome))
- self.gpg = gnupg.GPG(gnupghome=gnupghome)
- return None
- except Exception as e:
- log.error('Cant create dir {}'.format(e))
- return None
- def list_keys(self, **kwargs):
- #TODO: multisearch
- try:
- keys = self.gpg.list_keys()
- if len(keys) == 0:
- log.warning("gpg database is empty.")
- return [], 'Empty database'
- set1 = set(keys[1].keys())
- set2 = set(kwargs.keys())
- if set2-set1 != set():
- log.warning('Wrong keyword {}'.format(set2-set1))
- return [], "Wrong keywords {}".format(set2-set1)
- elif kwargs == {}:
- log.debug('Returning all keys.')
- return keys, None
- else:
- results = []
- log.debug('Searching {} in keys.'.format(kwargs))
- for _dict in keys:
- for keyword in kwargs:
- if type(_dict[keyword]) == type(list()):
- for value in _dict[keyword]:
- if kwargs[keyword] in value:
- log.debug(
- 'Match in {}.'.format(_dict[keyword]))
- if not _dict in results:
- results.append(_dict)
- else:
- if kwargs[keyword] in _dict[keyword]:
- log.debug(
- 'Match in {}.'.format(_dict[keyword]))
- if not _dict in results:
- results.append(_dict)
- log.debug("Returning {} matches.".format(len(results)))
- return results, None
- except Exception as e:
- log.error("Error in list_keys(self)", e)
- return [], 'Error in Gpg.list_keys(self, **kwargs)'
- def encrypt(self, message, sign=None, file=None, **kwargs):
- # TODO FILE ENCRYPT
- try:
- log.debug("autosearch enabled.")
- recipients = []
- encrypted = []
- errors = ''
- log.debug("Searching recipients.")
- keys, error = self.list_keys(**kwargs)
- if error != None:
- log.error(error)
- return [], error
- for key in keys:
- if key['ownertrust'] == '-':
- log.warning(
- "Key {} is untrusted. Forcing...".format(key['uids']))
- _encrypted = self.gpg.encrypt(
- str(message), key['keyid'], sign=sign, always_trust=True)
- encrypted.append(_encrypted.data.decode())
- if not _encrypted.ok:
- errors += _encrypted.stderr + '\n'
- return encrypted, errors
- except Exception as e:
- log.error("Error in self.encrypt()", e)
- return [], "Error in self.encrypt()"
- def decrypt(self):
- pass
- def sign(self):
- pass
- def symmetric_encrypt(self):
- pass
- def symmetric_decrypt(self):
- pass
|