6 from don import tags, _shared
8 def _integer_size_to_string_serializer(integer_size):
9 minimum = -(2 ** (integer_size - 1))
10 maximum = 2 ** (integer_size - 1) - 1
12 def serializer(integer):
13 assert minimum <= integer and integer <= maximum
14 return '{}i{}'.format(integer, integer_size)
18 def _serialize_binary(b):
19 return '"{}"b'.format(binascii.hexlify(b).decode('ascii'))
21 def _utf_encoding_to_serializer(utf_encoding):
23 return '"{}"{}'.format(s, utf_encoding)
27 def _string_serialize_list(l):
28 return '[{}]'.format(', '.join(map(serialize, l)))
30 def _string_serialize_dictionary(d):
31 def serialize_kvp(kvp):
32 return serialize(kvp[0]) + ': ' + serialize(kvp[1])
33 return '{ ' + ', '.join(map(serialize_kvp, d.items())) + ' }'
35 _STRING_SERIALIZERS = {
36 tags.VOID: lambda o: 'null',
37 tags.TRUE: lambda o: 'true',
38 tags.FALSE: lambda o: 'false',
39 tags.INT8: _integer_size_to_string_serializer(8),
40 tags.INT16: _integer_size_to_string_serializer(16),
41 tags.INT32: _integer_size_to_string_serializer(32),
42 tags.INT64: _integer_size_to_string_serializer(64),
43 tags.BINARY: _serialize_binary,
44 tags.UTF8: _utf_encoding_to_serializer('utf8'),
45 tags.UTF16: _utf_encoding_to_serializer('utf16'),
46 tags.UTF32: _utf_encoding_to_serializer('utf32'),
47 tags.LIST: _string_serialize_list,
48 tags.DICTIONARY: _string_serialize_dictionary,
54 return _STRING_SERIALIZERS[o.tag](o.value)
56 def _consume_leading_whitespace(wrapped_parser):
57 @functools.wraps(wrapped_parser)
60 return wrapped_parser(s)
64 def _make_constant_parser(constant, value):
65 @_consume_leading_whitespace
66 def constant_parser(s):
67 if s.startswith(constant):
68 result = _shared.ParseResult(
71 remaining = s[len(constant):],
75 return _shared._FAILED_PARSE_RESULT
77 return constant_parser
79 def _make_integer_parser(width):
80 matcher = re.compile(r'(-?\d+)i' + str(width))
82 @_consume_leading_whitespace
83 def integer_parser(s):
84 match = matcher.match(s)
87 # TODO Validate that the integer is in range
88 return _shared.ParseResult(
90 value = int(match.group(1)),
91 remaining = s[match.end():],
94 return _shared._FAILED_PARSE_RESULT
98 _BINARY_MATCHER = re.compile(r'"([\da-f]*)"b')
100 @_consume_leading_whitespace
101 def _binary_parser(s):
102 match = _BINARY_MATCHER.match(s)
105 return _shared.ParseResult(
107 value = binascii.unhexlify(match.group(1)),
108 remaining = s[match.end():],
111 return _shared._FAILED_PARSE_RESULT
113 def _make_utf_parser(encoding):
114 matcher = re.compile(r'"(.*?)"' + encoding)
116 @_consume_leading_whitespace
118 match = matcher.match(s)
121 return _shared.ParseResult(
123 value = match.group(1),
124 remaining = s[match.end():],
127 return _shared._FAILED_PARSE_RESULT
131 def _make_consume_constant_parser(constant):
132 @_consume_leading_whitespace
133 def consume_character_parser(s):
134 if s.startswith(constant):
135 return _shared.ParseResult(
138 remaining = s[len(constant):],
140 return _shared._FAILED_PARSE_RESULT
142 return consume_character_parser
144 _consume_comma_parser = _make_consume_constant_parser(',')
146 def _prefix_with_comma(parser):
148 result = _consume_comma_parser(s)
152 return _shared._FAILED_PARSE_RESULT
155 if not result.success:
156 raise Exception('Trailing comma before "{}"'.format(s))
162 def _comma_separate_and_wrap(wrapped_parser, start_wrap, end_wrap, typecaster):
163 parser_prefixed_with_comma = _prefix_with_comma(wrapped_parser)
164 start_wrap_parser = _make_consume_constant_parser(start_wrap)
165 end_wrap_parser = _make_consume_constant_parser(end_wrap)
168 result = start_wrap_parser(s)
172 return _shared._FAILED_PARSE_RESULT
177 parse_result = wrapped_parser(s)
179 while parse_result.success:
180 value.append(parse_result.value)
181 s = parse_result.remaining
182 parse_result = parser_prefixed_with_comma(s)
184 result = end_wrap_parser(s)
188 return _shared._FAILED_PARSE_RESULT
190 return _shared.ParseResult(
192 value = typecaster(value),
198 # This uses _PARSERS which has not been defined yet, but is defined here so it can be used in
199 # the definition of _list_parser
200 def _object_parser(source):
201 for parser in _PARSERS:
202 result = parser(source)
207 return _shared._FAILED_PARSE_RESULT
209 _list_parser = _comma_separate_and_wrap(_object_parser, '[', ']', list)
211 _consume_colon_parser = _make_consume_constant_parser(':')
214 key_parse_result = _object_parser(s)
215 if key_parse_result.success:
216 s = key_parse_result.remaining
218 return _shared._FAILED_PARSE_RESULT
220 result = _consume_colon_parser(s)
224 return _shared._FAILED_PARSE_RESULT
226 value_parse_result = _object_parser(s)
227 if value_parse_result.success:
228 s = value_parse_result.remaining
230 return _shared._FAILED_PARSE_RESULT
232 return _shared.ParseResult(
234 value = (key_parse_result.value, value_parse_result.value),
238 _dictionary_parser = _comma_separate_and_wrap(_kvp_parser, '{', '}', collections.OrderedDict)
242 _make_constant_parser('null', None),
243 _make_constant_parser('true', True),
244 _make_constant_parser('false', False),
245 _make_integer_parser(8),
246 _make_integer_parser(16),
247 _make_integer_parser(32),
248 _make_integer_parser(64),
250 _make_utf_parser('utf8'),
251 _make_utf_parser('utf16'),
252 _make_utf_parser('utf32'),
257 def _parse(parser, source):
258 result = parser(source)
261 if result.remaining.strip() == '':
264 raise Exception('Unparsed trailing characters: "{}"'.format(result.remaining))
266 raise Exception('Unable to parse: "{}"'.format(source))
269 return _parse(_object_parser, s)