Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the demo #28

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ aiomsg

Pure-Python smart sockets (like ZMQ) for simpler networking

.. figure:: https://upload.wikimedia.org/wikipedia/commons/5/5e/NetworkDecentral.svg
:target: https://commons.wikimedia.org/wiki/File:NetworkDecentral.svg
:alt: Diagram of computers linked up in a network

:sub:`Attribution: And1mu [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)]`


Table of Contents
-----------------

Expand All @@ -46,6 +39,43 @@ Table of Contents
Demo
====

Put on your *Software Architect* hat, and imagine a microservices layout
shown in this block diagram:

.. figure:: https://raw.githubusercontent.com/cjrh/aiomsg/master/images/microservices.svg?sanitize=true
:alt: Layout of an example microservices architecture

- One of more features are exposed to the world via the *load balancer*, **N**.
For example, this could be *nginx*.
- Imagine the load balancer proxies HTTP requests through to your backend
webservers, **H**. Your webserver may well do some processing itself, but
imagine further that it needs information from other microservices to
service some requests.
- Both instances of **H** are identical, there are two of them for
redundancy.
- One of these microservices is **A**. It's not important what it does, just
that it does "something".
- It turns out that sometimes **A** needs information supplied by another
microservice, **B**. Both **A** and **B** need to do work so it's important
that they can both be scaled horizontally (ignore that "horizontal scaling"
would actually be in a vertical direction in the diagram!).

The goal of **aiomsg** is to make it simple to construct these kinds of
arrangements of microservices.

We'll move through each of the services and look at their code:

.. literalinclude:: examples/demo/h.py
:language: python3

TODO: microservice A
TODO: microservice P
TODO: microservice B
TODO: microservice M

Simple Demo
===========

Let's make two microservices; one will send the current time to the other.
Here's the end that binds to a port (a.k.a, the "server"):

Expand Down Expand Up @@ -934,7 +964,7 @@ These are the rules:

#. **Every payload** in either direction shall be length-prefixed:

.. code-block::
.. code-block:: shell

message = [4-bytes big endian int32][payload]

Expand Down
17 changes: 17 additions & 0 deletions docker/Dockerfile38.demo
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.8-alpine3.10

