1Concurrent processing 2===================== 3 4Rasterio affords concurrent processing of raster data. Python's global 5interpreter lock (GIL) is released when calling GDAL's ``GDALRasterIO()`` 6function, which means that Python threads can read and write concurrently. 7 8The Numpy library also often releases the GIL, e.g., in applying 9universal functions to arrays, and this makes it possible to distribute 10processing of an array across cores of a processor. 11 12This means that it is possible to parallelize tasks that need to be performed 13for a set of windows/pixels in the raster. Reading, writing and processing can 14always be done concurrently. But it depends on the hardware and where the 15bottlenecks are, how much of a speedup can be obtained. In the case that the 16processing function releases the GIL, multiple threads processing 17simultaneously can lead to further speedups. 18 19.. note:: 20 If you wish to do multiprocessing that is not trivially parallelizable 21 accross very large images that do not fit in memory, or if you wish to 22 do multiprocessing across multiple machines. You might want to have a 23 look at `dask <https://dask.org/>`__ and in particular this 24 `example <https://examples.dask.org/applications/satellite-imagery-geotiff.html>`__. 25 26The Cython function below, included in Rasterio's ``_example`` module, 27simulates a GIL-releasing CPU-intensive raster processing function. You can 28also easily create GIL-releasing functions by using `numba <https://numba.pydata.org/>`__ 29 30.. code-block:: python 31 32 # cython: boundscheck=False 33 34 import numpy as np 35 36 37 def compute(unsigned char[:, :, :] input): 38 """reverses bands inefficiently 39 40 Given input and output uint8 arrays, fakes an CPU-intensive 41 computation. 42 """ 43 cdef int I, J, K 44 cdef int i, j, k, l 45 cdef double val 46 I = input.shape[0] 47 J = input.shape[1] 48 K = input.shape[2] 49 output = np.empty((I, J, K), dtype='uint8') 50 cdef unsigned char[:, :, :] output_view = output 51 with nogil: 52 for i in range(I): 53 for j in range(J): 54 for k in range(K): 55 val = <double>input[i, j, k] 56 for l in range(2000): 57 val += 1.0 58 val -= 2000.0 59 output_view[~i, j, k] = <unsigned char>val 60 return output 61 62Here is the program in examples/thread_pool_executor.py. It is set up in such 63a way that at most 1 thread is reading and at most 1 thread is writing at the 64same time. Processing is not protected by a lock and can be done by multiple 65threads simultaneously. 66 67.. code-block:: python 68 69 """thread_pool_executor.py 70 71 Operate on a raster dataset window-by-window using a ThreadPoolExecutor. 72 73 Simulates a CPU-bound thread situation where multiple threads can improve 74 performance. 75 76 With -j 4, the program returns in about 1/4 the time as with -j 1. 77 """ 78 79 import concurrent.futures 80 import multiprocessing 81 import threading 82 83 import rasterio 84 from rasterio._example import compute 85 86 87 def main(infile, outfile, num_workers=4): 88 """Process infile block-by-block and write to a new file 89 90 The output is the same as the input, but with band order 91 reversed. 92 """ 93 94 with rasterio.open(infile) as src: 95 96 # Create a destination dataset based on source params. The 97 # destination will be tiled, and we'll process the tiles 98 # concurrently. 99 profile = src.profile 100 profile.update(blockxsize=128, blockysize=128, tiled=True) 101 102 with rasterio.open(outfile, "w", **src.profile) as dst: 103 windows = [window for ij, window in dst.block_windows()] 104 105 # We cannot write to the same file from multiple threads 106 # without causing race conditions. To safely read/write 107 # from multiple threads, we use a lock to protect the 108 # DatasetReader/Writer 109 read_lock = threading.Lock() 110 write_lock = threading.Lock() 111 112 def process(window): 113 with read_lock: 114 src_array = src.read(window=window) 115 116 # The computation can be performed concurrently 117 result = compute(src_array) 118 119 with write_lock: 120 dst.write(result, window=window) 121 122 # We map the process() function over the list of 123 # windows. 124 with concurrent.futures.ThreadPoolExecutor( 125 max_workers=num_workers 126 ) as executor: 127 executor.map(process, windows) 128 129The code above simulates a CPU-intensive calculation that runs faster when 130spread over multiple cores using the ``ThreadPoolExecutor`` from Python 3's 131``concurrent.futures`` module. Compared to the case of one concurrent job 132(``-j 1``), 133 134.. code-block:: console 135 136 $ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 1 137 138 real 0m4.277s 139 user 0m4.356s 140 sys 0m0.184s 141 142we get over 3x speed up with four concurrent jobs. 143 144.. code-block:: console 145 146 $ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 4 147 148 real 0m1.251s 149 user 0m4.402s 150 sys 0m0.168s 151 152If the function that you'd like to map over raster windows doesn't release the 153GIL, you unfortunately cannot simply replace ``ThreadPoolExecutor`` with 154``ProcessPoolExecutor``, the DatasetReader/Writer cannot be shared by multiple 155processes, which means that each process needs to open the file seperately, 156or you can do all the reading and writing from the main thread, as shown in 157this next example. This is much less efficient memory wise, however. 158 159.. code-block:: python 160 161 arrays = [src.read(window=window) for window in windows] 162 163 with concurrent.futures.ProcessPoolExecutor( 164 max_workers=num_workers 165 ) as executor: 166 futures = executor.map(compute, arrays) 167 for window, result in zip(windows, futures): 168 dst.write(result, window=window) 169