extras: add example for asyncio

This commit is contained in:
Zbigniew Jędrzejewski-Szmek 2024-08-29 19:21:59 +03:00
parent b52786f134
commit a71161f4c6
4 changed files with 292 additions and 0 deletions

View file

@ -0,0 +1,41 @@
This is an example of "parallel" processing using `asyncio`.
We have the following work plan to prepare a dako:
1. Fetch dako rusk
2. Fetch olive oil
3. Fetch tomato
4. Chop tomato
5. Fetch feta
6. Water rusk
7. Oil rusk
8. Put tomato and feta on rusk
9. Store ready dako on countertop
Note that certain steps depend on earlier steps,
e.g. 3→4, 1→6, 6→7, (7, 4, 2)→8, 8→9.
File `kitchen_serial.py` has a straighforward Python implementation.
Execute it as:
python kitchen_asyncio.py 5
(This creates 5 dakos, one after each other.)
File `kitchen_asyncio_naive.py` has a version which was converted
to use `asyncio` and the jobs will be executed via the `asynio`
task queue, but it will still run serially, because we wait for
each task to be finished before continuing.
Execute it as:
python kitchen_asyncio_naive.py 5
(This creates 5 dakos, one after each other, using `asyncio`.)
File `kitchen_asyncio_async.py` has a version adds effective
parallelization by starting independent tasks in parallel and only
awaiting when necessary for subsequent steps. Some steps were split
out of the big function `prepare_dako`, to allow them to be awaited
separately.
Execute it as:
python kitchen_asyncio_async.py 5
(This creates 5 dakos, one after each other, using `asyncio`.)

View file