RUN apk add --update \
openssl \
gcc \
python-dev \
musl-dev \
linux-headers \
libffi-dev \
libressl-dev \
&& rm -rf /var/cache/apk/*

COPY . /aiomsg

RUN pip install -e aiomsg[all]
WORKDIR /aiomsg
CMD ["python", "-m", "pytest", "--cov", "aiomsg", "--cov-report", "term-missing"]
13 changes: 13 additions & 0 deletions examples/demo/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '3'
services:
h:
build: .
ports:
- "5000:5000"
volumes:
- .:/code
- logvolume01:/var/log
links:
- redis
redis:
image: redis
10 changes: 10 additions & 0 deletions examples/demo/h.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3

WORKDIR /usr/src/app

#COPY requirements.txt ./
RUN pip install --no-cache-dir aiomsg

COPY . .

CMD [ "python", "./your-daemon-or-script.py" ]
66 changes: 66 additions & 0 deletions examples/demo/h.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# microservice: H
import asyncio
import json
import time
from typing import Dict

from aiomsg import Søcket
from aiohttp import web
from dataclasses import dataclass


@dataclass
class Payload:
msg_id: int
req: Dict
resp: Dict


REQ_COUNTER = 0
BACKEND_QUEUE = asyncio.Queue() # TODO: must be inside async context
pending_backend_requests: {}


async def backend_receiver(sock: Søcket):
async for msg in sock.messages():
raw_data = json.loads(msg)
data = Payload(**raw_data)
f = pending_backend_requests.pop(data.msg_id)
f.set_result(data.body)


async def backend(app):
async with Søcket() as sock:
await sock.bind("127.0.0.1", 25000)
asyncio.create_task(backend_receiver(sock))
while True:
await sock.send(time.ctime().encode())
await asyncio.sleep(1)


async def run_backend_job(sock: Søcket, data: Payload) -> Payload:
f = asyncio.Future()
REQ_COUNTER += 1
pending_backend_requests[REQ_COUNTER] = f
backend_response = await f
data = dict(result=backend_response)


async def handle(request):
nonlocal REQ_COUNTER
# TODO: get post data from request to send to backend

f = asyncio.Future()
REQ_COUNTER += 1
pending_backend_requests[REQ_COUNTER] = f
backend_response = await f
data = dict(result=backend_response)
return web.json_response(data)


app = web.Application()
app.on_startup.append(backend)
app.add_routes([web.post("/process/{name}", handle)])

if __name__ == "__main__":
web.run_app(app)
2 changes: 2 additions & 0 deletions examples/lb/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Load balancer in which each of the workers asks for work. This is much better
than naive round-robin message distribution.
22 changes: 22 additions & 0 deletions examples/lb/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio
import logging
import aiorun
import aiomsg
from random import randint


async def main():
async with aiomsg.Søcket() as sock:
await sock.bind()
async for id_, msg in sock.identity_messages():
payload = dict(n=randint(30, 40))
logging.info(f"Sending message: {payload}")
await sock.send_json(payload, identity=id_)
await asyncio.sleep(1.0)


logging.basicConfig(level="INFO")
try:
aiorun.run(main())
except KeyboardInterrupt:
pass
1 change: 1 addition & 0 deletions examples/lb/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aiorun
38 changes: 38 additions & 0 deletions examples/lb/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Example: load balancer

import os
import sys
import shlex
import signal
from pathlib import Path
import subprocess as sp

this_dir = Path(__file__).parent


def make_arg(cmdline):
return shlex.split(cmdline, posix=False)


# Start producer
s = f"{sys.executable} {this_dir / 'producer.py'}"
print(s)
producer = sp.Popen(make_arg(s))

# Start worker
s = f"{sys.executable} {this_dir / 'worker.py'}"
worker = sp.Popen(make_arg(s))

with producer, worker:
try:
producer.wait(20)
worker.wait(20)
except (KeyboardInterrupt, sp.TimeoutExpired):
producer.send_signal(signal.CTRL_C_EVENT)
worker.send_signal(signal.CTRL_C_EVENT)

# try:
# producer.wait(20)
# worker.wait(20)
# except KeyboardInterrupt:
# pass
53 changes: 53 additions & 0 deletions examples/lb/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
import logging
import signal

import aiorun
import aiomsg
from concurrent.futures import ProcessPoolExecutor as Executor


def fib(n):
if n < 2:
return n
return fib(n - 2) + fib(n - 1)


async def fetch_work(sock: aiomsg.Søcket) -> dict:
await sock.send(b"1")
work = await sock.recv_json()
return work


async def main(executor):
async with aiomsg.Søcket() as sock:
await sock.connect()
while True:
# "Fetching" is actually a two-step process, a send followed
# by a receive, and we don't want to allow shutdown between
# those two operations. That's why there's a guard.
work = await fetch_work(sock)
logging.info(f"Worker received work: {work}")
# CPU-bound task MUST be run in an executor, otherwise
# heartbeats inside aiomsg will fail.
executor_job = asyncio.get_running_loop().run_in_executor(
executor, fib, work["n"]
)
result = await executor_job
logging.info(f"Job completed, the answer is: {result}")


def initializer():
"""Disable the handler for KeyboardInterrupt in the pool of
executors.

NOTE that the initializer must not be inside the __name__ guard,
since the child processes need to be able to execute it. """
signal.signal(signal.SIGINT, signal.SIG_IGN)


# This guard is SUPER NECESSARY if using ProcessPoolExecutor on Windows
if __name__ == "__main__":
logging.basicConfig(level="INFO")
executor = Executor(max_workers=4, initializer=initializer)
aiorun.run(main(executor))
9 changes: 9 additions & 0 deletions examples/overlapped/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
This is a basic test of plain socket connectivity and CTRL-C handling.
It should be possible to execute the run.py launcher which will start up
two subprocesses, one of which is a server and one is a client. And
when you press CTRL-C, everything should shut down cleanly without
tracebacks.

If you execute run.py in a PyCharm run configuration, make sure to enable
the "Emulate terminal in output console" option. This will allow you
to press CTRL-C to trigger the KeyboardInterrupt.
27 changes: 27 additions & 0 deletions examples/overlapped/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
from asyncio import StreamReader, StreamWriter


async def cb(reader: StreamReader, writer: StreamWriter):
try:
while True:
data = await reader.read(100)
print(data)
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()


async def main():
server = await asyncio.start_server(cb, host="127.0.0.1", port=12345)
async with server:
try:
await server.serve_forever()
except asyncio.CancelledError:
print("server cancelled")


try:
asyncio.run(main())
except KeyboardInterrupt:
pass
1 change: 1 addition & 0 deletions examples/overlapped/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aiorun
35 changes: 35 additions & 0 deletions examples/overlapped/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Example: load balancer

import os
import sys
import shlex
import signal
from pathlib import Path
import subprocess as sp

this_dir = Path(__file__).parent


def make_arg(cmdline):
return shlex.split(cmdline, posix=False)


# Start producer
s = f"{sys.executable} {this_dir / 'producer.py'}"
print(s)
producer = sp.Popen(make_arg(s))

# Start worker
s = f"{sys.executable} {this_dir / 'worker.py'}"
worker = sp.Popen(make_arg(s))

with producer, worker:
try:
producer.wait(100)
worker.wait(100)
except (KeyboardInterrupt, sp.TimeoutExpired):
producer.send_signal(signal.CTRL_C_EVENT)
worker.send_signal(signal.CTRL_C_EVENT)

producer.wait(100)
worker.wait(100)
22 changes: 22 additions & 0 deletions examples/overlapped/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio


async def main():
print("worker connecting")
reader, writer = await asyncio.open_connection("127.0.0.1", 12345)
print("worker connected")
try:
while True:
writer.write(b"blah")
await writer.drain()
print("sent data")
await asyncio.sleep(1.0)
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()


try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Loading