upload exercises
27
Exercise2c/OUTDATED_process_image.py
Executable file
|
@ -0,0 +1,27 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# This module provides the magic_signature() function, safely and
|
||||||
|
# securely calculated to provide invaluable signature of an image.
|
||||||
|
|
||||||
|
def magic(fname):
|
||||||
|
n_threads = os.getenv('OMP_NUM_THREADS', '(unset)')
|
||||||
|
print(f"Worker {fname=} OMP_NUM_THREADS={n_threads}")
|
||||||
|
|
||||||
|
# We delay the import of numpy because we want to set OMP_NUM_THREADS.
|
||||||
|
# We delay the import of PIL in case is uses numpy internally.
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
im = Image.open(fname)
|
||||||
|
try:
|
||||||
|
m = np.median(im, axis=2)
|
||||||
|
except np.AxisError:
|
||||||
|
return -1
|
||||||
|
n = (m @ m.T).mean()
|
||||||
|
return int(n)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
for fname in sys.argv[1:]:
|
||||||
|
print(f'{fname} → {magic(fname)}')
|
35
Exercise2c/OUTDATED_process_many_times.py
Executable file
|
@ -0,0 +1,35 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
from process_multiple_images import process
|
||||||
|
|
||||||
|
n_threads = ...
|
||||||
|
n_processes = ...
|
||||||
|
|
||||||
|
def measure_one(n_processes, n_threads, fnames):
|
||||||
|
result_fname = f'{n_processes:02}_{n_threads:02}.txt'
|
||||||
|
|
||||||
|
if os.path.exists(result_fname):
|
||||||
|
print(f'Skipping job with {n_processes} processes and {n_threads} threads, results file exists')
|
||||||
|
return
|
||||||
|
|
||||||
|
t0 = time.time()
|
||||||
|
process(n_processes, n_threads, fnames)
|
||||||
|
dt = time.time() - t0
|
||||||
|
|
||||||
|
print(f'Job with {n_processes} processes and {n_threads} threads/worker and {len(fnames)} jobs: {dt}')
|
||||||
|
|
||||||
|
with open(result_fname, 'wt') as results:
|
||||||
|
print(f'{n_processes:02} {n_threads:02} {dt}', file=results)
|
||||||
|
|
||||||
|
def measure(n_processes, n_threads, fnames):
|
||||||
|
for n1 in n_processes:
|
||||||
|
for n2 in n_threads:
|
||||||
|
measure_one(n1, n2, fnames)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
measure(
|
||||||
|
[1, 2, 3, 4, 5, 6, 7, 8, 9],
|
||||||
|
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
|
||||||
|
sys.argv[1:])
|
25
Exercise2c/OUTDATED_process_multiple_images.py
Executable file
|
@ -0,0 +1,25 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from multiprocessing import Pool as ProcessPool
|
||||||
|
|
||||||
|
import process_image
|
||||||
|
|
||||||
|
def process(n_processes, n_threads, fnames):
|
||||||
|
print(f"Controller with {n_processes} processes and {n_threads} threads / worker")
|
||||||
|
|
||||||
|
# The environment that is set in the parent is inherited by child workers,
|
||||||
|
# but here process_image import numpy, so we need to set the variable
|
||||||
|
# before process_image is imported.
|
||||||
|
os.environ['OMP_NUM_THREADS'] = str(n_threads)
|
||||||
|
|
||||||
|
with ProcessPool(n_processes) as p:
|
||||||
|
signatures = p.map(process_image.magic, fnames)
|
||||||
|
for fname, signature in zip(fnames, signatures):
|
||||||
|
print(f'{fname} → {signature}')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
n_processes = int(sys.argv[1])
|
||||||
|
n_threads = int(sys.argv[2])
|
||||||
|
fnames = sys.argv[3:]
|
||||||
|
|
||||||
|
process(n_processes, n_threads, fnames)
|
20
Exercise2c/README.md
Executable file
|
@ -0,0 +1,20 @@
|
||||||
|
## Exercise 2c
|
||||||
|
|
||||||
|
Processing images with simultaneous multiprocessing and multithreading.
|
||||||
|
|
||||||
|
`process_image.py` processes a single image.
|
||||||
|
It can be used as a script: `process_image.py foo.jpg`
|
||||||
|
It is also an importable module. `process_image.magic("foo.jpg")`
|
||||||
|
|
||||||
|
`process_multiple_images` uses `process_image` to process a series of images.
|
||||||
|
It should be called as following:
|
||||||
|
`process_multiple_images n_processes n_threads filename1.png filename2.png …`
|
||||||
|
|
||||||
|
1. start a separate terminal window with `htop`: `gnome-terminal -- htop &`
|
||||||
|
|
||||||
|
2. call `python process_multiple_images.py 2 2 images/*.png`
|
||||||
|
|
||||||
|
You should see `htop` showing 4 threads (2 worked processes × 2 numpy threads).
|
||||||
|
|
||||||
|
Play with different numbers of processes and threads.
|
||||||
|
Which combination is optimum (fastest)?
|
BIN
Exercise2c/processed_images/f32.png
Executable file
After Width: | Height: | Size: 422 KiB |
BIN
Exercise2c/processed_images/f33-01-dawn.png
Executable file
After Width: | Height: | Size: 289 KiB |
BIN
Exercise2c/processed_images/f33-02-day.png
Executable file
After Width: | Height: | Size: 280 KiB |
BIN
Exercise2c/processed_images/f33-03-dusk.png
Executable file
After Width: | Height: | Size: 246 KiB |
BIN
Exercise2c/processed_images/f33-04-night.png
Executable file
After Width: | Height: | Size: 262 KiB |
BIN
Exercise2c/processed_images/f33.png
Executable file
After Width: | Height: | Size: 280 KiB |
BIN
Exercise2c/processed_images/f34-01-day.png
Executable file
After Width: | Height: | Size: 493 KiB |
BIN
Exercise2c/processed_images/f34-02-night.png
Executable file
After Width: | Height: | Size: 453 KiB |
BIN
Exercise2c/processed_images/f34.png
Executable file
After Width: | Height: | Size: 493 KiB |
BIN
Exercise2c/processed_images/f35-01-day.png
Executable file
After Width: | Height: | Size: 515 KiB |
BIN
Exercise2c/processed_images/f35-02-night.png
Executable file
After Width: | Height: | Size: 468 KiB |
BIN
Exercise2c/processed_images/f35.png
Executable file
After Width: | Height: | Size: 515 KiB |
BIN
Exercise2c/processed_images/f36-01-day.png
Executable file
After Width: | Height: | Size: 348 KiB |
BIN
Exercise2c/processed_images/f36-02-night.png
Executable file
After Width: | Height: | Size: 357 KiB |
BIN
Exercise2c/processed_images/f36.png
Executable file
After Width: | Height: | Size: 348 KiB |
BIN
Exercise2c/processed_images/f37-01-day.png
Executable file
After Width: | Height: | Size: 222 KiB |
BIN
Exercise2c/processed_images/f37-01-night.png
Executable file
After Width: | Height: | Size: 221 KiB |
BIN
Exercise2c/processed_images/f38-01-day.png
Executable file
After Width: | Height: | Size: 666 KiB |
BIN
Exercise2c/processed_images/f38-01-night.png
Executable file
After Width: | Height: | Size: 607 KiB |
53
exercises/exerciseA/README.md
Executable file
|
@ -0,0 +1,53 @@
|
||||||
|
# Exercise A: multithreading with NumPy
|
||||||
|
|
||||||
|
Objective: investigate speed-up of numpy code with multiple threads.
|
||||||
|
|
||||||
|
```HINT``` Use `htop` in your terminal to track what the CPUs are doing.
|
||||||
|
|
||||||
|
## First
|
||||||
|
|
||||||
|
The script `heavy_computation.py` performs some matrix calculations with numpy.
|
||||||
|
|
||||||
|
You can change the number of threads that numpy uses for the calculation
|
||||||
|
using the `OMP_NUM_THREADS` environment variable like this:
|
||||||
|
```
|
||||||
|
OMP_NUM_THREADS=7 python heavy_computation.py
|
||||||
|
```
|
||||||
|
|
||||||
|
The script will also measure the time to run the calculation and will save
|
||||||
|
the timing results into the `timings/` folder as a `.txt` file.
|
||||||
|
|
||||||
|
**TASK**: Execute the script `heavy_computation.py`, varying the numbers of threads.
|
||||||
|
You will plot the resulting calculating times in the second part below.
|
||||||
|
|
||||||
|
**QUESTION**
|
||||||
|
> What happens if `OMP_NUM_THREADS` is not set? How many threads are there? Why?
|
||||||
|
|
||||||
|
|
||||||
|
## Second
|
||||||
|
|
||||||
|
In `plot.py`, we have given code that will load all of the timing data in `timings/`.
|
||||||
|
|
||||||
|
**TASK**: Add code to plot of the execution duration vs. the number of threads
|
||||||
|
|
||||||
|
**TASK**: Open a Pull Request with your plotting code and post your plots in the
|
||||||
|
conversation. Don't upload binaries to the Git remote!
|
||||||
|
|
||||||
|
**OPTIONAL TASK**: Add code to calculate and plot the speed-up time compared
|
||||||
|
to single-threaded execution. Include your code and plot in the PR.
|
||||||
|
|
||||||
|
**QUESTIONS**
|
||||||
|
|
||||||
|
> What does the result tell us about the optimum number of threads? Why?
|
||||||
|
|
||||||
|
> Does it take the same time as your colleagues to run? Why?
|
||||||
|
|
||||||
|
## Optional tasks
|
||||||
|
|
||||||
|
Investigate the runtime variability. Systematically run multiple instances with the same number of threads by modifying `heavy_computation.py`.
|
||||||
|
|
||||||
|
How is the runtime affected when the problem becomes bigger? Is the optimum number of threads always the same?
|
||||||
|
|
||||||
|
How is the runtime affected when the memory is almost full? You can fill it up by creating a separate (unused) large numpy array.
|
||||||
|
|
||||||
|
How about running on battery vs. having your laptop plugged in?
|
31
exercises/exerciseA/heavy_computation.py
Executable file
|
@ -0,0 +1,31 @@
|
||||||
|
import os
|
||||||
|
import timeit
|
||||||
|
import numpy as np
|
||||||
|
from datetime import datetime
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Timestamp that will be put in the file name
|
||||||
|
timestamp = datetime.now().strftime("%H%M%S%f")
|
||||||
|
|
||||||
|
# Get the environment variable for threads
|
||||||
|
threads = os.getenv('OMP_NUM_THREADS')
|
||||||
|
|
||||||
|
# A relatively large matrix to work on
|
||||||
|
n = 5_000
|
||||||
|
x = np.random.random(size=(n, n))
|
||||||
|
|
||||||
|
print(f"We are executed with OMP_NUM_THREADS={threads} for {n=}")
|
||||||
|
|
||||||
|
# Measure the time required for matrix multiplication
|
||||||
|
start_time = time.time()
|
||||||
|
y = x @ x # The heavy compute
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
|
||||||
|
print(f'Time used for matrix multiplication: {elapsed_time:.2f} s')
|
||||||
|
|
||||||
|
# Check if timings folder exists
|
||||||
|
if not os.path.isdir('timings/'): os.mkdir('timings')
|
||||||
|
|
||||||
|
# IO: Save the timing to a unique txt file
|
||||||
|
with open(f'timings/{threads}_threads_t{timestamp}.txt', 'w') as file:
|
||||||
|
file.write(f'{threads},{elapsed_time:.6f}')
|
23
exercises/exerciseA/plot.py
Executable file
|
@ -0,0 +1,23 @@
|
||||||
|
import os
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
# IO: This loads the timings for you
|
||||||
|
threads, timings = [], []
|
||||||
|
for file in os.listdir('timings'):
|
||||||
|
with open(f'timings/{file}', 'r') as f:
|
||||||
|
n, t = f.read().strip().split(',')
|
||||||
|
threads.append(int(n))
|
||||||
|
timings.append(float(t))
|
||||||
|
threads = np.array(threads)
|
||||||
|
timings = np.array(timings)
|
||||||
|
|
||||||
|
print('This is the data I loaded: threads =', threads, ', timings =',timings)
|
||||||
|
|
||||||
|
fig, axs = plt.subplots()
|
||||||
|
|
||||||
|
# CREATE YOUR PLOT HERE
|
||||||
|
# Remember to label your axis
|
||||||
|
# Feel free to make it pretty
|
||||||
|
|
||||||
|
plt.savefig('threads_v_timings.png', dpi=300)
|
27
exercises/exerciseB/README.md
Executable file
|
@ -0,0 +1,27 @@
|
||||||
|
# Exercise B: multiprocessing and map
|
||||||
|
|
||||||
|
Objective: introduce `map` and `Pool.map`.
|
||||||
|
|
||||||
|
In the `numerical_integration.py` file, we give Python code that calculates
|
||||||
|
the integral of a function in two different ways: numerically and analytically.
|
||||||
|
|
||||||
|
The given functions are `integrate` (numerical integration), `f` (the function
|
||||||
|
to integrate), and `F` (the analytical integral).
|
||||||
|
|
||||||
|
We want to check the precision of the numerical integration as a function of
|
||||||
|
the number of steps in the domain. To do this, we calculate and print the
|
||||||
|
relative differences between the analytic result and the numerical result
|
||||||
|
for different values of the number of steps.
|
||||||
|
|
||||||
|
**TASKS**:
|
||||||
|
0. Read `numerical_integration.py` and familiarize yourselves with the code.
|
||||||
|
1. Update the `main` function so that it calculates the numerical error without
|
||||||
|
any parallelization. You can use a for loop or `map`.
|
||||||
|
2. Note the execution time for this serial implementation.
|
||||||
|
3. Implement the parallel version using `multiprocessing.Pool`.
|
||||||
|
4. Compare the timing for the parallel version with the serial time.
|
||||||
|
What speed-up did you get?
|
||||||
|
|
||||||
|
**BONUS TASKS (very optional)**:
|
||||||
|
5. Implement a parallel version with threads (using `multiprocessing.pool.ThreadPool`).
|
||||||
|
6. Time this version, and hypothetize about the result.
|
38
exercises/exerciseB/numerical_integration.py
Executable file
|
@ -0,0 +1,38 @@
|
||||||
|
"""Exercise 2b: multiprocessing
|
||||||
|
"""
|
||||||
|
|
||||||
|
def integrate(f, a, b, n):
|
||||||
|
"Perform numerical integration of f in range [a, b], with n steps"
|
||||||
|
s = []
|
||||||
|
for i in range(n):
|
||||||
|
dx = (b - a) / n
|
||||||
|
x = a + (i + 0.5) * dx
|
||||||
|
y = f(x)
|
||||||
|
s = s + [y * dx]
|
||||||
|
return sum(s)
|
||||||
|
|
||||||
|
def f(x):
|
||||||
|
"A polynomial that we'll integrate"
|
||||||
|
return x ** 4 - 3 * x
|
||||||
|
|
||||||
|
def F(x):
|
||||||
|
"The analatic integral of f. (F' = f)"
|
||||||
|
return 1 / 5 * x ** 5 - 3 / 2 * x ** 2
|
||||||
|
|
||||||
|
def compute_error(n):
|
||||||
|
"Calculate the difference between the numerical and analytical integration results"
|
||||||
|
a = -1.0
|
||||||
|
b = +2.0
|
||||||
|
F_analytical = F(b) - F(a)
|
||||||
|
F_numerical = integrate(f, a, b, n)
|
||||||
|
return abs((F_numerical - F_analytical) / F_analytical)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ns = [10_000, 25_000, 50_000, 75_000]
|
||||||
|
errors = ... # TODO: write a for loop, serial map, and parallel map here
|
||||||
|
|
||||||
|
for n, e in zip(ns, errors):
|
||||||
|
print(f'{n} {e:.8%}')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
30
exercises/exerciseB/numerical_integration_solution.py
Executable file
|
@ -0,0 +1,30 @@
|
||||||
|
import sys
|
||||||
|
from numerical_integration import compute_error
|
||||||
|
|
||||||
|
def main(arg):
|
||||||
|
ns = [10_000, 25_000, 50_000, 75_000]
|
||||||
|
|
||||||
|
match arg:
|
||||||
|
case 'for':
|
||||||
|
errors = []
|
||||||
|
for n in ns:
|
||||||
|
errors += [compute_error(n)]
|
||||||
|
case 'lc':
|
||||||
|
errors = [compute_error(n) for n in ns]
|
||||||
|
case 'map':
|
||||||
|
errors = list(map(compute_error, ns))
|
||||||
|
case 'mp':
|
||||||
|
from multiprocessing import Pool as ProcessPool
|
||||||
|
with ProcessPool() as pool:
|
||||||
|
errors = pool.map(compute_error, ns)
|
||||||
|
case 'mt':
|
||||||
|
from multiprocessing.pool import ThreadPool
|
||||||
|
with ThreadPool(10) as pool:
|
||||||
|
errors = pool.map(compute_error, ns)
|
||||||
|
|
||||||
|
for n, e in zip(ns, errors):
|
||||||
|
print(f'{n} {e:.8%}')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
arg = (sys.argv[1:] + ['for'])[0]
|
||||||
|
main(arg)
|
38
exercises/exerciseC/README.md
Executable file
|
@ -0,0 +1,38 @@
|
||||||
|
# Exercise C: blending processes and threads
|
||||||
|
|
||||||
|
Objective: investigate how the number of processes and threads impacts the
|
||||||
|
speed-up time of a computation.
|
||||||
|
|
||||||
|
## First
|
||||||
|
|
||||||
|
For each of the 19 images in the folder `images/`, the `process_images.py`:
|
||||||
|
(1) decomposes the image using a singular-value decomposition (SVD), (2) removes the
|
||||||
|
largest singular value and (3) returns the reconstructed image. The script also measures
|
||||||
|
the time for the computation and saves the result in `timings/`.
|
||||||
|
|
||||||
|
You can change the number of processes and threads on a set of images by calling the function
|
||||||
|
as follows:
|
||||||
|
```python process_images.py 3 2 images/*```
|
||||||
|
The code above will use 3 processes and 2 threads to analyse everything in the folder `images/`.
|
||||||
|
|
||||||
|
**TASKS**:
|
||||||
|
0. Familiarize yourself with the code in `process_images.py`. Where is the number of
|
||||||
|
threads set in the code? Why is it set there? Where is the number of processes set
|
||||||
|
in the code?
|
||||||
|
1. Hypothesize what would be a good number of processes and threads for this exercise.
|
||||||
|
2. Try a couple combinations of processes and threads, look at the saved timings, and see if
|
||||||
|
the results match your expectations.
|
||||||
|
|
||||||
|
## Second
|
||||||
|
|
||||||
|
This folder also includes a bash script called `run_with_all_configurations.sh`.
|
||||||
|
|
||||||
|
**TASKS**:
|
||||||
|
0. Open the bash script. What does it do?
|
||||||
|
1. Execute the bash script in the terminal:
|
||||||
|
`bash run_with_all_configurations.sh`
|
||||||
|
Observe what's printed to screen. Does it match your expectations?
|
||||||
|
2. Open `plot.py` and see what it does. Run the script and view the results. Do they
|
||||||
|
match your expectations?
|
||||||
|
3. Add the image as a comment to the Pull Request you opened in Exercise A (or make a
|
||||||
|
new Pull Request if you need one).
|
BIN
exercises/exerciseC/images/f32.png
Executable file
After Width: | Height: | Size: 14 MiB |
BIN
exercises/exerciseC/images/f33-01-dawn.png
Executable file
After Width: | Height: | Size: 7.6 MiB |
BIN
exercises/exerciseC/images/f33-02-day.png
Executable file
After Width: | Height: | Size: 7.9 MiB |
BIN
exercises/exerciseC/images/f33-03-dusk.png
Executable file
After Width: | Height: | Size: 6.5 MiB |
BIN
exercises/exerciseC/images/f33-04-night.png
Executable file
After Width: | Height: | Size: 7.4 MiB |
BIN
exercises/exerciseC/images/f33.png
Executable file
After Width: | Height: | Size: 7.9 MiB |
BIN
exercises/exerciseC/images/f34-01-day.png
Executable file
After Width: | Height: | Size: 13 MiB |
BIN
exercises/exerciseC/images/f34-02-night.png
Executable file
After Width: | Height: | Size: 10 MiB |
BIN
exercises/exerciseC/images/f34.png
Executable file
After Width: | Height: | Size: 13 MiB |
BIN
exercises/exerciseC/images/f35-01-day.png
Executable file
After Width: | Height: | Size: 11 MiB |
BIN
exercises/exerciseC/images/f35-02-night.png
Executable file
After Width: | Height: | Size: 8.6 MiB |
BIN
exercises/exerciseC/images/f35.png
Executable file
After Width: | Height: | Size: 11 MiB |
BIN
exercises/exerciseC/images/f36-01-day.png
Executable file
After Width: | Height: | Size: 3.8 MiB |
BIN
exercises/exerciseC/images/f36-02-night.png
Executable file
After Width: | Height: | Size: 3.6 MiB |
BIN
exercises/exerciseC/images/f36.png
Executable file
After Width: | Height: | Size: 3.8 MiB |
BIN
exercises/exerciseC/images/f37-01-day.png
Executable file
After Width: | Height: | Size: 2.3 MiB |
BIN
exercises/exerciseC/images/f37-01-night.png
Executable file
After Width: | Height: | Size: 1.8 MiB |
BIN
exercises/exerciseC/images/f38-01-day.png
Executable file
After Width: | Height: | Size: 5.6 MiB |
BIN
exercises/exerciseC/images/f38-01-night.png
Executable file
After Width: | Height: | Size: 5.5 MiB |
53
exercises/exerciseC/plot.py
Executable file
|
@ -0,0 +1,53 @@
|
||||||
|
import os
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import matplotlib.patheffects as PathEffects
|
||||||
|
|
||||||
|
N_processes = 5
|
||||||
|
N_threads = 5
|
||||||
|
|
||||||
|
# Load measured timings
|
||||||
|
times = np.empty((N_processes, N_threads))
|
||||||
|
for fname in os.listdir('timings'):
|
||||||
|
values = open(f'timings/{fname}').read().split()
|
||||||
|
n_processes = int(values[0])
|
||||||
|
n_threads = int(values[1])
|
||||||
|
dt = float(values[2])
|
||||||
|
times[n_processes-1][n_threads-1] = dt
|
||||||
|
print(times)
|
||||||
|
|
||||||
|
""" Plot measured time"""
|
||||||
|
fig_time, axs_time = plt.subplots()
|
||||||
|
im = axs_time.imshow(times.T, origin='lower')
|
||||||
|
axs_time.set_title('Computation time')
|
||||||
|
fig_time.colorbar(im, ax=axs_time, label='Measured computation time (s)')
|
||||||
|
|
||||||
|
""" Plot speedup """
|
||||||
|
workers = np.arange(N_processes + 1)[:, None] * np.arange(N_threads + 1)
|
||||||
|
speedup = times[0, 0] / times
|
||||||
|
|
||||||
|
fig_speedup, axs_speedup = plt.subplots()
|
||||||
|
im = axs_speedup.imshow(speedup.T, origin='lower')
|
||||||
|
axs_speedup.set_title('Computation speed-up')
|
||||||
|
fig_speedup.colorbar(im, ax=axs_speedup, label='Speed-up')
|
||||||
|
|
||||||
|
# Set same style for both plots
|
||||||
|
for axs, data in zip([axs_time, axs_speedup], [times, speedup]):
|
||||||
|
axs.set_xlabel('# processes')
|
||||||
|
axs.set_ylabel('# threads')
|
||||||
|
axs.set_xticks(np.arange(N_processes))
|
||||||
|
axs.set_xticklabels(np.arange(N_processes)+1)
|
||||||
|
axs.set_yticks(np.arange(N_threads))
|
||||||
|
axs.set_yticklabels(np.arange(N_threads)+1)
|
||||||
|
|
||||||
|
for i in range(N_processes):
|
||||||
|
for j in range(N_threads):
|
||||||
|
txt = axs.text(i, j, f'{data[i, j]:.2f}', fontsize=10, color='w',
|
||||||
|
ha='center', va='center', fontweight='bold')
|
||||||
|
txt.set_path_effects([PathEffects.withStroke(linewidth=0.5, foreground='k')])
|
||||||
|
axs.spines[['right', 'top']].set_visible(False)
|
||||||
|
|
||||||
|
# Save plots
|
||||||
|
fig_time.savefig('time.png', dpi=300)
|
||||||
|
fig_speedup.savefig('speedup.png', dpi=300)
|
||||||
|
|
72
exercises/exerciseC/process_images.py
Executable file
|
@ -0,0 +1,72 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from multiprocessing import Pool as ProcessPool
|
||||||
|
import time
|
||||||
|
|
||||||
|
def process_image(input_tuple):
|
||||||
|
|
||||||
|
fname, A = input_tuple
|
||||||
|
n_threads = os.getenv('OMP_NUM_THREADS', '(unset)')
|
||||||
|
print(f"Worker {fname=} OMP_NUM_THREADS={n_threads}", flush=True)
|
||||||
|
|
||||||
|
# Decompose image
|
||||||
|
U, S, Vh = np.linalg.svd(A)
|
||||||
|
|
||||||
|
# Remove first singular value
|
||||||
|
S[0] = 0
|
||||||
|
smat = np.zeros(A.shape, dtype=complex)
|
||||||
|
smat[:min(A.shape), :min(A.shape)] = np.diag(S)
|
||||||
|
|
||||||
|
# Re-compose image
|
||||||
|
A = np.dot(U, np.dot(smat, Vh)).real
|
||||||
|
A = (256*(A - A.min())/A.max()).astype('uint8')
|
||||||
|
|
||||||
|
return A
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
n_processes = int(sys.argv[1])
|
||||||
|
n_threads = int(sys.argv[2])
|
||||||
|
fnames = sys.argv[3:]
|
||||||
|
|
||||||
|
# Check that the output folders exist, or create them if needed
|
||||||
|
if not os.path.isdir('timings'): os.mkdir('timings')
|
||||||
|
if not os.path.isdir('processed_images'): os.mkdir('processed_images')
|
||||||
|
|
||||||
|
print(f"Controller with {n_processes} processes and {n_threads} threads / worker", flush=True)
|
||||||
|
|
||||||
|
# The environment that is set in the parent is inherited by child workers,
|
||||||
|
# we need to set the variable before numpy is imported!
|
||||||
|
os.environ['OMP_NUM_THREADS'] = str(n_threads)
|
||||||
|
|
||||||
|
# We delay the import of numpy because we have to set OMP_NUM_THREADS before import.
|
||||||
|
# We delay the import of PIL in case it uses numpy internally.
|
||||||
|
import numpy as np
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
# I/O Load the images
|
||||||
|
image_arrays = []
|
||||||
|
for fname in fnames:
|
||||||
|
im = Image.open(fname)
|
||||||
|
A = np.array(im)
|
||||||
|
image_arrays.append((fname, A))
|
||||||
|
|
||||||
|
# Time the execution of the pool map
|
||||||
|
start_time = time.time()
|
||||||
|
with ProcessPool(n_processes) as p:
|
||||||
|
new_images = p.map(process_image, image_arrays)
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
|
||||||
|
# I/O save the processed images
|
||||||
|
for im, fname in zip(new_images, fnames):
|
||||||
|
im = Image.fromarray(im)
|
||||||
|
im.save(fname.replace('images', 'processed_images'))
|
||||||
|
|
||||||
|
print(f'{n_processes} processes and {n_threads} threads and {len(fnames)} jobs: {elapsed_time}\n',
|
||||||
|
flush=True)
|
||||||
|
|
||||||
|
# I/O: Save the timing to a unique txt file
|
||||||
|
filename = f'timings/{n_processes:02}_processes_{n_threads:02}_threads.txt'
|
||||||
|
with open(filename, 'w') as file:
|
||||||
|
file.write(f'{n_processes} {n_threads} {elapsed_time:.6f}')
|
||||||
|
|
BIN
exercises/exerciseC/processed_images/f32.png
Executable file
After Width: | Height: | Size: 76 KiB |
BIN
exercises/exerciseC/processed_images/f33-01-dawn.png
Executable file
After Width: | Height: | Size: 53 KiB |
BIN
exercises/exerciseC/processed_images/f33-02-day.png
Executable file
After Width: | Height: | Size: 52 KiB |
BIN
exercises/exerciseC/processed_images/f33-03-dusk.png
Executable file
After Width: | Height: | Size: 46 KiB |
BIN
exercises/exerciseC/processed_images/f33-04-night.png
Executable file
After Width: | Height: | Size: 48 KiB |
BIN
exercises/exerciseC/processed_images/f33.png
Executable file
After Width: | Height: | Size: 52 KiB |
BIN
exercises/exerciseC/processed_images/f34-01-day.png
Executable file
After Width: | Height: | Size: 89 KiB |
BIN
exercises/exerciseC/processed_images/f34-02-night.png
Executable file
After Width: | Height: | Size: 83 KiB |
BIN
exercises/exerciseC/processed_images/f34.png
Executable file
After Width: | Height: | Size: 89 KiB |
BIN
exercises/exerciseC/processed_images/f35-01-day.png
Executable file
After Width: | Height: | Size: 95 KiB |
BIN
exercises/exerciseC/processed_images/f35-02-night.png
Executable file
After Width: | Height: | Size: 91 KiB |
BIN
exercises/exerciseC/processed_images/f35.png
Executable file
After Width: | Height: | Size: 95 KiB |
BIN
exercises/exerciseC/processed_images/f36-01-day.png
Executable file
After Width: | Height: | Size: 62 KiB |
BIN
exercises/exerciseC/processed_images/f36-02-night.png
Executable file
After Width: | Height: | Size: 63 KiB |
BIN
exercises/exerciseC/processed_images/f36.png
Executable file
After Width: | Height: | Size: 62 KiB |
BIN
exercises/exerciseC/processed_images/f37-01-day.png
Executable file
After Width: | Height: | Size: 50 KiB |
BIN
exercises/exerciseC/processed_images/f37-01-night.png
Executable file
After Width: | Height: | Size: 48 KiB |
BIN
exercises/exerciseC/processed_images/f38-01-day.png
Executable file
After Width: | Height: | Size: 112 KiB |
BIN
exercises/exerciseC/processed_images/f38-01-night.png
Executable file
After Width: | Height: | Size: 103 KiB |
9
exercises/exerciseC/run_with_all_configurations.sh
Executable file
|
@ -0,0 +1,9 @@
|
||||||
|
# This is bash
|
||||||
|
# It runs the python script multiple times with different arguments
|
||||||
|
for i in {1..5} # Number of processes
|
||||||
|
do
|
||||||
|
for j in {1..5} # Number of threads
|
||||||
|
do
|
||||||
|
python process_images.py $i $j images/*
|
||||||
|
done
|
||||||
|
done
|
41
extras/kitchen_asyncio/README.md
Executable file
|
@ -0,0 +1,41 @@
|
||||||
|
This is an example of "parallel" processing using `asyncio`.
|
||||||
|
|
||||||
|
We have the following work plan to prepare a dako:
|
||||||
|
|
||||||
|
1. Fetch dako rusk
|
||||||
|
2. Fetch olive oil
|
||||||
|
3. Fetch tomato
|
||||||
|
4. Chop tomato
|
||||||
|
5. Fetch feta
|
||||||
|
6. Water rusk
|
||||||
|
7. Oil rusk
|
||||||
|
8. Put tomato and feta on rusk
|
||||||
|
9. Store ready dako on countertop
|
||||||
|
|
||||||
|
Note that certain steps depend on earlier steps,
|
||||||
|
e.g. 3→4, 1→6, 6→7, (7, 4, 2)→8, 8→9.
|
||||||
|
|
||||||
|
File `kitchen_serial.py` has a straighforward Python implementation.
|
||||||
|
|
||||||
|
Execute it as:
|
||||||
|
python kitchen_asyncio.py 5
|
||||||
|
(This creates 5 dakos, one after each other.)
|
||||||
|
|
||||||
|
File `kitchen_asyncio_naive.py` has a version which was converted
|
||||||
|
to use `asyncio` and the jobs will be executed via the `asynio`
|
||||||
|
task queue, but it will still run serially, because we wait for
|
||||||
|
each task to be finished before continuing.
|
||||||
|
|
||||||
|
Execute it as:
|
||||||
|
python kitchen_asyncio_naive.py 5
|
||||||
|
(This creates 5 dakos, one after each other, using `asyncio`.)
|
||||||
|
|
||||||
|
File `kitchen_asyncio_async.py` has a version adds effective
|
||||||
|
parallelization by starting independent tasks in parallel and only
|
||||||
|
awaiting when necessary for subsequent steps. Some steps were split
|
||||||
|
out of the big function `prepare_dako`, to allow them to be awaited
|
||||||
|
separately.
|
||||||
|
|
||||||
|
Execute it as:
|
||||||
|
python kitchen_asyncio_async.py 5
|
||||||
|
(This creates 5 dakos, one after each other, using `asyncio`.)
|
93
extras/kitchen_asyncio/kitchen_asyncio_async.py
Executable file
|
@ -0,0 +1,93 @@
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from kitchen_serial import parse_args
|
||||||
|
|
||||||
|
async def do_work(duration, result):
|
||||||
|
t = random.uniform(duration/5 * 0.5, duration/5 * 1.5)
|
||||||
|
# t = duration / 5
|
||||||
|
await asyncio.sleep(t)
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def fetch_olive_oil():
|
||||||
|
print('Fetching olive oil…')
|
||||||
|
return await do_work(5, 'oil')
|
||||||
|
|
||||||
|
async def fetch_dako_rusk():
|
||||||
|
print('Fetching dako rusk…')
|
||||||
|
return await do_work(5, 'rusk')
|
||||||
|
|
||||||
|
async def fetch_tomato():
|
||||||
|
print('Fetching tomato…')
|
||||||
|
return await do_work(3, 'tomato')
|
||||||
|
|
||||||
|
async def fetch_feta():
|
||||||
|
print('Fetching feta…')
|
||||||
|
return await do_work(3, 'feta')
|
||||||
|
|
||||||
|
async def chop_tomato(tomato):
|
||||||
|
assert tomato == 'tomato'
|
||||||
|
print('Chopping tomato…')
|
||||||
|
return await do_work(1, f'chopped {tomato}')
|
||||||
|
|
||||||
|
async def water_rusk(rusk):
|
||||||
|
assert rusk == 'rusk'
|
||||||
|
print('Watering rusk…')
|
||||||
|
return await do_work(0.2, f'wet {rusk}')
|
||||||
|
|
||||||
|
async def oil_rusk(rusk, oil):
|
||||||
|
assert rusk == 'wet rusk'
|
||||||
|
assert oil == 'oil'
|
||||||
|
print(f'Pouring {oil} on {rusk}…')
|
||||||
|
return await do_work(0.5, f'{rusk} with {oil}')
|
||||||
|
|
||||||
|
async def decorate_rusk(rusk, *toppings):
|
||||||
|
result = rusk
|
||||||
|
for topping in toppings:
|
||||||
|
print(f'Putting {topping} on {result}…')
|
||||||
|
result = await do_work(1, f'{result} with {topping}')
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def store_dako(dako):
|
||||||
|
print(f'Storing {dako}')
|
||||||
|
await do_work(1, None)
|
||||||
|
|
||||||
|
async def prepare_rusk():
|
||||||
|
rusk = await fetch_dako_rusk()
|
||||||
|
rusk = await water_rusk(rusk)
|
||||||
|
return rusk
|
||||||
|
|
||||||
|
async def prepare_tomato():
|
||||||
|
tomato = await fetch_tomato()
|
||||||
|
tomato = await chop_tomato(tomato)
|
||||||
|
return tomato
|
||||||
|
|
||||||
|
async def prepare_oiled_rusk():
|
||||||
|
rusk = prepare_rusk()
|
||||||
|
oil = fetch_olive_oil()
|
||||||
|
rusk, oil = await asyncio.gather(rusk, oil)
|
||||||
|
|
||||||
|
return await oil_rusk(rusk, oil)
|
||||||
|
|
||||||
|
async def prepare_dako():
|
||||||
|
print('Making dako…')
|
||||||
|
rusk = prepare_oiled_rusk()
|
||||||
|
tomato = prepare_tomato()
|
||||||
|
feta = fetch_feta()
|
||||||
|
parts = await asyncio.gather(rusk, tomato, feta)
|
||||||
|
dako = await decorate_rusk(*parts)
|
||||||
|
await store_dako(dako)
|
||||||
|
print(f'{dako} is ready!')
|
||||||
|
|
||||||
|
async def prepare_dakos(n_dakos):
|
||||||
|
tasks = [prepare_dako()
|
||||||
|
for n in range(args.n_dakos)]
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
args = parse_args()
|
||||||
|
asyncio.run(
|
||||||
|
prepare_dakos(args.n_dakos)
|
||||||
|
)
|
75
extras/kitchen_asyncio/kitchen_asyncio_naive.py
Executable file
|
@ -0,0 +1,75 @@
|
||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
|
||||||
|
from kitchen_serial import parse_args
|
||||||
|
|
||||||
|
async def do_work(duration, result):
|
||||||
|
t = random.uniform(duration/5 * 0.5, duration/5 * 1.5)
|
||||||
|
await asyncio.sleep(t)
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def fetch_olive_oil():
|
||||||
|
print('Fetching olive oil…')
|
||||||
|
return await do_work(5, 'oil')
|
||||||
|
|
||||||
|
async def fetch_dako_rusk():
|
||||||
|
print('Fetching dako rusk…')
|
||||||
|
return await do_work(5, 'rusk')
|
||||||
|
|
||||||
|
async def fetch_tomato():
|
||||||
|
print('Fetching tomato…')
|
||||||
|
return await do_work(3, 'tomato')
|
||||||
|
|
||||||
|
async def fetch_feta():
|
||||||
|
print('Fetching feta…')
|
||||||
|
return await do_work(3, 'feta')
|
||||||
|
|
||||||
|
async def chop_tomato(tomato):
|
||||||
|
assert tomato == 'tomato'
|
||||||
|
print('Chopping tomato…')
|
||||||
|
return await do_work(1, f'chopped {tomato}')
|
||||||
|
|
||||||
|
async def water_rusk(rusk):
|
||||||
|
assert rusk == 'rusk'
|
||||||
|
print('Watering rusk…')
|
||||||
|
return await do_work(0.2, f'wet {rusk}')
|
||||||
|
|
||||||
|
async def oil_rusk(rusk, oil):
|
||||||
|
assert rusk == 'wet rusk'
|
||||||
|
assert oil == 'oil'
|
||||||
|
print(f'Pouring {oil} on {rusk}…')
|
||||||
|
return await do_work(0.5, f'{rusk} with {oil}')
|
||||||
|
|
||||||
|
async def decorate_rusk(rusk, *toppings):
|
||||||
|
result = rusk
|
||||||
|
for topping in toppings:
|
||||||
|
print(f'Putting {topping} on {result}…')
|
||||||
|
result = await do_work(1, f'{result} with {topping}')
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def store_dako(dako):
|
||||||
|
print(f'Storing {dako}')
|
||||||
|
await do_work(1, None)
|
||||||
|
|
||||||
|
async def prepare_dako():
|
||||||
|
print('Making dako…')
|
||||||
|
oil = await fetch_olive_oil()
|
||||||
|
rusk = await fetch_dako_rusk()
|
||||||
|
tomato = await fetch_tomato()
|
||||||
|
tomato = await chop_tomato(tomato)
|
||||||
|
rusk = await water_rusk(rusk)
|
||||||
|
rusk = await oil_rusk(rusk, oil)
|
||||||
|
feta = await fetch_feta()
|
||||||
|
dako = await decorate_rusk(rusk, tomato, feta)
|
||||||
|
await store_dako(dako)
|
||||||
|
print(f'{dako} is ready!')
|
||||||
|
|
||||||
|
async def prepare_dakos(n_dakos):
|
||||||
|
for n in range(args.n_dakos):
|
||||||
|
await prepare_dako()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
args = parse_args()
|
||||||
|
asyncio.run(
|
||||||
|
prepare_dakos(args.n_dakos)
|
||||||
|
)
|
83
extras/kitchen_asyncio/kitchen_serial.py
Executable file
|
@ -0,0 +1,83 @@
|
||||||
|
import argparse
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
def do_work(duration, result):
|
||||||
|
t = random.uniform(duration/5 * 0.5, duration/5 * 1.5)
|
||||||
|
time.sleep(t)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def fetch_olive_oil():
|
||||||
|
print('Fetching olive oil…')
|
||||||
|
return do_work(5, 'oil')
|
||||||
|
|
||||||
|
def fetch_dako_rusk():
|
||||||
|
print('Fetching dako rusk…')
|
||||||
|
return do_work(5, 'rusk')
|
||||||
|
|
||||||
|
def fetch_tomato():
|
||||||
|
print('Fetching tomato…')
|
||||||
|
return do_work(3, 'tomato')
|
||||||
|
|
||||||
|
def fetch_feta():
|
||||||
|
print('Fetching feta…')
|
||||||
|
return do_work(3, 'feta')
|
||||||
|
|
||||||
|
def chop_tomato(tomato):
|
||||||
|
assert tomato == 'tomato'
|
||||||
|
print('Chopping tomato…')
|
||||||
|
return do_work(1, f'chopped {tomato}')
|
||||||
|
|
||||||
|
def water_rusk(rusk):
|
||||||
|
assert rusk == 'rusk'
|
||||||
|
print('Watering rusk…')
|
||||||
|
return do_work(0.2, f'wet {rusk}')
|
||||||
|
|
||||||
|
def oil_rusk(rusk, oil):
|
||||||
|
assert rusk == 'wet rusk'
|
||||||
|
assert oil == 'oil'
|
||||||
|
print(f'Pouring {oil} on {rusk}…')
|
||||||
|
return do_work(0.5, f'{rusk} with {oil}')
|
||||||
|
|
||||||
|
def decorate_rusk(rusk, *toppings):
|
||||||
|
result = rusk
|
||||||
|
for topping in toppings:
|
||||||
|
print(f'Putting {topping} on {result}…')
|
||||||
|
result = do_work(1, f'{result} with {topping}')
|
||||||
|
return result
|
||||||
|
|
||||||
|
def store_dako(dako):
|
||||||
|
print(f'Storing {dako}')
|
||||||
|
do_work(1, None)
|
||||||
|
|
||||||
|
def prepare_dako():
|
||||||
|
print('Making dako…')
|
||||||
|
oil = fetch_olive_oil()
|
||||||
|
rusk = fetch_dako_rusk()
|
||||||
|
tomato = fetch_tomato()
|
||||||
|
tomato = chop_tomato(tomato)
|
||||||
|
rusk = water_rusk(rusk)
|
||||||
|
rusk = oil_rusk(rusk, oil)
|
||||||
|
feta = fetch_feta()
|
||||||
|
dako = decorate_rusk(rusk, tomato, feta)
|
||||||
|
store_dako(dako)
|
||||||
|
print(f'{dako} is ready!')
|
||||||
|
|
||||||
|
def prepare_dakos(n_dakos):
|
||||||
|
for n in range(args.n_dakos):
|
||||||
|
prepare_dako()
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
p = argparse.ArgumentParser()
|
||||||
|
p.add_argument(
|
||||||
|
'n_dakos',
|
||||||
|
type=int,
|
||||||
|
nargs='?',
|
||||||
|
default=1,
|
||||||
|
help='How many dakos to make?',
|
||||||
|
)
|
||||||
|
return p.parse_args()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
args = parse_args()
|
||||||
|
prepare_dakos(args.n_dakos)
|