1""" 2Demo: Parallel processing accross images 3 4Multithreading can be used to run transforms on a set of images in parallel. 5This will give a net performance benefit if the images to be transformed are 6sufficiently large. 7 8This demo runs a multilevel wavelet decomposition on a list of 32 images, 9each of size (512, 512). Computations are repeated sequentially and in 10parallel and the runtimes compared. 11 12In general, multithreading will be more beneficial for larger images and for 13wavelets with a larger filter size. 14 15One can also change ``ndim`` to 3 in the code below to use a set of 3D volumes 16instead. 17""" 18 19import time 20from functools import partial 21from multiprocessing import cpu_count 22 23try: 24 from concurrent import futures 25except ImportError: 26 raise ImportError( 27 "This demo requires concurrent.futures. It can be installed for " 28 "for python 2.x via: pip install futures") 29 30import numpy as np 31from numpy.testing import assert_array_equal 32 33import pywt 34 35# the test image 36cam = pywt.data.camera().astype(float) 37 38ndim = 2 # dimension of images to transform (2 or 3) 39num_images = 32 # number of images to transform 40max_workers = cpu_count() # max number of available threads 41nrepeat = 5 # averages used in the benchmark 42 43# create a list of num_images images 44if ndim == 2: 45 imgs = [cam, ] * num_images 46 wavelet = 'db8' 47elif ndim == 3: 48 # stack image along 3rd dimension to create a [512 x 512 x 16] 3D volume 49 im3 = np.concatenate([cam[:, :, np.newaxis], ]*16, axis=-1) 50 # create multiple copies of the volume 51 imgs = [im3, ] * num_images 52 wavelet = 'db1' 53else: 54 ValueError("Only 2D and 3D test cases implemented") 55 56# define a function to apply to each image 57wavedecn_func = partial(pywt.wavedecn, wavelet=wavelet, mode='periodization', 58 level=3) 59 60 61def concurrent_transforms(func, imgs, max_workers=None): 62 """Call func on each img in imgs using a ThreadPoolExecutor.""" 63 executor = futures.ThreadPoolExecutor 64 if max_workers is None: 65 # default to as many workers as available cpus 66 max_workers = cpu_count() 67 results = [] 68 with executor(max_workers=max_workers) as execute: 69 for result in execute.map(func, imgs): 70 results.append(result) 71 return results 72 73 74print("Processing {} images of shape {}".format(len(imgs), imgs[0].shape)) 75 76# Sequential computation via a list comprehension 77tstart = time.time() 78for n in range(nrepeat): 79 results = [wavedecn_func(img) for img in imgs] 80t = (time.time()-tstart)/nrepeat 81print("\nSequential Case") 82print("\tElapsed time: {:0.2f} ms".format(1000*t)) 83 84 85# Concurrent computation via concurrent.futures 86tstart = time.time() 87for n in range(nrepeat): 88 results_concurrent = concurrent_transforms(wavedecn_func, imgs, 89 max_workers=max_workers) 90t2 = (time.time()-tstart)/nrepeat 91print("\nMultithreaded Case") 92print("\tNumber of concurrent workers: {}".format(max_workers)) 93print("\tElapsed time: {:0.2f} ms".format(1000*t2)) 94print("\nRelative speedup with concurrent = {}".format(t/t2)) 95 96# check a couple of the coefficient arrays to verify matching results for 97# sequential and multithreaded computation 98assert_array_equal(results[-1][0], 99 results_concurrent[-1][0]) 100assert_array_equal(results[-1][1]['d' + 'a'*(ndim-1)], 101 results_concurrent[-1][1]['d' + 'a'*(ndim-1)]) 102