Skip to content

Commit

Permalink
More demo code to show off different things
Browse files Browse the repository at this point in the history
  • Loading branch information
cjrh committed Feb 10, 2020
1 parent 9ed5f29 commit 6c5e288
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 0 deletions.
2 changes: 2 additions & 0 deletions examples/lb/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Load balancer in which each of the workers asks for work. This is much better
than naive round-robin message distribution.
22 changes: 22 additions & 0 deletions examples/lb/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio
import logging
import aiorun
import aiomsg
from random import randint


async def main():
async with aiomsg.Søcket() as sock:
await sock.bind()
async for id_, msg in sock.identity_messages():
payload = dict(n=randint(30, 40))
logging.info(f"Sending message: {payload}")
await sock.send_json(payload, identity=id_)
await asyncio.sleep(1.0)


logging.basicConfig(level="INFO")
try:
aiorun.run(main())
except KeyboardInterrupt:
pass
1 change: 1 addition & 0 deletions examples/lb/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aiorun
38 changes: 38 additions & 0 deletions examples/lb/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Example: load balancer

import os
import sys
import shlex
import signal
from pathlib import Path
import subprocess as sp

this_dir = Path(__file__).parent


def make_arg(cmdline):
return shlex.split(cmdline, posix=False)


# Start producer
s = f"{sys.executable} {this_dir / 'producer.py'}"
print(s)
producer = sp.Popen(make_arg(s))

# Start worker
s = f"{sys.executable} {this_dir / 'worker.py'}"
worker = sp.Popen(make_arg(s))

with producer, worker:
try:
producer.wait(20)
worker.wait(20)
except (KeyboardInterrupt, sp.TimeoutExpired):
producer.send_signal(signal.CTRL_C_EVENT)
worker.send_signal(signal.CTRL_C_EVENT)

# try:
# producer.wait(20)
# worker.wait(20)
# except KeyboardInterrupt:
# pass
53 changes: 53 additions & 0 deletions examples/lb/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
import logging
import signal

import aiorun
import aiomsg
from concurrent.futures import ProcessPoolExecutor as Executor


def fib(n):
if n < 2:
return n
return fib(n - 2) + fib(n - 1)


async def fetch_work(sock: aiomsg.Søcket) -> dict:
await sock.send(b"1")
work = await sock.recv_json()
return work


async def main(executor):
async with aiomsg.Søcket() as sock:
await sock.connect()
while True:
# "Fetching" is actually a two-step process, a send followed
# by a receive, and we don't want to allow shutdown between
# those two operations. That's why there's a guard.
work = await fetch_work(sock)
logging.info(f"Worker received work: {work}")
# CPU-bound task MUST be run in an executor, otherwise
# heartbeats inside aiomsg will fail.
executor_job = asyncio.get_running_loop().run_in_executor(
executor, fib, work["n"]
)
result = await executor_job
logging.info(f"Job completed, the answer is: {result}")


def initializer():
"""Disable the handler for KeyboardInterrupt in the pool of
executors.
NOTE that the initializer must not be inside the __name__ guard,
since the child processes need to be able to execute it. """
signal.signal(signal.SIGINT, signal.SIG_IGN)


# This guard is SUPER NECESSARY if using ProcessPoolExecutor on Windows
if __name__ == "__main__":
logging.basicConfig(level="INFO")
executor = Executor(max_workers=4, initializer=initializer)
aiorun.run(main(executor))
9 changes: 9 additions & 0 deletions examples/overlapped/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
This is a basic test of plain socket connectivity and CTRL-C handling.
It should be possible to execute the run.py launcher which will start up
two subprocesses, one of which is a server and one is a client. And
when you press CTRL-C, everything should shut down cleanly without
tracebacks.

If you execute run.py in a PyCharm run configuration, make sure to enable
the "Emulate terminal in output console" option. This will allow you
to press CTRL-C to trigger the KeyboardInterrupt.
27 changes: 27 additions & 0 deletions examples/overlapped/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
from asyncio import StreamReader, StreamWriter


async def cb(reader: StreamReader, writer: StreamWriter):
try:
while True:
data = await reader.read(100)
print(data)
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()


async def main():
server = await asyncio.start_server(cb, host="127.0.0.1", port=12345)
async with server:
try:
await server.serve_forever()
except asyncio.CancelledError:
print("server cancelled")


try:
asyncio.run(main())
except KeyboardInterrupt:
pass
1 change: 1 addition & 0 deletions examples/overlapped/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aiorun
35 changes: 35 additions & 0 deletions examples/overlapped/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Example: load balancer

import os
import sys
import shlex
import signal
from pathlib import Path
import subprocess as sp

this_dir = Path(__file__).parent


def make_arg(cmdline):
return shlex.split(cmdline, posix=False)


# Start producer
s = f"{sys.executable} {this_dir / 'producer.py'}"
print(s)
producer = sp.Popen(make_arg(s))

# Start worker
s = f"{sys.executable} {this_dir / 'worker.py'}"
worker = sp.Popen(make_arg(s))

with producer, worker:
try:
producer.wait(100)
worker.wait(100)
except (KeyboardInterrupt, sp.TimeoutExpired):
producer.send_signal(signal.CTRL_C_EVENT)
worker.send_signal(signal.CTRL_C_EVENT)

producer.wait(100)
worker.wait(100)
22 changes: 22 additions & 0 deletions examples/overlapped/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio


async def main():
print("worker connecting")
reader, writer = await asyncio.open_connection("127.0.0.1", 12345)
print("worker connected")
try:
while True:
writer.write(b"blah")
await writer.drain()
print("sent data")
await asyncio.sleep(1.0)
except asyncio.CancelledError:
writer.close()
await writer.wait_closed()


try:
asyncio.run(main())
except KeyboardInterrupt:
pass

0 comments on commit 6c5e288

Please sign in to comment.