import binascii
+import collections
+import functools
import re
from don import tags, _shared
return serializer
-def _serialize_float(f):
- return '{}f'.format(f)
-
-def _serialize_double(d):
- return '{}d'.format(d)
-
def _serialize_binary(b):
return '"{}"b'.format(binascii.hexlify(b).decode('ascii'))
tags.INT16: _integer_size_to_string_serializer(16),
tags.INT32: _integer_size_to_string_serializer(32),
tags.INT64: _integer_size_to_string_serializer(64),
- tags.FLOAT: _serialize_float,
- tags.DOUBLE: _serialize_double,
tags.BINARY: _serialize_binary,
tags.UTF8: _utf_encoding_to_serializer('utf8'),
tags.UTF16: _utf_encoding_to_serializer('utf16'),
}
def serialize(o):
- o = tags._tag(o)
+ o = tags.autotag(o)
return _STRING_SERIALIZERS[o.tag](o.value)
-def _make_constant_parser(constant, value):
+def _consume_leading_whitespace(wrapped_parser):
+ @functools.wraps(wrapped_parser)
def parser(s):
+ s = s.lstrip()
+ return wrapped_parser(s)
+
+ return parser
+
+def _make_constant_parser(constant, value):
+ @_consume_leading_whitespace
+ def constant_parser(s):
if s.startswith(constant):
result = _shared.ParseResult(
success = True,
return _shared._FAILED_PARSE_RESULT
- return parser
+ return constant_parser
def _make_integer_parser(width):
matcher = re.compile(r'(-?\d+)i' + str(width))
- def parser(s):
+ @_consume_leading_whitespace
+ def integer_parser(s):
match = matcher.match(s)
if match:
return _shared._FAILED_PARSE_RESULT
- return parser
-
-_BINARY32_MATCHER = re.compile(r'(-?\d+\.\d+)f')
-_BINARY64_MATCHER = re.compile(r'(-?\d+\.\d+)d')
-
-def _binary32_parser(s):
- match = _BINARY32_MATCHER.match(s)
-
- if match:
- # TODO Validate that the float is in range
- return _shared.ParseResult(
- success = True,
- value = float(match.group(1)),
- remaining = s[match.end():],
- )
-
- return _shared._FAILED_PARSE_RESULT
-
-def _binary64_parser(s):
- match = _BINARY64_MATCHER.match(s)
-
- if match:
- # TODO Validate that the double is in range
- return _shared.ParseResult(
- success = True,
- value = float(match.group(1)),
- remaining = s[match.end():],
- )
-
- return _shared._FAILED_PARSE_RESULT
+ return integer_parser
_BINARY_MATCHER = re.compile(r'"([\da-f]*)"b')
+@_consume_leading_whitespace
def _binary_parser(s):
match = _BINARY_MATCHER.match(s)
def _make_utf_parser(encoding):
matcher = re.compile(r'"(.*?)"' + encoding)
- def parser(s):
+ @_consume_leading_whitespace
+ def utf_parser(s):
match = matcher.match(s)
if match:
return _shared._FAILED_PARSE_RESULT
- return parser
+ return utf_parser
+
+def _make_consume_constant_parser(constant):
+ @_consume_leading_whitespace
+ def consume_character_parser(s):
+ if s.startswith(constant):
+ return _shared.ParseResult(
+ success = True,
+ value = None,
+ remaining = s[len(constant):],
+ )
+ return _shared._FAILED_PARSE_RESULT
+
+ return consume_character_parser
+
+_consume_comma_parser = _make_consume_constant_parser(',')
def _prefix_with_comma(parser):
def wrapped(s):
- if s.startswith(','):
- s = s[1:]
-
- result = parser(s)
- if not result.success:
- raise Exception('Trailing comma before "{}"'.format(s))
+ result = _consume_comma_parser(s)
+ if result.success:
+ s = result.remaining
+ else:
+ return _shared._FAILED_PARSE_RESULT
- return result
+ result = parser(s)
+ if not result.success:
+ raise Exception('Trailing comma before "{}"'.format(s))
- return _shared._FAILED_PARSE_RESULT
+ return result
return wrapped
-def _list_parser(s):
- # TODO Assert they are all the same type
- if not s.startswith('['):
- return _shared._FAILED_PARSE_RESULT
- s = s[1:]
+def _comma_separate_and_wrap(wrapped_parser, start_wrap, end_wrap, typecaster):
+ parser_prefixed_with_comma = _prefix_with_comma(wrapped_parser)
+ start_wrap_parser = _make_consume_constant_parser(start_wrap)
+ end_wrap_parser = _make_consume_constant_parser(end_wrap)
+
+ def parser(s):
+ result = start_wrap_parser(s)
+ if result.success:
+ s = result.remaining
+ else:
+ return _shared._FAILED_PARSE_RESULT
+
+ value = []
+ first = True
+
+ parse_result = wrapped_parser(s)
+
+ while parse_result.success:
+ value.append(parse_result.value)
+ s = parse_result.remaining
+ parse_result = parser_prefixed_with_comma(s)
+
+ result = end_wrap_parser(s)
+ if result.success:
+ s = result.remaining
+ else:
+ return _shared._FAILED_PARSE_RESULT
+
+ return _shared.ParseResult(
+ success = True,
+ value = typecaster(value),
+ remaining = s,
+ )
+
+ return parser
+
+# This uses _PARSERS which has not been defined yet, but is defined here so it can be used in
+# the definition of _list_parser
+def _object_parser(source):
+ for parser in _PARSERS:
+ result = parser(source)
- value = []
+ if result.success:
+ return result
+
+ return _shared._FAILED_PARSE_RESULT
- first = True
- parse_result = _object_parser(s)
+_list_parser = _comma_separate_and_wrap(_object_parser, '[', ']', list)
- while parse_result.success:
- value.append(parse_result.value)
- s = parse_result.remaining
- parse_result = _prefix_with_comma(_object_parser)(s)
+_consume_colon_parser = _make_consume_constant_parser(':')
- if not s.startswith(']'):
+def _kvp_parser(s):
+ key_parse_result = _object_parser(s)
+ if key_parse_result.success:
+ s = key_parse_result.remaining
+ else:
+ return _shared._FAILED_PARSE_RESULT
+
+ result = _consume_colon_parser(s)
+ if result.success:
+ s = result.remaining
+ else:
+ return _shared._FAILED_PARSE_RESULT
+
+ value_parse_result = _object_parser(s)
+ if value_parse_result.success:
+ s = value_parse_result.remaining
+ else:
return _shared._FAILED_PARSE_RESULT
return _shared.ParseResult(
success = True,
- value = value,
- remaining = s[1:],
+ value = (key_parse_result.value, value_parse_result.value),
+ remaining = s,
)
-
-
-def _dictionary_parser(s):
- return _shared._FAILED_PARSE_RESULT
+_dictionary_parser = _comma_separate_and_wrap(_kvp_parser, '{', '}', collections.OrderedDict)
_PARSERS = [
_make_integer_parser(16),
_make_integer_parser(32),
_make_integer_parser(64),
- _binary32_parser,
- _binary64_parser,
_binary_parser,
_make_utf_parser('utf8'),
_make_utf_parser('utf16'),
_dictionary_parser,
]
-def _object_parser(source):
- for parser in _PARSERS:
- result = parser(source)
-
- if result.success:
- return result
-
- return _shared._FAILED_PARSE_RESULT
-
def _parse(parser, source):
result = parser(source)