123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- # -*- coding: utf-8 -*-
- """
- sleekxmpp.util.sasl.client
- ~~~~~~~~~~~~~~~~~~~~~~~~~~
- This module was originally based on Dave Cridland's Suelta library.
- Part of SleekXMPP: The Sleek XMPP Library
- :copryight: (c) 2004-2013 David Alan Cridland
- :copyright: (c) 2013 Nathanael C. Fritz, Lance J.T. Stout
- :license: MIT, see LICENSE for more details
- """
- import logging
- import stringprep
- from sleekxmpp.util import hashes, bytes, stringprep_profiles
- log = logging.getLogger(__name__)
- #: Global registry mapping mechanism names to implementation classes.
- MECHANISMS = {}
- #: Global registry mapping mechanism names to security scores.
- MECH_SEC_SCORES = {}
- #: The SASLprep profile of stringprep used to validate simple username
- #: and password credentials.
- saslprep = stringprep_profiles.create(
- nfkc=True,
- bidi=True,
- mappings=[
- stringprep_profiles.b1_mapping,
- stringprep_profiles.c12_mapping],
- prohibited=[
- stringprep.in_table_c12,
- stringprep.in_table_c21,
- stringprep.in_table_c22,
- stringprep.in_table_c3,
- stringprep.in_table_c4,
- stringprep.in_table_c5,
- stringprep.in_table_c6,
- stringprep.in_table_c7,
- stringprep.in_table_c8,
- stringprep.in_table_c9],
- unassigned=[stringprep.in_table_a1])
- def sasl_mech(score):
- sec_score = score
- def register(mech):
- n = 0
- mech.score = sec_score
- if mech.use_hashes:
- for hashing_alg in hashes():
- n += 1
- score = mech.score + n
- name = '%s-%s' % (mech.name, hashing_alg)
- MECHANISMS[name] = mech
- MECH_SEC_SCORES[name] = score
- if mech.channel_binding:
- name += '-PLUS'
- score += 10
- MECHANISMS[name] = mech
- MECH_SEC_SCORES[name] = score
- else:
- MECHANISMS[mech.name] = mech
- MECH_SEC_SCORES[mech.name] = mech.score
- if mech.channel_binding:
- MECHANISMS[mech.name + '-PLUS'] = mech
- MECH_SEC_SCORES[name] = mech.score + 10
- return mech
- return register
- class SASLNoAppropriateMechanism(Exception):
- def __init__(self, value=''):
- self.message = value
- class SASLCancelled(Exception):
- def __init__(self, value=''):
- self.message = value
- class SASLFailed(Exception):
- def __init__(self, value=''):
- self.message = value
- class SASLMutualAuthFailed(SASLFailed):
- def __init__(self, value=''):
- self.message = value
- class Mech(object):
- name = 'GENERIC'
- score = -1
- use_hashes = False
- channel_binding = False
- required_credentials = set()
- optional_credentials = set()
- security = set()
- def __init__(self, name, credentials, security_settings):
- self.credentials = credentials
- self.security_settings = security_settings
- self.values = {}
- self.base_name = self.name
- self.name = name
- self.setup(name)
- def setup(self, name):
- pass
- def process(self, challenge=b''):
- return b''
- def choose(mech_list, credentials, security_settings, limit=None, min_mech=None):
- available_mechs = set(MECHANISMS.keys())
- if limit is None:
- limit = set(mech_list)
- if not isinstance(limit, set):
- limit = set(limit)
- if not isinstance(mech_list, set):
- mech_list = set(mech_list)
- mech_list = mech_list.intersection(limit)
- available_mechs = available_mechs.intersection(mech_list)
- best_score = MECH_SEC_SCORES.get(min_mech, -1)
- best_mech = None
- for name in available_mechs:
- if name in MECH_SEC_SCORES:
- if MECH_SEC_SCORES[name] > best_score:
- best_score = MECH_SEC_SCORES[name]
- best_mech = name
- if best_mech is None:
- raise SASLNoAppropriateMechanism()
- mech_class = MECHANISMS[best_mech]
- try:
- creds = credentials(mech_class.required_credentials,
- mech_class.optional_credentials)
- for req in mech_class.required_credentials:
- if req not in creds:
- raise SASLCancelled('Missing credential: %s' % req)
- for opt in mech_class.optional_credentials:
- if opt not in creds:
- creds[opt] = b''
- for cred in creds:
- if cred in ('username', 'password', 'authzid'):
- creds[cred] = bytes(saslprep(creds[cred]))
- else:
- creds[cred] = bytes(creds[cred])
- security_opts = security_settings(mech_class.security)
- return mech_class(best_mech, creds, security_opts)
- except SASLCancelled as e:
- log.info('SASL: %s: %s', best_mech, e.message)
- mech_list.remove(best_mech)
- return choose(mech_list, credentials, security_settings,
- limit=limit,
- min_mech=min_mech)
|