123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680 |
- # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
- # Copyright (C) 2003-2017 Nominum, Inc.
- #
- # Permission to use, copy, modify, and distribute this software and its
- # documentation for any purpose with or without fee is hereby granted,
- # provided that the above copyright notice and this permission notice
- # appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
- # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
- # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
- # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- """Tokenize DNS zone file format"""
- import io
- import sys
- import dns.exception
- import dns.name
- import dns.ttl
- _DELIMITERS = {' ', '\t', '\n', ';', '(', ')', '"'}
- _QUOTING_DELIMITERS = {'"'}
- EOF = 0
- EOL = 1
- WHITESPACE = 2
- IDENTIFIER = 3
- QUOTED_STRING = 4
- COMMENT = 5
- DELIMITER = 6
- class UngetBufferFull(dns.exception.DNSException):
- """An attempt was made to unget a token when the unget buffer was full."""
- class Token:
- """A DNS zone file format token.
- ttype: The token type
- value: The token value
- has_escape: Does the token value contain escapes?
- """
- def __init__(self, ttype, value='', has_escape=False, comment=None):
- """Initialize a token instance."""
- self.ttype = ttype
- self.value = value
- self.has_escape = has_escape
- self.comment = comment
- def is_eof(self):
- return self.ttype == EOF
- def is_eol(self):
- return self.ttype == EOL
- def is_whitespace(self):
- return self.ttype == WHITESPACE
- def is_identifier(self):
- return self.ttype == IDENTIFIER
- def is_quoted_string(self):
- return self.ttype == QUOTED_STRING
- def is_comment(self):
- return self.ttype == COMMENT
- def is_delimiter(self): # pragma: no cover (we don't return delimiters yet)
- return self.ttype == DELIMITER
- def is_eol_or_eof(self):
- return self.ttype == EOL or self.ttype == EOF
- def __eq__(self, other):
- if not isinstance(other, Token):
- return False
- return (self.ttype == other.ttype and
- self.value == other.value)
- def __ne__(self, other):
- if not isinstance(other, Token):
- return True
- return (self.ttype != other.ttype or
- self.value != other.value)
- def __str__(self):
- return '%d "%s"' % (self.ttype, self.value)
- def unescape(self):
- if not self.has_escape:
- return self
- unescaped = ''
- l = len(self.value)
- i = 0
- while i < l:
- c = self.value[i]
- i += 1
- if c == '\\':
- if i >= l: # pragma: no cover (can't happen via get())
- raise dns.exception.UnexpectedEnd
- c = self.value[i]
- i += 1
- if c.isdigit():
- if i >= l:
- raise dns.exception.UnexpectedEnd
- c2 = self.value[i]
- i += 1
- if i >= l:
- raise dns.exception.UnexpectedEnd
- c3 = self.value[i]
- i += 1
- if not (c2.isdigit() and c3.isdigit()):
- raise dns.exception.SyntaxError
- codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
- if codepoint > 255:
- raise dns.exception.SyntaxError
- c = chr(codepoint)
- unescaped += c
- return Token(self.ttype, unescaped)
- def unescape_to_bytes(self):
- # We used to use unescape() for TXT-like records, but this
- # caused problems as we'd process DNS escapes into Unicode code
- # points instead of byte values, and then a to_text() of the
- # processed data would not equal the original input. For
- # example, \226 in the TXT record would have a to_text() of
- # \195\162 because we applied UTF-8 encoding to Unicode code
- # point 226.
- #
- # We now apply escapes while converting directly to bytes,
- # avoiding this double encoding.
- #
- # This code also handles cases where the unicode input has
- # non-ASCII code-points in it by converting it to UTF-8. TXT
- # records aren't defined for Unicode, but this is the best we
- # can do to preserve meaning. For example,
- #
- # foo\u200bbar
- #
- # (where \u200b is Unicode code point 0x200b) will be treated
- # as if the input had been the UTF-8 encoding of that string,
- # namely:
- #
- # foo\226\128\139bar
- #
- unescaped = b''
- l = len(self.value)
- i = 0
- while i < l:
- c = self.value[i]
- i += 1
- if c == '\\':
- if i >= l: # pragma: no cover (can't happen via get())
- raise dns.exception.UnexpectedEnd
- c = self.value[i]
- i += 1
- if c.isdigit():
- if i >= l:
- raise dns.exception.UnexpectedEnd
- c2 = self.value[i]
- i += 1
- if i >= l:
- raise dns.exception.UnexpectedEnd
- c3 = self.value[i]
- i += 1
- if not (c2.isdigit() and c3.isdigit()):
- raise dns.exception.SyntaxError
- codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
- if codepoint > 255:
- raise dns.exception.SyntaxError
- unescaped += b'%c' % (codepoint)
- else:
- # Note that as mentioned above, if c is a Unicode
- # code point outside of the ASCII range, then this
- # += is converting that code point to its UTF-8
- # encoding and appending multiple bytes to
- # unescaped.
- unescaped += c.encode()
- else:
- unescaped += c.encode()
- return Token(self.ttype, bytes(unescaped))
- class Tokenizer:
- """A DNS zone file format tokenizer.
- A token object is basically a (type, value) tuple. The valid
- types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING,
- COMMENT, and DELIMITER.
- file: The file to tokenize
- ungotten_char: The most recently ungotten character, or None.
- ungotten_token: The most recently ungotten token, or None.
- multiline: The current multiline level. This value is increased
- by one every time a '(' delimiter is read, and decreased by one every time
- a ')' delimiter is read.
- quoting: This variable is true if the tokenizer is currently
- reading a quoted string.
- eof: This variable is true if the tokenizer has encountered EOF.
- delimiters: The current delimiter dictionary.
- line_number: The current line number
- filename: A filename that will be returned by the where() method.
- idna_codec: A dns.name.IDNACodec, specifies the IDNA
- encoder/decoder. If None, the default IDNA 2003
- encoder/decoder is used.
- """
- def __init__(self, f=sys.stdin, filename=None, idna_codec=None):
- """Initialize a tokenizer instance.
- f: The file to tokenize. The default is sys.stdin.
- This parameter may also be a string, in which case the tokenizer
- will take its input from the contents of the string.
- filename: the name of the filename that the where() method
- will return.
- idna_codec: A dns.name.IDNACodec, specifies the IDNA
- encoder/decoder. If None, the default IDNA 2003
- encoder/decoder is used.
- """
- if isinstance(f, str):
- f = io.StringIO(f)
- if filename is None:
- filename = '<string>'
- elif isinstance(f, bytes):
- f = io.StringIO(f.decode())
- if filename is None:
- filename = '<string>'
- else:
- if filename is None:
- if f is sys.stdin:
- filename = '<stdin>'
- else:
- filename = '<file>'
- self.file = f
- self.ungotten_char = None
- self.ungotten_token = None
- self.multiline = 0
- self.quoting = False
- self.eof = False
- self.delimiters = _DELIMITERS
- self.line_number = 1
- self.filename = filename
- if idna_codec is None:
- idna_codec = dns.name.IDNA_2003
- self.idna_codec = idna_codec
- def _get_char(self):
- """Read a character from input.
- """
- if self.ungotten_char is None:
- if self.eof:
- c = ''
- else:
- c = self.file.read(1)
- if c == '':
- self.eof = True
- elif c == '\n':
- self.line_number += 1
- else:
- c = self.ungotten_char
- self.ungotten_char = None
- return c
- def where(self):
- """Return the current location in the input.
- Returns a (string, int) tuple. The first item is the filename of
- the input, the second is the current line number.
- """
- return (self.filename, self.line_number)
- def _unget_char(self, c):
- """Unget a character.
- The unget buffer for characters is only one character large; it is
- an error to try to unget a character when the unget buffer is not
- empty.
- c: the character to unget
- raises UngetBufferFull: there is already an ungotten char
- """
- if self.ungotten_char is not None:
- # this should never happen!
- raise UngetBufferFull # pragma: no cover
- self.ungotten_char = c
- def skip_whitespace(self):
- """Consume input until a non-whitespace character is encountered.
- The non-whitespace character is then ungotten, and the number of
- whitespace characters consumed is returned.
- If the tokenizer is in multiline mode, then newlines are whitespace.
- Returns the number of characters skipped.
- """
- skipped = 0
- while True:
- c = self._get_char()
- if c != ' ' and c != '\t':
- if (c != '\n') or not self.multiline:
- self._unget_char(c)
- return skipped
- skipped += 1
- def get(self, want_leading=False, want_comment=False):
- """Get the next token.
- want_leading: If True, return a WHITESPACE token if the
- first character read is whitespace. The default is False.
- want_comment: If True, return a COMMENT token if the
- first token read is a comment. The default is False.
- Raises dns.exception.UnexpectedEnd: input ended prematurely
- Raises dns.exception.SyntaxError: input was badly formed
- Returns a Token.
- """
- if self.ungotten_token is not None:
- token = self.ungotten_token
- self.ungotten_token = None
- if token.is_whitespace():
- if want_leading:
- return token
- elif token.is_comment():
- if want_comment:
- return token
- else:
- return token
- skipped = self.skip_whitespace()
- if want_leading and skipped > 0:
- return Token(WHITESPACE, ' ')
- token = ''
- ttype = IDENTIFIER
- has_escape = False
- while True:
- c = self._get_char()
- if c == '' or c in self.delimiters:
- if c == '' and self.quoting:
- raise dns.exception.UnexpectedEnd
- if token == '' and ttype != QUOTED_STRING:
- if c == '(':
- self.multiline += 1
- self.skip_whitespace()
- continue
- elif c == ')':
- if self.multiline <= 0:
- raise dns.exception.SyntaxError
- self.multiline -= 1
- self.skip_whitespace()
- continue
- elif c == '"':
- if not self.quoting:
- self.quoting = True
- self.delimiters = _QUOTING_DELIMITERS
- ttype = QUOTED_STRING
- continue
- else:
- self.quoting = False
- self.delimiters = _DELIMITERS
- self.skip_whitespace()
- continue
- elif c == '\n':
- return Token(EOL, '\n')
- elif c == ';':
- while 1:
- c = self._get_char()
- if c == '\n' or c == '':
- break
- token += c
- if want_comment:
- self._unget_char(c)
- return Token(COMMENT, token)
- elif c == '':
- if self.multiline:
- raise dns.exception.SyntaxError(
- 'unbalanced parentheses')
- return Token(EOF, comment=token)
- elif self.multiline:
- self.skip_whitespace()
- token = ''
- continue
- else:
- return Token(EOL, '\n', comment=token)
- else:
- # This code exists in case we ever want a
- # delimiter to be returned. It never produces
- # a token currently.
- token = c
- ttype = DELIMITER
- else:
- self._unget_char(c)
- break
- elif self.quoting and c == '\n':
- raise dns.exception.SyntaxError('newline in quoted string')
- elif c == '\\':
- #
- # It's an escape. Put it and the next character into
- # the token; it will be checked later for goodness.
- #
- token += c
- has_escape = True
- c = self._get_char()
- if c == '' or (c == '\n' and not self.quoting):
- raise dns.exception.UnexpectedEnd
- token += c
- if token == '' and ttype != QUOTED_STRING:
- if self.multiline:
- raise dns.exception.SyntaxError('unbalanced parentheses')
- ttype = EOF
- return Token(ttype, token, has_escape)
- def unget(self, token):
- """Unget a token.
- The unget buffer for tokens is only one token large; it is
- an error to try to unget a token when the unget buffer is not
- empty.
- token: the token to unget
- Raises UngetBufferFull: there is already an ungotten token
- """
- if self.ungotten_token is not None:
- raise UngetBufferFull
- self.ungotten_token = token
- def next(self):
- """Return the next item in an iteration.
- Returns a Token.
- """
- token = self.get()
- if token.is_eof():
- raise StopIteration
- return token
- __next__ = next
- def __iter__(self):
- return self
- # Helpers
- def get_int(self, base=10):
- """Read the next token and interpret it as an unsigned integer.
- Raises dns.exception.SyntaxError if not an unsigned integer.
- Returns an int.
- """
- token = self.get().unescape()
- if not token.is_identifier():
- raise dns.exception.SyntaxError('expecting an identifier')
- if not token.value.isdigit():
- raise dns.exception.SyntaxError('expecting an integer')
- return int(token.value, base)
- def get_uint8(self):
- """Read the next token and interpret it as an 8-bit unsigned
- integer.
- Raises dns.exception.SyntaxError if not an 8-bit unsigned integer.
- Returns an int.
- """
- value = self.get_int()
- if value < 0 or value > 255:
- raise dns.exception.SyntaxError(
- '%d is not an unsigned 8-bit integer' % value)
- return value
- def get_uint16(self, base=10):
- """Read the next token and interpret it as a 16-bit unsigned
- integer.
- Raises dns.exception.SyntaxError if not a 16-bit unsigned integer.
- Returns an int.
- """
- value = self.get_int(base=base)
- if value < 0 or value > 65535:
- if base == 8:
- raise dns.exception.SyntaxError(
- '%o is not an octal unsigned 16-bit integer' % value)
- else:
- raise dns.exception.SyntaxError(
- '%d is not an unsigned 16-bit integer' % value)
- return value
- def get_uint32(self, base=10):
- """Read the next token and interpret it as a 32-bit unsigned
- integer.
- Raises dns.exception.SyntaxError if not a 32-bit unsigned integer.
- Returns an int.
- """
- value = self.get_int(base=base)
- if value < 0 or value > 4294967295:
- raise dns.exception.SyntaxError(
- '%d is not an unsigned 32-bit integer' % value)
- return value
- def get_uint48(self, base=10):
- """Read the next token and interpret it as a 48-bit unsigned
- integer.
- Raises dns.exception.SyntaxError if not a 48-bit unsigned integer.
- Returns an int.
- """
- value = self.get_int(base=base)
- if value < 0 or value > 281474976710655:
- raise dns.exception.SyntaxError(
- '%d is not an unsigned 48-bit integer' % value)
- return value
- def get_string(self, max_length=None):
- """Read the next token and interpret it as a string.
- Raises dns.exception.SyntaxError if not a string.
- Raises dns.exception.SyntaxError if token value length
- exceeds max_length (if specified).
- Returns a string.
- """
- token = self.get().unescape()
- if not (token.is_identifier() or token.is_quoted_string()):
- raise dns.exception.SyntaxError('expecting a string')
- if max_length and len(token.value) > max_length:
- raise dns.exception.SyntaxError("string too long")
- return token.value
- def get_identifier(self):
- """Read the next token, which should be an identifier.
- Raises dns.exception.SyntaxError if not an identifier.
- Returns a string.
- """
- token = self.get().unescape()
- if not token.is_identifier():
- raise dns.exception.SyntaxError('expecting an identifier')
- return token.value
- def get_remaining(self, max_tokens=None):
- """Return the remaining tokens on the line, until an EOL or EOF is seen.
- max_tokens: If not None, stop after this number of tokens.
- Returns a list of tokens.
- """
- tokens = []
- while True:
- token = self.get()
- if token.is_eol_or_eof():
- self.unget(token)
- break
- tokens.append(token)
- if len(tokens) == max_tokens:
- break
- return tokens
- def concatenate_remaining_identifiers(self, allow_empty=False):
- """Read the remaining tokens on the line, which should be identifiers.
- Raises dns.exception.SyntaxError if there are no remaining tokens,
- unless `allow_empty=True` is given.
- Raises dns.exception.SyntaxError if a token is seen that is not an
- identifier.
- Returns a string containing a concatenation of the remaining
- identifiers.
- """
- s = ""
- while True:
- token = self.get().unescape()
- if token.is_eol_or_eof():
- self.unget(token)
- break
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- s += token.value
- if not (allow_empty or s):
- raise dns.exception.SyntaxError('expecting another identifier')
- return s
- def as_name(self, token, origin=None, relativize=False, relativize_to=None):
- """Try to interpret the token as a DNS name.
- Raises dns.exception.SyntaxError if not a name.
- Returns a dns.name.Name.
- """
- if not token.is_identifier():
- raise dns.exception.SyntaxError('expecting an identifier')
- name = dns.name.from_text(token.value, origin, self.idna_codec)
- return name.choose_relativity(relativize_to or origin, relativize)
- def get_name(self, origin=None, relativize=False, relativize_to=None):
- """Read the next token and interpret it as a DNS name.
- Raises dns.exception.SyntaxError if not a name.
- Returns a dns.name.Name.
- """
- token = self.get()
- return self.as_name(token, origin, relativize, relativize_to)
- def get_eol_as_token(self):
- """Read the next token and raise an exception if it isn't EOL or
- EOF.
- Returns a string.
- """
- token = self.get()
- if not token.is_eol_or_eof():
- raise dns.exception.SyntaxError(
- 'expected EOL or EOF, got %d "%s"' % (token.ttype,
- token.value))
- return token
- def get_eol(self):
- return self.get_eol_as_token().value
- def get_ttl(self):
- """Read the next token and interpret it as a DNS TTL.
- Raises dns.exception.SyntaxError or dns.ttl.BadTTL if not an
- identifier or badly formed.
- Returns an int.
- """
- token = self.get().unescape()
- if not token.is_identifier():
- raise dns.exception.SyntaxError('expecting an identifier')
- return dns.ttl.from_text(token.value)
|