X-Git-Url: https://code.kerkeslager.com/?p=ton;a=blobdiff_plain;f=don%2Fstring.py;fp=don%2Fstring.py;h=0000000000000000000000000000000000000000;hp=a65a260242be044aedfdd18aed80fe7856d5de1a;hb=6102d28307a3efe75f1d9cb6b317c37eaa0d0907;hpb=d5664ddbb2c2a6bf98b354ea68b03c80464e6511 diff --git a/don/string.py b/don/string.py deleted file mode 100644 index a65a260..0000000 --- a/don/string.py +++ /dev/null @@ -1,269 +0,0 @@ -import binascii -import collections -import functools -import re - -from don import tags, _shared - -def _integer_size_to_string_serializer(integer_size): - minimum = -(2 ** (integer_size - 1)) - maximum = 2 ** (integer_size - 1) - 1 - - def serializer(integer): - assert minimum <= integer and integer <= maximum - return '{}i{}'.format(integer, integer_size) - - return serializer - -def _serialize_binary(b): - return '"{}"b'.format(binascii.hexlify(b).decode('ascii')) - -def _utf_encoding_to_serializer(utf_encoding): - def serializer(s): - return '"{}"{}'.format(s, utf_encoding) - - return serializer - -def _string_serialize_list(l): - return '[{}]'.format(', '.join(map(serialize, l))) - -def _string_serialize_dictionary(d): - def serialize_kvp(kvp): - return serialize(kvp[0]) + ': ' + serialize(kvp[1]) - return '{ ' + ', '.join(map(serialize_kvp, d.items())) + ' }' - -_STRING_SERIALIZERS = { - tags.VOID: lambda o: 'null', - tags.TRUE: lambda o: 'true', - tags.FALSE: lambda o: 'false', - tags.INT8: _integer_size_to_string_serializer(8), - tags.INT16: _integer_size_to_string_serializer(16), - tags.INT32: _integer_size_to_string_serializer(32), - tags.INT64: _integer_size_to_string_serializer(64), - tags.BINARY: _serialize_binary, - tags.UTF8: _utf_encoding_to_serializer('utf8'), - tags.UTF16: _utf_encoding_to_serializer('utf16'), - tags.UTF32: _utf_encoding_to_serializer('utf32'), - tags.LIST: _string_serialize_list, - tags.DICTIONARY: _string_serialize_dictionary, -} - -def serialize(o): - o = tags.autotag(o) - - return _STRING_SERIALIZERS[o.tag](o.value) - -def _consume_leading_whitespace(wrapped_parser): - @functools.wraps(wrapped_parser) - def parser(s): - s = s.lstrip() - return wrapped_parser(s) - - return parser - -def _make_constant_parser(constant, value): - @_consume_leading_whitespace - def constant_parser(s): - if s.startswith(constant): - result = _shared.ParseResult( - success = True, - value = value, - remaining = s[len(constant):], - ) - return result - - return _shared._FAILED_PARSE_RESULT - - return constant_parser - -def _make_integer_parser(width): - matcher = re.compile(r'(-?\d+)i' + str(width)) - - @_consume_leading_whitespace - def integer_parser(s): - match = matcher.match(s) - - if match: - # TODO Validate that the integer is in range - return _shared.ParseResult( - success = True, - value = int(match.group(1)), - remaining = s[match.end():], - ) - - return _shared._FAILED_PARSE_RESULT - - return integer_parser - -_BINARY_MATCHER = re.compile(r'"([\da-f]*)"b') - -@_consume_leading_whitespace -def _binary_parser(s): - match = _BINARY_MATCHER.match(s) - - if match: - return _shared.ParseResult( - success = True, - value = binascii.unhexlify(match.group(1)), - remaining = s[match.end():], - ) - - return _shared._FAILED_PARSE_RESULT - -def _make_utf_parser(encoding): - matcher = re.compile(r'"(.*?)"' + encoding) - - @_consume_leading_whitespace - def utf_parser(s): - match = matcher.match(s) - - if match: - return _shared.ParseResult( - success = True, - value = match.group(1), - remaining = s[match.end():], - ) - - return _shared._FAILED_PARSE_RESULT - - return utf_parser - -def _make_consume_constant_parser(constant): - @_consume_leading_whitespace - def consume_character_parser(s): - if s.startswith(constant): - return _shared.ParseResult( - success = True, - value = None, - remaining = s[len(constant):], - ) - return _shared._FAILED_PARSE_RESULT - - return consume_character_parser - -_consume_comma_parser = _make_consume_constant_parser(',') - -def _prefix_with_comma(parser): - def wrapped(s): - result = _consume_comma_parser(s) - if result.success: - s = result.remaining - else: - return _shared._FAILED_PARSE_RESULT - - result = parser(s) - if not result.success: - raise Exception('Trailing comma before "{}"'.format(s)) - - return result - - return wrapped - -def _comma_separate_and_wrap(wrapped_parser, start_wrap, end_wrap, typecaster): - parser_prefixed_with_comma = _prefix_with_comma(wrapped_parser) - start_wrap_parser = _make_consume_constant_parser(start_wrap) - end_wrap_parser = _make_consume_constant_parser(end_wrap) - - def parser(s): - result = start_wrap_parser(s) - if result.success: - s = result.remaining - else: - return _shared._FAILED_PARSE_RESULT - - value = [] - first = True - - parse_result = wrapped_parser(s) - - while parse_result.success: - value.append(parse_result.value) - s = parse_result.remaining - parse_result = parser_prefixed_with_comma(s) - - result = end_wrap_parser(s) - if result.success: - s = result.remaining - else: - return _shared._FAILED_PARSE_RESULT - - return _shared.ParseResult( - success = True, - value = typecaster(value), - remaining = s, - ) - - return parser - -# This uses _PARSERS which has not been defined yet, but is defined here so it can be used in -# the definition of _list_parser -def _object_parser(source): - for parser in _PARSERS: - result = parser(source) - - if result.success: - return result - - return _shared._FAILED_PARSE_RESULT - -_list_parser = _comma_separate_and_wrap(_object_parser, '[', ']', list) - -_consume_colon_parser = _make_consume_constant_parser(':') - -def _kvp_parser(s): - key_parse_result = _object_parser(s) - if key_parse_result.success: - s = key_parse_result.remaining - else: - return _shared._FAILED_PARSE_RESULT - - result = _consume_colon_parser(s) - if result.success: - s = result.remaining - else: - return _shared._FAILED_PARSE_RESULT - - value_parse_result = _object_parser(s) - if value_parse_result.success: - s = value_parse_result.remaining - else: - return _shared._FAILED_PARSE_RESULT - - return _shared.ParseResult( - success = True, - value = (key_parse_result.value, value_parse_result.value), - remaining = s, - ) - -_dictionary_parser = _comma_separate_and_wrap(_kvp_parser, '{', '}', collections.OrderedDict) - - -_PARSERS = [ - _make_constant_parser('null', None), - _make_constant_parser('true', True), - _make_constant_parser('false', False), - _make_integer_parser(8), - _make_integer_parser(16), - _make_integer_parser(32), - _make_integer_parser(64), - _binary_parser, - _make_utf_parser('utf8'), - _make_utf_parser('utf16'), - _make_utf_parser('utf32'), - _list_parser, - _dictionary_parser, -] - -def _parse(parser, source): - result = parser(source) - - if result.success: - if result.remaining.strip() == '': - return result.value - - raise Exception('Unparsed trailing characters: "{}"'.format(result.remaining)) - - raise Exception('Unable to parse: "{}"'.format(source)) - -def deserialize(s): - return _parse(_object_parser, s)