Commit my random junk
[sandbox] / async / test.py
1 import asyncio
2 import logging
3 import random
4 import string
5 import uuid
6
7 import attr
8
9 logging.basicConfig(
10     level=logging.INFO,
11     format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
12     datefmt='%H:%M:%S',
13 )
14
15 @attr.s
16 class PubSubMessage:
17     instance_name = attr.ib()
18     message_id    = attr.ib(repr=False)
19     hostname      = attr.ib(repr=False, init=False)
20
21     def __attrs_post_init__(self):
22         self.hostname = f'{self.instance_name}.example.net'
23
24
25 # simulating an external publisher of events
26
27 async def publish(queue):
28     choices = string.ascii_lowercase + string.digits
29
30     while True:
31         msg_id = str(uuid.uuid4())
32         host_id = "".join(random.choices(choices, k=4))
33         instance_name = f"cattle-{host_id}"
34         msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
35         # publish an item
36         asyncio.create_task(queue.put(msg))
37         logging.info(f"Published message {msg}")
38         # simulate randomness of publishing messages
39         await asyncio.sleep(random.random())
40
41
42 async def consume(queue):
43     while True:
44         # wait for an item from the publisher
45         msg = await queue.get()
46         if msg is None:  # publisher is done
47             break
48
49         # process the msg
50         logging.info(f'Consumed {msg}')
51         # unhelpful simulation of i/o work
52         await asyncio.sleep(random.random())
53
54
55 def main():
56     queue = asyncio.Queue()
57     loop = asyncio.get_event_loop()
58
59     try:
60         loop.create_task(publish(queue))
61         loop.create_task(consume(queue))
62         loop.run_forever()
63     except KeyboardInterrupt:
64         logging.info("Process interrupted")
65     finally:
66         loop.close()
67         logging.info("Successfully shutdown the Mayhem service.")
68
69
70 if __name__ == '__main__':
71     main()