1Concurrent processing
2=====================
3
4Rasterio affords concurrent processing of raster data. Python's global
5interpreter lock (GIL) is released when calling GDAL's ``GDALRasterIO()``
6function, which means that Python threads can read and write concurrently.
7
8The Numpy library also often releases the GIL, e.g., in applying
9universal functions to arrays, and this makes it possible to distribute
10processing of an array across cores of a processor.
11
12This means that it is possible to parallelize tasks that need to be performed
13for a set of windows/pixels in the raster. Reading, writing and processing can
14always be done concurrently. But it depends on the hardware and where the
15bottlenecks are, how much of a speedup can be obtained. In the case that the
16processing function releases the GIL, multiple threads processing
17simultaneously can lead to further speedups.
18
19.. note::
20    If you wish to do multiprocessing that is not trivially parallelizable
21    accross very large images that do not fit in memory, or if you wish to
22    do multiprocessing across multiple machines. You might want to have a
23    look at `dask <https://dask.org/>`__ and in particular this
24    `example <https://examples.dask.org/applications/satellite-imagery-geotiff.html>`__.
25
26The Cython function below, included in Rasterio's ``_example`` module,
27simulates a GIL-releasing CPU-intensive raster processing function. You can
28also easily create GIL-releasing functions by using `numba <https://numba.pydata.org/>`__
29
30.. code-block:: python
31
32    # cython: boundscheck=False
33
34    import numpy as np
35
36
37    def compute(unsigned char[:, :, :] input):
38        """reverses bands inefficiently
39
40        Given input and output uint8 arrays, fakes an CPU-intensive
41        computation.
42        """
43        cdef int I, J, K
44        cdef int i, j, k, l
45        cdef double val
46        I = input.shape[0]
47        J = input.shape[1]
48        K = input.shape[2]
49        output = np.empty((I, J, K), dtype='uint8')
50        cdef unsigned char[:, :, :] output_view = output
51        with nogil:
52            for i in range(I):
53                for j in range(J):
54                    for k in range(K):
55                        val = <double>input[i, j, k]
56                        for l in range(2000):
57                            val += 1.0
58                        val -= 2000.0
59                        output_view[~i, j, k] = <unsigned char>val
60        return output
61
62Here is the program in examples/thread_pool_executor.py. It is set up in such
63a way that at most 1 thread is reading and at most 1 thread is writing at the
64same time. Processing is not protected by a lock and can be done by multiple
65threads simultaneously.
66
67.. code-block:: python
68
69    """thread_pool_executor.py
70
71    Operate on a raster dataset window-by-window using a ThreadPoolExecutor.
72
73    Simulates a CPU-bound thread situation where multiple threads can improve
74    performance.
75
76    With -j 4, the program returns in about 1/4 the time as with -j 1.
77    """
78
79    import concurrent.futures
80    import multiprocessing
81    import threading
82
83    import rasterio
84    from rasterio._example import compute
85
86
87    def main(infile, outfile, num_workers=4):
88        """Process infile block-by-block and write to a new file
89
90        The output is the same as the input, but with band order
91        reversed.
92        """
93
94        with rasterio.open(infile) as src:
95
96            # Create a destination dataset based on source params. The
97            # destination will be tiled, and we'll process the tiles
98            # concurrently.
99            profile = src.profile
100            profile.update(blockxsize=128, blockysize=128, tiled=True)
101
102            with rasterio.open(outfile, "w", **src.profile) as dst:
103                windows = [window for ij, window in dst.block_windows()]
104
105                # We cannot write to the same file from multiple threads
106                # without causing race conditions. To safely read/write
107                # from multiple threads, we use a lock to protect the
108                # DatasetReader/Writer
109                read_lock = threading.Lock()
110                write_lock = threading.Lock()
111
112                def process(window):
113                    with read_lock:
114                        src_array = src.read(window=window)
115
116                    # The computation can be performed concurrently
117                    result = compute(src_array)
118
119                    with write_lock:
120                        dst.write(result, window=window)
121
122                # We map the process() function over the list of
123                # windows.
124                with concurrent.futures.ThreadPoolExecutor(
125                    max_workers=num_workers
126                ) as executor:
127                    executor.map(process, windows)
128
129The code above simulates a CPU-intensive calculation that runs faster when
130spread over multiple cores using the ``ThreadPoolExecutor`` from Python 3's
131``concurrent.futures`` module. Compared to the case of one concurrent job
132(``-j 1``),
133
134.. code-block:: console
135
136   $ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 1
137
138   real    0m4.277s
139   user    0m4.356s
140   sys     0m0.184s
141
142we get over 3x speed up with four concurrent jobs.
143
144.. code-block:: console
145
146   $ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 4
147
148   real    0m1.251s
149   user    0m4.402s
150   sys     0m0.168s
151
152If the function that you'd like to map over raster windows doesn't release the
153GIL, you unfortunately cannot simply replace ``ThreadPoolExecutor`` with
154``ProcessPoolExecutor``, the DatasetReader/Writer cannot be shared by multiple
155processes, which means that each process needs to open the file seperately,
156or you can do all the reading and writing from the main thread, as shown in
157this next example. This is much less efficient memory wise, however.
158
159.. code-block:: python
160
161    arrays = [src.read(window=window) for window in windows]
162
163    with concurrent.futures.ProcessPoolExecutor(
164        max_workers=num_workers
165    ) as executor:
166        futures = executor.map(compute, arrays)
167        for window, result in zip(windows, futures):
168            dst.write(result, window=window)
169