1 /*=========================================================================
2  *
3  *  Copyright Insight Software Consortium
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *         http://www.apache.org/licenses/LICENSE-2.0.txt
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *=========================================================================*/
18 
19 #include "itkStreamingProcessObject.h"
20 
21 namespace itk
22 {
23 
24 
GenerateData(void)25 void StreamingProcessObject::GenerateData( void )
26 {
27   // The m_Updating flag used in StreamingProcessObject::UpdateOutputData
28   // should prevent recursive execution.
29 
30   this->BeforeStreamedGenerateData();
31 
32 
33   //
34   // Determine number of pieces to divide the input.  This will be the
35   // minimum of what the user specified via SetNumberOfStreamDivisions()
36   // and what the Splitter thinks is a reasonable value.
37   //
38   unsigned int numberOfInputRequestRegion = this->GetNumberOfInputRequestedRegions();
39 
40   //
41   // Loop over the number of pieces, execute the upstream pipeline on each
42   // piece, and execute StreamedGenerateData on each region
43   //
44   for (unsigned int piece = 0; piece < numberOfInputRequestRegion  && !this->GetAbortGenerateData();  piece++)
45     {
46     this->m_CurrentRequestNumber = piece;
47 
48     this->GenerateNthInputRequestedRegion( piece );
49 
50     //
51     // Now that we know the input requested region, propagate this
52     // through all the inputs.
53     // ;
54     for (auto &inputName: this->GetInputNames())
55       {
56       if (this->GetInput(inputName))
57         {
58         this->GetInput(inputName)->PropagateRequestedRegion();
59         }
60       }
61 
62     //
63     // Propagate the update call - make sure everything we
64     // might rely on is up-to-date
65     // Must call PropagateRequestedRegion before UpdateOutputData if multiple
66     // inputs since they may lead back to the same data object.
67     m_Updating = true;
68     for (auto &inputName: this->GetInputNames())
69       {
70       if (this->GetInput(inputName))
71         {
72         if ( inputName != this->GetPrimaryInputName()  && this->GetNumberOfInputs() > 1)
73           {
74           this->GetInput(inputName)->PropagateRequestedRegion();
75           }
76         this->GetInput(inputName)->UpdateOutputData();
77         }
78       }
79 
80     //
81     try
82       {
83       this->StreamedGenerateData( piece );
84       this->UpdateProgress( float( piece+1 ) / numberOfInputRequestRegion);
85       }
86     catch( ProcessAborted & )
87       {
88       this->InvokeEvent( AbortEvent() );
89       this->ResetPipeline();
90       this->RestoreInputReleaseDataFlags();
91       throw;
92       }
93     catch( ... )
94       {
95       this->ResetPipeline();
96       this->RestoreInputReleaseDataFlags();
97       throw;
98       }
99     }
100 
101   m_CurrentRequestNumber = -1;
102 
103   this->AfterStreamedGenerateData();
104 }
105 
106 
UpdateOutputData(DataObject * itkNotUsed (output))107 void StreamingProcessObject::UpdateOutputData(DataObject *itkNotUsed(output))
108 {
109 
110   //
111   //prevent chasing our tail
112   //
113   if (this->m_Updating)
114     {
115     return;
116     }
117 
118 
119   //
120   // Prepare all the outputs. This may deallocate previous bulk data.
121   //
122   this->PrepareOutputs();
123 
124   /*
125    * Cache the state of any ReleaseDataFlag's on the inputs. While the
126    * filter is executing, we need to set the ReleaseDataFlag's on the
127    * inputs to false in case the current filter is implemented using a
128    * mini-pipeline (which will try to release the inputs).  After the
129    * filter finishes, we restore the state of the ReleaseDataFlag's
130    * before the call to ReleaseInputs().
131    */
132   this->CacheInputReleaseDataFlags();
133 
134   /*
135    * Make sure we have the necessary inputs
136    */
137   unsigned int ninputs = this->GetNumberOfValidRequiredInputs();
138   if (ninputs < this->GetNumberOfRequiredInputs())
139     {
140     itkExceptionMacro(<< "At least " << static_cast<unsigned int>( this->GetNumberOfRequiredInputs() )
141       << " inputs are required but only " << ninputs << " are specified.");
142     }
143 
144   this->SetAbortGenerateData( false );
145   this->UpdateProgress(0.0);
146   this->m_Updating = true;
147 
148   /*
149    * Tell all Observers that the filter is starting
150    */
151   this->InvokeEvent( StartEvent() );
152 
153   this->Self::GenerateData( );
154   /*
155    * If we ended due to aborting, push the progress up to 1.0 (since
156    * it probably didn't end there)
157    */
158   if ( this->GetAbortGenerateData() )
159     {
160     this->UpdateProgress(1.0);
161     }
162 
163   // Notify end event observers
164   this->InvokeEvent( EndEvent() );
165 
166 
167   /*
168    * Now we have to mark the data as up to data.
169    */
170   for (auto &outputName : this->GetOutputNames())
171     {
172     if (this->GetOutput(outputName))
173       {
174       this->GetOutput(outputName)->DataHasBeenGenerated();
175       }
176     }
177 
178   /* DO NOT Restore the state of any input ReleaseDataFlags
179    */
180   //this->RestoreInputReleaseDataFlags();
181 
182   /**
183    * Release any inputs if marked for release
184    */
185   this->ReleaseInputs();
186 
187   // Mark that we are no longer updating the data in this filter
188   this->m_Updating = false;
189 
190 }
191 
192 
GetCurrentRequestNumber() const193 int StreamingProcessObject::GetCurrentRequestNumber( ) const
194 {
195   return m_CurrentRequestNumber;
196 }
197 
198 
ResetPipeline()199 void StreamingProcessObject::ResetPipeline()
200 {
201   Superclass::ResetPipeline();
202   m_CurrentRequestNumber = -1;
203 }
204 
205 
206 StreamingProcessObject::StreamingProcessObject( void ) = default;
207 
208 
209 StreamingProcessObject::~StreamingProcessObject( ) = default;
210 
211 
PrintSelf(std::ostream & os,Indent indent) const212 void StreamingProcessObject::PrintSelf(std::ostream& os, Indent indent) const
213 {
214   Superclass::PrintSelf(os,indent);
215 
216   os << indent << "Current Request Number: " << this->m_CurrentRequestNumber << std::endl;
217 }
218 
219 
PropagateRequestedRegion(DataObject * output)220 void StreamingProcessObject::PropagateRequestedRegion(DataObject *output)
221 {
222 
223   /**
224    * check flag to avoid executing forever if there is a loop
225    */
226   if (this->m_Updating)
227     {
228     return;
229     }
230 
231   /**
232    * Give the subclass a chance to indicate that it will provide
233    * more data than required for the output. This can happen, for
234    * example, when a source can only produce the whole output.
235    * Although this is being called for a specific output, the source
236    * may need to enlarge all outputs.
237    */
238   this->EnlargeOutputRequestedRegion( output );
239 
240 
241   /**
242    * Give the subclass a chance to define how to set the requested
243    * regions for each of its outputs, given this output's requested
244    * region.  The default implementation is to make all the output
245    * requested regions the same.  A subclass may need to override this
246    * method if each output is a different resolution.
247    */
248   this->GenerateOutputRequestedRegion( output );
249 
250   // we don't call GenerateInputRequestedRegion since the requested
251   // regions are managed when the pipeline is execute
252 
253   // we don't call inputs PropagateRequestedRegion either
254   // because the pipeline managed later
255 }
256 
257 
BeforeStreamedGenerateData(void)258 void StreamingProcessObject::BeforeStreamedGenerateData( void )
259 {
260 }
261 
AfterStreamedGenerateData(void)262 void StreamingProcessObject::AfterStreamedGenerateData( void )
263 {
264 }
265 
266 } // end namespace itk
267