123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755 |
- import collections
- import logging
- import re
- import urllib.parse as urlparse
- from collections import OrderedDict
- from django.urls import get_script_prefix
- from django.utils.functional import Promise
- from inflection import camelize
- from .utils import dict_has_ordered_keys, filter_none, force_real_str
- try:
- from collections import abc as collections_abc
- except ImportError:
- collections_abc = collections
- logger = logging.getLogger(__name__)
- TYPE_OBJECT = "object" #:
- TYPE_STRING = "string" #:
- TYPE_NUMBER = "number" #:
- TYPE_INTEGER = "integer" #:
- TYPE_BOOLEAN = "boolean" #:
- TYPE_ARRAY = "array" #:
- TYPE_FILE = "file" #:
- # officially supported by Swagger 2.0 spec
- FORMAT_DATE = "date" #:
- FORMAT_DATETIME = "date-time" #:
- FORMAT_PASSWORD = "password" #:
- FORMAT_BINARY = "binary" #:
- FORMAT_BASE64 = "bytes" #:
- FORMAT_FLOAT = "float" #:
- FORMAT_DOUBLE = "double" #:
- FORMAT_INT32 = "int32" #:
- FORMAT_INT64 = "int64" #:
- # defined in JSON-schema
- FORMAT_EMAIL = "email" #:
- FORMAT_IPV4 = "ipv4" #:
- FORMAT_IPV6 = "ipv6" #:
- FORMAT_URI = "uri" #:
- # pulled out of my ass
- FORMAT_UUID = "uuid" #:
- FORMAT_SLUG = "slug" #:
- FORMAT_DECIMAL = "decimal"
- IN_BODY = 'body' #:
- IN_PATH = 'path' #:
- IN_QUERY = 'query' #:
- IN_FORM = 'formData' #:
- IN_HEADER = 'header' #:
- SCHEMA_DEFINITIONS = 'definitions' #:
- def make_swagger_name(attribute_name):
- """
- Convert a python variable name into a Swagger spec attribute name.
- In particular,
- * if name starts with ``x_``, return ``x-{camelCase}``
- * if name is ``ref``, return ``$ref``
- * else return the name converted to camelCase, with trailing underscores stripped
- :param str attribute_name: python attribute name
- :return: swagger name
- """
- if attribute_name == 'ref':
- return "$ref"
- if attribute_name.startswith("x_"):
- return "x-" + camelize(attribute_name[2:], uppercase_first_letter=False)
- return camelize(attribute_name.rstrip('_'), uppercase_first_letter=False)
- def _bare_SwaggerDict(cls):
- assert issubclass(cls, SwaggerDict)
- result = cls.__new__(cls)
- OrderedDict.__init__(result) # no __init__ called for SwaggerDict subclasses!
- return result
- class SwaggerDict(OrderedDict):
- """A particular type of OrderedDict, which maps all attribute accesses to dict lookups using
- :func:`.make_swagger_name`. Attribute names starting with ``_`` are set on the object as-is and are not included
- in the specification output.
- Used as a base class for all Swagger helper models.
- """
- def __init__(self, **attrs):
- super(SwaggerDict, self).__init__()
- self._extras__ = attrs
- if type(self) == SwaggerDict:
- self._insert_extras__()
- def __setattr__(self, key, value):
- if key.startswith('_'):
- super(SwaggerDict, self).__setattr__(key, value)
- return
- if value is not None:
- self[make_swagger_name(key)] = value
- def __getattr__(self, item):
- if item.startswith('_'):
- raise AttributeError
- try:
- return self[make_swagger_name(item)]
- except KeyError:
- # raise_from is EXTREMELY slow, replaced with plain raise
- raise AttributeError("object of class " + type(self).__name__ + " has no attribute " + item)
- def __delattr__(self, item):
- if item.startswith('_'):
- super(SwaggerDict, self).__delattr__(item)
- return
- del self[make_swagger_name(item)]
- def _insert_extras__(self):
- """
- From an ordering perspective, it is desired that extra attributes such as vendor extensions stay at the
- bottom of the object. However, python2.7's OrderedDict craps out if you try to insert into it before calling
- init. This means that subclasses must call super().__init__ as the first statement of their own __init__,
- which would result in the extra attributes being added first. For this reason, we defer the insertion of the
- attributes and require that subclasses call ._insert_extras__ at the end of their __init__ method.
- """
- for attr, val in sorted(self._extras__.items()):
- setattr(self, attr, val)
- @staticmethod
- def _as_odict(obj, memo):
- """Implementation detail of :meth:`.as_odict`"""
- if id(obj) in memo:
- return memo[id(obj)]
- if isinstance(obj, Promise) and hasattr(obj, '_proxy____cast'):
- # handle __proxy__ objects from django.utils.functional.lazy
- obj = obj._proxy____cast()
- if isinstance(obj, collections_abc.Mapping):
- result = OrderedDict()
- memo[id(obj)] = result
- items = obj.items()
- if not dict_has_ordered_keys(obj):
- items = sorted(items)
- for attr, val in items:
- result[attr] = SwaggerDict._as_odict(val, memo)
- return result
- elif isinstance(obj, str):
- return force_real_str(obj)
- elif isinstance(obj, collections_abc.Iterable) and not isinstance(obj, collections_abc.Iterator):
- return type(obj)(SwaggerDict._as_odict(elem, memo) for elem in obj)
- return obj
- def as_odict(self):
- """Convert this object into an ``OrderedDict`` instance.
- :rtype: OrderedDict
- """
- return SwaggerDict._as_odict(self, {})
- def __reduce__(self):
- # for pickle support; this skips calls to all SwaggerDict __init__ methods and relies
- # on the already set attributes instead
- attrs = {k: v for k, v in vars(self).items() if not k.startswith('_NP_')}
- return _bare_SwaggerDict, (type(self),), attrs, None, iter(self.items())
- class Contact(SwaggerDict):
- def __init__(self, name=None, url=None, email=None, **extra):
- """Swagger Contact object
- At least one of the following fields is required:
- :param str name: contact name
- :param str url: contact url
- :param str email: contact e-mail
- """
- super(Contact, self).__init__(**extra)
- if name is None and url is None and email is None:
- raise AssertionError("one of name, url or email is requires for Swagger Contact object")
- self.name = name
- self.url = url
- self.email = email
- self._insert_extras__()
- class License(SwaggerDict):
- def __init__(self, name, url=None, **extra):
- """Swagger License object
- :param str name: Required. License name
- :param str url: link to detailed license information
- """
- super(License, self).__init__(**extra)
- if name is None:
- raise AssertionError("name is required for Swagger License object")
- self.name = name
- self.url = url
- self._insert_extras__()
- class Info(SwaggerDict):
- def __init__(self, title, default_version, description=None, terms_of_service=None, contact=None, license=None,
- **extra):
- """Swagger Info object
- :param str title: Required. API title.
- :param str default_version: Required. API version string (not to be confused with Swagger spec version)
- :param str description: API description; markdown supported
- :param str terms_of_service: API terms of service; should be a URL
- :param Contact contact: contact object
- :param License license: license object
- """
- super(Info, self).__init__(**extra)
- if title is None or default_version is None:
- raise AssertionError("title and version are required for Swagger info object")
- if contact is not None and not isinstance(contact, Contact):
- raise AssertionError("contact must be a Contact object")
- if license is not None and not isinstance(license, License):
- raise AssertionError("license must be a License object")
- self.title = title
- self._default_version = default_version
- self.description = description
- self.terms_of_service = terms_of_service
- self.contact = contact
- self.license = license
- self._insert_extras__()
- class Swagger(SwaggerDict):
- def __init__(self, info=None, _url=None, _prefix=None, _version=None, consumes=None, produces=None,
- security_definitions=None, security=None, paths=None, definitions=None, **extra):
- """Root Swagger object.
- :param .Info info: info object
- :param str _url: URL used for setting the API host and scheme
- :param str _prefix: api path prefix to use in setting basePath; this will be appended to the wsgi
- SCRIPT_NAME prefix or Django's FORCE_SCRIPT_NAME if applicable
- :param str _version: version string to override Info
- :param dict[str,dict] security_definitions: list of supported authentication mechanisms
- :param list[dict[str,list[str]]] security: authentication mechanisms accepted globally
- :param list[str] consumes: consumed MIME types; can be overridden in Operation
- :param list[str] produces: produced MIME types; can be overridden in Operation
- :param Paths paths: paths object
- :param dict[str,Schema] definitions: named models
- """
- super(Swagger, self).__init__(**extra)
- self.swagger = '2.0'
- self.info = info
- self.info.version = _version or info._default_version
- if _url:
- url = urlparse.urlparse(_url)
- assert url.netloc and url.scheme, "if given, url must have both schema and netloc"
- self.host = url.netloc
- self.schemes = [url.scheme]
- self.base_path = self.get_base_path(get_script_prefix(), _prefix)
- self.consumes = consumes
- self.produces = produces
- self.security_definitions = filter_none(security_definitions)
- self.security = filter_none(security)
- self.paths = paths
- self.definitions = filter_none(definitions)
- self._insert_extras__()
- @classmethod
- def get_base_path(cls, script_prefix, api_prefix):
- """Determine an appropriate value for ``basePath`` based on the SCRIPT_NAME and the api common prefix.
- :param str script_prefix: script prefix as defined by django ``get_script_prefix``
- :param str api_prefix: api common prefix
- :return: joined base path
- """
- # avoid double slash when joining script_name with api_prefix
- if script_prefix and script_prefix.endswith('/'):
- script_prefix = script_prefix[:-1]
- if not api_prefix.startswith('/'):
- api_prefix = '/' + api_prefix
- base_path = script_prefix + api_prefix
- # ensure that the base path has a leading slash and no trailing slash
- if base_path and base_path.endswith('/'):
- base_path = base_path[:-1]
- if not base_path.startswith('/'):
- base_path = '/' + base_path
- return base_path
- class Paths(SwaggerDict):
- def __init__(self, paths, **extra):
- """A listing of all the paths in the API.
- :param dict[str,PathItem] paths:
- """
- super(Paths, self).__init__(**extra)
- for path, path_obj in paths.items():
- assert path.startswith("/")
- if path_obj is not None: # pragma: no cover
- self[path] = path_obj
- self._insert_extras__()
- class PathItem(SwaggerDict):
- OPERATION_NAMES = ['get', 'put', 'post', 'delete', 'options', 'head', 'patch']
- def __init__(self, get=None, put=None, post=None, delete=None, options=None,
- head=None, patch=None, parameters=None, **extra):
- """Information about a single path
- :param Operation get: operation for GET
- :param Operation put: operation for PUT
- :param Operation post: operation for POST
- :param Operation delete: operation for DELETE
- :param Operation options: operation for OPTIONS
- :param Operation head: operation for HEAD
- :param Operation patch: operation for PATCH
- :param list[Parameter] parameters: parameters that apply to all operations
- """
- super(PathItem, self).__init__(**extra)
- self.get = get
- self.head = head
- self.post = post
- self.put = put
- self.patch = patch
- self.delete = delete
- self.options = options
- self.parameters = filter_none(parameters)
- self._insert_extras__()
- @property
- def operations(self):
- """A list of all standard Operations on this PathItem object. See :attr:`.OPERATION_NAMES`.
- :return: list of (method name, Operation) tuples
- :rtype: list[tuple[str,Operation]]
- """
- return [(k, v) for k, v in self.items() if k in PathItem.OPERATION_NAMES and v]
- class Operation(SwaggerDict):
- def __init__(self, operation_id, responses, parameters=None, consumes=None, produces=None, summary=None,
- description=None, tags=None, security=None, **extra):
- """Information about an API operation (path + http method combination)
- :param str operation_id: operation ID, should be unique across all operations
- :param Responses responses: responses returned
- :param list[Parameter] parameters: parameters accepted
- :param list[str] consumes: content types accepted
- :param list[str] produces: content types produced
- :param str summary: operation summary; should be < 120 characters
- :param str description: operation description; can be of any length and supports markdown
- :param list[str] tags: operation tags
- :param list[dict[str,list[str]]] security: list of security requirements
- """
- super(Operation, self).__init__(**extra)
- self.operation_id = operation_id
- self.summary = summary
- self.description = description
- self.parameters = filter_none(parameters)
- self.responses = responses
- self.consumes = filter_none(consumes)
- self.produces = filter_none(produces)
- self.tags = filter_none(tags)
- self.security = filter_none(security)
- self._insert_extras__()
- def _check_type(type, format, enum, pattern, items, _obj_type):
- if items and type != TYPE_ARRAY:
- raise AssertionError("items can only be used when type is array")
- if type == TYPE_ARRAY and not items:
- raise AssertionError("TYPE_ARRAY requires the items attribute")
- if pattern and type != TYPE_STRING:
- raise AssertionError("pattern can only be used when type is string")
- if (format or enum or pattern) and type in (TYPE_OBJECT, TYPE_ARRAY, None):
- raise AssertionError("[format, enum, pattern] can only be applied to primitive " + _obj_type)
- class Items(SwaggerDict):
- def __init__(self, type=None, format=None, enum=None, pattern=None, items=None, **extra):
- """Used when defining an array :class:`.Parameter` to describe the array elements.
- :param str type: type of the array elements; must not be ``object``
- :param str format: value format, see OpenAPI spec
- :param list enum: restrict possible values
- :param str pattern: pattern if type is ``string``
- :param .Items items: only valid if `type` is ``array``
- """
- super(Items, self).__init__(**extra)
- assert type is not None, "type is required!"
- self.type = type
- self.format = format
- self.enum = enum
- self.pattern = pattern
- self.items_ = items
- self._insert_extras__()
- _check_type(type, format, enum, pattern, items, self.__class__)
- class Parameter(SwaggerDict):
- def __init__(self, name, in_, description=None, required=None, schema=None,
- type=None, format=None, enum=None, pattern=None, items=None, default=None, **extra):
- """Describe parameters accepted by an :class:`.Operation`. Each parameter should be a unique combination of
- (`name`, `in_`). ``body`` and ``form`` parameters in the same operation are mutually exclusive.
- :param str name: parameter name
- :param str in_: parameter location
- :param str description: parameter description
- :param bool required: whether the parameter is required for the operation
- :param schema: required if `in_` is ``body``
- :type schema: Schema or SchemaRef
- :param str type: parameter type; required if `in_` is not ``body``; must not be ``object``
- :param str format: value format, see OpenAPI spec
- :param list enum: restrict possible values
- :param str pattern: pattern if type is ``string``
- :param .Items items: only valid if `type` is ``array``
- :param default: default value if the parameter is not provided; must conform to parameter type
- """
- super(Parameter, self).__init__(**extra)
- self.name = name
- self.in_ = in_
- self.description = description
- self.required = required
- self.schema = schema
- self.type = type
- self.format = format
- self.enum = enum
- self.pattern = pattern
- self.items_ = items
- self.default = default
- self._insert_extras__()
- if (not schema and not type) or (schema and type):
- raise AssertionError("either schema or type are required for Parameter object (not both)!")
- if schema and isinstance(schema, Schema):
- schema._remove_read_only()
- if self['in'] == IN_PATH:
- # path parameters must always be required
- assert required is not False, "path parameter cannot be optional"
- self.required = True
- if self['in'] != IN_BODY and schema is not None:
- raise AssertionError("schema can only be applied to a body Parameter, not %s" % type)
- if default and not type:
- raise AssertionError("default can only be applied to a non-body Parameter")
- _check_type(type, format, enum, pattern, items, self.__class__)
- class Schema(SwaggerDict):
- OR_REF = () #: useful for type-checking, e.g ``isinstance(obj, openapi.Schema.OR_REF)``
- def __init__(self, title=None, description=None, type=None, format=None, enum=None, pattern=None, properties=None,
- additional_properties=None, required=None, items=None, default=None, read_only=None, **extra):
- """Describes a complex object accepted as parameter or returned as a response.
- :param str title: schema title
- :param str description: schema description
- :param str type: value type; required
- :param str format: value format, see OpenAPI spec
- :param list enum: restrict possible values
- :param str pattern: pattern if type is ``string``
- :param properties: object properties; required if `type` is ``object``
- :type properties: dict[str,Schema or SchemaRef]
- :param additional_properties: allow wildcard properties not listed in `properties`
- :type additional_properties: bool or Schema or SchemaRef
- :param list[str] required: list of required property names
- :param items: type of array items, only valid if `type` is ``array``
- :type items: Schema or SchemaRef
- :param default: only valid when insider another ``Schema``\\ 's ``properties``;
- the default value of this property if it is not provided, must conform to the type of this Schema
- :param read_only: only valid when insider another ``Schema``\\ 's ``properties``;
- declares the property as read only - it must only be sent as part of responses, never in requests
- """
- super(Schema, self).__init__(**extra)
- if required is True or required is False:
- # common error
- raise AssertionError("the `required` attribute of schema must be an "
- "array of required property names, not a boolean!")
- assert type, "type is required!"
- self.title = title
- self.description = description
- self.required = filter_none(required)
- self.type = type
- self.properties = filter_none(properties)
- self.additional_properties = additional_properties
- self.format = format
- self.enum = enum
- self.pattern = pattern
- self.items_ = items
- self.read_only = read_only
- self.default = default
- self._insert_extras__()
- if (properties or (additional_properties is not None)) and type != TYPE_OBJECT:
- raise AssertionError("only object Schema can have properties")
- _check_type(type, format, enum, pattern, items, self.__class__)
- def _remove_read_only(self):
- # readOnly is only valid for Schemas inside another Schema's properties;
- # when placing Schema elsewhere we must take care to remove the readOnly flag
- self.pop('readOnly', '')
- class _Ref(SwaggerDict):
- ref_name_re = re.compile(r"#/(?P<scope>.+)/(?P<name>[^/]+)$")
- def __init__(self, resolver, name, scope, expected_type, ignore_unresolved=False):
- """Base class for all reference types. A reference object has only one property, ``$ref``, which must be a JSON
- reference to a valid object in the specification, e.g. ``#/definitions/Article`` to refer to an article model.
- :param .ReferenceResolver resolver: component resolver which must contain the referenced object
- :param str name: referenced object name, e.g. "Article"
- :param str scope: reference scope, e.g. "definitions"
- :param type[.SwaggerDict] expected_type: the expected type that will be asserted on the object found in resolver
- :param bool ignore_unresolved: do not throw if the referenced object does not exist
- """
- super(_Ref, self).__init__()
- assert not type(self) == _Ref, "do not instantiate _Ref directly"
- ref_name = "#/{scope}/{name}".format(scope=scope, name=name)
- if not ignore_unresolved:
- obj = resolver.get(name, scope)
- assert isinstance(obj, expected_type), ref_name + " is a {actual}, not a {expected}" \
- .format(actual=type(obj).__name__, expected=expected_type.__name__)
- self.ref = ref_name
- def resolve(self, resolver):
- """Get the object targeted by this reference from the given component resolver.
- :param .ReferenceResolver resolver: component resolver which must contain the referenced object
- :returns: the target object
- """
- ref_match = self.ref_name_re.match(self.ref)
- return resolver.getdefault(ref_match.group('name'), scope=ref_match.group('scope'))
- def __setitem__(self, key, value):
- if key == "$ref":
- return super(_Ref, self).__setitem__(key, value)
- raise NotImplementedError("only $ref can be set on Reference objects (not %s)" % key)
- def __delitem__(self, key):
- raise NotImplementedError("cannot delete property of Reference object")
- class SchemaRef(_Ref):
- def __init__(self, resolver, schema_name, ignore_unresolved=False):
- """Adds a reference to a named Schema defined in the ``#/definitions/`` object.
- :param .ReferenceResolver resolver: component resolver which must contain the definition
- :param str schema_name: schema name
- :param bool ignore_unresolved: do not throw if the referenced object does not exist
- """
- assert SCHEMA_DEFINITIONS in resolver.scopes
- super(SchemaRef, self).__init__(resolver, schema_name, SCHEMA_DEFINITIONS, Schema, ignore_unresolved)
- Schema.OR_REF = (Schema, SchemaRef)
- def resolve_ref(ref_or_obj, resolver):
- """Resolve `ref_or_obj` if it is a reference type. Return it unchanged if not.
- :param ref_or_obj: object to dereference
- :type ref_or_obj: SwaggerDict or _Ref
- :param resolver: component resolver which must contain the referenced object
- """
- if isinstance(ref_or_obj, _Ref):
- return ref_or_obj.resolve(resolver)
- return ref_or_obj
- class Responses(SwaggerDict):
- def __init__(self, responses, default=None, **extra):
- """Describes the expected responses of an :class:`.Operation`.
- :param responses: mapping of status code to response definition
- :type responses: dict[str or int,Response]
- :param Response default: description of the response structure to expect if another status code is returned
- """
- super(Responses, self).__init__(**extra)
- for status, response in responses.items():
- if response is not None: # pragma: no cover
- self[str(status)] = response
- self.default = default
- self._insert_extras__()
- class Response(SwaggerDict):
- def __init__(self, description, schema=None, examples=None, **extra):
- """Describes the structure of an operation's response.
- :param str description: response description
- :param schema: structure of the response body
- :type schema: Schema or SchemaRef or rest_framework.serializers.Serializer
- or type[rest_framework.serializers.Serializer]
- :param dict examples: example bodies mapped by mime type
- """
- super(Response, self).__init__(**extra)
- self.description = description
- self.schema = schema
- self.examples = examples
- self._insert_extras__()
- if schema and isinstance(schema, Schema):
- schema._remove_read_only()
- class ReferenceResolver(object):
- """A mapping type intended for storing objects pointed at by Swagger Refs.
- Provides support and checks for different reference scopes, e.g. 'definitions'.
- For example:
- ::
- > components = ReferenceResolver('definitions', 'parameters')
- > definitions = components.with_scope('definitions')
- > definitions.set('Article', Schema(...))
- > print(components)
- {'definitions': OrderedDict([('Article', Schema(...)]), 'parameters': OrderedDict()}
- """
- def __init__(self, *scopes, **kwargs):
- """
- :param str scopes: an enumeration of the valid scopes this resolver will contain
- """
- force_init = kwargs.pop('force_init', False)
- if not force_init:
- raise AssertionError(
- "Creating an instance of ReferenceResolver almost certainly won't do what you want it to do.\n"
- "See https://github.com/axnsan12/drf-yasg/issues/211, "
- "https://github.com/axnsan12/drf-yasg/issues/271, "
- "https://github.com/axnsan12/drf-yasg/issues/325.\n"
- "Pass `force_init=True` to override this."
- )
- self._objects = OrderedDict()
- self._force_scope = None
- for scope in scopes:
- assert isinstance(scope, str), "scope names must be strings"
- self._objects[scope] = OrderedDict()
- def with_scope(self, scope):
- """Return a view into this :class:`.ReferenceResolver` whose scope is defaulted and forced to `scope`.
- :param str scope: target scope, must be in this resolver's `scopes`
- :return: the bound resolver
- :rtype: .ReferenceResolver
- """
- assert scope in self.scopes, "unknown scope %s" % scope
- ret = ReferenceResolver(force_init=True)
- ret._objects = self._objects
- ret._force_scope = scope
- return ret
- def _check_scope(self, scope):
- real_scope = self._force_scope or scope
- if scope is not None:
- assert not self._force_scope or scope == self._force_scope, "cannot override forced scope"
- assert real_scope and real_scope in self._objects, "invalid scope %s" % scope
- return real_scope
- def set(self, name, obj, scope=None):
- """Set an object in the given scope, raise an error if it already exists.
- :param str name: reference name
- :param obj: referenced object
- :param str scope: reference scope
- """
- scope = self._check_scope(scope)
- assert obj is not None, "referenced objects cannot be None/null"
- assert name not in self._objects[scope], "#/%s/%s already exists" % (scope, name)
- self._objects[scope][name] = obj
- def setdefault(self, name, maker, scope=None):
- """Set an object in the given scope only if it does not exist.
- :param str name: reference name
- :param function maker: object factory, called only if necessary
- :param str scope: reference scope
- """
- scope = self._check_scope(scope)
- assert callable(maker), "setdefault expects a callable, not %s" % type(maker).__name__
- ret = self.getdefault(name, None, scope)
- if ret is None:
- ret = maker()
- value = self.getdefault(name, None, scope)
- assert ret is not None, "maker returned None; referenced objects cannot be None/null"
- if value is None:
- self.set(name, ret, scope)
- elif value != ret:
- logger.debug("during setdefault, maker for %s inserted a value and returned a different value", name)
- ret = value
- return ret
- def get(self, name, scope=None):
- """Get an object from the given scope, raise an error if it does not exist.
- :param str name: reference name
- :param str scope: reference scope
- :return: the object
- """
- scope = self._check_scope(scope)
- assert name in self._objects[scope], "#/%s/%s is not defined" % (scope, name)
- return self._objects[scope][name]
- def getdefault(self, name, default=None, scope=None):
- """Get an object from the given scope or a default value if it does not exist.
- :param str name: reference name
- :param default: the default value
- :param str scope: reference scope
- :return: the object or `default`
- """
- scope = self._check_scope(scope)
- return self._objects[scope].get(name, default)
- def has(self, name, scope=None):
- """Check if an object exists in the given scope.
- :param str name: reference name
- :param str scope: reference scope
- :return: True if the object exists
- :rtype: bool
- """
- scope = self._check_scope(scope)
- return name in self._objects[scope]
- def __iter__(self):
- if self._force_scope:
- return iter(self._objects[self._force_scope])
- return iter(self._objects)
- @property
- def scopes(self):
- if self._force_scope:
- return [self._force_scope]
- return list(self._objects.keys())
- # act as mapping
- def keys(self):
- if self._force_scope:
- return self._objects[self._force_scope].keys()
- return self._objects.keys()
- def __getitem__(self, item):
- if self._force_scope:
- return self._objects[self._force_scope][item]
- return self._objects[item]
- def __str__(self):
- return str(dict(self))
|