1 /*=========================================================================
2 
3   Program:   Visualization Toolkit
4   Module:    vtkPStreamTracer.cxx
5 
6   Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
7   All rights reserved.
8   See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
9 
10      This software is distributed WITHOUT ANY WARRANTY; without even
11      the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
12      PURPOSE.  See the above copyright notice for more information.
13 
14 =========================================================================*/
15 #include "vtkPStreamTracer.h"
16 
17 #include "vtkAMRInterpolatedVelocityField.h"
18 #include "vtkAbstractInterpolatedVelocityField.h"
19 #include "vtkAppendDataSets.h"
20 #include "vtkAppendPolyData.h"
21 #include "vtkCellArray.h"
22 #include "vtkCellData.h"
23 #include "vtkCompositeDataIterator.h"
24 #include "vtkCompositeDataSet.h"
25 #include "vtkDoubleArray.h"
26 #include "vtkFloatArray.h"
27 #include "vtkIdList.h"
28 #include "vtkInformation.h"
29 #include "vtkInformationVector.h"
30 #include "vtkIntArray.h"
31 #include "vtkMPIController.h"
32 #include "vtkMath.h"
33 #include "vtkMultiProcessController.h"
34 #include "vtkMultiProcessStream.h"
35 #include "vtkNew.h"
36 #include "vtkObjectFactory.h"
37 #include "vtkOverlappingAMR.h"
38 #include "vtkParallelAMRUtilities.h"
39 #include "vtkPointData.h"
40 #include "vtkPoints.h"
41 #include "vtkPolyData.h"
42 #include "vtkRungeKutta2.h"
43 #include "vtkSMPTools.h"
44 #include "vtkStreamingDemandDrivenPipeline.h"
45 #include "vtkTimerLog.h"
46 #include "vtkUniformGrid.h"
47 
48 #include <algorithm>
49 #include <cassert>
50 #include <list>
51 #include <numeric>
52 #include <vector>
53 
54 #ifndef NDEBUG
55 // #define DEBUGTRACE
56 // #define LOGTRACE
57 #else
58 #endif
59 
60 #ifdef DEBUGTRACE
61 #define PRINT(x)                                                                                   \
62   {                                                                                                \
63     cout << this->Rank << ")" << x << endl;                                                        \
64   }
65 #define ALLPRINT(x)                                                                                \
66   for (int rank = 0; rank < NumProcs; rank++)                                                      \
67   {                                                                                                \
68     Controller->Barrier();                                                                         \
69     if (rank == Rank)                                                                              \
70       cout << "(" << this->Rank << ")" << x << endl;                                               \
71   }
72 #define Assert(a, msg)                                                                             \
73   {                                                                                                \
74     if (!a)                                                                                        \
75     {                                                                                              \
76       cerr << msg << endl;                                                                         \
77       assert(false);                                                                               \
78     }                                                                                              \
79   }
80 
81 #define AssertEq(a, b)                                                                             \
82   {                                                                                                \
83     if (a != b)                                                                                    \
84     {                                                                                              \
85       cerr << a << " != " << b << endl;                                                            \
86       assert(false);                                                                               \
87     }                                                                                              \
88   }
89 
90 #define AssertNe(a, b)                                                                             \
91   {                                                                                                \
92     if (a == b)                                                                                    \
93     {                                                                                              \
94       cerr << a << " == " << b << endl;                                                            \
95       assert(false);                                                                               \
96     }                                                                                              \
97   }
98 
99 #define AssertGe(a, b)                                                                             \
100   {                                                                                                \
101     if (a < b)                                                                                     \
102     {                                                                                              \
103       cerr << a << " < " << b << endl;                                                             \
104       assert(false);                                                                               \
105     }                                                                                              \
106   }
107 
108 #define AssertGt(a, b)                                                                             \
109   {                                                                                                \
110     if (a <= b)                                                                                    \
111     {                                                                                              \
112       cerr << a << " < " << b << endl;                                                             \
113       assert(false);                                                                               \
114     }                                                                                              \
115   }
116 //#define PRINT(id, x)
117 #else
118 #define PRINT(x)
119 #define ALLPRINT(x)
120 
121 #define AssertEq(a, b)
122 #define AssertGt(a, b)
123 #define AssertGe(a, b)
124 #define Assert(a, b)
125 #define AssertNe(a, b)
126 #endif
127 
128 namespace
129 {
CNext(int i,int n)130 inline int CNext(int i, int n)
131 {
132   return (i + 1) % n;
133 }
134 
135 class MyStream
136 {
137 public:
MyStream(int BufferSize)138   MyStream(int BufferSize)
139     : Size(BufferSize)
140   {
141     this->Data = new char[Size];
142     this->Head = Data;
143   }
~MyStream()144   ~MyStream() { delete[] this->Data; }
GetSize()145   int GetSize() { return Size; }
146 
147   template <class T>
operator <<(T t)148   MyStream& operator<<(T t)
149   {
150     unsigned int size = sizeof(T);
151     char* value = reinterpret_cast<char*>(&t);
152     for (unsigned int i = 0; i < size; i++)
153     {
154       AssertGe(Size, this->Head - this->Data);
155       *(this->Head++) = (*(value++));
156     }
157     return (*this);
158   }
159 
160   template <class T>
operator >>(T & t)161   MyStream& operator>>(T& t)
162   {
163     unsigned int size = sizeof(T);
164     AssertGe(Size, this->Head + size - this->Data);
165     t = *(reinterpret_cast<T*>(this->Head));
166     this->Head += size;
167     return (*this);
168   }
169 
GetRawData()170   char* GetRawData() { return this->Data; }
GetLength()171   int GetLength() { return this->Head - this->Data; }
172 
Reset()173   void Reset() { this->Head = this->Data; }
174 
175 private:
176   MyStream(const MyStream&) = delete;
177   void operator=(const MyStream&) = delete;
178 
179   char* Data;
180   char* Head;
181   int Size;
182 };
183 
InitBB(double * Bounds)184 inline void InitBB(double* Bounds)
185 {
186   Bounds[0] = DBL_MAX;
187   Bounds[1] = -DBL_MAX;
188   Bounds[2] = DBL_MAX;
189   Bounds[3] = -DBL_MAX;
190   Bounds[4] = DBL_MAX;
191   Bounds[5] = -DBL_MAX;
192 }
193 
InBB(const double * x,const double * bounds)194 inline bool InBB(const double* x, const double* bounds)
195 {
196   constexpr double delta[3] = { 1e-6, 1e-6, 1e-6 };
197   return vtkMath::PointIsWithinBounds(x, bounds, delta);
198 }
199 
UpdateBB(double * a,const double * b)200 inline void UpdateBB(double* a, const double* b)
201 {
202   for (int i = 0; i <= 4; i += 2)
203   {
204     if (b[i] < a[i])
205     {
206       a[i] = b[i];
207     }
208   }
209   for (int i = 1; i <= 5; i += 2)
210   {
211     if (b[i] > a[i])
212     {
213       a[i] = b[i];
214     }
215   }
216 }
217 }
218 
219 class PStreamTracerPoint : public vtkObject
220 {
221 public:
222   vtkTypeMacro(PStreamTracerPoint, vtkObject);
223 
224   static PStreamTracerPoint* New();
225 
226   vtkGetMacro(Id, int);
227   vtkGetVector3Macro(Seed, double);
228   vtkGetVector3Macro(Normal, double);
229   vtkGetMacro(Direction, int);
230   vtkGetMacro(NumSteps, int);
231   vtkGetMacro(Propagation, double);
232   vtkGetMacro(Rank, int);
233   vtkGetMacro(IntegrationTime, double);
234 
235   vtkSetMacro(Id, int);
236   vtkSetMacro(Direction, int);
237   vtkSetVector3Macro(Seed, double);
238   vtkSetMacro(NumSteps, int);
239   vtkSetMacro(Propagation, double);
240   vtkSetMacro(Rank, int);
241   vtkSetMacro(IntegrationTime, double);
242 
Reseed(double * seed,double * normal,vtkPolyData * poly,int id,double propagation,double integrationTime)243   void Reseed(double* seed, double* normal, vtkPolyData* poly, int id, double propagation,
244     double integrationTime)
245   {
246     memcpy(this->Seed, seed, 3 * sizeof(double));
247     memcpy(this->Normal, normal, 3 * sizeof(double));
248 
249     this->AllocateTail(poly->GetPointData());
250     double* x = poly->GetPoints()->GetPoint(id);
251     this->Tail->GetPoints()->SetPoint(0, x);
252     this->Tail->GetPointData()->CopyData(poly->GetPointData(), id, 0);
253     this->Rank = -1; // someone else figure this out
254     this->IntegrationTime = integrationTime;
255     this->Propagation = propagation;
256   }
257 
GetTail()258   vtkPolyData* GetTail() { return this->Tail; }
259 
CopyTail(PStreamTracerPoint * other)260   void CopyTail(PStreamTracerPoint* other)
261   {
262     if (other->Tail)
263     {
264       vtkPointData* pd = other->Tail->GetPointData();
265       if (!this->Tail)
266       {
267         AllocateTail(pd);
268       }
269       this->Tail->GetPointData()->DeepCopy(pd);
270     }
271     else
272     {
273       Tail = nullptr;
274     }
275   }
276 
277   // allocate a one point vtkPolyData whose PointData setup matches pd
AllocateTail(vtkPointData * pd)278   void AllocateTail(vtkPointData* pd)
279   {
280     if (!this->Tail)
281     {
282       Tail = vtkSmartPointer<vtkPolyData>::New();
283       vtkNew<vtkPoints> points;
284       {
285         points->SetNumberOfPoints(1);
286       }
287       Tail->SetPoints(points);
288     }
289 
290     this->Tail->GetPointData()->CopyAllocate(pd);
291   }
292 
GetSize()293   virtual int GetSize()
294   {
295     int size(0);
296     vtkPointData* data = this->GetTail()->GetPointData();
297     for (int i = 0; i < data->GetNumberOfArrays(); i++)
298     {
299       size += data->GetArray(i)->GetNumberOfComponents();
300     }
301     return size * sizeof(double) + sizeof(PStreamTracerPoint);
302   }
303 
Read(MyStream & stream)304   virtual void Read(MyStream& stream)
305   {
306     stream >> this->Id;
307     stream >> this->Seed[0];
308     stream >> this->Seed[1];
309     stream >> this->Seed[2];
310     stream >> this->Direction;
311     stream >> this->NumSteps;
312     stream >> this->Propagation;
313     stream >> this->IntegrationTime;
314 
315     char hasTail(0);
316     stream >> hasTail;
317     if (hasTail)
318     {
319       double x[3];
320       for (int i = 0; i < 3; i++)
321       {
322         stream >> x[i];
323       }
324       AssertNe(this->Tail, nullptr); // someone should have allocated it by prototype
325       this->Tail->SetPoints(vtkSmartPointer<vtkPoints>::New());
326       this->Tail->GetPoints()->InsertNextPoint(x);
327 
328       vtkPointData* pointData = this->Tail->GetPointData();
329       for (int i = 0; i < pointData->GetNumberOfArrays(); i++)
330       {
331         int numComponents = pointData->GetArray(i)->GetNumberOfComponents();
332         std::vector<double> xi(numComponents);
333         for (int j = 0; j < numComponents; j++)
334         {
335           double& xj(xi[j]);
336           stream >> xj;
337         }
338         pointData->GetArray(i)->InsertNextTuple(&xi[0]);
339       }
340     }
341     else
342     {
343       this->Tail = nullptr;
344     }
345   }
346 
Write(MyStream & stream)347   virtual void Write(MyStream& stream)
348   {
349     stream << this->Id << this->Seed[0] << this->Seed[1] << this->Seed[2] << this->Direction
350            << this->NumSteps << this->Propagation << this->IntegrationTime;
351 
352     stream << (char)(this->Tail != nullptr);
353 
354     if (this->Tail)
355     {
356       double* x = this->Tail->GetPoints()->GetPoint(0);
357       for (int i = 0; i < 3; i++)
358       {
359         stream << x[i];
360       }
361       vtkPointData* pData = this->Tail->GetPointData();
362       int numArrays(pData->GetNumberOfArrays());
363       for (int i = 0; i < numArrays; i++)
364       {
365         vtkDataArray* arr = pData->GetArray(i);
366         int numComponents = arr->GetNumberOfComponents();
367         double* y = arr->GetTuple(0);
368         for (int j = 0; j < numComponents; j++)
369           stream << y[j];
370       }
371     }
372   }
373 
374 private:
375   int Id;
376   double Seed[3];
377   double Normal[3];
378   int Direction;
379   int NumSteps;
380   double Propagation;
381   vtkSmartPointer<vtkPolyData> Tail;
382   int Rank;
383   double IntegrationTime;
384 
385 protected:
PStreamTracerPoint()386   PStreamTracerPoint()
387     : Id(-1)
388     , Direction(0)
389     , NumSteps(0)
390     , Propagation(0)
391     , Rank(-1)
392     , IntegrationTime(0)
393   {
394     this->Seed[0] = this->Seed[1] = this->Seed[2] = -999;
395   }
396 };
397 
398 vtkStandardNewMacro(PStreamTracerPoint);
399 
400 class AMRPStreamTracerPoint : public PStreamTracerPoint
401 {
402 public:
403   vtkTypeMacro(AMRPStreamTracerPoint, PStreamTracerPoint);
404 
405   static AMRPStreamTracerPoint* New();
406   vtkSetMacro(Level, int);
407   vtkGetMacro(Level, int);
408 
409   vtkSetMacro(GridId, int);
410   vtkGetMacro(GridId, int);
411 
GetSize()412   int GetSize() override { return PStreamTracerPoint::GetSize() + 2 * sizeof(int); }
413 
Read(MyStream & stream)414   void Read(MyStream& stream) override
415   {
416     PStreamTracerPoint::Read(stream);
417     stream >> Level;
418     stream >> GridId;
419   }
Write(MyStream & stream)420   void Write(MyStream& stream) override
421   {
422     PStreamTracerPoint::Write(stream);
423     stream << Level << GridId;
424   }
425 
426 private:
AMRPStreamTracerPoint()427   AMRPStreamTracerPoint()
428     : Level(-1)
429     , GridId(-1)
430   {
431   }
432   int Level;
433   int GridId;
434 };
435 
436 typedef std::vector<vtkSmartPointer<PStreamTracerPoint>> PStreamTracerPointArray;
437 
438 vtkStandardNewMacro(AMRPStreamTracerPoint);
439 
440 class ProcessLocator : public vtkObject
441 {
442 public:
443   vtkTypeMacro(ProcessLocator, vtkObject);
444   static ProcessLocator* New();
Initialize(vtkCompositeDataSet * data)445   void Initialize(vtkCompositeDataSet* data)
446   {
447     this->Controller = vtkMultiProcessController::GetGlobalController();
448     this->Rank = this->Controller->GetLocalProcessId();
449     this->NumProcs = this->Controller->GetNumberOfProcesses();
450     this->InitBoundingBoxes(this->NumProcs);
451 
452     double bb[6];
453     InitBB(bb);
454 
455     if (data)
456     {
457       vtkCompositeDataIterator* iter = data->NewIterator();
458       iter->InitTraversal();
459       while (!iter->IsDoneWithTraversal())
460       {
461         vtkDataSet* dataSet = vtkDataSet::SafeDownCast(iter->GetCurrentDataObject());
462         AssertNe(dataSet, nullptr);
463         UpdateBB(bb, dataSet->GetBounds());
464         iter->GoToNextItem();
465       }
466       iter->Delete();
467     }
468 
469     PRINT(bb[0] << " " << bb[1] << " " << bb[2] << " " << bb[3] << " " << bb[4] << " " << bb[5]);
470     this->Controller->AllGather(bb, &this->BoundingBoxes[0], 6);
471 
472 #ifdef DEBUGTRACE
473     cout << "(" << Rank << ") BoundingBoxes: ";
474     for (int i = 0; i < NumProcs; i++)
475     {
476       double* box = this->GetBoundingBox(i);
477       cout << box[0] << " " << box[1] << " " << box[2] << " " << box[3] << " " << box[4] << " "
478            << box[5] << ";  ";
479     }
480     cout << endl;
481 #endif
482   }
483 
InCurrentProcess(double * p)484   bool InCurrentProcess(double* p) { return InBB(p, GetBoundingBox(Rank)); }
FindNextProcess(double * p)485   int FindNextProcess(double* p)
486   {
487     for (int rank = CNext(this->Rank, this->NumProcs); rank != Rank;
488          rank = CNext(rank, this->NumProcs))
489     {
490       if (InBB(p, GetBoundingBox(rank)))
491       {
492         return rank;
493       }
494     }
495     return -1;
496   }
497 
498 private:
ProcessLocator()499   ProcessLocator()
500   {
501     this->Controller = nullptr;
502     this->NumProcs = 0;
503     this->Rank = 0;
504   }
505   vtkMultiProcessController* Controller;
506   int Rank;
507   int NumProcs;
508 
GetBoundingBox(int i)509   double* GetBoundingBox(int i) { return &BoundingBoxes[6 * i]; }
510 
InitBoundingBoxes(int num)511   void InitBoundingBoxes(int num)
512   {
513     for (int i = 0; i < 6 * num; i++)
514     {
515       this->BoundingBoxes.push_back(0);
516     }
517   }
518   std::vector<double> BoundingBoxes;
519 };
520 vtkStandardNewMacro(ProcessLocator);
521 
522 class AbstractPStreamTracerUtils : public vtkObject
523 {
524 public:
525   vtkTypeMacro(AbstractPStreamTracerUtils, vtkObject);
526 
527   vtkGetMacro(VecName, char*);
528   vtkGetMacro(VecType, int);
529   vtkGetMacro(Input0, vtkDataSet*);
530 
GetProcessLocator()531   virtual ProcessLocator* GetProcessLocator() { return nullptr; }
532 
GetProto()533   vtkSmartPointer<PStreamTracerPoint> GetProto() { return this->Proto; }
534 
InitializeVelocityFunction(PStreamTracerPoint *,vtkAbstractInterpolatedVelocityField *)535   virtual void InitializeVelocityFunction(
536     PStreamTracerPoint*, vtkAbstractInterpolatedVelocityField*)
537   {
538   }
539 
PreparePoint(PStreamTracerPoint *,vtkAbstractInterpolatedVelocityField *)540   virtual bool PreparePoint(PStreamTracerPoint*, vtkAbstractInterpolatedVelocityField*)
541   {
542     return true;
543   }
544 
ComputeSeeds(vtkDataSet * source,PStreamTracerPointArray & out,int & maxId)545   vtkSmartPointer<vtkIdList> ComputeSeeds(
546     vtkDataSet* source, PStreamTracerPointArray& out, int& maxId)
547   {
548     vtkDataArray* seeds;
549     vtkIdList* seedIds;
550     vtkIntArray* integrationDirections;
551     this->Tracer->InitializeSeeds(seeds, seedIds, integrationDirections, source);
552 
553     int numSeeds = seedIds->GetNumberOfIds();
554     for (int i = 0; i < numSeeds; i++)
555     {
556       double seed[3];
557       seeds->GetTuple(seedIds->GetId(i), seed);
558       vtkSmartPointer<PStreamTracerPoint> point =
559         NewPoint(i, seed, integrationDirections->GetValue(i));
560       if (this->InBound(point))
561       {
562         out.push_back(point);
563       }
564     }
565     if (seeds)
566     {
567       seeds->Delete();
568     }
569     if (integrationDirections)
570     {
571       integrationDirections->Delete();
572     }
573 
574     maxId = numSeeds - 1;
575     return vtkSmartPointer<vtkIdList>::Take(seedIds);
576   }
577 
Initialize(vtkPStreamTracer * tracer)578   virtual void Initialize(vtkPStreamTracer* tracer)
579   {
580     this->Tracer = tracer;
581     this->Controller = tracer->Controller;
582     this->Rank = tracer->Rank;
583     this->NumProcs = tracer->NumProcs;
584     this->InputData = tracer->InputData;
585     this->VecType = 0;
586     this->VecName = nullptr;
587     this->Input0 = nullptr;
588     if (!tracer->EmptyData)
589     {
590       vtkCompositeDataIterator* iter = tracer->InputData->NewIterator();
591       vtkSmartPointer<vtkCompositeDataIterator> iterP(iter);
592       iter->Delete();
593       iterP->GoToFirstItem();
594       if (!iterP->IsDoneWithTraversal())
595       {
596         Input0 = vtkDataSet::SafeDownCast(iterP->GetCurrentDataObject());
597         // iterP->GotoNextitem();
598       }
599       vtkDataArray* vectors = tracer->GetInputArrayToProcess(0, this->Input0, this->VecType);
600       this->VecName = vectors->GetName();
601     }
602 
603     if (!tracer->EmptyData)
604     {
605       this->CreatePrototype(this->Input0->GetPointData(), VecType, VecName);
606     }
607   }
608 
609 protected:
AbstractPStreamTracerUtils()610   AbstractPStreamTracerUtils()
611     : Tracer(nullptr)
612     , Controller(nullptr)
613     , VecName(nullptr)
614     , Input0(nullptr)
615     , InputData(nullptr)
616   {
617   }
618 
619   virtual vtkSmartPointer<PStreamTracerPoint> NewPoint(int id, double* x, int dir) = 0;
620   virtual bool InBound(PStreamTracerPoint* p) = 0;
621 
CreatePrototype(vtkPointData * pointData,int fieldType,const char * vecName)622   void CreatePrototype(vtkPointData* pointData, int fieldType, const char* vecName)
623   {
624     this->Proto = NewPoint(-1, nullptr, -1);
625 
626     vtkNew<vtkPointData> protoPD;
627     protoPD->InterpolateAllocate(pointData, 1);
628     vtkSmartPointer<vtkDoubleArray> time = vtkSmartPointer<vtkDoubleArray>::New();
629     time->SetName("IntegrationTime");
630     protoPD->AddArray(time);
631 
632     if (fieldType == vtkDataObject::FIELD_ASSOCIATION_CELLS)
633     {
634       vtkSmartPointer<vtkDoubleArray> velocityVectors = vtkSmartPointer<vtkDoubleArray>::New();
635       velocityVectors->SetName(vecName);
636       velocityVectors->SetNumberOfComponents(3);
637       protoPD->AddArray(velocityVectors);
638     }
639 
640     if (Tracer->GetComputeVorticity())
641     {
642       vtkSmartPointer<vtkDoubleArray> vorticity = vtkSmartPointer<vtkDoubleArray>::New();
643       vorticity->SetName("Vorticity");
644       vorticity->SetNumberOfComponents(3);
645       protoPD->AddArray(vorticity);
646 
647       vtkSmartPointer<vtkDoubleArray> rotation = vtkSmartPointer<vtkDoubleArray>::New();
648       rotation->SetName("Rotation");
649       protoPD->AddArray(rotation);
650 
651       vtkSmartPointer<vtkDoubleArray> angularVel = vtkSmartPointer<vtkDoubleArray>::New();
652       angularVel->SetName("AngularVelocity");
653       protoPD->AddArray(angularVel);
654     }
655 
656     if (Tracer->GenerateNormalsInIntegrate)
657     {
658       PRINT("Generate normals prototype");
659       vtkSmartPointer<vtkDoubleArray> normals = vtkSmartPointer<vtkDoubleArray>::New();
660       normals->SetName("Normals");
661       normals->SetNumberOfComponents(3);
662       protoPD->AddArray(normals);
663     }
664     AssertEq(this->Proto->GetTail(), nullptr);
665     this->Proto->AllocateTail(protoPD);
666   }
667 
668   vtkPStreamTracer* Tracer;
669   vtkMultiProcessController* Controller;
670   vtkSmartPointer<PStreamTracerPoint> Proto;
671   int VecType;
672   char* VecName;
673   vtkDataSet* Input0;
674   vtkCompositeDataSet* InputData;
675 
676   int Rank;
677   int NumProcs;
678 };
679 
680 class PStreamTracerUtils : public AbstractPStreamTracerUtils
681 {
682 public:
683   vtkTypeMacro(PStreamTracerUtils, AbstractPStreamTracerUtils);
684 
685   static PStreamTracerUtils* New();
686 
PStreamTracerUtils()687   PStreamTracerUtils() { this->Locator = nullptr; }
688 
Initialize(vtkPStreamTracer * tracer)689   void Initialize(vtkPStreamTracer* tracer) override
690   {
691     this->Superclass::Initialize(tracer);
692     this->Locator = vtkSmartPointer<ProcessLocator>::New();
693     this->Locator->Initialize(tracer->InputData);
694   }
695 
GetProcessLocator()696   ProcessLocator* GetProcessLocator() override { return this->Locator; }
697 
InBound(PStreamTracerPoint *)698   bool InBound(PStreamTracerPoint*) override { return true; }
699 
NewPoint(int id,double * x,int dir)700   vtkSmartPointer<PStreamTracerPoint> NewPoint(int id, double* x, int dir) override
701   {
702     vtkSmartPointer<PStreamTracerPoint> p = vtkSmartPointer<PStreamTracerPoint>::New();
703     p->SetId(id);
704     if (x)
705     {
706       p->SetSeed(x);
707     }
708     p->SetDirection(dir);
709     return p;
710   }
711 
712 private:
713   vtkSmartPointer<ProcessLocator> Locator;
714 };
715 
716 vtkStandardNewMacro(PStreamTracerUtils);
717 
718 class AMRPStreamTracerUtils : public AbstractPStreamTracerUtils
719 {
720 public:
721   vtkTypeMacro(AMRPStreamTracerUtils, AbstractPStreamTracerUtils);
722   static AMRPStreamTracerUtils* New();
723   vtkSetMacro(AMR, vtkOverlappingAMR*);
724 
InitializeVelocityFunction(PStreamTracerPoint * point,vtkAbstractInterpolatedVelocityField * func)725   void InitializeVelocityFunction(
726     PStreamTracerPoint* point, vtkAbstractInterpolatedVelocityField* func) override
727   {
728     AMRPStreamTracerPoint* amrPoint = AMRPStreamTracerPoint::SafeDownCast(point);
729     assert(amrPoint);
730     vtkAMRInterpolatedVelocityField* amrFunc = vtkAMRInterpolatedVelocityField::SafeDownCast(func);
731     assert(amrFunc);
732     if (amrPoint->GetLevel() >= 0)
733     {
734       amrFunc->SetLastDataSet(amrPoint->GetLevel(), amrPoint->GetGridId());
735 #ifdef DEBUGTRACE
736       vtkUniformGrid* grid = this->AMR->GetDataSet(amrPoint->GetLevel(), amrPoint->GetGridId());
737       if (!grid || !InBB(amrPoint->GetSeed(), grid->GetBounds()))
738       {
739         PRINT("WARNING: Bad AMR Point "
740           << (grid) << " " << amrPoint->GetSeed()[0] << " " << amrPoint->GetSeed()[1] << " "
741           << amrPoint->GetSeed()[2] << " " << amrPoint->GetLevel() << " " << amrPoint->GetGridId());
742       }
743 #endif
744     }
745   }
746 
PreparePoint(PStreamTracerPoint * point,vtkAbstractInterpolatedVelocityField * func)747   bool PreparePoint(PStreamTracerPoint* point, vtkAbstractInterpolatedVelocityField* func) override
748   {
749     AMRPStreamTracerPoint* amrPoint = AMRPStreamTracerPoint::SafeDownCast(point);
750     vtkAMRInterpolatedVelocityField* amrFunc = vtkAMRInterpolatedVelocityField::SafeDownCast(func);
751     unsigned int level, id;
752     if (amrFunc->GetLastDataSetLocation(level, id))
753     {
754       amrPoint->SetLevel(level);
755       amrPoint->SetId(id);
756       int blockIndex = this->AMR->GetCompositeIndex(level, id);
757       amrPoint->SetRank(this->BlockProcess[blockIndex]);
758       return true;
759     }
760     else
761     {
762       PRINT("Invalid AMR : " << point->GetSeed()[0] << " " << point->GetSeed()[1] << " "
763                              << point->GetSeed()[2] << " "
764                              << "Probably out of bound");
765       amrPoint->SetLevel(-1);
766       amrPoint->SetGridId(-1);
767       amrPoint->SetRank(-1);
768       return false;
769     }
770   }
771 
772   // this assume that p's AMR information has been set correctly
773   // it makes no attempt to look for it
InBound(PStreamTracerPoint * p)774   bool InBound(PStreamTracerPoint* p) override
775   {
776     AMRPStreamTracerPoint* amrp = AMRPStreamTracerPoint::SafeDownCast(p);
777     if (amrp->GetLevel() < 0)
778     {
779       return false;
780     }
781     AssertNe(amrp, nullptr);
782     vtkUniformGrid* grid = this->AMR->GetDataSet(amrp->GetLevel(), amrp->GetGridId());
783     return grid != nullptr;
784   }
785 
NewPoint(int id,double * x,int dir)786   vtkSmartPointer<PStreamTracerPoint> NewPoint(int id, double* x, int dir) override
787   {
788 
789     vtkSmartPointer<AMRPStreamTracerPoint> amrp = vtkSmartPointer<AMRPStreamTracerPoint>::New();
790     vtkSmartPointer<PStreamTracerPoint> p = amrp;
791     p->SetId(id);
792     if (x)
793     {
794       p->SetSeed(x);
795     }
796     p->SetDirection(dir);
797 
798     if (x)
799     {
800       unsigned int level, gridId;
801       if (vtkAMRInterpolatedVelocityField::FindGrid(x, this->AMR, level, gridId))
802       {
803         amrp->SetLevel((int)level);
804         amrp->SetGridId((int)gridId);
805         int blockIndex = this->AMR->GetCompositeIndex(level, gridId);
806         int process = this->BlockProcess[blockIndex];
807         AssertGe(process, 0);
808         amrp->SetRank(process);
809       }
810       else
811       {
812       }
813     }
814 
815     return p;
816   }
Initialize(vtkPStreamTracer * tracer)817   void Initialize(vtkPStreamTracer* tracer) override
818   {
819     this->Superclass::Initialize(tracer);
820     AssertNe(this->InputData, nullptr);
821     this->AMR = vtkOverlappingAMR::SafeDownCast(this->InputData);
822 
823     vtkParallelAMRUtilities::DistributeProcessInformation(
824       this->AMR, this->Controller, BlockProcess);
825     this->AMR->GenerateParentChildInformation();
826   }
827 
828 protected:
AMRPStreamTracerUtils()829   AMRPStreamTracerUtils() { this->AMR = nullptr; }
830   vtkOverlappingAMR* AMR;
831 
832   std::vector<int> BlockProcess; // stores block->process information
833 };
834 vtkStandardNewMacro(AMRPStreamTracerUtils);
835 
836 namespace
837 {
normvec3(double * x,double * y)838 inline double normvec3(double* x, double* y)
839 {
840   return sqrt(
841     (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) + (x[2] - y[2]) * (x[2] - y[2]));
842 }
843 
LastPointIndex(vtkPolyData * pathPoly)844 inline vtkIdType LastPointIndex(vtkPolyData* pathPoly)
845 {
846   vtkCellArray* pathCells = pathPoly->GetLines();
847   AssertGt(pathCells->GetNumberOfCells(), 0);
848   const vtkIdType* path(nullptr);
849   vtkIdType nPoints(0);
850   pathCells->InitTraversal();
851   pathCells->GetNextCell(nPoints, path);
852   int lastPointIndex = path[nPoints - 1];
853   return lastPointIndex;
854 }
855 
856 #ifdef DEBUGTRACE
ComputeLength(vtkIdList * poly,vtkPoints * pts)857 inline double ComputeLength(vtkIdList* poly, vtkPoints* pts)
858 {
859   int n = poly->GetNumberOfIds();
860   if (n == 0)
861     return 0;
862 
863   double s(0);
864   double p[3];
865   pts->GetPoint(poly->GetId(0), p);
866   for (int j = 1; j < n; j++)
867   {
868     int pIndex = poly->GetId(j);
869     double q[3];
870     pts->GetPoint(pIndex, q);
871     s += sqrt(vtkMath::Distance2BetweenPoints(p, q));
872     memcpy(p, q, 3 * sizeof(double));
873   }
874   return s;
875 }
876 
PrintNames(ostream & out,vtkPointData * a)877 inline void PrintNames(ostream& out, vtkPointData* a)
878 {
879   for (int i = 0; i < a->GetNumberOfArrays(); i++)
880   {
881     out << a->GetArray(i)->GetName() << " ";
882   }
883   out << endl;
884 }
885 
SameShape(vtkPointData * a,vtkPointData * b)886 inline bool SameShape(vtkPointData* a, vtkPointData* b)
887 {
888   if (!a || !b)
889   {
890     return false;
891   }
892 
893   if (a->GetNumberOfArrays() != b->GetNumberOfArrays())
894   {
895     PrintNames(cerr, a);
896     PrintNames(cerr, b);
897     return false;
898   }
899 
900   int numArrays(a->GetNumberOfArrays());
901   for (int i = 0; i < numArrays; i++)
902   {
903     if (a->GetArray(i)->GetNumberOfComponents() != b->GetArray(i)->GetNumberOfComponents())
904     {
905       return false;
906     }
907   }
908 
909   return true;
910 }
911 #endif
912 
913 class MessageBuffer
914 {
915 public:
MessageBuffer(int size)916   MessageBuffer(int size) { this->Stream = new MyStream(size); }
917 
~MessageBuffer()918   ~MessageBuffer() { delete this->Stream; }
919 
GetRequest()920   vtkMPICommunicator::Request& GetRequest() { return this->Request; }
GetStream()921   MyStream& GetStream() { return *(this->Stream); }
922 
923 private:
924   vtkMPICommunicator::Request Request;
925   MyStream* Stream;
926 
927   MessageBuffer(const MessageBuffer&) = delete;
928   void operator=(const MessageBuffer&) = delete;
929 };
930 
931 typedef MyStream MessageStream;
932 
933 class Task : public vtkObject
934 {
935 public:
936   static Task* New();
937 
GetId()938   int GetId() { return this->Point->GetId(); }
939   vtkGetMacro(TraceExtended, bool);
940   vtkGetMacro(TraceTerminated, bool);
941   vtkSetMacro(TraceExtended, bool);
942   vtkSetMacro(TraceTerminated, bool);
943 
GetPoint()944   PStreamTracerPoint* GetPoint() { return this->Point; }
IncHop()945   void IncHop() { this->NumHops++; }
946 
947 private:
948   vtkSmartPointer<PStreamTracerPoint> Point;
949 
950   int NumPeeks;
951   int NumHops;
952   bool TraceTerminated;
953   bool TraceExtended;
954 
Task()955   Task()
956     : NumPeeks(0)
957     , NumHops(0)
958     , TraceTerminated(false)
959     , TraceExtended(false)
960   {
961   }
962   friend class TaskManager;
963   friend MessageStream& operator<<(MessageStream& stream, const Task& task);
964 };
965 vtkStandardNewMacro(Task);
966 
operator <<(MessageStream & stream,const Task & t)967 MessageStream& operator<<(MessageStream& stream, const Task& t)
968 {
969   t.Point->Write(stream);
970   stream << t.NumPeeks;
971   stream << t.NumHops;
972   return stream;
973 }
974 
975 // Description:
976 // Manages the communication of traces between processes
977 class TaskManager
978 {
979 public:
980   enum Message
981   {
982     NewTask,
983     NoMoreTasks,
984     TaskFinished
985   };
986 
TaskManager(ProcessLocator * locator,PStreamTracerPoint * proto)987   TaskManager(ProcessLocator* locator, PStreamTracerPoint* proto)
988     : Locator(locator)
989     , Proto(proto)
990   {
991     this->Controller =
992       vtkMPIController::SafeDownCast(vtkMultiProcessController::GetGlobalController());
993     AssertNe(this->Controller, nullptr);
994     this->NumProcs = this->Controller->GetNumberOfProcesses();
995     this->Rank = this->Controller->GetLocalProcessId();
996 
997     int prototypeSize = Proto == nullptr ? 0 : Proto->GetSize();
998     this->MessageSize = prototypeSize + sizeof(Task);
999     this->ReceiveBuffer = nullptr;
1000 
1001     this->NumSends = 0;
1002     this->Timer = vtkSmartPointer<vtkTimerLog>::New();
1003     this->ReceiveTime = 0;
1004   }
1005 
Initialize(bool hasData,const PStreamTracerPointArray & seeds,int MaxId)1006   void Initialize(bool hasData, const PStreamTracerPointArray& seeds, int MaxId)
1007   {
1008     AssertGe(MaxId, 0);
1009     int numSeeds = static_cast<int>(seeds.size());
1010     this->HasData.resize(this->NumProcs);
1011     std::fill(this->HasData.begin(), this->HasData.end(), 0);
1012     {
1013       const int self_hasdata = hasData ? 1 : 0;
1014       this->Controller->AllGather(&self_hasdata, &this->HasData[0], 1);
1015     }
1016 
1017     for (int i = 0; i < NumProcs; i++)
1018     {
1019       if (this->HasData[i])
1020       {
1021         this->Leader = i;
1022         break;
1023       }
1024     }
1025 
1026     std::vector<int> processMap0(MaxId + 1, -1);
1027     for (int i = 0; i < numSeeds; i++)
1028     {
1029       int rank = seeds[i]->GetRank();
1030       int id = seeds[i]->GetId();
1031       if (rank < 0 && this->Locator)
1032       {
1033         rank = this->Locator->InCurrentProcess(seeds[i]->GetSeed()) ? this->Rank : -1;
1034       }
1035       processMap0[id] = rank;
1036     }
1037 
1038     std::vector<int> processMap(MaxId + 1);
1039     this->Controller->AllReduce(
1040       &processMap0[0], &processMap[0], MaxId + 1, vtkCommunicator::MAX_OP);
1041 
1042     int totalNumTasks = std::accumulate(processMap.begin(), processMap.end(), 0,
1043       [](int accumlatedSum, int b) { return accumlatedSum + (b >= 0 ? 1 : 0); });
1044 
1045     this->TotalNumTasks = Rank == this->Leader
1046       ? totalNumTasks
1047       : INT_MAX; // only the master process knows how many are left
1048 
1049     for (int i = 0; i < numSeeds; i++)
1050     {
1051       int id = seeds[i]->GetId();
1052       if (processMap[id] == Rank)
1053       {
1054         vtkNew<Task> task;
1055         task->Point = seeds[i];
1056         NTasks.emplace_back(task);
1057       }
1058     }
1059     ALLPRINT(NTasks.size() << " initial seeds out of " << totalNumTasks);
1060   }
1061 
NextTask()1062   Task* NextTask()
1063   {
1064     if (!this->HasData[Rank])
1065     {
1066       return nullptr;
1067     }
1068 
1069     //---------------------------------------------------------
1070     // Send messages
1071     //---------------------------------------------------------
1072 
1073     while (!this->PTasks.empty())
1074     {
1075       vtkSmartPointer<Task> task = PTasks.back();
1076       PTasks.pop_back();
1077 
1078       if (task->GetTraceTerminated())
1079       {
1080         // send to the master process
1081         this->Send(TaskFinished, this->Leader, task);
1082       }
1083       else
1084       {
1085         if (!task->GetTraceExtended())
1086         {
1087           // increment the peak
1088           task->NumPeeks++;
1089           PRINT("Skip " << task->GetId() << " with " << task->NumPeeks << " Peeks");
1090         }
1091         else
1092         {
1093           task->NumPeeks = 1;
1094         }
1095         int nextProcess = -1;
1096         if (task->NumPeeks < this->NumProcs)
1097         {
1098           nextProcess = NextProcess(task);
1099           if (nextProcess >= 0)
1100           {
1101             task->IncHop();
1102             // send it to the next guy
1103             this->Send(NewTask, NextProcess(task), task);
1104           }
1105         }
1106 
1107         if (nextProcess < 0)
1108         {
1109           this->Send(TaskFinished, this->Leader, task); // no one can do it, norminally finished
1110           PRINT("Bail on " << task->GetId());
1111         }
1112       }
1113     }
1114 
1115     //---------------------------------------------------------
1116     // Receive messages
1117     //---------------------------------------------------------
1118 
1119     do
1120     {
1121       this->Receive(this->TotalNumTasks != 0 && this->Msgs.empty() &&
1122         NTasks.empty()); // wait if there is nothing to do
1123       while (!this->Msgs.empty())
1124       {
1125         Message msg = this->Msgs.back();
1126         this->Msgs.pop_back();
1127         switch (msg)
1128         {
1129           case NewTask:
1130             break;
1131           case TaskFinished:
1132             AssertEq(Rank, this->Leader);
1133             this->TotalNumTasks--;
1134             PRINT(TotalNumTasks << " tasks left");
1135             break;
1136           case NoMoreTasks:
1137             AssertNe(Rank, this->Leader);
1138             this->TotalNumTasks = 0;
1139             break;
1140           default:
1141             assert(false);
1142         }
1143       }
1144     } while (this->TotalNumTasks != 0 && NTasks.empty());
1145 
1146     vtkSmartPointer<Task> nextTask;
1147     if (NTasks.empty())
1148     {
1149       AssertEq(this->TotalNumTasks, 0);
1150       if (this->Rank == this->Leader) // let everyeone know
1151       {
1152         for (int i = (this->Rank + 1) % NumProcs; i != this->Rank; i = (i + 1) % NumProcs)
1153         {
1154           if (this->HasData[i])
1155           {
1156             this->Send(NoMoreTasks, i, nullptr);
1157           }
1158         }
1159       }
1160     }
1161     else
1162     {
1163       nextTask = this->NTasks.back();
1164       this->NTasks.pop_back();
1165       this->PTasks.push_back(nextTask);
1166     }
1167 
1168     return nextTask;
1169   }
1170 
~TaskManager()1171   ~TaskManager()
1172   {
1173     for (BufferList::iterator itr = SendBuffers.begin(); itr != SendBuffers.end(); ++itr)
1174     {
1175       MessageBuffer* buf = *itr;
1176       AssertNe(buf->GetRequest().Test(), 0);
1177       delete buf;
1178     }
1179     if (this->ReceiveBuffer)
1180     {
1181       this->ReceiveBuffer->GetRequest().Cancel();
1182       delete ReceiveBuffer;
1183     }
1184   }
1185 
1186 private:
1187   ProcessLocator* Locator;
1188   vtkSmartPointer<PStreamTracerPoint> Proto;
1189   vtkMPIController* Controller;
1190   std::vector<vtkSmartPointer<Task>> NTasks;
1191   std::vector<vtkSmartPointer<Task>> PTasks;
1192   std::vector<Message> Msgs;
1193   int NumProcs;
1194   int Rank;
1195   int TotalNumTasks;
1196   int MessageSize;
1197   std::vector<int> HasData;
1198   int Leader;
1199   typedef std::list<MessageBuffer*> BufferList;
1200   BufferList SendBuffers;
1201   MessageBuffer* ReceiveBuffer;
1202 
Send(int msg,int rank,Task * task)1203   void Send(int msg, int rank, Task* task)
1204   {
1205     if (task && (msg == TaskFinished))
1206     {
1207       PRINT("Done in " << task->Point->GetNumSteps() << " steps " << task->NumHops << " hops");
1208     }
1209     if (rank == this->Rank)
1210     {
1211       switch (msg)
1212       {
1213         case TaskFinished:
1214           this->TotalNumTasks--;
1215           PRINT(TotalNumTasks << " tasks left");
1216           break;
1217         default:
1218           PRINT("Unhandled message " << msg);
1219           assert(false);
1220       }
1221     }
1222     else
1223     {
1224       MessageBuffer& buf = this->NewSendBuffer();
1225       MessageStream& outStream(buf.GetStream());
1226 
1227       outStream << msg << this->Rank;
1228       AssertNe(this->Rank, rank);
1229 
1230       if (task)
1231       {
1232         outStream << (*task);
1233       }
1234 
1235       AssertGe(this->MessageSize, outStream.GetLength());
1236       this->Controller->NoBlockSend(
1237         outStream.GetRawData(), outStream.GetLength(), rank, 561, buf.GetRequest());
1238 
1239       NumSends++;
1240       if (task)
1241       {
1242         PRINT("Send " << msg << "; task "
1243                       << task->GetId()); //<<" "<<task->Seed[0]<<" "<<task->Seed[1]<<"
1244                                          //"<<task->Seed[2]<<" to "<<rank);
1245       }
1246       else
1247       {
1248         PRINT("Send " << msg);
1249       }
1250     }
1251   }
NextProcess(Task * task)1252   int NextProcess(Task* task)
1253   {
1254     PStreamTracerPoint* p = task->GetPoint();
1255     int rank = p->GetRank();
1256     if (rank >= 0)
1257     {
1258       return rank;
1259     }
1260 
1261     if (this->Locator)
1262     {
1263       rank = this->Locator->FindNextProcess(p->GetSeed());
1264     }
1265     AssertNe(rank, Rank);
1266     return rank;
1267   }
1268 
NewTaskInstance()1269   vtkSmartPointer<Task> NewTaskInstance()
1270   {
1271     vtkSmartPointer<Task> task = vtkSmartPointer<Task>::New();
1272 
1273     task->Point = this->Proto->NewInstance();
1274     task->Point->CopyTail(this->Proto);
1275     task->Point->Delete();
1276     return task;
1277   }
1278 
Read(MessageStream & stream,Task & task)1279   void Read(MessageStream& stream, Task& task)
1280   {
1281     task.Point->Read(stream);
1282     stream >> task.NumPeeks;
1283     stream >> task.NumHops;
1284   }
1285 
NewSendBuffer()1286   MessageBuffer& NewSendBuffer()
1287   {
1288     // remove all empty buffers
1289     BufferList::iterator itr = SendBuffers.begin();
1290     while (itr != SendBuffers.end())
1291     {
1292       MessageBuffer* buf(*itr);
1293       BufferList::iterator next = itr;
1294       ++next;
1295       if (buf->GetRequest().Test())
1296       {
1297         delete buf;
1298         SendBuffers.erase(itr);
1299       }
1300       itr = next;
1301     }
1302 
1303     MessageBuffer* buf = new MessageBuffer(this->MessageSize);
1304     SendBuffers.push_back(buf);
1305     return *buf;
1306   }
1307 
Receive(bool wait=false)1308   void Receive(bool wait = false)
1309   {
1310     int msg = -1;
1311     int sender(0);
1312 
1313 #ifdef DEBUGTRACE
1314     this->StartTimer();
1315 #endif
1316     if (ReceiveBuffer && wait)
1317     {
1318       ReceiveBuffer->GetRequest().Wait();
1319     }
1320 
1321     if (ReceiveBuffer && ReceiveBuffer->GetRequest().Test())
1322     {
1323       MyStream& inStream(ReceiveBuffer->GetStream());
1324       inStream >> msg >> sender;
1325       this->Msgs.push_back((Message)msg);
1326       if (msg == NewTask)
1327       {
1328         PRINT("Received message " << msg << " from " << sender)
1329 
1330         vtkSmartPointer<Task> task = this->NewTaskInstance();
1331         this->Read(inStream, *task);
1332         PRINT("Received task "
1333           << task->GetId()); //<<" "<<task->Seed[0]<<" "<<task->Seed[1]<<" "<<task->Seed[2]);
1334         this->NTasks.push_back(task);
1335       }
1336       delete ReceiveBuffer;
1337       ReceiveBuffer = nullptr;
1338     }
1339     if (ReceiveBuffer == nullptr)
1340     {
1341       ReceiveBuffer = new MessageBuffer(this->MessageSize);
1342       MyStream& inStream(ReceiveBuffer->GetStream());
1343       this->Controller->NoBlockReceive(inStream.GetRawData(), inStream.GetSize(),
1344         vtkMultiProcessController::ANY_SOURCE, 561, ReceiveBuffer->GetRequest());
1345     }
1346 
1347 #ifdef DEBUGTRACE
1348     double time = this->StopTimer();
1349     if (msg >= 0)
1350     {
1351       this->ReceiveTime += time;
1352     }
1353 #endif
1354   }
1355 
1356   int NumSends;
1357   double ReceiveTime;
1358   vtkSmartPointer<vtkTimerLog> Timer;
1359 };
1360 
1361 };
1362 
1363 vtkCxxSetObjectMacro(vtkPStreamTracer, Controller, vtkMultiProcessController);
1364 vtkCxxSetObjectMacro(vtkPStreamTracer, Interpolator, vtkAbstractInterpolatedVelocityField);
1365 vtkStandardNewMacro(vtkPStreamTracer);
1366 
vtkPStreamTracer()1367 vtkPStreamTracer::vtkPStreamTracer()
1368 {
1369   this->Controller = vtkMultiProcessController::GetGlobalController();
1370   if (this->Controller)
1371   {
1372     this->Controller->Register(this);
1373   }
1374   this->Interpolator = nullptr;
1375   this->GenerateNormalsInIntegrate = false;
1376 
1377   this->EmptyData = 0;
1378 }
1379 
~vtkPStreamTracer()1380 vtkPStreamTracer::~vtkPStreamTracer()
1381 {
1382   if (this->Controller)
1383   {
1384     this->Controller->UnRegister(this);
1385     this->Controller = nullptr;
1386   }
1387   this->SetInterpolator(nullptr);
1388 }
1389 
RequestUpdateExtent(vtkInformation * vtkNotUsed (request),vtkInformationVector ** inputVector,vtkInformationVector * outputVector)1390 int vtkPStreamTracer::RequestUpdateExtent(vtkInformation* vtkNotUsed(request),
1391   vtkInformationVector** inputVector, vtkInformationVector* outputVector)
1392 {
1393   vtkInformation* outInfo = outputVector->GetInformationObject(0);
1394   int piece = outInfo->Get(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER());
1395   int numPieces = outInfo->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES());
1396   int ghostLevel = outInfo->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS());
1397 
1398   int numInputs = this->GetNumberOfInputConnections(0);
1399   for (int idx = 0; idx < numInputs; ++idx)
1400   {
1401     vtkInformation* info = inputVector[0]->GetInformationObject(idx);
1402     if (info)
1403     {
1404       info->Set(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER(), piece);
1405       info->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES(), numPieces);
1406       info->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS(), ghostLevel);
1407     }
1408   }
1409 
1410   vtkInformation* sourceInfo = inputVector[1]->GetInformationObject(0);
1411   if (sourceInfo)
1412   {
1413     sourceInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER(), 0);
1414     sourceInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES(), 1);
1415     sourceInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS(), ghostLevel);
1416   }
1417 
1418   return 1;
1419 }
1420 
RequestData(vtkInformation * request,vtkInformationVector ** inputVector,vtkInformationVector * outputVector)1421 int vtkPStreamTracer::RequestData(
1422   vtkInformation* request, vtkInformationVector** inputVector, vtkInformationVector* outputVector)
1423 {
1424   if (!vtkMPIController::SafeDownCast(this->Controller) ||
1425     this->Controller->GetNumberOfProcesses() == 1)
1426   {
1427     this->GenerateNormalsInIntegrate = true;
1428     int result = vtkStreamTracer::RequestData(request, inputVector, outputVector);
1429     this->GenerateNormalsInIntegrate = false;
1430     return result;
1431   }
1432 
1433   this->Rank = this->Controller->GetLocalProcessId();
1434   this->NumProcs = this->Controller->GetNumberOfProcesses();
1435 
1436   vtkInformation* inInfo = inputVector[0]->GetInformationObject(0);
1437   vtkInformation* outInfo = outputVector->GetInformationObject(0);
1438   if (!this->SetupOutput(inInfo, outInfo))
1439   {
1440     return 0;
1441   }
1442 
1443   vtkPolyData* output = vtkPolyData::SafeDownCast(outInfo->Get(vtkDataObject::DATA_OBJECT()));
1444 
1445   vtkInformation* sourceInfo = inputVector[1]->GetInformationObject(0);
1446   vtkDataSet* localSource = vtkDataSet::SafeDownCast(sourceInfo->Get(vtkDataObject::DATA_OBJECT()));
1447   vtkDataSet* source = nullptr;
1448   vtkNew<vtkAppendDataSets> distantAppender;
1449   if (this->UseLocalSeedSource)
1450   {
1451     source = localSource;
1452   }
1453   else
1454   {
1455     std::vector<vtkSmartPointer<vtkDataObject>> allSources;
1456     if (this->Controller->AllGather(localSource, allSources) == 0)
1457     {
1458       vtkErrorMacro("Couldn't gather seed sources, aborting StreamTracer");
1459       return 0;
1460     }
1461     for (auto distantSource : allSources)
1462     {
1463       if (vtkDataSet* ds = vtkDataSet::SafeDownCast(distantSource))
1464       {
1465         distantAppender->AddInputData(ds);
1466       }
1467     }
1468     distantAppender->MergePointsOn();
1469     distantAppender->SetTolerance(0.0);
1470     distantAppender->Update();
1471     source = vtkDataSet::SafeDownCast(distantAppender->GetOutputDataObject(0));
1472   }
1473   if (!source)
1474   {
1475     vtkErrorMacro("Error while retrieving the source");
1476     return 0;
1477   }
1478 
1479   // init 'func' with nullptr such that we can check it later to determine
1480   // if we need to deallocate 'func' in case CheckInputs() fails (note
1481   // that a process may be assigned no any dataset when the number of
1482   // processes is greater than that of the blocks)
1483   vtkAbstractInterpolatedVelocityField* func = nullptr;
1484   int maxCellSize = 0;
1485   if (this->CheckInputs(func, &maxCellSize) != VTK_OK)
1486   {
1487     vtkDebugMacro("No appropriate inputs have been found..");
1488     this->EmptyData = 1;
1489     PRINT("Has Empty Data")
1490 
1491     // the if-statement below is a MUST since 'func' may be still nullptr
1492     // when this->InputData is nullptr ---- no any data has been assigned
1493     // to this process
1494     if (func)
1495     {
1496       func->Delete();
1497       func = nullptr;
1498     }
1499   }
1500   else
1501   {
1502     func->SetCaching(false);
1503     this->SetInterpolator(func);
1504     func->Delete();
1505   }
1506 
1507   if (vtkOverlappingAMR::SafeDownCast(this->InputData))
1508   {
1509     this->Utils = vtkSmartPointer<AMRPStreamTracerUtils>::New();
1510   }
1511   else
1512   {
1513     this->Utils = vtkSmartPointer<PStreamTracerUtils>::New();
1514   }
1515   this->Utils->Initialize(this);
1516   ALLPRINT("Vec Name: " << this->Utils->GetVecName());
1517   typedef std::vector<vtkSmartPointer<vtkPolyData>> traceOutputsType;
1518   traceOutputsType traceOutputs;
1519 
1520   TaskManager taskManager(this->Utils->GetProcessLocator(), this->Utils->GetProto());
1521   PStreamTracerPointArray seedPoints;
1522 
1523   int maxId;
1524   auto originalSeedIds = this->Utils->ComputeSeeds(source, seedPoints, maxId);
1525   taskManager.Initialize(this->EmptyData == 0, seedPoints, maxId);
1526 
1527   Task* task(nullptr);
1528   std::vector<int> traceIds;
1529   int iterations = 0;
1530   while ((task = taskManager.NextTask()))
1531   {
1532     iterations++;
1533     PStreamTracerPoint* point = task->GetPoint();
1534 
1535     vtkSmartPointer<vtkPolyData> traceOut;
1536     this->Trace(this->Utils->GetInput0(), this->Utils->GetVecType(), this->Utils->GetVecName(),
1537       point, traceOut, func, maxCellSize);
1538 
1539     task->SetTraceExtended(traceOut->GetNumberOfPoints() > 0);
1540 
1541     if (task->GetTraceExtended() && task->GetPoint()->GetTail())
1542     {
1543       // if we got this streamline from another process then this
1544       // process is responsible for filling in the gap over
1545       // the subdomain boundary
1546       this->Prepend(traceOut, task->GetPoint()->GetTail());
1547     }
1548 
1549     int resTerm = vtkStreamTracer::OUT_OF_DOMAIN;
1550     vtkIntArray* resTermArray =
1551       vtkArrayDownCast<vtkIntArray>(traceOut->GetCellData()->GetArray("ReasonForTermination"));
1552     if (resTermArray)
1553     {
1554       resTerm = resTermArray->GetValue(0);
1555     }
1556 
1557     // construct a new seed from the last point
1558     task->SetTraceTerminated(this->Controller->GetNumberOfProcesses() == 1 ||
1559       resTerm != vtkStreamTracer::OUT_OF_DOMAIN ||
1560       point->GetPropagation() > this->MaximumPropagation ||
1561       point->GetNumSteps() >= this->MaximumNumberOfSteps);
1562     if (task->GetTraceExtended() && !task->GetTraceTerminated())
1563     {
1564       task->SetTraceTerminated(
1565         !this->TraceOneStep(traceOut, func, point)); // we don't know where to go, just terminate it
1566     }
1567     if (!task->GetTraceTerminated())
1568     {
1569       task->SetTraceTerminated(!this->Utils->PreparePoint(point, func));
1570     }
1571 
1572     traceIds.push_back(task->GetId());
1573     traceOutputs.push_back(traceOut);
1574   }
1575 
1576   this->Controller->Barrier();
1577 
1578 #ifdef LOGTRACE
1579   double receiveTime = taskManager.ComputeReceiveTime();
1580   if (this->Rank == 0)
1581   {
1582     PRINT("Total receive time: " << receiveTime)
1583   }
1584   this->Controller->Barrier();
1585 #endif
1586 
1587   PRINT("Done");
1588 
1589   // The parallel integration adds all streamlines to traceOutputs
1590   // container. We append them all together here.
1591   vtkNew<vtkAppendPolyData> append;
1592   for (traceOutputsType::iterator it = traceOutputs.begin(); it != traceOutputs.end(); ++it)
1593   {
1594     vtkPolyData* inp = it->GetPointer();
1595     if (inp->GetNumberOfCells() > 0)
1596     {
1597       append->AddInputData(inp);
1598     }
1599   }
1600   if (append->GetNumberOfInputConnections(0) > 0)
1601   {
1602     append->Update();
1603     vtkPolyData* appoutput = append->GetOutput();
1604     output->CopyStructure(appoutput);
1605     output->GetPointData()->PassData(appoutput->GetPointData());
1606     output->GetCellData()->PassData(appoutput->GetCellData());
1607   }
1608 
1609   this->InputData->UnRegister(this);
1610 
1611   // Fix seed ids. The seed ids that the parallel algorithm uses are not really
1612   // seed ids but seed indices. We need to restore original seed ids so that
1613   // a full streamline gets the same seed id for forward and backward
1614   // directions.
1615   if (auto seedIds = vtkIntArray::SafeDownCast(output->GetCellData()->GetArray("SeedIds")))
1616   {
1617     vtkSMPTools::For(0, seedIds->GetNumberOfTuples(),
1618       [&originalSeedIds, &seedIds](vtkIdType start, vtkIdType end) {
1619         for (vtkIdType cc = start; cc < end; ++cc)
1620         {
1621           const auto seedIdx = seedIds->GetTypedComponent(cc, 0);
1622           assert(seedIdx < originalSeedIds->GetNumberOfIds());
1623           seedIds->SetTypedComponent(cc, 0, originalSeedIds->GetId(seedIdx));
1624         }
1625       });
1626   }
1627 
1628 #ifdef DEBUGTRACE
1629   int maxSeeds(maxId + 1);
1630   std::vector<double> lengths(maxSeeds);
1631   for (int i = 0; i < maxSeeds; i++)
1632   {
1633     lengths[i] = 0;
1634   }
1635 
1636   AssertEq(traceOutputs.size(), traceIds.size());
1637   for (unsigned int i = 0; i < traceOutputs.size(); i++)
1638   {
1639     vtkPolyData* poly = traceOutputs[i];
1640     int id = traceIds[i];
1641     double length(0);
1642     vtkCellArray* lines = poly->GetLines();
1643     if (lines)
1644     {
1645       lines->InitTraversal();
1646       vtkNew<vtkIdList> trace;
1647       lines->GetNextCell(trace);
1648       length = ComputeLength(trace, poly->GetPoints());
1649     }
1650     lengths[id] += length;
1651   }
1652   std::vector<double> totalLengths(maxSeeds);
1653   this->Controller->AllReduce(&lengths[0], &totalLengths[0], maxSeeds, vtkCommunicator::SUM_OP);
1654 
1655   int numNonZeros(0);
1656   double totalLength(0);
1657   for (int i = 0; i < maxSeeds; i++)
1658   {
1659     totalLength += totalLengths[i];
1660     if (totalLengths[i] > 0)
1661     {
1662       numNonZeros++;
1663     }
1664   }
1665 
1666   if (this->Rank == 0)
1667   {
1668     PRINT("Summary: " << maxSeeds << " seeds," << numNonZeros << " traces"
1669                       << " total length " << totalLength);
1670   }
1671 
1672 #endif
1673   PRINT("Done in " << iterations << " iterations");
1674 
1675   traceOutputs.erase(traceOutputs.begin(), traceOutputs.end());
1676   return 1;
1677 }
1678 
PrintSelf(ostream & os,vtkIndent indent)1679 void vtkPStreamTracer::PrintSelf(ostream& os, vtkIndent indent)
1680 {
1681   this->Superclass::PrintSelf(os, indent);
1682   os << indent << "Controller: " << this->Controller << endl;
1683 }
1684 
Trace(vtkDataSet * input,int vecType,const char * vecName,PStreamTracerPoint * point,vtkSmartPointer<vtkPolyData> & traceOut,vtkAbstractInterpolatedVelocityField * func,int maxCellSize)1685 void vtkPStreamTracer::Trace(vtkDataSet* input, int vecType, const char* vecName,
1686   PStreamTracerPoint* point, vtkSmartPointer<vtkPolyData>& traceOut,
1687   vtkAbstractInterpolatedVelocityField* func, int maxCellSize)
1688 {
1689   double* seedSource = point->GetSeed();
1690   int direction = point->GetDirection();
1691 
1692   this->Utils->InitializeVelocityFunction(point, func);
1693 
1694   double lastPoint[3];
1695   vtkSmartPointer<vtkFloatArray> seeds = vtkSmartPointer<vtkFloatArray>::New();
1696   seeds->SetNumberOfComponents(3);
1697   seeds->InsertNextTuple(seedSource);
1698 
1699   vtkNew<vtkIdList> seedIds;
1700   seedIds->InsertNextId(0);
1701 
1702   vtkNew<vtkIntArray> integrationDirections;
1703   integrationDirections->InsertNextValue(direction);
1704   traceOut = vtkSmartPointer<vtkPolyData>::New();
1705 
1706   double propagation = point->GetPropagation();
1707   vtkIdType numSteps = point->GetNumSteps();
1708   double integrationTime = point->GetIntegrationTime();
1709 
1710   vtkStreamTracer::Integrate(input->GetPointData(), traceOut, seeds, seedIds, integrationDirections,
1711     lastPoint, func, maxCellSize, vecType, vecName, propagation, numSteps, integrationTime);
1712   AssertGe(propagation, point->GetPropagation());
1713   AssertGe(numSteps, point->GetNumSteps());
1714 
1715   point->SetPropagation(propagation);
1716   point->SetNumSteps(numSteps);
1717   point->SetIntegrationTime(integrationTime);
1718 
1719   if (this->GenerateNormalsInIntegrate)
1720   {
1721     this->GenerateNormals(traceOut, point->GetNormal(), vecName);
1722   }
1723 
1724   if (traceOut->GetNumberOfPoints() > 0)
1725   {
1726     if (traceOut->GetLines()->GetNumberOfCells() == 0)
1727     {
1728       PRINT("Fix Single Point Path")
1729       AssertEq(traceOut->GetNumberOfPoints(), 1); // fix it
1730       vtkNew<vtkCellArray> newCells;
1731       vtkIdType cells[2] = { 0, 0 };
1732       newCells->InsertNextCell(2, cells);
1733       traceOut->SetLines(newCells);
1734 
1735       // Don't forget to add the ReasonForTermination cell array.
1736       vtkNew<vtkIntArray> retVals;
1737       retVals->SetName("ReasonForTermination");
1738       retVals->SetNumberOfTuples(1);
1739       retVals->SetValue(0, vtkStreamTracer::OUT_OF_DOMAIN);
1740       traceOut->GetCellData()->AddArray(retVals);
1741     }
1742 
1743     vtkNew<vtkIntArray> ids;
1744     ids->SetName("SeedIds");
1745     ids->SetNumberOfTuples(1);
1746     ids->SetValue(0, point->GetId());
1747     traceOut->GetCellData()->AddArray(ids);
1748   }
1749   Assert(SameShape(traceOut->GetPointData(), this->Utils->GetProto()->GetTail()->GetPointData()),
1750     "trace data does not match prototype");
1751 }
TraceOneStep(vtkPolyData * traceOut,vtkAbstractInterpolatedVelocityField * func,PStreamTracerPoint * point)1752 bool vtkPStreamTracer::TraceOneStep(
1753   vtkPolyData* traceOut, vtkAbstractInterpolatedVelocityField* func, PStreamTracerPoint* point)
1754 {
1755   double outPoint[3], outNormal[3];
1756 
1757   vtkIdType lastPointIndex = LastPointIndex(traceOut);
1758   double lastPoint[3];
1759   // Continue the integration a bit further to obtain a point
1760   // outside. The main integration step can not always be used
1761   // for this, specially if the integration is not 2nd order.
1762   traceOut->GetPoint(lastPointIndex, lastPoint);
1763 
1764   vtkInitialValueProblemSolver* ivp = this->Integrator;
1765   ivp->Register(this);
1766 
1767   vtkNew<vtkRungeKutta2> tmpSolver;
1768   this->SetIntegrator(tmpSolver);
1769 
1770   memcpy(outPoint, lastPoint, sizeof(double) * 3);
1771 
1772   double timeStepTaken = this->SimpleIntegrate(nullptr, outPoint, this->LastUsedStepSize, func);
1773   PRINT("Simple Integrate from :" << lastPoint[0] << " " << lastPoint[1] << " " << lastPoint[2]
1774                                   << " to " << outPoint[0] << " " << outPoint[1] << " "
1775                                   << outPoint[2]);
1776   double d = sqrt(vtkMath::Distance2BetweenPoints(lastPoint, outPoint));
1777 
1778   this->SetIntegrator(ivp);
1779   ivp->UnRegister(this);
1780 
1781   vtkDataArray* normals = traceOut->GetPointData()->GetArray("Normals");
1782   if (normals)
1783   {
1784     normals->GetTuple(lastPointIndex, outNormal);
1785   }
1786 
1787   bool res = d > 0;
1788   if (res)
1789   {
1790     Assert(SameShape(traceOut->GetPointData(), this->Utils->GetProto()->GetTail()->GetPointData()),
1791       "Point data mismatch");
1792     point->Reseed(outPoint, outNormal, traceOut, lastPointIndex, point->GetPropagation() + d,
1793       point->GetIntegrationTime() + timeStepTaken);
1794     AssertEq(point->GetTail()->GetPointData()->GetNumberOfTuples(), 1);
1795   }
1796 
1797   return res;
1798 }
1799 
Prepend(vtkPolyData * pathPoly,vtkPolyData * headPoly)1800 void vtkPStreamTracer::Prepend(vtkPolyData* pathPoly, vtkPolyData* headPoly)
1801 {
1802   vtkCellArray* pathCells = pathPoly->GetLines();
1803   AssertEq(pathCells->GetNumberOfCells(), 1);
1804   AssertEq(headPoly->GetNumberOfPoints(), 1);
1805 
1806   double* newPoint = headPoly->GetPoint(0);
1807   AssertEq(
1808     headPoly->GetPointData()->GetNumberOfArrays(), pathPoly->GetPointData()->GetNumberOfArrays());
1809 
1810   const vtkIdType* path(nullptr);
1811   vtkIdType nPoints(0);
1812   pathCells->InitTraversal();
1813   pathCells->GetNextCell(nPoints, path);
1814   AssertNe(path, nullptr);
1815   AssertEq(nPoints, pathPoly->GetNumberOfPoints());
1816 
1817   vtkIdType newPointId = pathPoly->GetPoints()->InsertNextPoint(newPoint);
1818 
1819   vtkPointData* headData = headPoly->GetPointData();
1820   vtkPointData* pathData = pathPoly->GetPointData();
1821   Assert(SameShape(headData, pathData), "Prepend failure");
1822   int numArrays(headData->GetNumberOfArrays());
1823   for (int i = 0; i < numArrays; i++)
1824   {
1825     pathData->CopyTuple(
1826       headData->GetAbstractArray(i), pathData->GetAbstractArray(i), 0, newPointId);
1827   }
1828 
1829   PRINT("Prepend Point " << newPointId << " " << newPoint[0] << " " << newPoint[1] << " "
1830                          << newPoint[2]);
1831   vtkNew<vtkIdList> newPath;
1832   newPath->InsertNextId(newPointId);
1833   for (int i = 0; i < nPoints; i++)
1834   {
1835     newPath->InsertNextId(path[i]);
1836   }
1837 
1838   pathCells->Reset();
1839   if (newPath->GetNumberOfIds() > 1)
1840   {
1841     pathCells->InsertNextCell(newPath);
1842   }
1843   AssertEq(pathCells->GetNumberOfCells(), 1);
1844   vtkIdType newNumPoints(0);
1845   pathCells->GetNextCell(newNumPoints, path);
1846   AssertEq(newNumPoints, nPoints + 1);
1847   AssertEq(newNumPoints, pathPoly->GetNumberOfPoints());
1848 }
1849