--- /dev/null
+import asyncio
+import functools
+import logging
+import random
+import signal
+import string
+import uuid
+
+import attr
+
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
+ datefmt='%H:%M:%S',
+)
+
+@attr.s
+class PubSubMessage:
+ instance_name = attr.ib()
+ message_id = attr.ib(repr=False)
+ hostname = attr.ib(repr=False, init=False)
+ restarted = attr.ib(repr=False, default=False)
+ saved = attr.ib(repr=False, default=False)
+ acked = attr.ib(repr=False, default=False)
+ extended_cnt = attr.ib(repr=False, default=0)
+
+ def __attrs_post_init__(self):
+ self.hostname = f"{self.instance_name}.example.net"
+
+async def publish(queue):
+ choices = string.ascii_lowercase + string.digits
+
+ while True:
+ msg_id = str(uuid.uuid4())
+ host_id = "".join(random.choices(choices, k=4))
+ instance_name = f"cattle-{host_id}"
+ msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
+ # publish an item
+ asyncio.create_task(queue.put(msg))
+ logging.debug(f"Published message {msg}")
+ # simulate randomness of publishing messages
+ await asyncio.sleep(random.random())
+
+async def restart_host(msg):
+ # unhelpful simulation of i/o work
+ await asyncio.sleep(random.random())
+ msg.restart = True
+ logging.info(f"Restarted {msg.hostname}")
+
+async def save(msg):
+ # unhelpful simulation of i/o work
+ await asyncio.sleep(random.random())
+ msg.save = True
+ logging.info(f"Saved {msg} into database")
+
+async def cleanup(msg, event):
+ # this will block the rest of the coro until `event.set` is called
+ await event.wait()
+ # unhelpful simulation of i/o work
+ await asyncio.sleep(random.random())
+ msg.acked = True
+ logging.info(f"Done. Acked {msg}")
+
+async def extend(msg, event):
+ while not event.is_set():
+ msg.extended_cnt += 1
+ logging.info(f"Extended deadline by 3 seconds for {msg}")
+ # want to sleep for less than the deadline amount
+ await asyncio.sleep(2)
+
+async def handle_message(msg):
+ event = asyncio.Event()
+ asyncio.create_task(extend(msg, event))
+ asyncio.create_task(cleanup(msg, event))
+
+ await asyncio.gather(save(msg), restart_host(msg))
+ event.set()
+
+async def consume(queue):
+ while True:
+ msg = await queue.get()
+ logging.info(f"Consumed {msg}")
+ asyncio.create_task(handle_message(msg))
+
+def main():
+ queue = asyncio.Queue()
+ loop = asyncio.get_event_loop()
+
+ try:
+ loop.create_task(publish(queue))
+ loop.create_task(consume(queue))
+ loop.run_forever()
+ except KeyboardInterrupt:
+ logging.info("Process interrupted")
+ finally:
+ loop.close()
+ logging.info("Successfully shutdown the Mayhem service.")
+
+if __name__ == "__main__":
+ main()