From: David Kerkeslager Date: Thu, 29 Sep 2016 21:51:37 +0000 (-0400) Subject: Merge branch 'master' of github.com:kerkeslager/sandbox X-Git-Url: https://code.kerkeslager.com/?p=sandbox;a=commitdiff_plain;h=38da3d335b2118ed985fc6e6c3c1163972c6b840;hp=fb55aa39701df8a28bea106ea7df199c3cb3af1b Merge branch 'master' of github.com:kerkeslager/sandbox --- diff --git a/serial/binary.py b/serial/binary.py new file mode 100644 index 0000000..7ce58b7 --- /dev/null +++ b/serial/binary.py @@ -0,0 +1,142 @@ +import collections +import functools +import io +import struct + +TAG_NULL = 0x00 +TAG_TRUE = 0x01 +TAG_FALSE = 0x02 +TAG_UINT8 = 0x03 +TAG_UINT16 = 0x04 +TAG_UINT32 = 0x05 +TAG_UINT64 = 0x06 +TAG_INT8 = 0x10 +TAG_INT16 = 0x11 +TAG_INT32 = 0x12 +TAG_INT64 = 0x13 +TAG_BINARY = 0x20 +TAG_UTF8 = 0x21 +TAG_UTF16 = 0x22 +TAG_UTF32 = 0x23 + +TaggedObject = collections.namedtuple( + 'TaggedObject', + [ + 'tag', + 'instance', + ], +) + +def _make_tag_only_serializer(tag, expected_value): + tag = bytes([tag]) + + def serializer(to): + assert to.instance == expected_value + return tag + + return serializer + +def _make_struct_serializer(fmt): + fmt = '!B' + fmt + packer = functools.partial(struct.pack, fmt) + + def serializer(to): + return packer(to.tag, to.instance) + + return serializer + +def _make_string_serializer(encoder): + packer = functools.partial(struct.pack, '!BI') + + def serializer(to): + encoded = encoder(to.instance) + return packer(to.tag, len(encoded)) + encoded + + return serializer + +_TAGS_TO_SERIALIZERS = { + TAG_NULL: _make_tag_only_serializer(TAG_NULL, None), + TAG_TRUE: _make_tag_only_serializer(TAG_TRUE, True), + TAG_FALSE: _make_tag_only_serializer(TAG_FALSE, False), + TAG_UINT8: _make_struct_serializer('B'), + TAG_UINT16: _make_struct_serializer('H'), + TAG_UINT32: _make_struct_serializer('I'), + TAG_UINT64: _make_struct_serializer('Q'), + TAG_INT8: _make_struct_serializer('b'), + TAG_INT16: _make_struct_serializer('h'), + TAG_INT32: _make_struct_serializer('i'), + TAG_INT64: _make_struct_serializer('q'), + TAG_BINARY: _make_string_serializer(lambda s: s), + TAG_UTF8: _make_string_serializer(lambda s: s.encode('utf-8')), + TAG_UTF16: _make_string_serializer(lambda s: s.encode('utf-16')), + TAG_UTF32: _make_string_serializer(lambda s: s.encode('utf-32')), +} + +def serialize(to): + return _TAGS_TO_SERIALIZERS[to.tag](to) + +def _make_tag_only_parser(tag, value): + def parser(b): + return TaggedObject(tag = tag, instance = value) + + return parser + +def _make_struct_deserializer(tag, fmt): + fmt = '!' + fmt + size = struct.calcsize(fmt) + unpacker = functools.partial(struct.unpack, fmt) + + def parser(b): + b = b.read(size) + assert len(b) == size + return TaggedObject(tag = tag, instance = unpacker(b)[0]) + + return parser + +def _make_string_deserializer(tag, decoder): + fmt = '!I' + size = struct.calcsize(fmt) + unpacker = functools.partial(struct.unpack, fmt) + + def parser(b): + length_b = b.read(size) + assert len(length_b) == size + length = unpacker(length_b)[0] + s = b.read(length) + assert len(s) == length + return TaggedObject(tag = tag, instance = decoder(s)) + + return parser + +_TAGS_TO_PARSERS = { + TAG_NULL: _make_tag_only_parser(TAG_NULL, None), + TAG_TRUE: _make_tag_only_parser(TAG_TRUE, True), + TAG_FALSE: _make_tag_only_parser(TAG_FALSE, False), + TAG_UINT8: _make_struct_deserializer(TAG_UINT8, 'B'), + TAG_UINT16: _make_struct_deserializer(TAG_UINT16, 'H'), + TAG_UINT32: _make_struct_deserializer(TAG_UINT32, 'I'), + TAG_UINT64: _make_struct_deserializer(TAG_UINT64, 'Q'), + TAG_INT8: _make_struct_deserializer(TAG_INT8, 'b'), + TAG_INT16: _make_struct_deserializer(TAG_INT16, 'h'), + TAG_INT32: _make_struct_deserializer(TAG_INT32, 'i'), + TAG_INT64: _make_struct_deserializer(TAG_INT64, 'q'), + TAG_BINARY: _make_string_deserializer(TAG_BINARY, lambda b: b), + TAG_UTF8: _make_string_deserializer(TAG_UTF8, lambda b: b.decode('utf-8')), + TAG_UTF16: _make_string_deserializer(TAG_UTF16, lambda b: b.decode('utf-16')), + TAG_UTF32: _make_string_deserializer(TAG_UTF32, lambda b: b.decode('utf-32')), +} + +def deserialize(b): + if isinstance(b, bytes): + b = io.BytesIO(b) + + tag = b.read(1)[0] + + result = _TAGS_TO_PARSERS[tag](b) + + remainder = b.read() + + if len(remainder) == 0: + return result + + raise Exception('Unable to parse remainder: {}'.format(remainder)) diff --git a/serial/test_binary.py b/serial/test_binary.py new file mode 100644 index 0000000..8bc3811 --- /dev/null +++ b/serial/test_binary.py @@ -0,0 +1,53 @@ +import unittest + +import binary + +EXAMPLE_REPRESENTATIONS = [ + (binary.TAG_NULL, None, b'\x00'), + (binary.TAG_TRUE, True, b'\x01'), + (binary.TAG_FALSE, False, b'\x02'), + (binary.TAG_UINT8, 7, b'\x03\x07'), + (binary.TAG_UINT16, 7, b'\x04\x00\x07'), + (binary.TAG_UINT32, 7, b'\x05\x00\x00\x00\x07'), + (binary.TAG_UINT64, 7, b'\x06\x00\x00\x00\x00\x00\x00\x00\x07'), + (binary.TAG_INT8, 7, b'\x10\x07'), + (binary.TAG_INT16, 7, b'\x11\x00\x07'), + (binary.TAG_INT32, 7, b'\x12\x00\x00\x00\x07'), + (binary.TAG_INT64, 7, b'\x13\x00\x00\x00\x00\x00\x00\x00\x07'), + (binary.TAG_UINT8, 254, b'\x03\xfe'), + (binary.TAG_UINT16, 65534, b'\x04\xff\xfe'), + (binary.TAG_UINT32, 4294967294, b'\x05\xff\xff\xff\xfe'), + (binary.TAG_UINT64, 18446744073709551614, b'\x06\xff\xff\xff\xff\xff\xff\xff\xfe'), + (binary.TAG_INT8, -2, b'\x10\xfe'), + (binary.TAG_INT16, -2, b'\x11\xff\xfe'), + (binary.TAG_INT32, -2, b'\x12\xff\xff\xff\xfe'), + (binary.TAG_INT64, -2, b'\x13\xff\xff\xff\xff\xff\xff\xff\xfe'), + (binary.TAG_BINARY, b'\xde\xad\xbe\xef', b'\x20\x00\x00\x00\x04\xde\xad\xbe\xef'), + (binary.TAG_UTF8, 'Lol!', b'\x21\x00\x00\x00\x04Lol!'), + (binary.TAG_UTF16, 'かわ', b'\x22\x00\x00\x00\x06\xff\xfeK0\x8f0'), + (binary.TAG_UTF32, '漢', b'\x23\x00\x00\x00\x08\xff\xfe\x00\x00"o\x00\x00'), +] + +class SerializeTests(unittest.TestCase): + def test_serialize(self): + for tag, instance, representation in EXAMPLE_REPRESENTATIONS: + self.assertEqual( + binary.serialize(binary.TaggedObject( + tag = tag, + instance = instance, + )), + representation, + ) + +class DeserializeTests(unittest.TestCase): + def test_deserialize(self): + for tag, instance, representation in EXAMPLE_REPRESENTATIONS: + self.assertEqual( + binary.deserialize(representation), + binary.TaggedObject( + tag = tag, + instance = instance, + ), + ) + +unittest.main()