Serialization for unsigned integers
[sandbox] / serial / binary.py
1 import collections
2 import functools
3 import io
4 import struct
5
6 TAG_NULL = 0x00
7 TAG_TRUE = 0x01
8 TAG_FALSE = 0x02
9 TAG_UINT8 = 0x03
10 TAG_UINT16 = 0x04
11 TAG_UINT32 = 0x05
12 TAG_UINT64 = 0x06
13
14 TaggedObject = collections.namedtuple(
15     'TaggedObject',
16     [
17         'tag',
18         'instance',
19     ],
20 )
21
22 def _make_tag_only_serializer(tag, expected_value):
23     tag = bytes([tag])
24
25     def serializer(to):
26         assert to.instance == expected_value
27         return tag
28
29     return serializer
30
31 def _make_struct_serializer(fmt):
32     fmt = '!B' + fmt
33     packer = functools.partial(struct.pack, fmt)
34
35     def serializer(to):
36         return packer(to.tag, to.instance)
37
38     return serializer
39
40 _TAGS_TO_SERIALIZERS = {
41     TAG_NULL: _make_tag_only_serializer(TAG_NULL, None),
42     TAG_TRUE: _make_tag_only_serializer(TAG_TRUE, True),
43     TAG_FALSE: _make_tag_only_serializer(TAG_FALSE, False),
44     TAG_UINT8: _make_struct_serializer('B'),
45     TAG_UINT16: _make_struct_serializer('H'),
46     TAG_UINT32: _make_struct_serializer('I'),
47     TAG_UINT64: _make_struct_serializer('Q'),
48 }
49
50 def serialize(to):
51     return _TAGS_TO_SERIALIZERS[to.tag](to)
52
53 def _make_tag_only_parser(tag, value):
54     def parser(b):
55         return TaggedObject(tag = tag, instance = value)
56
57     return parser
58
59 _TAGS_TO_PARSERS = {
60     TAG_NULL: _make_tag_only_parser(TAG_NULL, None),
61     TAG_TRUE: _make_tag_only_parser(TAG_TRUE, True),
62     TAG_FALSE: _make_tag_only_parser(TAG_FALSE, False),
63 }
64
65 def deserialize(b):
66     if isinstance(b, bytes):
67         b = io.BytesIO(b)
68
69     tag = b.read(1)[0]
70
71     result = _TAGS_TO_PARSERS[tag](b)
72
73     remainder = b.read()
74
75     if len(remainder) == 0:
76         return result
77
78     raise Exception('Unable to parse remainder: {}'.format(remainder))