codecs.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import copy
  2. import json
  3. import logging
  4. from collections import OrderedDict
  5. from django.utils.encoding import force_bytes
  6. from ruamel import yaml
  7. from . import openapi
  8. from .errors import SwaggerValidationError
  9. logger = logging.getLogger(__name__)
  10. def _validate_flex(spec):
  11. try:
  12. from flex.core import parse as validate_flex
  13. from flex.exceptions import ValidationError
  14. except ImportError:
  15. return
  16. try:
  17. validate_flex(spec)
  18. except ValidationError as ex:
  19. raise SwaggerValidationError(str(ex)) from ex
  20. def _validate_swagger_spec_validator(spec):
  21. from swagger_spec_validator.common import SwaggerValidationError as SSVErr
  22. from swagger_spec_validator.validator20 import validate_spec as validate_ssv
  23. try:
  24. validate_ssv(spec)
  25. except SSVErr as ex:
  26. raise SwaggerValidationError(str(ex)) from ex
  27. #:
  28. VALIDATORS = {
  29. 'flex': _validate_flex,
  30. 'ssv': _validate_swagger_spec_validator,
  31. }
  32. class _OpenAPICodec(object):
  33. media_type = None
  34. def __init__(self, validators):
  35. self._validators = validators
  36. @property
  37. def validators(self):
  38. """List of validator names to apply"""
  39. return self._validators
  40. def encode(self, document):
  41. """Transform an :class:`.Swagger` object to a sequence of bytes.
  42. Also performs validation and applies settings.
  43. :param openapi.Swagger document: Swagger spec object as generated by :class:`.OpenAPISchemaGenerator`
  44. :return: binary encoding of ``document``
  45. :rtype: bytes
  46. """
  47. if not isinstance(document, openapi.Swagger):
  48. raise TypeError('Expected a `openapi.Swagger` instance')
  49. spec = self.generate_swagger_object(document)
  50. errors = {}
  51. for validator in self.validators:
  52. try:
  53. # validate a deepcopy of the spec to prevent the validator from messing with it
  54. # for example, swagger_spec_validator adds an x-scope property to all references
  55. VALIDATORS[validator](copy.deepcopy(spec))
  56. except SwaggerValidationError as e:
  57. errors[validator] = str(e)
  58. if errors:
  59. exc = SwaggerValidationError("spec validation failed: {}".format(errors), errors, spec, self)
  60. logger.warning(str(exc))
  61. raise exc
  62. return force_bytes(self._dump_dict(spec))
  63. def encode_error(self, err):
  64. """Dump an error message into an encoding-appropriate sequence of bytes"""
  65. return force_bytes(self._dump_dict(err))
  66. def _dump_dict(self, spec):
  67. """Dump the given dictionary into its string representation.
  68. :param dict spec: a python dict
  69. :return: string representation of ``spec``
  70. :rtype: str or bytes
  71. """
  72. raise NotImplementedError("override this method")
  73. def generate_swagger_object(self, swagger):
  74. """Generates the root Swagger object.
  75. :param openapi.Swagger swagger: Swagger spec object as generated by :class:`.OpenAPISchemaGenerator`
  76. :return: swagger spec as dict
  77. :rtype: OrderedDict
  78. """
  79. return swagger.as_odict()
  80. class OpenAPICodecJson(_OpenAPICodec):
  81. media_type = 'application/json'
  82. def __init__(self, validators, pretty=False, media_type='application/json'):
  83. super(OpenAPICodecJson, self).__init__(validators)
  84. self.pretty = pretty
  85. self.media_type = media_type
  86. def _dump_dict(self, spec):
  87. """Dump ``spec`` into JSON.
  88. :rtype: str"""
  89. if self.pretty:
  90. out = json.dumps(spec, indent=4, separators=(',', ': '), ensure_ascii=False)
  91. if out[-1] != '\n':
  92. out += '\n'
  93. return out
  94. else:
  95. return json.dumps(spec, ensure_ascii=False)
  96. YAML_MAP_TAG = u'tag:yaml.org,2002:map'
  97. class SaneYamlDumper(yaml.SafeDumper):
  98. """YamlDumper class usable for dumping ``OrderedDict`` and list instances in a standard way."""
  99. def ignore_aliases(self, data):
  100. """Disable YAML references."""
  101. return True
  102. def increase_indent(self, flow=False, indentless=False, **kwargs):
  103. """https://stackoverflow.com/a/39681672
  104. Indent list elements.
  105. """
  106. return super(SaneYamlDumper, self).increase_indent(flow=flow, indentless=False, **kwargs)
  107. def represent_odict(self, mapping, flow_style=None): # pragma: no cover
  108. """https://gist.github.com/miracle2k/3184458
  109. Make PyYAML output an OrderedDict.
  110. It will do so fine if you use yaml.dump(), but that generates ugly, non-standard YAML code.
  111. To use yaml.safe_dump(), you need the following.
  112. """
  113. tag = YAML_MAP_TAG
  114. value = []
  115. node = yaml.MappingNode(tag, value, flow_style=flow_style)
  116. if self.alias_key is not None:
  117. self.represented_objects[self.alias_key] = node
  118. best_style = True
  119. if hasattr(mapping, 'items'):
  120. mapping = mapping.items()
  121. for item_key, item_value in mapping:
  122. node_key = self.represent_data(item_key)
  123. node_value = self.represent_data(item_value)
  124. if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style):
  125. best_style = False
  126. if not (isinstance(node_value, yaml.ScalarNode) and not node_value.style):
  127. best_style = False
  128. value.append((node_key, node_value))
  129. if flow_style is None:
  130. if self.default_flow_style is not None:
  131. node.flow_style = self.default_flow_style
  132. else:
  133. node.flow_style = best_style
  134. return node
  135. def represent_text(self, text):
  136. if "\n" in text:
  137. return self.represent_scalar('tag:yaml.org,2002:str', text, style='|')
  138. return self.represent_scalar('tag:yaml.org,2002:str', text)
  139. SaneYamlDumper.add_representer(bytes, SaneYamlDumper.represent_text)
  140. SaneYamlDumper.add_representer(str, SaneYamlDumper.represent_text)
  141. SaneYamlDumper.add_representer(OrderedDict, SaneYamlDumper.represent_odict)
  142. SaneYamlDumper.add_multi_representer(OrderedDict, SaneYamlDumper.represent_odict)
  143. def yaml_sane_dump(data, binary):
  144. """Dump the given data dictionary into a sane format:
  145. * OrderedDicts are dumped as regular mappings instead of non-standard !!odict
  146. * multi-line mapping style instead of json-like inline style
  147. * list elements are indented into their parents
  148. * YAML references/aliases are disabled
  149. :param dict data: the data to be dumped
  150. :param bool binary: True to return a utf-8 encoded binary object, False to return a string
  151. :return: the serialized YAML
  152. :rtype: str or bytes
  153. """
  154. return yaml.dump(
  155. data,
  156. Dumper=SaneYamlDumper,
  157. default_flow_style=False,
  158. encoding='utf-8' if binary else None,
  159. allow_unicode=binary
  160. )
  161. class SaneYamlLoader(yaml.SafeLoader):
  162. def construct_odict(self, node, deep=False):
  163. self.flatten_mapping(node)
  164. return OrderedDict(self.construct_pairs(node))
  165. SaneYamlLoader.add_constructor(YAML_MAP_TAG, SaneYamlLoader.construct_odict)
  166. def yaml_sane_load(stream):
  167. """Load the given YAML stream while preserving the input order for mapping items.
  168. :param stream: YAML stream (can be a string or a file-like object)
  169. :rtype: OrderedDict
  170. """
  171. return yaml.load(stream, Loader=SaneYamlLoader)
  172. class OpenAPICodecYaml(_OpenAPICodec):
  173. media_type = 'application/yaml'
  174. def __init__(self, validators, media_type='application/yaml'):
  175. super(OpenAPICodecYaml, self).__init__(validators)
  176. self.media_type = media_type
  177. def _dump_dict(self, spec):
  178. """Dump ``spec`` into YAML.
  179. :rtype: bytes"""
  180. return yaml_sane_dump(spec, binary=True)