11 format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
17 instance_name = attr.ib()
18 message_id = attr.ib(repr=False)
19 hostname = attr.ib(repr=False, init=False)
21 def __attrs_post_init__(self):
22 self.hostname = f'{self.instance_name}.example.net'
25 # simulating an external publisher of events
27 async def publish(queue):
28 choices = string.ascii_lowercase + string.digits
31 msg_id = str(uuid.uuid4())
32 host_id = "".join(random.choices(choices, k=4))
33 instance_name = f"cattle-{host_id}"
34 msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
36 asyncio.create_task(queue.put(msg))
37 logging.info(f"Published message {msg}")
38 # simulate randomness of publishing messages
39 await asyncio.sleep(random.random())
42 async def consume(queue):
44 # wait for an item from the publisher
45 msg = await queue.get()
46 if msg is None: # publisher is done
50 logging.info(f'Consumed {msg}')
51 # unhelpful simulation of i/o work
52 await asyncio.sleep(random.random())
56 queue = asyncio.Queue()
57 loop = asyncio.get_event_loop()
60 loop.create_task(publish(queue))
61 loop.create_task(consume(queue))
63 except KeyboardInterrupt:
64 logging.info("Process interrupted")
67 logging.info("Successfully shutdown the Mayhem service.")
70 if __name__ == '__main__':