7ce58b708a0f255391a40e02cd163ecfa4d4af4a
[sandbox] / serial / binary.py
1 import collections
2 import functools
3 import io
4 import struct
5
6 TAG_NULL = 0x00
7 TAG_TRUE = 0x01
8 TAG_FALSE = 0x02
9 TAG_UINT8 = 0x03
10 TAG_UINT16 = 0x04
11 TAG_UINT32 = 0x05
12 TAG_UINT64 = 0x06
13 TAG_INT8 = 0x10
14 TAG_INT16 = 0x11
15 TAG_INT32 = 0x12
16 TAG_INT64 = 0x13
17 TAG_BINARY = 0x20
18 TAG_UTF8 = 0x21
19 TAG_UTF16 = 0x22
20 TAG_UTF32 = 0x23
21
22 TaggedObject = collections.namedtuple(
23     'TaggedObject',
24     [
25         'tag',
26         'instance',
27     ],
28 )
29
30 def _make_tag_only_serializer(tag, expected_value):
31     tag = bytes([tag])
32
33     def serializer(to):
34         assert to.instance == expected_value
35         return tag
36
37     return serializer
38
39 def _make_struct_serializer(fmt):
40     fmt = '!B' + fmt
41     packer = functools.partial(struct.pack, fmt)
42
43     def serializer(to):
44         return packer(to.tag, to.instance)
45
46     return serializer
47
48 def _make_string_serializer(encoder):
49     packer = functools.partial(struct.pack, '!BI')
50
51     def serializer(to):
52         encoded = encoder(to.instance)
53         return packer(to.tag, len(encoded)) + encoded
54
55     return serializer
56
57 _TAGS_TO_SERIALIZERS = {
58     TAG_NULL: _make_tag_only_serializer(TAG_NULL, None),
59     TAG_TRUE: _make_tag_only_serializer(TAG_TRUE, True),
60     TAG_FALSE: _make_tag_only_serializer(TAG_FALSE, False),
61     TAG_UINT8: _make_struct_serializer('B'),
62     TAG_UINT16: _make_struct_serializer('H'),
63     TAG_UINT32: _make_struct_serializer('I'),
64     TAG_UINT64: _make_struct_serializer('Q'),
65     TAG_INT8: _make_struct_serializer('b'),
66     TAG_INT16: _make_struct_serializer('h'),
67     TAG_INT32: _make_struct_serializer('i'),
68     TAG_INT64: _make_struct_serializer('q'),
69     TAG_BINARY: _make_string_serializer(lambda s: s),
70     TAG_UTF8: _make_string_serializer(lambda s: s.encode('utf-8')),
71     TAG_UTF16: _make_string_serializer(lambda s: s.encode('utf-16')),
72     TAG_UTF32: _make_string_serializer(lambda s: s.encode('utf-32')),
73 }
74
75 def serialize(to):
76     return _TAGS_TO_SERIALIZERS[to.tag](to)
77
78 def _make_tag_only_parser(tag, value):
79     def parser(b):
80         return TaggedObject(tag = tag, instance = value)
81
82     return parser
83
84 def _make_struct_deserializer(tag, fmt):
85     fmt = '!' + fmt
86     size = struct.calcsize(fmt)
87     unpacker = functools.partial(struct.unpack, fmt)
88
89     def parser(b):
90         b = b.read(size)
91         assert len(b) == size
92         return TaggedObject(tag = tag, instance = unpacker(b)[0])
93
94     return parser
95
96 def _make_string_deserializer(tag, decoder):
97     fmt = '!I'
98     size = struct.calcsize(fmt)
99     unpacker = functools.partial(struct.unpack, fmt)
100
101     def parser(b):
102         length_b = b.read(size)
103         assert len(length_b) == size
104         length = unpacker(length_b)[0]
105         s = b.read(length)
106         assert len(s) == length
107         return TaggedObject(tag = tag, instance = decoder(s))
108
109     return parser
110
111 _TAGS_TO_PARSERS = {
112     TAG_NULL: _make_tag_only_parser(TAG_NULL, None),
113     TAG_TRUE: _make_tag_only_parser(TAG_TRUE, True),
114     TAG_FALSE: _make_tag_only_parser(TAG_FALSE, False),
115     TAG_UINT8: _make_struct_deserializer(TAG_UINT8, 'B'),
116     TAG_UINT16: _make_struct_deserializer(TAG_UINT16, 'H'),
117     TAG_UINT32: _make_struct_deserializer(TAG_UINT32, 'I'),
118     TAG_UINT64: _make_struct_deserializer(TAG_UINT64, 'Q'),
119     TAG_INT8: _make_struct_deserializer(TAG_INT8, 'b'),
120     TAG_INT16: _make_struct_deserializer(TAG_INT16, 'h'),
121     TAG_INT32: _make_struct_deserializer(TAG_INT32, 'i'),
122     TAG_INT64: _make_struct_deserializer(TAG_INT64, 'q'),
123     TAG_BINARY: _make_string_deserializer(TAG_BINARY, lambda b: b),
124     TAG_UTF8: _make_string_deserializer(TAG_UTF8, lambda b: b.decode('utf-8')),
125     TAG_UTF16: _make_string_deserializer(TAG_UTF16, lambda b: b.decode('utf-16')),
126     TAG_UTF32: _make_string_deserializer(TAG_UTF32, lambda b: b.decode('utf-32')),
127 }
128
129 def deserialize(b):
130     if isinstance(b, bytes):
131         b = io.BytesIO(b)
132
133     tag = b.read(1)[0]
134
135     result = _TAGS_TO_PARSERS[tag](b)
136
137     remainder = b.read()
138
139     if len(remainder) == 0:
140         return result
141
142     raise Exception('Unable to parse remainder: {}'.format(remainder))