X-Git-Url: https://code.kerkeslager.com/?p=sandbox;a=blobdiff_plain;f=async%2Ftest1.py;fp=async%2Ftest1.py;h=7b640429bba00ba41f5fe403732c74f6b502bd43;hp=0000000000000000000000000000000000000000;hb=968af6bc53d70e889bae92c15606212c084e0168;hpb=45ec9c36ab7241cee93e615b3c901b5b80aa7aff diff --git a/async/test1.py b/async/test1.py new file mode 100644 index 0000000..7b64042 --- /dev/null +++ b/async/test1.py @@ -0,0 +1,100 @@ +import asyncio +import functools +import logging +import random +import signal +import string +import uuid + +import attr + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s', + datefmt='%H:%M:%S', +) + +@attr.s +class PubSubMessage: + instance_name = attr.ib() + message_id = attr.ib(repr=False) + hostname = attr.ib(repr=False, init=False) + restarted = attr.ib(repr=False, default=False) + saved = attr.ib(repr=False, default=False) + acked = attr.ib(repr=False, default=False) + extended_cnt = attr.ib(repr=False, default=0) + + def __attrs_post_init__(self): + self.hostname = f"{self.instance_name}.example.net" + +async def publish(queue): + choices = string.ascii_lowercase + string.digits + + while True: + msg_id = str(uuid.uuid4()) + host_id = "".join(random.choices(choices, k=4)) + instance_name = f"cattle-{host_id}" + msg = PubSubMessage(message_id=msg_id, instance_name=instance_name) + # publish an item + asyncio.create_task(queue.put(msg)) + logging.debug(f"Published message {msg}") + # simulate randomness of publishing messages + await asyncio.sleep(random.random()) + +async def restart_host(msg): + # unhelpful simulation of i/o work + await asyncio.sleep(random.random()) + msg.restart = True + logging.info(f"Restarted {msg.hostname}") + +async def save(msg): + # unhelpful simulation of i/o work + await asyncio.sleep(random.random()) + msg.save = True + logging.info(f"Saved {msg} into database") + +async def cleanup(msg, event): + # this will block the rest of the coro until `event.set` is called + await event.wait() + # unhelpful simulation of i/o work + await asyncio.sleep(random.random()) + msg.acked = True + logging.info(f"Done. Acked {msg}") + +async def extend(msg, event): + while not event.is_set(): + msg.extended_cnt += 1 + logging.info(f"Extended deadline by 3 seconds for {msg}") + # want to sleep for less than the deadline amount + await asyncio.sleep(2) + +async def handle_message(msg): + event = asyncio.Event() + asyncio.create_task(extend(msg, event)) + asyncio.create_task(cleanup(msg, event)) + + await asyncio.gather(save(msg), restart_host(msg)) + event.set() + +async def consume(queue): + while True: + msg = await queue.get() + logging.info(f"Consumed {msg}") + asyncio.create_task(handle_message(msg)) + +def main(): + queue = asyncio.Queue() + loop = asyncio.get_event_loop() + + try: + loop.create_task(publish(queue)) + loop.create_task(consume(queue)) + loop.run_forever() + except KeyboardInterrupt: + logging.info("Process interrupted") + finally: + loop.close() + logging.info("Successfully shutdown the Mayhem service.") + +if __name__ == "__main__": + main()