0162d42ed844b1c9400d7bc21365dc79c1b16903
[ton] / don / string.py
1 import binascii
2 import collections
3 import functools
4 import re
5
6 from don import tags, _shared
7
8 def _integer_size_to_string_serializer(integer_size):
9     minimum = -(2 ** (integer_size - 1))
10     maximum = 2 ** (integer_size - 1) - 1
11     
12     def serializer(integer):
13         assert minimum <= integer and integer <= maximum
14         return '{}i{}'.format(integer, integer_size)
15
16     return serializer
17
18 def _serialize_float(f):
19     return '{}f'.format(f)
20
21 def _serialize_double(d):
22     return '{}d'.format(d)
23
24 def _serialize_binary(b):
25     return '"{}"b'.format(binascii.hexlify(b).decode('ascii'))
26
27 def _utf_encoding_to_serializer(utf_encoding):
28     def serializer(s):
29         return '"{}"{}'.format(s, utf_encoding)
30
31     return serializer
32
33 def _string_serialize_list(l):
34     return '[{}]'.format(', '.join(map(serialize, l)))
35
36 def _string_serialize_dictionary(d):
37     def serialize_kvp(kvp):
38         return serialize(kvp[0]) + ': ' + serialize(kvp[1])
39     return '{ ' + ', '.join(map(serialize_kvp, d.items())) + ' }'
40
41 _STRING_SERIALIZERS = {
42     tags.VOID: lambda o: 'null',
43     tags.TRUE: lambda o: 'true',
44     tags.FALSE: lambda o: 'false',
45     tags.INT8: _integer_size_to_string_serializer(8),
46     tags.INT16: _integer_size_to_string_serializer(16),
47     tags.INT32: _integer_size_to_string_serializer(32),
48     tags.INT64: _integer_size_to_string_serializer(64),
49     tags.FLOAT: _serialize_float,
50     tags.DOUBLE: _serialize_double,
51     tags.BINARY: _serialize_binary,
52     tags.UTF8: _utf_encoding_to_serializer('utf8'),
53     tags.UTF16: _utf_encoding_to_serializer('utf16'),
54     tags.UTF32: _utf_encoding_to_serializer('utf32'),
55     tags.LIST: _string_serialize_list,
56     tags.DICTIONARY: _string_serialize_dictionary,
57 }
58
59 def serialize(o):
60     o = tags._tag(o)
61     
62     return _STRING_SERIALIZERS[o.tag](o.value)
63
64 def _consume_leading_whitespace(wrapped_parser):
65     @functools.wraps(wrapped_parser)
66     def parser(s):
67         s = s.lstrip()
68         return wrapped_parser(s)
69
70     return parser
71
72 def _make_constant_parser(constant, value):
73     @_consume_leading_whitespace
74     def constant_parser(s):
75         if s.startswith(constant):
76             result = _shared.ParseResult(
77                 success = True,
78                 value = value,
79                 remaining = s[len(constant):],
80             )
81             return result
82
83         return _shared._FAILED_PARSE_RESULT
84
85     return constant_parser
86
87 def _make_integer_parser(width):
88     matcher = re.compile(r'(-?\d+)i' + str(width))
89
90     @_consume_leading_whitespace
91     def integer_parser(s):
92         match = matcher.match(s)
93
94         if match:
95             # TODO Validate that the integer is in range
96             return _shared.ParseResult(
97                 success = True,
98                 value = int(match.group(1)),
99                 remaining = s[match.end():],
100             )
101
102         return _shared._FAILED_PARSE_RESULT
103
104     return integer_parser
105
106 _BINARY32_MATCHER = re.compile(r'(-?\d+\.\d+)f')
107 _BINARY64_MATCHER = re.compile(r'(-?\d+\.\d+)d')
108
109 @_consume_leading_whitespace
110 def _binary32_parser(s):
111     match = _BINARY32_MATCHER.match(s)
112
113     if match:
114         # TODO Validate that the float is in range
115         return _shared.ParseResult(
116             success = True,
117             value = float(match.group(1)),
118             remaining = s[match.end():],
119         )
120
121     return _shared._FAILED_PARSE_RESULT
122
123 @_consume_leading_whitespace
124 def _binary64_parser(s):
125     match = _BINARY64_MATCHER.match(s)
126
127     if match:
128         # TODO Validate that the double is in range
129         return _shared.ParseResult(
130             success = True,
131             value = float(match.group(1)),
132             remaining = s[match.end():],
133         )
134
135     return _shared._FAILED_PARSE_RESULT
136
137 _BINARY_MATCHER = re.compile(r'"([\da-f]*)"b')
138
139 @_consume_leading_whitespace
140 def _binary_parser(s):
141     match = _BINARY_MATCHER.match(s)
142
143     if match:
144         return _shared.ParseResult(
145             success = True,
146             value = binascii.unhexlify(match.group(1)),
147             remaining = s[match.end():],
148         )
149
150     return _shared._FAILED_PARSE_RESULT
151
152 def _make_utf_parser(encoding):
153     matcher = re.compile(r'"(.*?)"' + encoding)
154
155     @_consume_leading_whitespace
156     def utf_parser(s):
157         match = matcher.match(s)
158
159         if match:
160             return _shared.ParseResult(
161                 success = True,
162                 value = match.group(1),
163                 remaining = s[match.end():],
164             )
165
166         return _shared._FAILED_PARSE_RESULT
167
168     return utf_parser
169
170 def _make_consume_constant_parser(constant):
171     @_consume_leading_whitespace
172     def consume_character_parser(s):
173         if s.startswith(constant):
174             return _shared.ParseResult(
175                 success = True,
176                 value = None,
177                 remaining = s[len(constant):],
178             )
179         return _shared._FAILED_PARSE_RESULT
180
181     return consume_character_parser
182
183 _consume_comma_parser = _make_consume_constant_parser(',')
184
185 def _prefix_with_comma(parser):
186     def wrapped(s):
187         result = _consume_comma_parser(s)
188         if result.success:
189             s = result.remaining
190         else:
191             return _shared._FAILED_PARSE_RESULT
192
193         result = parser(s)
194         if not result.success:
195             raise Exception('Trailing comma before "{}"'.format(s))
196
197         return result
198
199     return wrapped
200
201 def _comma_separate_and_wrap(wrapped_parser, start_wrap, end_wrap, typecaster):
202     parser_prefixed_with_comma = _prefix_with_comma(wrapped_parser)
203     start_wrap_parser = _make_consume_constant_parser(start_wrap)
204     end_wrap_parser = _make_consume_constant_parser(end_wrap)
205
206     def parser(s):
207         result = start_wrap_parser(s)
208         if result.success:
209             s = result.remaining
210         else:
211             return _shared._FAILED_PARSE_RESULT
212
213         value = []
214         first = True
215
216         parse_result = wrapped_parser(s)
217
218         while parse_result.success:
219             value.append(parse_result.value)
220             s = parse_result.remaining
221             parse_result = parser_prefixed_with_comma(s)
222
223         result = end_wrap_parser(s)
224         if result.success:
225             s = result.remaining
226         else:
227             return _shared._FAILED_PARSE_RESULT
228
229         return _shared.ParseResult(
230             success = True,
231             value = typecaster(value),
232             remaining = s,
233         )
234
235     return parser
236
237 # This uses _PARSERS which has not been defined yet, but is defined here so it can be used in
238 # the definition of _list_parser
239 def _object_parser(source):
240     for parser in _PARSERS:
241         result = parser(source)
242
243         if result.success:
244             return result
245
246     return _shared._FAILED_PARSE_RESULT
247
248 _list_parser = _comma_separate_and_wrap(_object_parser, '[', ']', list)
249
250 _consume_colon_parser = _make_consume_constant_parser(':')
251
252 def _kvp_parser(s):
253     key_parse_result = _object_parser(s)
254     if key_parse_result.success:
255         s = key_parse_result.remaining
256     else:
257         return _shared._FAILED_PARSE_RESULT
258
259     result = _consume_colon_parser(s)
260     if result.success:
261         s = result.remaining
262     else:
263         return _shared._FAILED_PARSE_RESULT
264
265     value_parse_result = _object_parser(s)
266     if value_parse_result.success:
267         s = value_parse_result.remaining
268     else:
269         return _shared._FAILED_PARSE_RESULT
270
271     return _shared.ParseResult(
272         success = True,
273         value = (key_parse_result.value, value_parse_result.value),
274         remaining = s,
275     )
276
277 _dictionary_parser = _comma_separate_and_wrap(_kvp_parser, '{', '}', collections.OrderedDict)
278
279
280 _PARSERS = [
281     _make_constant_parser('null', None),
282     _make_constant_parser('true', True),
283     _make_constant_parser('false', False),
284     _make_integer_parser(8),
285     _make_integer_parser(16),
286     _make_integer_parser(32),
287     _make_integer_parser(64),
288     _binary32_parser,
289     _binary64_parser,
290     _binary_parser,
291     _make_utf_parser('utf8'),
292     _make_utf_parser('utf16'),
293     _make_utf_parser('utf32'),
294     _list_parser,
295     _dictionary_parser,
296 ]
297
298 def _parse(parser, source):
299     result = parser(source)
300
301     if result.success:
302         if result.remaining.strip() == '':
303             return result.value
304
305         raise Exception('Unparsed trailing characters: "{}"'.format(result.remaining))
306
307     raise Exception('Unable to parse: "{}"'.format(source))
308
309 def deserialize(s):
310     return _parse(_object_parser, s)