13 format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
19 instance_name = attr.ib()
20 message_id = attr.ib(repr=False)
21 hostname = attr.ib(repr=False, init=False)
22 restarted = attr.ib(repr=False, default=False)
23 saved = attr.ib(repr=False, default=False)
24 acked = attr.ib(repr=False, default=False)
25 extended_cnt = attr.ib(repr=False, default=0)
27 def __attrs_post_init__(self):
28 self.hostname = f"{self.instance_name}.example.net"
30 async def publish(queue):
31 choices = string.ascii_lowercase + string.digits
34 msg_id = str(uuid.uuid4())
35 host_id = "".join(random.choices(choices, k=4))
36 instance_name = f"cattle-{host_id}"
37 msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
39 asyncio.create_task(queue.put(msg))
40 logging.debug(f"Published message {msg}")
41 # simulate randomness of publishing messages
42 await asyncio.sleep(random.random())
44 async def restart_host(msg):
45 # unhelpful simulation of i/o work
46 await asyncio.sleep(random.random())
48 logging.info(f"Restarted {msg.hostname}")
51 # unhelpful simulation of i/o work
52 await asyncio.sleep(random.random())
54 logging.info(f"Saved {msg} into database")
56 async def cleanup(msg, event):
57 # this will block the rest of the coro until `event.set` is called
59 # unhelpful simulation of i/o work
60 await asyncio.sleep(random.random())
62 logging.info(f"Done. Acked {msg}")
64 async def extend(msg, event):
65 while not event.is_set():
67 logging.info(f"Extended deadline by 3 seconds for {msg}")
68 # want to sleep for less than the deadline amount
69 await asyncio.sleep(2)
71 async def handle_message(msg):
72 event = asyncio.Event()
73 asyncio.create_task(extend(msg, event))
74 asyncio.create_task(cleanup(msg, event))
76 await asyncio.gather(save(msg), restart_host(msg))
79 async def consume(queue):
81 msg = await queue.get()
82 logging.info(f"Consumed {msg}")
83 asyncio.create_task(handle_message(msg))
86 queue = asyncio.Queue()
87 loop = asyncio.get_event_loop()
90 loop.create_task(publish(queue))
91 loop.create_task(consume(queue))
93 except KeyboardInterrupt:
94 logging.info("Process interrupted")
97 logging.info("Successfully shutdown the Mayhem service.")
99 if __name__ == "__main__":