1 /*=========================================================================
2 *
3 * Copyright Insight Software Consortium
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0.txt
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 *=========================================================================*/
18
19 #include "itkStreamingProcessObject.h"
20
21 namespace itk
22 {
23
24
GenerateData(void)25 void StreamingProcessObject::GenerateData( void )
26 {
27 // The m_Updating flag used in StreamingProcessObject::UpdateOutputData
28 // should prevent recursive execution.
29
30 this->BeforeStreamedGenerateData();
31
32
33 //
34 // Determine number of pieces to divide the input. This will be the
35 // minimum of what the user specified via SetNumberOfStreamDivisions()
36 // and what the Splitter thinks is a reasonable value.
37 //
38 unsigned int numberOfInputRequestRegion = this->GetNumberOfInputRequestedRegions();
39
40 //
41 // Loop over the number of pieces, execute the upstream pipeline on each
42 // piece, and execute StreamedGenerateData on each region
43 //
44 for (unsigned int piece = 0; piece < numberOfInputRequestRegion && !this->GetAbortGenerateData(); piece++)
45 {
46 this->m_CurrentRequestNumber = piece;
47
48 this->GenerateNthInputRequestedRegion( piece );
49
50 //
51 // Now that we know the input requested region, propagate this
52 // through all the inputs.
53 // ;
54 for (auto &inputName: this->GetInputNames())
55 {
56 if (this->GetInput(inputName))
57 {
58 this->GetInput(inputName)->PropagateRequestedRegion();
59 }
60 }
61
62 //
63 // Propagate the update call - make sure everything we
64 // might rely on is up-to-date
65 // Must call PropagateRequestedRegion before UpdateOutputData if multiple
66 // inputs since they may lead back to the same data object.
67 m_Updating = true;
68 for (auto &inputName: this->GetInputNames())
69 {
70 if (this->GetInput(inputName))
71 {
72 if ( inputName != this->GetPrimaryInputName() && this->GetNumberOfInputs() > 1)
73 {
74 this->GetInput(inputName)->PropagateRequestedRegion();
75 }
76 this->GetInput(inputName)->UpdateOutputData();
77 }
78 }
79
80 //
81 try
82 {
83 this->StreamedGenerateData( piece );
84 this->UpdateProgress( float( piece+1 ) / numberOfInputRequestRegion);
85 }
86 catch( ProcessAborted & )
87 {
88 this->InvokeEvent( AbortEvent() );
89 this->ResetPipeline();
90 this->RestoreInputReleaseDataFlags();
91 throw;
92 }
93 catch( ... )
94 {
95 this->ResetPipeline();
96 this->RestoreInputReleaseDataFlags();
97 throw;
98 }
99 }
100
101 m_CurrentRequestNumber = -1;
102
103 this->AfterStreamedGenerateData();
104 }
105
106
UpdateOutputData(DataObject * itkNotUsed (output))107 void StreamingProcessObject::UpdateOutputData(DataObject *itkNotUsed(output))
108 {
109
110 //
111 //prevent chasing our tail
112 //
113 if (this->m_Updating)
114 {
115 return;
116 }
117
118
119 //
120 // Prepare all the outputs. This may deallocate previous bulk data.
121 //
122 this->PrepareOutputs();
123
124 /*
125 * Cache the state of any ReleaseDataFlag's on the inputs. While the
126 * filter is executing, we need to set the ReleaseDataFlag's on the
127 * inputs to false in case the current filter is implemented using a
128 * mini-pipeline (which will try to release the inputs). After the
129 * filter finishes, we restore the state of the ReleaseDataFlag's
130 * before the call to ReleaseInputs().
131 */
132 this->CacheInputReleaseDataFlags();
133
134 /*
135 * Make sure we have the necessary inputs
136 */
137 unsigned int ninputs = this->GetNumberOfValidRequiredInputs();
138 if (ninputs < this->GetNumberOfRequiredInputs())
139 {
140 itkExceptionMacro(<< "At least " << static_cast<unsigned int>( this->GetNumberOfRequiredInputs() )
141 << " inputs are required but only " << ninputs << " are specified.");
142 }
143
144 this->SetAbortGenerateData( false );
145 this->UpdateProgress(0.0);
146 this->m_Updating = true;
147
148 /*
149 * Tell all Observers that the filter is starting
150 */
151 this->InvokeEvent( StartEvent() );
152
153 this->Self::GenerateData( );
154 /*
155 * If we ended due to aborting, push the progress up to 1.0 (since
156 * it probably didn't end there)
157 */
158 if ( this->GetAbortGenerateData() )
159 {
160 this->UpdateProgress(1.0);
161 }
162
163 // Notify end event observers
164 this->InvokeEvent( EndEvent() );
165
166
167 /*
168 * Now we have to mark the data as up to data.
169 */
170 for (auto &outputName : this->GetOutputNames())
171 {
172 if (this->GetOutput(outputName))
173 {
174 this->GetOutput(outputName)->DataHasBeenGenerated();
175 }
176 }
177
178 /* DO NOT Restore the state of any input ReleaseDataFlags
179 */
180 //this->RestoreInputReleaseDataFlags();
181
182 /**
183 * Release any inputs if marked for release
184 */
185 this->ReleaseInputs();
186
187 // Mark that we are no longer updating the data in this filter
188 this->m_Updating = false;
189
190 }
191
192
GetCurrentRequestNumber() const193 int StreamingProcessObject::GetCurrentRequestNumber( ) const
194 {
195 return m_CurrentRequestNumber;
196 }
197
198
ResetPipeline()199 void StreamingProcessObject::ResetPipeline()
200 {
201 Superclass::ResetPipeline();
202 m_CurrentRequestNumber = -1;
203 }
204
205
206 StreamingProcessObject::StreamingProcessObject( void ) = default;
207
208
209 StreamingProcessObject::~StreamingProcessObject( ) = default;
210
211
PrintSelf(std::ostream & os,Indent indent) const212 void StreamingProcessObject::PrintSelf(std::ostream& os, Indent indent) const
213 {
214 Superclass::PrintSelf(os,indent);
215
216 os << indent << "Current Request Number: " << this->m_CurrentRequestNumber << std::endl;
217 }
218
219
PropagateRequestedRegion(DataObject * output)220 void StreamingProcessObject::PropagateRequestedRegion(DataObject *output)
221 {
222
223 /**
224 * check flag to avoid executing forever if there is a loop
225 */
226 if (this->m_Updating)
227 {
228 return;
229 }
230
231 /**
232 * Give the subclass a chance to indicate that it will provide
233 * more data than required for the output. This can happen, for
234 * example, when a source can only produce the whole output.
235 * Although this is being called for a specific output, the source
236 * may need to enlarge all outputs.
237 */
238 this->EnlargeOutputRequestedRegion( output );
239
240
241 /**
242 * Give the subclass a chance to define how to set the requested
243 * regions for each of its outputs, given this output's requested
244 * region. The default implementation is to make all the output
245 * requested regions the same. A subclass may need to override this
246 * method if each output is a different resolution.
247 */
248 this->GenerateOutputRequestedRegion( output );
249
250 // we don't call GenerateInputRequestedRegion since the requested
251 // regions are managed when the pipeline is execute
252
253 // we don't call inputs PropagateRequestedRegion either
254 // because the pipeline managed later
255 }
256
257
BeforeStreamedGenerateData(void)258 void StreamingProcessObject::BeforeStreamedGenerateData( void )
259 {
260 }
261
AfterStreamedGenerateData(void)262 void StreamingProcessObject::AfterStreamedGenerateData( void )
263 {
264 }
265
266 } // end namespace itk
267