Reactive stream in pure python
Recently i wanted to feel the hype about all the reactive stuff. So I've read the reactive manifesto and start wondering how i should design a component which would integrate such reactive infrastructure, and how to code this.
My goal was to create a component integrated in a message driven architecture. My component had to consume message from a message queue and process it, so fairly basic stuff. And the question was “How do i do this in a proper, neat, pythonic way ?”.
I started using RxPy which is super cool, but it didn’t felt very pythonic to me. Ok it allowed me to use lambda and define a processing pipeline very easily. But at the end, i had the feeling of coding more in a Javascript way but with Python syntax. So i started wondering, How could i do this in pure python ? And the answer was in front of me, since Python 3.5 (3.4 in fact, but far easier in 3.5) : https://docs.python.org/3/library/asyncio.html !
This article wont be focused on what is python async library or how to use it, you will find far better article on this like the excellent introduction on async : Asynchronous Python.
So, let’s start. What are the requirements of my component ?
- Be asynchronous
- Integrate a message driven architecture inside my component
- Handle back-pressure inside my component
- Be pure python
Let’s code !
So we will design a standard test scenario where a producer produce message faster than a consumer can consumer. The work of the consumer will be represented by a 0.1 seconds sleep. This will put in light the back-pressure feature. The message will be a 1kb random string. Note that the closing is not very clean and you will get warning it you run this code. This is due to the fact that the producer will produce for ever and we’re closing it’s event loop.
The key is the main_queue = Queue(50) here. It will block the event loop until free slot is available in queue.
If you run this, the consumer will consume 50 items (0.1 message/s * 5s), but the interesting part is that python will put the producer on hold to let time to message to be consumed. It’s not a thread sleep, only a small, lightweight async wait.
To compare, if we use main_queue = Queue(), no limit on queue, on my I7, 491 686 messages are stored in the queue after 5s. With 1kb per message, it represent 491Mb.
Python async and async queue are really great tools to do really great jobs, don’t hesitate to try them !