Commit my random junk
[sandbox] / async / test1.py
1 import asyncio
2 import functools
3 import logging
4 import random
5 import signal
6 import string
7 import uuid
8
9 import attr
10
11 logging.basicConfig(
12     level=logging.INFO,
13     format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
14     datefmt='%H:%M:%S',
15 )
16
17 @attr.s
18 class PubSubMessage:
19     instance_name = attr.ib()
20     message_id    = attr.ib(repr=False)
21     hostname      = attr.ib(repr=False, init=False)
22     restarted     = attr.ib(repr=False, default=False)
23     saved         = attr.ib(repr=False, default=False)
24     acked         = attr.ib(repr=False, default=False)
25     extended_cnt  = attr.ib(repr=False, default=0)
26
27     def __attrs_post_init__(self):
28         self.hostname = f"{self.instance_name}.example.net"
29
30 async def publish(queue):
31     choices = string.ascii_lowercase + string.digits
32
33     while True:
34         msg_id = str(uuid.uuid4())
35         host_id = "".join(random.choices(choices, k=4))
36         instance_name = f"cattle-{host_id}"
37         msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
38         # publish an item
39         asyncio.create_task(queue.put(msg))
40         logging.debug(f"Published message {msg}")
41         # simulate randomness of publishing messages
42         await asyncio.sleep(random.random())
43
44 async def restart_host(msg):
45     # unhelpful simulation of i/o work
46     await asyncio.sleep(random.random())
47     msg.restart = True
48     logging.info(f"Restarted {msg.hostname}")
49
50 async def save(msg):
51     # unhelpful simulation of i/o work
52     await asyncio.sleep(random.random())
53     msg.save = True
54     logging.info(f"Saved {msg} into database")
55
56 async def cleanup(msg, event):
57     # this will block the rest of the coro until `event.set` is called
58     await event.wait()
59     # unhelpful simulation of i/o work
60     await asyncio.sleep(random.random())
61     msg.acked = True
62     logging.info(f"Done. Acked {msg}")
63
64 async def extend(msg, event):
65     while not event.is_set():
66         msg.extended_cnt += 1
67         logging.info(f"Extended deadline by 3 seconds for {msg}")
68         # want to sleep for less than the deadline amount
69         await asyncio.sleep(2)
70
71 async def handle_message(msg):
72     event = asyncio.Event()
73     asyncio.create_task(extend(msg, event))
74     asyncio.create_task(cleanup(msg, event))
75
76     await asyncio.gather(save(msg), restart_host(msg))
77     event.set()
78
79 async def consume(queue):
80     while True:
81         msg = await queue.get()
82         logging.info(f"Consumed {msg}")
83         asyncio.create_task(handle_message(msg))
84
85 def main():
86     queue = asyncio.Queue()
87     loop = asyncio.get_event_loop()
88
89     try:
90         loop.create_task(publish(queue))
91         loop.create_task(consume(queue))
92         loop.run_forever()
93     except KeyboardInterrupt:
94         logging.info("Process interrupted")
95     finally:
96         loop.close()
97         logging.info("Successfully shutdown the Mayhem service.")
98
99 if __name__ == "__main__":
100     main()