123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- from collections import namedtuple
- from coreschema.compat import text_types, numeric_types
- from coreschema.formats import validate_format
- from coreschema.utils import uniq
- import re
- Error = namedtuple('Error', ['text', 'index'])
- def push_index(errors, key):
- return [
- Error(error.text, [key] + error.index)
- for error in errors
- ]
- # TODO: Properties as OrderedDict if from list of tuples.
- # TODO: null keyword / Nullable
- # TODO: dependancies
- # TODO: remote ref
- # TODO: remaining formats
- # LATER: Enum display values
- # LATER: File
- # LATER: strict, coerce float etc...
- # LATER: decimals
- # LATER: override errors
- class Schema(object):
- errors = {}
- def __init__(self, title='', description='', default=None):
- self.title = title
- self.description = description
- self.default = default
- def make_error(self, code):
- error_string = self.errors[code]
- params = self.__dict__
- return Error(error_string.format(**params), [])
- def __or__(self, other):
- if isinstance(self, Union):
- self_children = self.children
- else:
- self_children = [self]
- if isinstance(other, Union):
- other_children = other.children
- else:
- other_children = [other]
- return Union(self_children + other_children)
- def __and__(self, other):
- if isinstance(self, Intersection):
- self_children = self.children
- else:
- self_children = [self]
- if isinstance(other, Intersection):
- other_children = other.children
- else:
- other_children = [other]
- return Intersection(self_children + other_children)
- def __xor__(self, other):
- return ExclusiveUnion([self, other])
- def __invert__(self):
- return Not(self)
- def __eq__(self, other):
- return (
- self.__class__ == other.__class__ and
- self.__dict__ == other.__dict__
- )
- class Object(Schema):
- errors = {
- 'type': 'Must be an object.',
- 'invalid_key': 'Object keys must be strings.',
- 'empty': 'Must not be empty.',
- 'required': 'This field is required.',
- 'max_properties': 'Must have no more than {max_properties} properties.',
- 'min_properties': 'Must have at least {min_properties} properties.',
- 'invalid_property': 'Invalid property.'
- }
- def __init__(self, properties=None, required=None, max_properties=None, min_properties=None, pattern_properties=None, additional_properties=True, **kwargs):
- super(Object, self).__init__(**kwargs)
- if isinstance(additional_properties, bool):
- # Handle `additional_properties` set to a boolean.
- self.additional_properties_schema = Anything()
- else:
- # Handle `additional_properties` set to a schema.
- self.additional_properties_schema = additional_properties
- additional_properties = True
- self.properties = properties
- self.required = required or []
- self.max_properties = max_properties
- self.min_properties = min_properties
- self.pattern_properties = pattern_properties
- self.additional_properties = additional_properties
- # Compile pattern regexes.
- self.pattern_properties_regex = None
- if pattern_properties is not None:
- self.pattern_properties_regex = {
- re.compile(key): value
- for key, value
- in pattern_properties.items()
- }
- def validate(self, value, context=None):
- if not isinstance(value, dict):
- return [self.make_error('type')]
- errors = []
- if any(not isinstance(key, text_types) for key in value.keys()):
- errors += [self.make_error('invalid_key')]
- if self.required is not None:
- for key in self.required:
- if key not in value:
- error_items = [self.make_error('required')]
- errors += push_index(error_items, key)
- if self.min_properties is not None:
- if len(value) < self.min_properties:
- if self.min_properties == 1:
- errors += [self.make_error('empty')]
- else:
- errors += [self.make_error('min_properties')]
- if self.max_properties is not None:
- if len(value) > self.max_properties:
- errors += [self.make_error('max_properties')]
- # Properties
- remaining_keys = set(value.keys())
- if self.properties is not None:
- remaining_keys -= set(self.properties.keys())
- for key, property_item in self.properties.items():
- if key not in value:
- continue
- error_items = property_item.validate(value[key], context)
- errors += push_index(error_items, key)
- # Pattern properties
- if self.pattern_properties is not None:
- for key in list(remaining_keys):
- for pattern, schema in self.pattern_properties_regex.items():
- if re.search(pattern, key):
- error_items = schema.validate(value[key], context)
- errors += push_index(error_items, key)
- remaining_keys.discard(key)
- # Additional properties
- if self.additional_properties:
- for key in remaining_keys:
- error_items = self.additional_properties_schema.validate(value[key], context)
- errors += push_index(error_items, key)
- else:
- for key in remaining_keys:
- error_items = [self.make_error('invalid_property')]
- errors += push_index(error_items, key)
- return errors
- class Array(Schema):
- errors = {
- 'type': 'Must be an array.',
- 'empty': 'Must not be empty.',
- 'max_items': 'Must have no more than {max_items} items.',
- 'min_items': 'Must have at least {min_items} items.',
- 'unique': 'Must not contain duplicate items.'
- }
- def __init__(self, items=None, max_items=None, min_items=None, unique_items=False, additional_items=True, **kwargs):
- super(Array, self).__init__(**kwargs)
- if items is None:
- items = Anything()
- if isinstance(items, list) and additional_items is False:
- # Setting additional_items==False implies a value for max_items.
- if max_items is None or max_items > len(items):
- max_items = len(items)
- self.items = items
- self.max_items = max_items
- self.min_items = min_items
- self.unique_items = unique_items
- self.additional_items = additional_items
- def validate(self, value, context=None):
- if not isinstance(value, list):
- return [self.make_error('type')]
- errors = []
- if self.items is not None:
- child_schema = self.items
- is_list = isinstance(self.items, list)
- for idx, item in enumerate(value):
- if is_list:
- # Case where `items` is a list of schemas.
- if idx < len(self.items):
- # Handle each item in the list.
- child_schema = self.items[idx]
- else:
- # Handle any additional items.
- if isinstance(self.additional_items, bool):
- break
- else:
- child_schema = self.additional_items
- error_items = child_schema.validate(item, context)
- errors += push_index(error_items, idx)
- if self.min_items is not None:
- if len(value) < self.min_items:
- if self.min_items == 1:
- errors += [self.make_error('empty')]
- else:
- errors += [self.make_error('min_items')]
- if self.max_items is not None:
- if len(value) > self.max_items:
- errors += [self.make_error('max_items')]
- if self.unique_items:
- if not(uniq(value)):
- errors += [self.make_error('unique')]
- return errors
- class Number(Schema):
- integer_only = False
- errors = {
- 'type': 'Must be a number.',
- 'minimum': 'Must be greater than or equal to {minimum}.',
- 'exclusive_minimum': 'Must be greater than {minimum}.',
- 'maximum': 'Must be less than or equal to {maximum}.',
- 'exclusive_maximum': 'Must be less than {maximum}.',
- 'multiple_of': 'Must be a multiple of {multiple_of}.',
- }
- def __init__(self, minimum=None, maximum=None, exclusive_minimum=False, exclusive_maximum=False, multiple_of=None, **kwargs):
- super(Number, self).__init__(**kwargs)
- self.minimum = minimum
- self.maximum = maximum
- self.exclusive_minimum = exclusive_minimum
- self.exclusive_maximum = exclusive_maximum
- self.multiple_of = multiple_of
- def validate(self, value, context=None):
- if isinstance(value, bool):
- # In Python `bool` subclasses `int`, so handle that case explicitly.
- return [self.make_error('type')]
- if not isinstance(value, numeric_types):
- return [self.make_error('type')]
- if self.integer_only and isinstance(value, float) and not value.is_integer():
- return [self.make_error('type')]
- errors = []
- if self.minimum is not None:
- if self.exclusive_minimum:
- if value <= self.minimum:
- errors += [self.make_error('exclusive_minimum')]
- else:
- if value < self.minimum:
- errors += [self.make_error('minimum')]
- if self.maximum is not None:
- if self.exclusive_maximum:
- if value >= self.maximum:
- errors += [self.make_error('exclusive_maximum')]
- else:
- if value > self.maximum:
- errors += [self.make_error('maximum')]
- if self.multiple_of is not None:
- if isinstance(self.multiple_of, float):
- failed = not (float(value) / self.multiple_of).is_integer()
- else:
- failed = value % self.multiple_of
- if failed:
- errors += [self.make_error('multiple_of')]
- return errors
- class Integer(Number):
- errors = {
- 'type': 'Must be an integer.',
- 'minimum': 'Must be greater than or equal to {minimum}.',
- 'exclusive_minimum': 'Must be greater than {minimum}.',
- 'maximum': 'Must be less than or equal to {maximum}.',
- 'exclusive_maximum': 'Must be less than {maximum}.',
- 'multiple_of': 'Must be a multiple of {multiple_of}.',
- }
- integer_only = True
- class String(Schema):
- errors = {
- 'type': 'Must be a string.',
- 'blank': 'Must not be blank.',
- 'max_length': 'Must have no more than {max_length} characters.',
- 'min_length': 'Must have at least {min_length} characters.',
- 'pattern': 'Must match the pattern /{pattern}/.',
- 'format': 'Must be a valid {format}.',
- }
- def __init__(self, max_length=None, min_length=None, pattern=None, format=None, **kwargs):
- super(String, self).__init__(**kwargs)
- self.max_length = max_length
- self.min_length = min_length
- self.pattern = pattern
- self.format = format
- self.pattern_regex = None
- if self.pattern is not None:
- self.pattern_regex = re.compile(pattern)
- def validate(self, value, context=None):
- if not isinstance(value, text_types):
- return [self.make_error('type')]
- errors = []
- if self.min_length is not None:
- if len(value) < self.min_length:
- if self.min_length == 1:
- errors += [self.make_error('blank')]
- else:
- errors += [self.make_error('min_length')]
- if self.max_length is not None:
- if len(value) > self.max_length:
- errors += [self.make_error('max_length')]
- if self.pattern is not None:
- if not re.search(self.pattern_regex, value):
- errors += [self.make_error('pattern')]
- if self.format is not None:
- if not validate_format(value, self.format):
- errors += [self.make_error('format')]
- return errors
- class Boolean(Schema):
- errors = {
- 'type': 'Must be a boolean.'
- }
- def validate(self, value, context=None):
- if not isinstance(value, bool):
- return [self.make_error('type')]
- return []
- class Null(Schema):
- errors = {
- 'type': 'Must be null.'
- }
- def validate(self, value, context=None):
- if value is not None:
- return [self.make_error('type')]
- return []
- class Enum(Schema):
- errors = {
- 'enum': 'Must be one of {enum}.',
- 'exact': 'Must be {exact}.',
- }
- def __init__(self, enum, **kwargs):
- super(Enum, self).__init__(**kwargs)
- self.enum = enum
- if len(enum) == 1:
- self.exact = repr(enum[0])
- def validate(self, value, context=None):
- if value not in self.enum:
- if len(self.enum) == 1:
- return [self.make_error('exact')]
- return [self.make_error('enum')]
- return []
- class Anything(Schema):
- errors = {
- 'type': 'Must be a valid primitive type.'
- }
- types = text_types + (dict, list, int, float, bool, type(None))
- def validate(self, value, context=None):
- if not isinstance(value, self.types):
- return [self.make_error('type')]
- errors = []
- if isinstance(value, list):
- schema = Array()
- errors += schema.validate(value, context)
- elif isinstance(value, dict):
- schema = Object()
- errors += schema.validate(value, context)
- return errors
- # Composites
- class Union(Schema):
- errors = {
- 'match': 'Must match one of the options.'
- }
- def __init__(self, children, **kwargs):
- super(Union, self).__init__(**kwargs)
- self.children = children
- def validate(self, value, context=None):
- for child in self.children:
- if child.validate(value, context) == []:
- return []
- return [self.make_error('match')]
- class Intersection(Schema):
- def __init__(self, children, **kwargs):
- super(Intersection, self).__init__(**kwargs)
- self.children = children
- def validate(self, value, context=None):
- errors = []
- for child in self.children:
- errors.extend(child.validate(value, context))
- return errors
- class ExclusiveUnion(Schema):
- errors = {
- 'match': 'Must match one of the options.',
- 'match_only_one': 'Must match only one of the options.'
- }
- def __init__(self, children, **kwargs):
- super(ExclusiveUnion, self).__init__(**kwargs)
- self.children = children
- def validate(self, value, context=None):
- matches = 0
- for child in self.children:
- if child.validate(value, context) == []:
- matches += 1
- if not matches:
- return [self.make_error('match')]
- elif matches > 1:
- return [self.make_error('match_only_one')]
- return []
- class Not(Schema):
- errors = {
- 'must_not_match': 'Must not match the option.'
- }
- def __init__(self, child, **kwargs):
- super(Not, self).__init__(**kwargs)
- self.child = child
- def validate(self, value, context=None):
- errors = []
- if self.child.validate(value, context):
- return []
- return [self.make_error('must_not_match')]
- # References
- class Ref(Schema):
- def __init__(self, ref_name):
- self.ref_name = ref_name
- def dereference(self, context):
- assert isinstance(context, dict)
- assert 'refs' in context
- assert self.ref_name in context['refs']
- return context['refs'][self.ref_name]
- def validate(self, value, context=None):
- schema = self.dereference(context)
- return schema.validate(value, context)
- class RefSpace(Schema):
- def __init__(self, refs, root):
- assert root in refs
- self.refs = refs
- self.root = root
- self.root_validator = refs[root]
- def validate(self, value):
- context = {'refs': self.refs}
- return self.root_validator.validate(value, context)
|