1"""
2Demo: Parallel processing accross images
3
4Multithreading can be used to run transforms on a set of images in parallel.
5This will give a net performance benefit if the images to be transformed are
6sufficiently large.
7
8This demo runs a multilevel wavelet decomposition on a list of 32 images,
9each of size (512, 512).  Computations are repeated sequentially and in
10parallel and the runtimes compared.
11
12In general, multithreading will be more beneficial for larger images and for
13wavelets with a larger filter size.
14
15One can also change ``ndim`` to 3 in the code below to use a set of 3D volumes
16instead.
17"""
18
19import time
20from functools import partial
21from multiprocessing import cpu_count
22
23try:
24    from concurrent import futures
25except ImportError:
26    raise ImportError(
27        "This demo requires concurrent.futures.  It can be installed for "
28        "for python 2.x via:  pip install futures")
29
30import numpy as np
31from numpy.testing import assert_array_equal
32
33import pywt
34
35# the test image
36cam = pywt.data.camera().astype(float)
37
38ndim = 2                   # dimension of images to transform (2 or 3)
39num_images = 32            # number of images to transform
40max_workers = cpu_count()  # max number of available threads
41nrepeat = 5                # averages used in the benchmark
42
43# create a list of num_images images
44if ndim == 2:
45    imgs = [cam, ] * num_images
46    wavelet = 'db8'
47elif ndim == 3:
48    # stack image along 3rd dimension to create a [512 x 512 x 16] 3D volume
49    im3 = np.concatenate([cam[:, :, np.newaxis], ]*16, axis=-1)
50    # create multiple copies of the volume
51    imgs = [im3, ] * num_images
52    wavelet = 'db1'
53else:
54    ValueError("Only 2D and 3D test cases implemented")
55
56# define a function to apply to each image
57wavedecn_func = partial(pywt.wavedecn, wavelet=wavelet, mode='periodization',
58                        level=3)
59
60
61def concurrent_transforms(func, imgs, max_workers=None):
62    """Call func on each img in imgs using a ThreadPoolExecutor."""
63    executor = futures.ThreadPoolExecutor
64    if max_workers is None:
65        # default to as many workers as available cpus
66        max_workers = cpu_count()
67    results = []
68    with executor(max_workers=max_workers) as execute:
69        for result in execute.map(func, imgs):
70            results.append(result)
71    return results
72
73
74print("Processing {} images of shape {}".format(len(imgs), imgs[0].shape))
75
76# Sequential computation via a list comprehension
77tstart = time.time()
78for n in range(nrepeat):
79    results = [wavedecn_func(img) for img in imgs]
80t = (time.time()-tstart)/nrepeat
81print("\nSequential Case")
82print("\tElapsed time: {:0.2f} ms".format(1000*t))
83
84
85# Concurrent computation via concurrent.futures
86tstart = time.time()
87for n in range(nrepeat):
88    results_concurrent = concurrent_transforms(wavedecn_func, imgs,
89                                               max_workers=max_workers)
90t2 = (time.time()-tstart)/nrepeat
91print("\nMultithreaded Case")
92print("\tNumber of concurrent workers: {}".format(max_workers))
93print("\tElapsed time: {:0.2f} ms".format(1000*t2))
94print("\nRelative speedup with concurrent = {}".format(t/t2))
95
96# check a couple of the coefficient arrays to verify matching results for
97# sequential and multithreaded computation
98assert_array_equal(results[-1][0],
99                   results_concurrent[-1][0])
100assert_array_equal(results[-1][1]['d' + 'a'*(ndim-1)],
101                   results_concurrent[-1][1]['d' + 'a'*(ndim-1)])
102