Added serialization and deserialization for signed integers
[sandbox] / serial / binary.py
1 import collections
2 import functools
3 import io
4 import struct
5
6 TAG_NULL = 0x00
7 TAG_TRUE = 0x01
8 TAG_FALSE = 0x02
9 TAG_UINT8 = 0x03
10 TAG_UINT16 = 0x04
11 TAG_UINT32 = 0x05
12 TAG_UINT64 = 0x06
13 TAG_INT8 = 0x10
14 TAG_INT16 = 0x11
15 TAG_INT32 = 0x12
16 TAG_INT64 = 0x13
17
18 TaggedObject = collections.namedtuple(
19     'TaggedObject',
20     [
21         'tag',
22         'instance',
23     ],
24 )
25
26 def _make_tag_only_serializer(tag, expected_value):
27     tag = bytes([tag])
28
29     def serializer(to):
30         assert to.instance == expected_value
31         return tag
32
33     return serializer
34
35 def _make_struct_serializer(fmt):
36     fmt = '!B' + fmt
37     packer = functools.partial(struct.pack, fmt)
38
39     def serializer(to):
40         return packer(to.tag, to.instance)
41
42     return serializer
43
44 _TAGS_TO_SERIALIZERS = {
45     TAG_NULL: _make_tag_only_serializer(TAG_NULL, None),
46     TAG_TRUE: _make_tag_only_serializer(TAG_TRUE, True),
47     TAG_FALSE: _make_tag_only_serializer(TAG_FALSE, False),
48     TAG_UINT8: _make_struct_serializer('B'),
49     TAG_UINT16: _make_struct_serializer('H'),
50     TAG_UINT32: _make_struct_serializer('I'),
51     TAG_UINT64: _make_struct_serializer('Q'),
52     TAG_INT8: _make_struct_serializer('b'),
53     TAG_INT16: _make_struct_serializer('h'),
54     TAG_INT32: _make_struct_serializer('i'),
55     TAG_INT64: _make_struct_serializer('q'),
56 }
57
58 def serialize(to):
59     return _TAGS_TO_SERIALIZERS[to.tag](to)
60
61 def _make_tag_only_parser(tag, value):
62     def parser(b):
63         return TaggedObject(tag = tag, instance = value)
64
65     return parser
66
67 def _make_struct_deserializer(tag, fmt):
68     fmt = '!' + fmt
69     size = struct.calcsize(fmt)
70     unpacker = functools.partial(struct.unpack, fmt)
71
72     def parser(b):
73         b = b.read(size)
74         assert len(b) == size
75         return TaggedObject(tag = tag, instance = unpacker(b)[0])
76
77     return parser
78
79 _TAGS_TO_PARSERS = {
80     TAG_NULL: _make_tag_only_parser(TAG_NULL, None),
81     TAG_TRUE: _make_tag_only_parser(TAG_TRUE, True),
82     TAG_FALSE: _make_tag_only_parser(TAG_FALSE, False),
83     TAG_UINT8: _make_struct_deserializer(TAG_UINT8, 'B'),
84     TAG_UINT16: _make_struct_deserializer(TAG_UINT16, 'H'),
85     TAG_UINT32: _make_struct_deserializer(TAG_UINT32, 'I'),
86     TAG_UINT64: _make_struct_deserializer(TAG_UINT64, 'Q'),
87     TAG_INT8: _make_struct_deserializer(TAG_INT8, 'b'),
88     TAG_INT16: _make_struct_deserializer(TAG_INT16, 'h'),
89     TAG_INT32: _make_struct_deserializer(TAG_INT32, 'i'),
90     TAG_INT64: _make_struct_deserializer(TAG_INT64, 'q'),
91 }
92
93 def deserialize(b):
94     if isinstance(b, bytes):
95         b = io.BytesIO(b)
96
97     tag = b.read(1)[0]
98
99     result = _TAGS_TO_PARSERS[tag](b)
100
101     remainder = b.read()
102
103     if len(remainder) == 0:
104         return result
105
106     raise Exception('Unable to parse remainder: {}'.format(remainder))