Commit my random junk
[sandbox] / async / test.py
diff --git a/async/test.py b/async/test.py
new file mode 100644 (file)
index 0000000..1b1b014
--- /dev/null
@@ -0,0 +1,71 @@
+import asyncio
+import logging
+import random
+import string
+import uuid
+
+import attr
+
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s,%(msecs)d %(levelname)s: %(message)s',
+    datefmt='%H:%M:%S',
+)
+
+@attr.s
+class PubSubMessage:
+    instance_name = attr.ib()
+    message_id    = attr.ib(repr=False)
+    hostname      = attr.ib(repr=False, init=False)
+
+    def __attrs_post_init__(self):
+        self.hostname = f'{self.instance_name}.example.net'
+
+
+# simulating an external publisher of events
+
+async def publish(queue):
+    choices = string.ascii_lowercase + string.digits
+
+    while True:
+        msg_id = str(uuid.uuid4())
+        host_id = "".join(random.choices(choices, k=4))
+        instance_name = f"cattle-{host_id}"
+        msg = PubSubMessage(message_id=msg_id, instance_name=instance_name)
+        # publish an item
+        asyncio.create_task(queue.put(msg))
+        logging.info(f"Published message {msg}")
+        # simulate randomness of publishing messages
+        await asyncio.sleep(random.random())
+
+
+async def consume(queue):
+    while True:
+        # wait for an item from the publisher
+        msg = await queue.get()
+        if msg is None:  # publisher is done
+            break
+
+        # process the msg
+        logging.info(f'Consumed {msg}')
+        # unhelpful simulation of i/o work
+        await asyncio.sleep(random.random())
+
+
+def main():
+    queue = asyncio.Queue()
+    loop = asyncio.get_event_loop()
+
+    try:
+        loop.create_task(publish(queue))
+        loop.create_task(consume(queue))
+        loop.run_forever()
+    except KeyboardInterrupt:
+        logging.info("Process interrupted")
+    finally:
+        loop.close()
+        logging.info("Successfully shutdown the Mayhem service.")
+
+
+if __name__ == '__main__':
+    main()