X-Git-Url: https://code.kerkeslager.com/?p=sandbox;a=blobdiff_plain;f=async%2Ftest.py;fp=async%2Ftest.py;h=1b1b01492b651d08c07abcdc7d467937dceeb59a;hp=0000000000000000000000000000000000000000;hb=968af6bc53d70e889bae92c15606212c084e0168;hpb=45ec9c36ab7241cee93e615b3c901b5b80aa7aff diff --git a/async/test.py b/async/test.py new file mode 100644 index 0000000..1b1b014 --- /dev/null +++ b/async/test.py @@ -0,0 +1,71 @@ +import asyncio +import logging +import random +import string +import uuid + +import attr + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s', + datefmt='%H:%M:%S', +) + +@attr.s +class PubSubMessage: + instance_name = attr.ib() + message_id = attr.ib(repr=False) + hostname = attr.ib(repr=False, init=False) + + def __attrs_post_init__(self): + self.hostname = f'{self.instance_name}.example.net' + + +# simulating an external publisher of events + +async def publish(queue): + choices = string.ascii_lowercase + string.digits + + while True: + msg_id = str(uuid.uuid4()) + host_id = "".join(random.choices(choices, k=4)) + instance_name = f"cattle-{host_id}" + msg = PubSubMessage(message_id=msg_id, instance_name=instance_name) + # publish an item + asyncio.create_task(queue.put(msg)) + logging.info(f"Published message {msg}") + # simulate randomness of publishing messages + await asyncio.sleep(random.random()) + + +async def consume(queue): + while True: + # wait for an item from the publisher + msg = await queue.get() + if msg is None: # publisher is done + break + + # process the msg + logging.info(f'Consumed {msg}') + # unhelpful simulation of i/o work + await asyncio.sleep(random.random()) + + +def main(): + queue = asyncio.Queue() + loop = asyncio.get_event_loop() + + try: + loop.create_task(publish(queue)) + loop.create_task(consume(queue)) + loop.run_forever() + except KeyboardInterrupt: + logging.info("Process interrupted") + finally: + loop.close() + logging.info("Successfully shutdown the Mayhem service.") + + +if __name__ == '__main__': + main()