123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632 |
- # -*- coding: utf-8 -*-
- """
- sleekxmpp.jid
- ~~~~~~~~~~~~~~~~~~~~~~~
- This module allows for working with Jabber IDs (JIDs).
- Part of SleekXMPP: The Sleek XMPP Library
- :copyright: (c) 2011 Nathanael C. Fritz
- :license: MIT, see LICENSE for more details
- """
- from __future__ import unicode_literals
- import re
- import socket
- import stringprep
- import threading
- import encodings.idna
- from copy import deepcopy
- from sleekxmpp.util import stringprep_profiles
- from sleekxmpp.thirdparty import OrderedDict
- #: These characters are not allowed to appear in a JID.
- ILLEGAL_CHARS = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r' + \
- '\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19' + \
- '\x1a\x1b\x1c\x1d\x1e\x1f' + \
- ' !"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~\x7f'
- #: The basic regex pattern that a JID must match in order to determine
- #: the local, domain, and resource parts. This regex does NOT do any
- #: validation, which requires application of nodeprep, resourceprep, etc.
- JID_PATTERN = re.compile(
- "^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$"
- )
- #: The set of escape sequences for the characters not allowed by nodeprep.
- JID_ESCAPE_SEQUENCES = set(['\\20', '\\22', '\\26', '\\27', '\\2f',
- '\\3a', '\\3c', '\\3e', '\\40', '\\5c'])
- #: A mapping of unallowed characters to their escape sequences. An escape
- #: sequence for '\' is also included since it must also be escaped in
- #: certain situations.
- JID_ESCAPE_TRANSFORMATIONS = {' ': '\\20',
- '"': '\\22',
- '&': '\\26',
- "'": '\\27',
- '/': '\\2f',
- ':': '\\3a',
- '<': '\\3c',
- '>': '\\3e',
- '@': '\\40',
- '\\': '\\5c'}
- #: The reverse mapping of escape sequences to their original forms.
- JID_UNESCAPE_TRANSFORMATIONS = {'\\20': ' ',
- '\\22': '"',
- '\\26': '&',
- '\\27': "'",
- '\\2f': '/',
- '\\3a': ':',
- '\\3c': '<',
- '\\3e': '>',
- '\\40': '@',
- '\\5c': '\\'}
- JID_CACHE = OrderedDict()
- JID_CACHE_LOCK = threading.Lock()
- JID_CACHE_MAX_SIZE = 1024
- def _cache(key, parts, locked):
- with JID_CACHE_LOCK:
- JID_CACHE[key] = (parts, locked)
- while len(JID_CACHE) > JID_CACHE_MAX_SIZE:
- found = None
- for key, item in JID_CACHE.items():
- if not item[1]: # if not locked
- found = key
- break
- if not found: # more than MAX_SIZE locked
- # warn?
- break
- del JID_CACHE[found]
- # pylint: disable=c0103
- #: The nodeprep profile of stringprep used to validate the local,
- #: or username, portion of a JID.
- nodeprep = stringprep_profiles.create(
- nfkc=True,
- bidi=True,
- mappings=[
- stringprep_profiles.b1_mapping,
- stringprep.map_table_b2],
- prohibited=[
- stringprep.in_table_c11,
- stringprep.in_table_c12,
- stringprep.in_table_c21,
- stringprep.in_table_c22,
- stringprep.in_table_c3,
- stringprep.in_table_c4,
- stringprep.in_table_c5,
- stringprep.in_table_c6,
- stringprep.in_table_c7,
- stringprep.in_table_c8,
- stringprep.in_table_c9,
- lambda c: c in ' \'"&/:<>@'],
- unassigned=[stringprep.in_table_a1])
- # pylint: disable=c0103
- #: The resourceprep profile of stringprep, which is used to validate
- #: the resource portion of a JID.
- resourceprep = stringprep_profiles.create(
- nfkc=True,
- bidi=True,
- mappings=[stringprep_profiles.b1_mapping],
- prohibited=[
- stringprep.in_table_c12,
- stringprep.in_table_c21,
- stringprep.in_table_c22,
- stringprep.in_table_c3,
- stringprep.in_table_c4,
- stringprep.in_table_c5,
- stringprep.in_table_c6,
- stringprep.in_table_c7,
- stringprep.in_table_c8,
- stringprep.in_table_c9],
- unassigned=[stringprep.in_table_a1])
- def _parse_jid(data):
- """
- Parse string data into the node, domain, and resource
- components of a JID, if possible.
- :param string data: A string that is potentially a JID.
- :raises InvalidJID:
- :returns: tuple of the validated local, domain, and resource strings
- """
- match = JID_PATTERN.match(data)
- if not match:
- raise InvalidJID('JID could not be parsed')
- (node, domain, resource) = match.groups()
- node = _validate_node(node)
- domain = _validate_domain(domain)
- resource = _validate_resource(resource)
- return node, domain, resource
- def _validate_node(node):
- """Validate the local, or username, portion of a JID.
- :raises InvalidJID:
- :returns: The local portion of a JID, as validated by nodeprep.
- """
- try:
- if node is not None:
- node = nodeprep(node)
- if not node:
- raise InvalidJID('Localpart must not be 0 bytes')
- if len(node) > 1023:
- raise InvalidJID('Localpart must be less than 1024 bytes')
- return node
- except stringprep_profiles.StringPrepError:
- raise InvalidJID('Invalid local part')
- def _validate_domain(domain):
- """Validate the domain portion of a JID.
- IP literal addresses are left as-is, if valid. Domain names
- are stripped of any trailing label separators (`.`), and are
- checked with the nameprep profile of stringprep. If the given
- domain is actually a punyencoded version of a domain name, it
- is converted back into its original Unicode form. Domains must
- also not start or end with a dash (`-`).
- :raises InvalidJID:
- :returns: The validated domain name
- """
- ip_addr = False
- # First, check if this is an IPv4 address
- try:
- socket.inet_aton(domain)
- ip_addr = True
- except socket.error:
- pass
- # Check if this is an IPv6 address
- if not ip_addr and hasattr(socket, 'inet_pton'):
- try:
- socket.inet_pton(socket.AF_INET6, domain.strip('[]'))
- domain = '[%s]' % domain.strip('[]')
- ip_addr = True
- except (socket.error, ValueError):
- pass
- if not ip_addr:
- # This is a domain name, which must be checked further
- if domain and domain[-1] == '.':
- domain = domain[:-1]
- domain_parts = []
- for label in domain.split('.'):
- try:
- label = encodings.idna.nameprep(label)
- encodings.idna.ToASCII(label)
- pass_nameprep = True
- except UnicodeError:
- pass_nameprep = False
- if not pass_nameprep:
- raise InvalidJID('Could not encode domain as ASCII')
- if label.startswith('xn--'):
- label = encodings.idna.ToUnicode(label)
- for char in label:
- if char in ILLEGAL_CHARS:
- raise InvalidJID('Domain contains illegal characters')
- if '-' in (label[0], label[-1]):
- raise InvalidJID('Domain started or ended with -')
- domain_parts.append(label)
- domain = '.'.join(domain_parts)
- if not domain:
- raise InvalidJID('Domain must not be 0 bytes')
- if len(domain) > 1023:
- raise InvalidJID('Domain must be less than 1024 bytes')
- return domain
- def _validate_resource(resource):
- """Validate the resource portion of a JID.
- :raises InvalidJID:
- :returns: The local portion of a JID, as validated by resourceprep.
- """
- try:
- if resource is not None:
- resource = resourceprep(resource)
- if not resource:
- raise InvalidJID('Resource must not be 0 bytes')
- if len(resource) > 1023:
- raise InvalidJID('Resource must be less than 1024 bytes')
- return resource
- except stringprep_profiles.StringPrepError:
- raise InvalidJID('Invalid resource')
- def _escape_node(node):
- """Escape the local portion of a JID."""
- result = []
- for i, char in enumerate(node):
- if char == '\\':
- if ''.join((node[i:i+3])) in JID_ESCAPE_SEQUENCES:
- result.append('\\5c')
- continue
- result.append(char)
- for i, char in enumerate(result):
- if char != '\\':
- result[i] = JID_ESCAPE_TRANSFORMATIONS.get(char, char)
- escaped = ''.join(result)
- if escaped.startswith('\\20') or escaped.endswith('\\20'):
- raise InvalidJID('Escaped local part starts or ends with "\\20"')
- _validate_node(escaped)
- return escaped
- def _unescape_node(node):
- """Unescape a local portion of a JID.
- .. note::
- The unescaped local portion is meant ONLY for presentation,
- and should not be used for other purposes.
- """
- unescaped = []
- seq = ''
- for i, char in enumerate(node):
- if char == '\\':
- seq = node[i:i+3]
- if seq not in JID_ESCAPE_SEQUENCES:
- seq = ''
- if seq:
- if len(seq) == 3:
- unescaped.append(JID_UNESCAPE_TRANSFORMATIONS.get(seq, char))
- # Pop character off the escape sequence, and ignore it
- seq = seq[1:]
- else:
- unescaped.append(char)
- unescaped = ''.join(unescaped)
- return unescaped
- def _format_jid(local=None, domain=None, resource=None):
- """Format the given JID components into a full or bare JID.
- :param string local: Optional. The local portion of the JID.
- :param string domain: Required. The domain name portion of the JID.
- :param strin resource: Optional. The resource portion of the JID.
- :return: A full or bare JID string.
- """
- result = []
- if local:
- result.append(local)
- result.append('@')
- if domain:
- result.append(domain)
- if resource:
- result.append('/')
- result.append(resource)
- return ''.join(result)
- class InvalidJID(ValueError):
- """
- Raised when attempting to create a JID that does not pass validation.
- It can also be raised if modifying an existing JID in such a way as
- to make it invalid, such trying to remove the domain from an existing
- full JID while the local and resource portions still exist.
- """
- # pylint: disable=R0903
- class UnescapedJID(object):
- """
- .. versionadded:: 1.1.10
- """
- def __init__(self, local, domain, resource):
- self._jid = (local, domain, resource)
- # pylint: disable=R0911
- def __getattr__(self, name):
- """Retrieve the given JID component.
- :param name: one of: user, server, domain, resource,
- full, or bare.
- """
- if name == 'resource':
- return self._jid[2] or ''
- elif name in ('user', 'username', 'local', 'node'):
- return self._jid[0] or ''
- elif name in ('server', 'domain', 'host'):
- return self._jid[1] or ''
- elif name in ('full', 'jid'):
- return _format_jid(*self._jid)
- elif name == 'bare':
- return _format_jid(self._jid[0], self._jid[1])
- elif name == '_jid':
- return getattr(super(JID, self), '_jid')
- else:
- return None
- def __str__(self):
- """Use the full JID as the string value."""
- return _format_jid(*self._jid)
- def __repr__(self):
- """Use the full JID as the representation."""
- return self.__str__()
- class JID(object):
- """
- A representation of a Jabber ID, or JID.
- Each JID may have three components: a user, a domain, and an optional
- resource. For example: user@domain/resource
- When a resource is not used, the JID is called a bare JID.
- The JID is a full JID otherwise.
- **JID Properties:**
- :jid: Alias for ``full``.
- :full: The string value of the full JID.
- :bare: The string value of the bare JID.
- :user: The username portion of the JID.
- :username: Alias for ``user``.
- :local: Alias for ``user``.
- :node: Alias for ``user``.
- :domain: The domain name portion of the JID.
- :server: Alias for ``domain``.
- :host: Alias for ``domain``.
- :resource: The resource portion of the JID.
- :param string jid:
- A string of the form ``'[user@]domain[/resource]'``.
- :param string local:
- Optional. Specify the local, or username, portion
- of the JID. If provided, it will override the local
- value provided by the `jid` parameter. The given
- local value will also be escaped if necessary.
- :param string domain:
- Optional. Specify the domain of the JID. If
- provided, it will override the domain given by
- the `jid` parameter.
- :param string resource:
- Optional. Specify the resource value of the JID.
- If provided, it will override the domain given
- by the `jid` parameter.
- :raises InvalidJID:
- """
- # pylint: disable=W0212
- def __init__(self, jid=None, **kwargs):
- locked = kwargs.get('cache_lock', False)
- in_local = kwargs.get('local', None)
- in_domain = kwargs.get('domain', None)
- in_resource = kwargs.get('resource', None)
- parts = None
- if in_local or in_domain or in_resource:
- parts = (in_local, in_domain, in_resource)
- # only check cache if there is a jid string, or parts, not if there
- # are both
- self._jid = None
- key = None
- if (jid is not None) and (parts is None):
- if isinstance(jid, JID):
- # it's already good to go, and there are no additions
- self._jid = jid._jid
- return
- key = jid
- self._jid, locked = JID_CACHE.get(jid, (None, locked))
- elif jid is None and parts is not None:
- key = parts
- self._jid, locked = JID_CACHE.get(parts, (None, locked))
- if not self._jid:
- if not jid:
- parsed_jid = (None, None, None)
- elif not isinstance(jid, JID):
- parsed_jid = _parse_jid(jid)
- else:
- parsed_jid = jid._jid
- local, domain, resource = parsed_jid
- if 'local' in kwargs:
- local = _escape_node(in_local)
- if 'domain' in kwargs:
- domain = _validate_domain(in_domain)
- if 'resource' in kwargs:
- resource = _validate_resource(in_resource)
- self._jid = (local, domain, resource)
- if key:
- _cache(key, self._jid, locked)
- def unescape(self):
- """Return an unescaped JID object.
- Using an unescaped JID is preferred for displaying JIDs
- to humans, and they should NOT be used for any other
- purposes than for presentation.
- :return: :class:`UnescapedJID`
- .. versionadded:: 1.1.10
- """
- return UnescapedJID(_unescape_node(self._jid[0]),
- self._jid[1],
- self._jid[2])
- def regenerate(self):
- """No-op
- .. deprecated:: 1.1.10
- """
- pass
- def reset(self, data):
- """Start fresh from a new JID string.
- :param string data: A string of the form ``'[user@]domain[/resource]'``.
- .. deprecated:: 1.1.10
- """
- self._jid = JID(data)._jid
- @property
- def resource(self):
- return self._jid[2] or ''
- @property
- def user(self):
- return self._jid[0] or ''
- @property
- def local(self):
- return self._jid[0] or ''
- @property
- def node(self):
- return self._jid[0] or ''
- @property
- def username(self):
- return self._jid[0] or ''
- @property
- def server(self):
- return self._jid[1] or ''
- @property
- def domain(self):
- return self._jid[1] or ''
- @property
- def host(self):
- return self._jid[1] or ''
- @property
- def full(self):
- return _format_jid(*self._jid)
- @property
- def jid(self):
- return _format_jid(*self._jid)
- @property
- def bare(self):
- return _format_jid(self._jid[0], self._jid[1])
- @resource.setter
- def resource(self, value):
- self._jid = JID(self, resource=value)._jid
- @user.setter
- def user(self, value):
- self._jid = JID(self, local=value)._jid
- @username.setter
- def username(self, value):
- self._jid = JID(self, local=value)._jid
- @local.setter
- def local(self, value):
- self._jid = JID(self, local=value)._jid
- @node.setter
- def node(self, value):
- self._jid = JID(self, local=value)._jid
- @server.setter
- def server(self, value):
- self._jid = JID(self, domain=value)._jid
- @domain.setter
- def domain(self, value):
- self._jid = JID(self, domain=value)._jid
- @host.setter
- def host(self, value):
- self._jid = JID(self, domain=value)._jid
- @full.setter
- def full(self, value):
- self._jid = JID(value)._jid
- @jid.setter
- def jid(self, value):
- self._jid = JID(value)._jid
- @bare.setter
- def bare(self, value):
- parsed = JID(value)._jid
- self._jid = (parsed[0], parsed[1], self._jid[2])
- def __str__(self):
- """Use the full JID as the string value."""
- return _format_jid(*self._jid)
- def __repr__(self):
- """Use the full JID as the representation."""
- return self.__str__()
- # pylint: disable=W0212
- def __eq__(self, other):
- """Two JIDs are equal if they have the same full JID value."""
- if isinstance(other, UnescapedJID):
- return False
- other = JID(other)
- return self._jid == other._jid
- # pylint: disable=W0212
- def __ne__(self, other):
- """Two JIDs are considered unequal if they are not equal."""
- return not self == other
- def __hash__(self):
- """Hash a JID based on the string version of its full JID."""
- return hash(self.__str__())
- def __copy__(self):
- """Generate a duplicate JID."""
- return JID(self)
- def __deepcopy__(self, memo):
- """Generate a duplicate JID."""
- return JID(deepcopy(str(self), memo))
|