TAG_UTF8 = 0x21
TAG_UTF16 = 0x22
TAG_UTF32 = 0x23
+TAG_TUPLE = 0x30
TaggedObject = collections.namedtuple(
'TaggedObject',
return serializer
+def _serialize_tuple(to):
+ assert isinstance(to.instance, tuple)
+
+ payload = b''.join(serialize(item) for item in to.instance)
+
+ fmt = '!BI'
+
+ return struct.pack('!BI', TAG_TUPLE, len(payload)) + payload
+
+
_TAGS_TO_SERIALIZERS = {
TAG_NULL: _make_tag_only_serializer(TAG_NULL, None),
TAG_TRUE: _make_tag_only_serializer(TAG_TRUE, True),
TAG_UTF8: _make_string_serializer(lambda s: s.encode('utf-8')),
TAG_UTF16: _make_string_serializer(lambda s: s.encode('utf-16')),
TAG_UTF32: _make_string_serializer(lambda s: s.encode('utf-32')),
+ TAG_TUPLE: _serialize_tuple,
}
def serialize(to):
def _make_tag_only_parser(tag, value):
def parser(b):
- return TaggedObject(tag = tag, instance = value)
+ return 0, TaggedObject(tag = tag, instance = value)
return parser
def parser(b):
b = b.read(size)
assert len(b) == size
- return TaggedObject(tag = tag, instance = unpacker(b)[0])
+ return size, TaggedObject(tag = tag, instance = unpacker(b)[0])
return parser
+_LENGTH_FMT = '!I'
+_LENGTH_FMT_SIZE = struct.calcsize(_LENGTH_FMT)
+
+def _read_length_then_payload(b):
+ length_b = b.read(_LENGTH_FMT_SIZE)
+ assert len(length_b) == _LENGTH_FMT_SIZE
+ length = struct.unpack(_LENGTH_FMT, length_b)[0]
+
+ payload = b.read(length)
+ assert len(payload) == length
+ return _LENGTH_FMT_SIZE + length, payload
+
def _make_string_deserializer(tag, decoder):
fmt = '!I'
size = struct.calcsize(fmt)
unpacker = functools.partial(struct.unpack, fmt)
def parser(b):
- length_b = b.read(size)
- assert len(length_b) == size
- length = unpacker(length_b)[0]
- s = b.read(length)
- assert len(s) == length
- return TaggedObject(tag = tag, instance = decoder(s))
+ bytes_read, payload = _read_length_then_payload(b)
+ return bytes_read, TaggedObject(tag = tag, instance = decoder(payload))
return parser
+def _deserialize_tuple(b):
+ bytes_read, payload = _read_length_then_payload(b)
+
+ payload_stream = io.BytesIO(payload)
+
+ total_bytes_read = 0
+ instance = []
+
+ while total_bytes_read < len(payload):
+ partial_bytes_read, item = _deserialize_partial(payload_stream)
+ total_bytes_read += partial_bytes_read
+ instance.append(item)
+
+ return bytes_read, TaggedObject(tag = TAG_TUPLE, instance = tuple(instance))
+
_TAGS_TO_PARSERS = {
TAG_NULL: _make_tag_only_parser(TAG_NULL, None),
TAG_TRUE: _make_tag_only_parser(TAG_TRUE, True),
TAG_UTF8: _make_string_deserializer(TAG_UTF8, lambda b: b.decode('utf-8')),
TAG_UTF16: _make_string_deserializer(TAG_UTF16, lambda b: b.decode('utf-16')),
TAG_UTF32: _make_string_deserializer(TAG_UTF32, lambda b: b.decode('utf-32')),
+ TAG_TUPLE: _deserialize_tuple,
}
+def _deserialize_partial(b):
+ tag = b.read(1)
+ assert len(tag) == 1
+ bytes_read, to = _TAGS_TO_PARSERS[tag[0]](b)
+ return bytes_read + 1, to
+
def deserialize(b):
if isinstance(b, bytes):
b = io.BytesIO(b)
- tag = b.read(1)[0]
-
- result = _TAGS_TO_PARSERS[tag](b)
+ bytes_read, result = _deserialize_partial(b)
remainder = b.read()
(binary.TaggedObject(binary.TAG_UTF8, 'Lol!'), b'\x21\x00\x00\x00\x04Lol!'),
(binary.TaggedObject(binary.TAG_UTF16, 'かわ'), b'\x22\x00\x00\x00\x06\xff\xfeK0\x8f0'),
(binary.TaggedObject(binary.TAG_UTF32, '漢'), b'\x23\x00\x00\x00\x08\xff\xfe\x00\x00"o\x00\x00'),
+ (
+ binary.TaggedObject(
+ binary.TAG_TUPLE,
+ (
+ binary.TaggedObject(
+ binary.TAG_TRUE,
+ True,
+ ),
+ binary.TaggedObject(
+ binary.TAG_UINT8,
+ 7,
+ ),
+ ),
+ ),
+ b'\x30\x00\x00\x00\x03\x01\x03\x07'
+ ),
]
class SerializeTests(unittest.TestCase):
def test_serialize(self):
- for tagged_object, representation in EXAMPLE_REPRESENTATIONS:
- self.assertEqual(
- binary.serialize(tagged_object),
- representation,
- )
+ for tagged_object, expected in EXAMPLE_REPRESENTATIONS:
+ actual = binary.serialize(tagged_object)
+ self.assertEqual(expected, actual)
class DeserializeTests(unittest.TestCase):
def test_deserialize(self):
- for tagged_object, representation in EXAMPLE_REPRESENTATIONS:
- self.assertEqual(
- binary.deserialize(representation),
- tagged_object,
- )
+ for expected, representation in EXAMPLE_REPRESENTATIONS:
+ actual = binary.deserialize(representation)
+ self.assertEqual(expected, actual)
unittest.main()