@ -0,0 +1,93 @@
import argparse
import asyncio
import random
import sys
from kitchen_serial import parse_args
async def do_work(duration, result):
t = random.uniform(duration/5 * 0.5, duration/5 * 1.5)
# t = duration / 5
await asyncio.sleep(t)
return result
async def fetch_olive_oil():
print('Fetching olive oil…')
return await do_work(5, 'oil')
async def fetch_dako_rusk():
print('Fetching dako rusk…')
return await do_work(5, 'rusk')
async def fetch_tomato():
print('Fetching tomato…')
return await do_work(3, 'tomato')
async def fetch_feta():
print('Fetching feta…')
return await do_work(3, 'feta')
async def chop_tomato(tomato):
assert tomato == 'tomato'
print('Chopping tomato…')
return await do_work(1, f'chopped {tomato}')
async def water_rusk(rusk):
assert rusk == 'rusk'
print('Watering rusk…')
return await do_work(0.2, f'wet {rusk}')
async def oil_rusk(rusk, oil):
assert rusk == 'wet rusk'
assert oil == 'oil'
print(f'Pouring {oil} on {rusk}')
return await do_work(0.5, f'{rusk} with {oil}')
async def decorate_rusk(rusk, *toppings):
result = rusk
for topping in toppings:
print(f'Putting {topping} on {result}')
result = await do_work(1, f'{result} with {topping}')
return result
async def store_dako(dako):
print(f'Storing {dako}')
await do_work(1, None)
async def prepare_rusk():
rusk = await fetch_dako_rusk()
rusk = await water_rusk(rusk)
return rusk
async def prepare_tomato():
tomato = await fetch_tomato()
tomato = await chop_tomato(tomato)
return tomato
async def prepare_oiled_rusk():
rusk = prepare_rusk()
oil = fetch_olive_oil()
rusk, oil = await asyncio.gather(rusk, oil)
return await oil_rusk(rusk, oil)
async def prepare_dako():
print('Making dako…')
rusk = prepare_oiled_rusk()
tomato = prepare_tomato()
feta = fetch_feta()
parts = await asyncio.gather(rusk, tomato, feta)
dako = await decorate_rusk(*parts)
await store_dako(dako)
print(f'{dako} is ready!')
async def prepare_dakos(n_dakos):
tasks = [prepare_dako()
for n in range(args.n_dakos)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
args = parse_args()
asyncio.run(
prepare_dakos(args.n_dakos)
)

View file

@ -0,0 +1,75 @@
import asyncio
import random
from kitchen_serial import parse_args
async def do_work(duration, result):
t = random.uniform(duration/5 * 0.5, duration/5 * 1.5)
await asyncio.sleep(t)
return result
async def fetch_olive_oil():
print('Fetching olive oil…')
return await do_work(5, 'oil')
async def fetch_dako_rusk():
print('Fetching dako rusk…')
return await do_work(5, 'rusk')
async def fetch_tomato():
print('Fetching tomato…')
return await do_work(3, 'tomato')
async def fetch_feta():
print('Fetching feta…')
return await do_work(3, 'feta')
async def chop_tomato(tomato):
assert tomato == 'tomato'
print('Chopping tomato…')
return await do_work(1, f'chopped {tomato}')
async def water_rusk(rusk):
assert rusk == 'rusk'
print('Watering rusk…')
return await do_work(0.2, f'wet {rusk}')
async def oil_rusk(rusk, oil):
assert rusk == 'wet rusk'
assert oil == 'oil'
print(f'Pouring {oil} on {rusk}')
return await do_work(0.5, f'{rusk} with {oil}')
async def decorate_rusk(rusk, *toppings):
result = rusk
for topping in toppings:
print(f'Putting {topping} on {result}')
result = await do_work(1, f'{result} with {topping}')
return result
async def store_dako(dako):
print(f'Storing {dako}')
await do_work(1, None)
async def prepare_dako():
print('Making dako…')
oil = await fetch_olive_oil()
rusk = await fetch_dako_rusk()
tomato = await fetch_tomato()
tomato = await chop_tomato(tomato)
rusk = await water_rusk(rusk)
rusk = await oil_rusk(rusk, oil)
feta = await fetch_feta()
dako = await decorate_rusk(rusk, tomato, feta)
await store_dako(dako)
print(f'{dako} is ready!')
async def prepare_dakos(n_dakos):
for n in range(args.n_dakos):
await prepare_dako()
if __name__ == '__main__':
args = parse_args()
asyncio.run(
prepare_dakos(args.n_dakos)
)

View file

@ -0,0 +1,83 @@
import argparse
import random
import time
def do_work(duration, result):
t = random.uniform(duration/5 * 0.5, duration/5 * 1.5)
time.sleep(t)
return result
def fetch_olive_oil():
print('Fetching olive oil…')
return do_work(5, 'oil')
def fetch_dako_rusk():
print('Fetching dako rusk…')
return do_work(5, 'rusk')
def fetch_tomato():
print('Fetching tomato…')
return do_work(3, 'tomato')
def fetch_feta():
print('Fetching feta…')
return do_work(3, 'feta')
def chop_tomato(tomato):
assert tomato == 'tomato'
print('Chopping tomato…')
return do_work(1, f'chopped {tomato}')
def water_rusk(rusk):
assert rusk == 'rusk'
print('Watering rusk…')
return do_work(0.2, f'wet {rusk}')
def oil_rusk(rusk, oil):
assert rusk == 'wet rusk'
assert oil == 'oil'
print(f'Pouring {oil} on {rusk}')
return do_work(0.5, f'{rusk} with {oil}')
def decorate_rusk(rusk, *toppings):
result = rusk
for topping in toppings:
print(f'Putting {topping} on {result}')
result = do_work(1, f'{result} with {topping}')
return result
def store_dako(dako):
print(f'Storing {dako}')
do_work(1, None)
def prepare_dako():
print('Making dako…')
oil = fetch_olive_oil()
rusk = fetch_dako_rusk()
tomato = fetch_tomato()
tomato = chop_tomato(tomato)
rusk = water_rusk(rusk)
rusk = oil_rusk(rusk, oil)
feta = fetch_feta()
dako = decorate_rusk(rusk, tomato, feta)
store_dako(dako)
print(f'{dako} is ready!')
def prepare_dakos(n_dakos):
for n in range(args.n_dakos):
prepare_dako()
def parse_args():
p = argparse.ArgumentParser()
p.add_argument(
'n_dakos',
type=int,
nargs='?',
default=1,
help='How many dakos to make?',
)
return p.parse_args()
if __name__ == '__main__':
args = parse_args()
prepare_dakos(args.n_dakos)