--- /dev/null
+import asyncio
+import logging
+import random
+import string
+import uuid
+
+import attr
+
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
+ datefmt='%H:%M:%S',
+)
+
+@attr.s
+class PubSubMessage:
+ instance_name = attr.ib()
+ message_id = attr.ib(repr=False)
+ hostname = attr.ib(repr=False, init=False)
+
+ def __attrs_post_init__(self):
+ self.hostname = f'{self.instance_name}.example.net'
+
+
+# simulating an external publisher of events
+
+async def publish(queue):
+ choices = string.ascii_lowercase + string.digits
+
+ while True:
+ msg_id = str(uuid.uuid4())
+ host_id = "".join(random.choices(choices, k=4))
+ instance_name = f"cattle-{host_id}"
+ msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
+ # publish an item
+ asyncio.create_task(queue.put(msg))
+ logging.info(f"Published message {msg}")
+ # simulate randomness of publishing messages
+ await asyncio.sleep(random.random())
+
+
+async def consume(queue):
+ while True:
+ # wait for an item from the publisher
+ msg = await queue.get()
+ if msg is None: # publisher is done
+ break
+
+ # process the msg
+ logging.info(f'Consumed {msg}')
+ # unhelpful simulation of i/o work
+ await asyncio.sleep(random.random())
+
+
+def main():
+ queue = asyncio.Queue()
+ loop = asyncio.get_event_loop()
+
+ try:
+ loop.create_task(publish(queue))
+ loop.create_task(consume(queue))
+ loop.run_forever()
+ except KeyboardInterrupt:
+ logging.info("Process interrupted")
+ finally:
+ loop.close()
+ logging.info("Successfully shutdown the Mayhem service.")
+
+
+if __name__ == '__main__':
+ main()