1 /*=========================================================================
2
3 Program: Visualization Toolkit
4 Module: vtkPStreamTracer.cxx
5
6 Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
7 All rights reserved.
8 See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
9
10 This software is distributed WITHOUT ANY WARRANTY; without even
11 the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
12 PURPOSE. See the above copyright notice for more information.
13
14 =========================================================================*/
15 #include "vtkPStreamTracer.h"
16
17 #include "vtkAMRInterpolatedVelocityField.h"
18 #include "vtkAbstractInterpolatedVelocityField.h"
19 #include "vtkAppendDataSets.h"
20 #include "vtkAppendPolyData.h"
21 #include "vtkCellArray.h"
22 #include "vtkCellData.h"
23 #include "vtkCompositeDataIterator.h"
24 #include "vtkCompositeDataSet.h"
25 #include "vtkDoubleArray.h"
26 #include "vtkFloatArray.h"
27 #include "vtkIdList.h"
28 #include "vtkInformation.h"
29 #include "vtkInformationVector.h"
30 #include "vtkIntArray.h"
31 #include "vtkMPIController.h"
32 #include "vtkMath.h"
33 #include "vtkMultiProcessController.h"
34 #include "vtkMultiProcessStream.h"
35 #include "vtkNew.h"
36 #include "vtkObjectFactory.h"
37 #include "vtkOverlappingAMR.h"
38 #include "vtkParallelAMRUtilities.h"
39 #include "vtkPointData.h"
40 #include "vtkPoints.h"
41 #include "vtkPolyData.h"
42 #include "vtkRungeKutta2.h"
43 #include "vtkSMPTools.h"
44 #include "vtkStreamingDemandDrivenPipeline.h"
45 #include "vtkTimerLog.h"
46 #include "vtkUniformGrid.h"
47
48 #include <algorithm>
49 #include <cassert>
50 #include <list>
51 #include <numeric>
52 #include <vector>
53
54 #ifndef NDEBUG
55 // #define DEBUGTRACE
56 // #define LOGTRACE
57 #else
58 #endif
59
60 #ifdef DEBUGTRACE
61 #define PRINT(x) \
62 { \
63 cout << this->Rank << ")" << x << endl; \
64 }
65 #define ALLPRINT(x) \
66 for (int rank = 0; rank < NumProcs; rank++) \
67 { \
68 Controller->Barrier(); \
69 if (rank == Rank) \
70 cout << "(" << this->Rank << ")" << x << endl; \
71 }
72 #define Assert(a, msg) \
73 { \
74 if (!a) \
75 { \
76 cerr << msg << endl; \
77 assert(false); \
78 } \
79 }
80
81 #define AssertEq(a, b) \
82 { \
83 if (a != b) \
84 { \
85 cerr << a << " != " << b << endl; \
86 assert(false); \
87 } \
88 }
89
90 #define AssertNe(a, b) \
91 { \
92 if (a == b) \
93 { \
94 cerr << a << " == " << b << endl; \
95 assert(false); \
96 } \
97 }
98
99 #define AssertGe(a, b) \
100 { \
101 if (a < b) \
102 { \
103 cerr << a << " < " << b << endl; \
104 assert(false); \
105 } \
106 }
107
108 #define AssertGt(a, b) \
109 { \
110 if (a <= b) \
111 { \
112 cerr << a << " < " << b << endl; \
113 assert(false); \
114 } \
115 }
116 //#define PRINT(id, x)
117 #else
118 #define PRINT(x)
119 #define ALLPRINT(x)
120
121 #define AssertEq(a, b)
122 #define AssertGt(a, b)
123 #define AssertGe(a, b)
124 #define Assert(a, b)
125 #define AssertNe(a, b)
126 #endif
127
128 namespace
129 {
CNext(int i,int n)130 inline int CNext(int i, int n)
131 {
132 return (i + 1) % n;
133 }
134
135 class MyStream
136 {
137 public:
MyStream(int BufferSize)138 MyStream(int BufferSize)
139 : Size(BufferSize)
140 {
141 this->Data = new char[Size];
142 this->Head = Data;
143 }
~MyStream()144 ~MyStream() { delete[] this->Data; }
GetSize()145 int GetSize() { return Size; }
146
147 template <class T>
operator <<(T t)148 MyStream& operator<<(T t)
149 {
150 unsigned int size = sizeof(T);
151 char* value = reinterpret_cast<char*>(&t);
152 for (unsigned int i = 0; i < size; i++)
153 {
154 AssertGe(Size, this->Head - this->Data);
155 *(this->Head++) = (*(value++));
156 }
157 return (*this);
158 }
159
160 template <class T>
operator >>(T & t)161 MyStream& operator>>(T& t)
162 {
163 unsigned int size = sizeof(T);
164 AssertGe(Size, this->Head + size - this->Data);
165 t = *(reinterpret_cast<T*>(this->Head));
166 this->Head += size;
167 return (*this);
168 }
169
GetRawData()170 char* GetRawData() { return this->Data; }
GetLength()171 int GetLength() { return this->Head - this->Data; }
172
Reset()173 void Reset() { this->Head = this->Data; }
174
175 private:
176 MyStream(const MyStream&) = delete;
177 void operator=(const MyStream&) = delete;
178
179 char* Data;
180 char* Head;
181 int Size;
182 };
183
InitBB(double * Bounds)184 inline void InitBB(double* Bounds)
185 {
186 Bounds[0] = DBL_MAX;
187 Bounds[1] = -DBL_MAX;
188 Bounds[2] = DBL_MAX;
189 Bounds[3] = -DBL_MAX;
190 Bounds[4] = DBL_MAX;
191 Bounds[5] = -DBL_MAX;
192 }
193
InBB(const double * x,const double * bounds)194 inline bool InBB(const double* x, const double* bounds)
195 {
196 constexpr double delta[3] = { 1e-6, 1e-6, 1e-6 };
197 return vtkMath::PointIsWithinBounds(x, bounds, delta);
198 }
199
UpdateBB(double * a,const double * b)200 inline void UpdateBB(double* a, const double* b)
201 {
202 for (int i = 0; i <= 4; i += 2)
203 {
204 if (b[i] < a[i])
205 {
206 a[i] = b[i];
207 }
208 }
209 for (int i = 1; i <= 5; i += 2)
210 {
211 if (b[i] > a[i])
212 {
213 a[i] = b[i];
214 }
215 }
216 }
217 }
218
219 class PStreamTracerPoint : public vtkObject
220 {
221 public:
222 vtkTypeMacro(PStreamTracerPoint, vtkObject);
223
224 static PStreamTracerPoint* New();
225
226 vtkGetMacro(Id, int);
227 vtkGetVector3Macro(Seed, double);
228 vtkGetVector3Macro(Normal, double);
229 vtkGetMacro(Direction, int);
230 vtkGetMacro(NumSteps, int);
231 vtkGetMacro(Propagation, double);
232 vtkGetMacro(Rank, int);
233 vtkGetMacro(IntegrationTime, double);
234
235 vtkSetMacro(Id, int);
236 vtkSetMacro(Direction, int);
237 vtkSetVector3Macro(Seed, double);
238 vtkSetMacro(NumSteps, int);
239 vtkSetMacro(Propagation, double);
240 vtkSetMacro(Rank, int);
241 vtkSetMacro(IntegrationTime, double);
242
Reseed(double * seed,double * normal,vtkPolyData * poly,int id,double propagation,double integrationTime)243 void Reseed(double* seed, double* normal, vtkPolyData* poly, int id, double propagation,
244 double integrationTime)
245 {
246 memcpy(this->Seed, seed, 3 * sizeof(double));
247 memcpy(this->Normal, normal, 3 * sizeof(double));
248
249 this->AllocateTail(poly->GetPointData());
250 double* x = poly->GetPoints()->GetPoint(id);
251 this->Tail->GetPoints()->SetPoint(0, x);
252 this->Tail->GetPointData()->CopyData(poly->GetPointData(), id, 0);
253 this->Rank = -1; // someone else figure this out
254 this->IntegrationTime = integrationTime;
255 this->Propagation = propagation;
256 }
257
GetTail()258 vtkPolyData* GetTail() { return this->Tail; }
259
CopyTail(PStreamTracerPoint * other)260 void CopyTail(PStreamTracerPoint* other)
261 {
262 if (other->Tail)
263 {
264 vtkPointData* pd = other->Tail->GetPointData();
265 if (!this->Tail)
266 {
267 AllocateTail(pd);
268 }
269 this->Tail->GetPointData()->DeepCopy(pd);
270 }
271 else
272 {
273 Tail = nullptr;
274 }
275 }
276
277 // allocate a one point vtkPolyData whose PointData setup matches pd
AllocateTail(vtkPointData * pd)278 void AllocateTail(vtkPointData* pd)
279 {
280 if (!this->Tail)
281 {
282 Tail = vtkSmartPointer<vtkPolyData>::New();
283 vtkNew<vtkPoints> points;
284 {
285 points->SetNumberOfPoints(1);
286 }
287 Tail->SetPoints(points);
288 }
289
290 this->Tail->GetPointData()->CopyAllocate(pd);
291 }
292
GetSize()293 virtual int GetSize()
294 {
295 int size(0);
296 vtkPointData* data = this->GetTail()->GetPointData();
297 for (int i = 0; i < data->GetNumberOfArrays(); i++)
298 {
299 size += data->GetArray(i)->GetNumberOfComponents();
300 }
301 return size * sizeof(double) + sizeof(PStreamTracerPoint);
302 }
303
Read(MyStream & stream)304 virtual void Read(MyStream& stream)
305 {
306 stream >> this->Id;
307 stream >> this->Seed[0];
308 stream >> this->Seed[1];
309 stream >> this->Seed[2];
310 stream >> this->Direction;
311 stream >> this->NumSteps;
312 stream >> this->Propagation;
313 stream >> this->IntegrationTime;
314
315 char hasTail(0);
316 stream >> hasTail;
317 if (hasTail)
318 {
319 double x[3];
320 for (int i = 0; i < 3; i++)
321 {
322 stream >> x[i];
323 }
324 AssertNe(this->Tail, nullptr); // someone should have allocated it by prototype
325 this->Tail->SetPoints(vtkSmartPointer<vtkPoints>::New());
326 this->Tail->GetPoints()->InsertNextPoint(x);
327
328 vtkPointData* pointData = this->Tail->GetPointData();
329 for (int i = 0; i < pointData->GetNumberOfArrays(); i++)
330 {
331 int numComponents = pointData->GetArray(i)->GetNumberOfComponents();
332 std::vector<double> xi(numComponents);
333 for (int j = 0; j < numComponents; j++)
334 {
335 double& xj(xi[j]);
336 stream >> xj;
337 }
338 pointData->GetArray(i)->InsertNextTuple(&xi[0]);
339 }
340 }
341 else
342 {
343 this->Tail = nullptr;
344 }
345 }
346
Write(MyStream & stream)347 virtual void Write(MyStream& stream)
348 {
349 stream << this->Id << this->Seed[0] << this->Seed[1] << this->Seed[2] << this->Direction
350 << this->NumSteps << this->Propagation << this->IntegrationTime;
351
352 stream << (char)(this->Tail != nullptr);
353
354 if (this->Tail)
355 {
356 double* x = this->Tail->GetPoints()->GetPoint(0);
357 for (int i = 0; i < 3; i++)
358 {
359 stream << x[i];
360 }
361 vtkPointData* pData = this->Tail->GetPointData();
362 int numArrays(pData->GetNumberOfArrays());
363 for (int i = 0; i < numArrays; i++)
364 {
365 vtkDataArray* arr = pData->GetArray(i);
366 int numComponents = arr->GetNumberOfComponents();
367 double* y = arr->GetTuple(0);
368 for (int j = 0; j < numComponents; j++)
369 stream << y[j];
370 }
371 }
372 }
373
374 private:
375 int Id;
376 double Seed[3];
377 double Normal[3];
378 int Direction;
379 int NumSteps;
380 double Propagation;
381 vtkSmartPointer<vtkPolyData> Tail;
382 int Rank;
383 double IntegrationTime;
384
385 protected:
PStreamTracerPoint()386 PStreamTracerPoint()
387 : Id(-1)
388 , Direction(0)
389 , NumSteps(0)
390 , Propagation(0)
391 , Rank(-1)
392 , IntegrationTime(0)
393 {
394 this->Seed[0] = this->Seed[1] = this->Seed[2] = -999;
395 }
396 };
397
398 vtkStandardNewMacro(PStreamTracerPoint);
399
400 class AMRPStreamTracerPoint : public PStreamTracerPoint
401 {
402 public:
403 vtkTypeMacro(AMRPStreamTracerPoint, PStreamTracerPoint);
404
405 static AMRPStreamTracerPoint* New();
406 vtkSetMacro(Level, int);
407 vtkGetMacro(Level, int);
408
409 vtkSetMacro(GridId, int);
410 vtkGetMacro(GridId, int);
411
GetSize()412 int GetSize() override { return PStreamTracerPoint::GetSize() + 2 * sizeof(int); }
413
Read(MyStream & stream)414 void Read(MyStream& stream) override
415 {
416 PStreamTracerPoint::Read(stream);
417 stream >> Level;
418 stream >> GridId;
419 }
Write(MyStream & stream)420 void Write(MyStream& stream) override
421 {
422 PStreamTracerPoint::Write(stream);
423 stream << Level << GridId;
424 }
425
426 private:
AMRPStreamTracerPoint()427 AMRPStreamTracerPoint()
428 : Level(-1)
429 , GridId(-1)
430 {
431 }
432 int Level;
433 int GridId;
434 };
435
436 typedef std::vector<vtkSmartPointer<PStreamTracerPoint>> PStreamTracerPointArray;
437
438 vtkStandardNewMacro(AMRPStreamTracerPoint);
439
440 class ProcessLocator : public vtkObject
441 {
442 public:
443 vtkTypeMacro(ProcessLocator, vtkObject);
444 static ProcessLocator* New();
Initialize(vtkCompositeDataSet * data)445 void Initialize(vtkCompositeDataSet* data)
446 {
447 this->Controller = vtkMultiProcessController::GetGlobalController();
448 this->Rank = this->Controller->GetLocalProcessId();
449 this->NumProcs = this->Controller->GetNumberOfProcesses();
450 this->InitBoundingBoxes(this->NumProcs);
451
452 double bb[6];
453 InitBB(bb);
454
455 if (data)
456 {
457 vtkCompositeDataIterator* iter = data->NewIterator();
458 iter->InitTraversal();
459 while (!iter->IsDoneWithTraversal())
460 {
461 vtkDataSet* dataSet = vtkDataSet::SafeDownCast(iter->GetCurrentDataObject());
462 AssertNe(dataSet, nullptr);
463 UpdateBB(bb, dataSet->GetBounds());
464 iter->GoToNextItem();
465 }
466 iter->Delete();
467 }
468
469 PRINT(bb[0] << " " << bb[1] << " " << bb[2] << " " << bb[3] << " " << bb[4] << " " << bb[5]);
470 this->Controller->AllGather(bb, &this->BoundingBoxes[0], 6);
471
472 #ifdef DEBUGTRACE
473 cout << "(" << Rank << ") BoundingBoxes: ";
474 for (int i = 0; i < NumProcs; i++)
475 {
476 double* box = this->GetBoundingBox(i);
477 cout << box[0] << " " << box[1] << " " << box[2] << " " << box[3] << " " << box[4] << " "
478 << box[5] << "; ";
479 }
480 cout << endl;
481 #endif
482 }
483
InCurrentProcess(double * p)484 bool InCurrentProcess(double* p) { return InBB(p, GetBoundingBox(Rank)); }
FindNextProcess(double * p)485 int FindNextProcess(double* p)
486 {
487 for (int rank = CNext(this->Rank, this->NumProcs); rank != Rank;
488 rank = CNext(rank, this->NumProcs))
489 {
490 if (InBB(p, GetBoundingBox(rank)))
491 {
492 return rank;
493 }
494 }
495 return -1;
496 }
497
498 private:
ProcessLocator()499 ProcessLocator()
500 {
501 this->Controller = nullptr;
502 this->NumProcs = 0;
503 this->Rank = 0;
504 }
505 vtkMultiProcessController* Controller;
506 int Rank;
507 int NumProcs;
508
GetBoundingBox(int i)509 double* GetBoundingBox(int i) { return &BoundingBoxes[6 * i]; }
510
InitBoundingBoxes(int num)511 void InitBoundingBoxes(int num)
512 {
513 for (int i = 0; i < 6 * num; i++)
514 {
515 this->BoundingBoxes.push_back(0);
516 }
517 }
518 std::vector<double> BoundingBoxes;
519 };
520 vtkStandardNewMacro(ProcessLocator);
521
522 class AbstractPStreamTracerUtils : public vtkObject
523 {
524 public:
525 vtkTypeMacro(AbstractPStreamTracerUtils, vtkObject);
526
527 vtkGetMacro(VecName, char*);
528 vtkGetMacro(VecType, int);
529 vtkGetMacro(Input0, vtkDataSet*);
530
GetProcessLocator()531 virtual ProcessLocator* GetProcessLocator() { return nullptr; }
532
GetProto()533 vtkSmartPointer<PStreamTracerPoint> GetProto() { return this->Proto; }
534
InitializeVelocityFunction(PStreamTracerPoint *,vtkAbstractInterpolatedVelocityField *)535 virtual void InitializeVelocityFunction(
536 PStreamTracerPoint*, vtkAbstractInterpolatedVelocityField*)
537 {
538 }
539
PreparePoint(PStreamTracerPoint *,vtkAbstractInterpolatedVelocityField *)540 virtual bool PreparePoint(PStreamTracerPoint*, vtkAbstractInterpolatedVelocityField*)
541 {
542 return true;
543 }
544
ComputeSeeds(vtkDataSet * source,PStreamTracerPointArray & out,int & maxId)545 vtkSmartPointer<vtkIdList> ComputeSeeds(
546 vtkDataSet* source, PStreamTracerPointArray& out, int& maxId)
547 {
548 vtkDataArray* seeds;
549 vtkIdList* seedIds;
550 vtkIntArray* integrationDirections;
551 this->Tracer->InitializeSeeds(seeds, seedIds, integrationDirections, source);
552
553 int numSeeds = seedIds->GetNumberOfIds();
554 for (int i = 0; i < numSeeds; i++)
555 {
556 double seed[3];
557 seeds->GetTuple(seedIds->GetId(i), seed);
558 vtkSmartPointer<PStreamTracerPoint> point =
559 NewPoint(i, seed, integrationDirections->GetValue(i));
560 if (this->InBound(point))
561 {
562 out.push_back(point);
563 }
564 }
565 if (seeds)
566 {
567 seeds->Delete();
568 }
569 if (integrationDirections)
570 {
571 integrationDirections->Delete();
572 }
573
574 maxId = numSeeds - 1;
575 return vtkSmartPointer<vtkIdList>::Take(seedIds);
576 }
577
Initialize(vtkPStreamTracer * tracer)578 virtual void Initialize(vtkPStreamTracer* tracer)
579 {
580 this->Tracer = tracer;
581 this->Controller = tracer->Controller;
582 this->Rank = tracer->Rank;
583 this->NumProcs = tracer->NumProcs;
584 this->InputData = tracer->InputData;
585 this->VecType = 0;
586 this->VecName = nullptr;
587 this->Input0 = nullptr;
588 if (!tracer->EmptyData)
589 {
590 vtkCompositeDataIterator* iter = tracer->InputData->NewIterator();
591 vtkSmartPointer<vtkCompositeDataIterator> iterP(iter);
592 iter->Delete();
593 iterP->GoToFirstItem();
594 if (!iterP->IsDoneWithTraversal())
595 {
596 Input0 = vtkDataSet::SafeDownCast(iterP->GetCurrentDataObject());
597 // iterP->GotoNextitem();
598 }
599 vtkDataArray* vectors = tracer->GetInputArrayToProcess(0, this->Input0, this->VecType);
600 this->VecName = vectors->GetName();
601 }
602
603 if (!tracer->EmptyData)
604 {
605 this->CreatePrototype(this->Input0->GetPointData(), VecType, VecName);
606 }
607 }
608
609 protected:
AbstractPStreamTracerUtils()610 AbstractPStreamTracerUtils()
611 : Tracer(nullptr)
612 , Controller(nullptr)
613 , VecName(nullptr)
614 , Input0(nullptr)
615 , InputData(nullptr)
616 {
617 }
618
619 virtual vtkSmartPointer<PStreamTracerPoint> NewPoint(int id, double* x, int dir) = 0;
620 virtual bool InBound(PStreamTracerPoint* p) = 0;
621
CreatePrototype(vtkPointData * pointData,int fieldType,const char * vecName)622 void CreatePrototype(vtkPointData* pointData, int fieldType, const char* vecName)
623 {
624 this->Proto = NewPoint(-1, nullptr, -1);
625
626 vtkNew<vtkPointData> protoPD;
627 protoPD->InterpolateAllocate(pointData, 1);
628 vtkSmartPointer<vtkDoubleArray> time = vtkSmartPointer<vtkDoubleArray>::New();
629 time->SetName("IntegrationTime");
630 protoPD->AddArray(time);
631
632 if (fieldType == vtkDataObject::FIELD_ASSOCIATION_CELLS)
633 {
634 vtkSmartPointer<vtkDoubleArray> velocityVectors = vtkSmartPointer<vtkDoubleArray>::New();
635 velocityVectors->SetName(vecName);
636 velocityVectors->SetNumberOfComponents(3);
637 protoPD->AddArray(velocityVectors);
638 }
639
640 if (Tracer->GetComputeVorticity())
641 {
642 vtkSmartPointer<vtkDoubleArray> vorticity = vtkSmartPointer<vtkDoubleArray>::New();
643 vorticity->SetName("Vorticity");
644 vorticity->SetNumberOfComponents(3);
645 protoPD->AddArray(vorticity);
646
647 vtkSmartPointer<vtkDoubleArray> rotation = vtkSmartPointer<vtkDoubleArray>::New();
648 rotation->SetName("Rotation");
649 protoPD->AddArray(rotation);
650
651 vtkSmartPointer<vtkDoubleArray> angularVel = vtkSmartPointer<vtkDoubleArray>::New();
652 angularVel->SetName("AngularVelocity");
653 protoPD->AddArray(angularVel);
654 }
655
656 if (Tracer->GenerateNormalsInIntegrate)
657 {
658 PRINT("Generate normals prototype");
659 vtkSmartPointer<vtkDoubleArray> normals = vtkSmartPointer<vtkDoubleArray>::New();
660 normals->SetName("Normals");
661 normals->SetNumberOfComponents(3);
662 protoPD->AddArray(normals);
663 }
664 AssertEq(this->Proto->GetTail(), nullptr);
665 this->Proto->AllocateTail(protoPD);
666 }
667
668 vtkPStreamTracer* Tracer;
669 vtkMultiProcessController* Controller;
670 vtkSmartPointer<PStreamTracerPoint> Proto;
671 int VecType;
672 char* VecName;
673 vtkDataSet* Input0;
674 vtkCompositeDataSet* InputData;
675
676 int Rank;
677 int NumProcs;
678 };
679
680 class PStreamTracerUtils : public AbstractPStreamTracerUtils
681 {
682 public:
683 vtkTypeMacro(PStreamTracerUtils, AbstractPStreamTracerUtils);
684
685 static PStreamTracerUtils* New();
686
PStreamTracerUtils()687 PStreamTracerUtils() { this->Locator = nullptr; }
688
Initialize(vtkPStreamTracer * tracer)689 void Initialize(vtkPStreamTracer* tracer) override
690 {
691 this->Superclass::Initialize(tracer);
692 this->Locator = vtkSmartPointer<ProcessLocator>::New();
693 this->Locator->Initialize(tracer->InputData);
694 }
695
GetProcessLocator()696 ProcessLocator* GetProcessLocator() override { return this->Locator; }
697
InBound(PStreamTracerPoint *)698 bool InBound(PStreamTracerPoint*) override { return true; }
699
NewPoint(int id,double * x,int dir)700 vtkSmartPointer<PStreamTracerPoint> NewPoint(int id, double* x, int dir) override
701 {
702 vtkSmartPointer<PStreamTracerPoint> p = vtkSmartPointer<PStreamTracerPoint>::New();
703 p->SetId(id);
704 if (x)
705 {
706 p->SetSeed(x);
707 }
708 p->SetDirection(dir);
709 return p;
710 }
711
712 private:
713 vtkSmartPointer<ProcessLocator> Locator;
714 };
715
716 vtkStandardNewMacro(PStreamTracerUtils);
717
718 class AMRPStreamTracerUtils : public AbstractPStreamTracerUtils
719 {
720 public:
721 vtkTypeMacro(AMRPStreamTracerUtils, AbstractPStreamTracerUtils);
722 static AMRPStreamTracerUtils* New();
723 vtkSetMacro(AMR, vtkOverlappingAMR*);
724
InitializeVelocityFunction(PStreamTracerPoint * point,vtkAbstractInterpolatedVelocityField * func)725 void InitializeVelocityFunction(
726 PStreamTracerPoint* point, vtkAbstractInterpolatedVelocityField* func) override
727 {
728 AMRPStreamTracerPoint* amrPoint = AMRPStreamTracerPoint::SafeDownCast(point);
729 assert(amrPoint);
730 vtkAMRInterpolatedVelocityField* amrFunc = vtkAMRInterpolatedVelocityField::SafeDownCast(func);
731 assert(amrFunc);
732 if (amrPoint->GetLevel() >= 0)
733 {
734 amrFunc->SetLastDataSet(amrPoint->GetLevel(), amrPoint->GetGridId());
735 #ifdef DEBUGTRACE
736 vtkUniformGrid* grid = this->AMR->GetDataSet(amrPoint->GetLevel(), amrPoint->GetGridId());
737 if (!grid || !InBB(amrPoint->GetSeed(), grid->GetBounds()))
738 {
739 PRINT("WARNING: Bad AMR Point "
740 << (grid) << " " << amrPoint->GetSeed()[0] << " " << amrPoint->GetSeed()[1] << " "
741 << amrPoint->GetSeed()[2] << " " << amrPoint->GetLevel() << " " << amrPoint->GetGridId());
742 }
743 #endif
744 }
745 }
746
PreparePoint(PStreamTracerPoint * point,vtkAbstractInterpolatedVelocityField * func)747 bool PreparePoint(PStreamTracerPoint* point, vtkAbstractInterpolatedVelocityField* func) override
748 {
749 AMRPStreamTracerPoint* amrPoint = AMRPStreamTracerPoint::SafeDownCast(point);
750 vtkAMRInterpolatedVelocityField* amrFunc = vtkAMRInterpolatedVelocityField::SafeDownCast(func);
751 unsigned int level, id;
752 if (amrFunc->GetLastDataSetLocation(level, id))
753 {
754 amrPoint->SetLevel(level);
755 amrPoint->SetId(id);
756 int blockIndex = this->AMR->GetCompositeIndex(level, id);
757 amrPoint->SetRank(this->BlockProcess[blockIndex]);
758 return true;
759 }
760 else
761 {
762 PRINT("Invalid AMR : " << point->GetSeed()[0] << " " << point->GetSeed()[1] << " "
763 << point->GetSeed()[2] << " "
764 << "Probably out of bound");
765 amrPoint->SetLevel(-1);
766 amrPoint->SetGridId(-1);
767 amrPoint->SetRank(-1);
768 return false;
769 }
770 }
771
772 // this assume that p's AMR information has been set correctly
773 // it makes no attempt to look for it
InBound(PStreamTracerPoint * p)774 bool InBound(PStreamTracerPoint* p) override
775 {
776 AMRPStreamTracerPoint* amrp = AMRPStreamTracerPoint::SafeDownCast(p);
777 if (amrp->GetLevel() < 0)
778 {
779 return false;
780 }
781 AssertNe(amrp, nullptr);
782 vtkUniformGrid* grid = this->AMR->GetDataSet(amrp->GetLevel(), amrp->GetGridId());
783 return grid != nullptr;
784 }
785
NewPoint(int id,double * x,int dir)786 vtkSmartPointer<PStreamTracerPoint> NewPoint(int id, double* x, int dir) override
787 {
788
789 vtkSmartPointer<AMRPStreamTracerPoint> amrp = vtkSmartPointer<AMRPStreamTracerPoint>::New();
790 vtkSmartPointer<PStreamTracerPoint> p = amrp;
791 p->SetId(id);
792 if (x)
793 {
794 p->SetSeed(x);
795 }
796 p->SetDirection(dir);
797
798 if (x)
799 {
800 unsigned int level, gridId;
801 if (vtkAMRInterpolatedVelocityField::FindGrid(x, this->AMR, level, gridId))
802 {
803 amrp->SetLevel((int)level);
804 amrp->SetGridId((int)gridId);
805 int blockIndex = this->AMR->GetCompositeIndex(level, gridId);
806 int process = this->BlockProcess[blockIndex];
807 AssertGe(process, 0);
808 amrp->SetRank(process);
809 }
810 else
811 {
812 }
813 }
814
815 return p;
816 }
Initialize(vtkPStreamTracer * tracer)817 void Initialize(vtkPStreamTracer* tracer) override
818 {
819 this->Superclass::Initialize(tracer);
820 AssertNe(this->InputData, nullptr);
821 this->AMR = vtkOverlappingAMR::SafeDownCast(this->InputData);
822
823 vtkParallelAMRUtilities::DistributeProcessInformation(
824 this->AMR, this->Controller, BlockProcess);
825 this->AMR->GenerateParentChildInformation();
826 }
827
828 protected:
AMRPStreamTracerUtils()829 AMRPStreamTracerUtils() { this->AMR = nullptr; }
830 vtkOverlappingAMR* AMR;
831
832 std::vector<int> BlockProcess; // stores block->process information
833 };
834 vtkStandardNewMacro(AMRPStreamTracerUtils);
835
836 namespace
837 {
normvec3(double * x,double * y)838 inline double normvec3(double* x, double* y)
839 {
840 return sqrt(
841 (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) + (x[2] - y[2]) * (x[2] - y[2]));
842 }
843
LastPointIndex(vtkPolyData * pathPoly)844 inline vtkIdType LastPointIndex(vtkPolyData* pathPoly)
845 {
846 vtkCellArray* pathCells = pathPoly->GetLines();
847 AssertGt(pathCells->GetNumberOfCells(), 0);
848 const vtkIdType* path(nullptr);
849 vtkIdType nPoints(0);
850 pathCells->InitTraversal();
851 pathCells->GetNextCell(nPoints, path);
852 int lastPointIndex = path[nPoints - 1];
853 return lastPointIndex;
854 }
855
856 #ifdef DEBUGTRACE
ComputeLength(vtkIdList * poly,vtkPoints * pts)857 inline double ComputeLength(vtkIdList* poly, vtkPoints* pts)
858 {
859 int n = poly->GetNumberOfIds();
860 if (n == 0)
861 return 0;
862
863 double s(0);
864 double p[3];
865 pts->GetPoint(poly->GetId(0), p);
866 for (int j = 1; j < n; j++)
867 {
868 int pIndex = poly->GetId(j);
869 double q[3];
870 pts->GetPoint(pIndex, q);
871 s += sqrt(vtkMath::Distance2BetweenPoints(p, q));
872 memcpy(p, q, 3 * sizeof(double));
873 }
874 return s;
875 }
876
PrintNames(ostream & out,vtkPointData * a)877 inline void PrintNames(ostream& out, vtkPointData* a)
878 {
879 for (int i = 0; i < a->GetNumberOfArrays(); i++)
880 {
881 out << a->GetArray(i)->GetName() << " ";
882 }
883 out << endl;
884 }
885
SameShape(vtkPointData * a,vtkPointData * b)886 inline bool SameShape(vtkPointData* a, vtkPointData* b)
887 {
888 if (!a || !b)
889 {
890 return false;
891 }
892
893 if (a->GetNumberOfArrays() != b->GetNumberOfArrays())
894 {
895 PrintNames(cerr, a);
896 PrintNames(cerr, b);
897 return false;
898 }
899
900 int numArrays(a->GetNumberOfArrays());
901 for (int i = 0; i < numArrays; i++)
902 {
903 if (a->GetArray(i)->GetNumberOfComponents() != b->GetArray(i)->GetNumberOfComponents())
904 {
905 return false;
906 }
907 }
908
909 return true;
910 }
911 #endif
912
913 class MessageBuffer
914 {
915 public:
MessageBuffer(int size)916 MessageBuffer(int size) { this->Stream = new MyStream(size); }
917
~MessageBuffer()918 ~MessageBuffer() { delete this->Stream; }
919
GetRequest()920 vtkMPICommunicator::Request& GetRequest() { return this->Request; }
GetStream()921 MyStream& GetStream() { return *(this->Stream); }
922
923 private:
924 vtkMPICommunicator::Request Request;
925 MyStream* Stream;
926
927 MessageBuffer(const MessageBuffer&) = delete;
928 void operator=(const MessageBuffer&) = delete;
929 };
930
931 typedef MyStream MessageStream;
932
933 class Task : public vtkObject
934 {
935 public:
936 static Task* New();
937
GetId()938 int GetId() { return this->Point->GetId(); }
939 vtkGetMacro(TraceExtended, bool);
940 vtkGetMacro(TraceTerminated, bool);
941 vtkSetMacro(TraceExtended, bool);
942 vtkSetMacro(TraceTerminated, bool);
943
GetPoint()944 PStreamTracerPoint* GetPoint() { return this->Point; }
IncHop()945 void IncHop() { this->NumHops++; }
946
947 private:
948 vtkSmartPointer<PStreamTracerPoint> Point;
949
950 int NumPeeks;
951 int NumHops;
952 bool TraceTerminated;
953 bool TraceExtended;
954
Task()955 Task()
956 : NumPeeks(0)
957 , NumHops(0)
958 , TraceTerminated(false)
959 , TraceExtended(false)
960 {
961 }
962 friend class TaskManager;
963 friend MessageStream& operator<<(MessageStream& stream, const Task& task);
964 };
965 vtkStandardNewMacro(Task);
966
operator <<(MessageStream & stream,const Task & t)967 MessageStream& operator<<(MessageStream& stream, const Task& t)
968 {
969 t.Point->Write(stream);
970 stream << t.NumPeeks;
971 stream << t.NumHops;
972 return stream;
973 }
974
975 // Description:
976 // Manages the communication of traces between processes
977 class TaskManager
978 {
979 public:
980 enum Message
981 {
982 NewTask,
983 NoMoreTasks,
984 TaskFinished
985 };
986
TaskManager(ProcessLocator * locator,PStreamTracerPoint * proto)987 TaskManager(ProcessLocator* locator, PStreamTracerPoint* proto)
988 : Locator(locator)
989 , Proto(proto)
990 {
991 this->Controller =
992 vtkMPIController::SafeDownCast(vtkMultiProcessController::GetGlobalController());
993 AssertNe(this->Controller, nullptr);
994 this->NumProcs = this->Controller->GetNumberOfProcesses();
995 this->Rank = this->Controller->GetLocalProcessId();
996
997 int prototypeSize = Proto == nullptr ? 0 : Proto->GetSize();
998 this->MessageSize = prototypeSize + sizeof(Task);
999 this->ReceiveBuffer = nullptr;
1000
1001 this->NumSends = 0;
1002 this->Timer = vtkSmartPointer<vtkTimerLog>::New();
1003 this->ReceiveTime = 0;
1004 }
1005
Initialize(bool hasData,const PStreamTracerPointArray & seeds,int MaxId)1006 void Initialize(bool hasData, const PStreamTracerPointArray& seeds, int MaxId)
1007 {
1008 AssertGe(MaxId, 0);
1009 int numSeeds = static_cast<int>(seeds.size());
1010 this->HasData.resize(this->NumProcs);
1011 std::fill(this->HasData.begin(), this->HasData.end(), 0);
1012 {
1013 const int self_hasdata = hasData ? 1 : 0;
1014 this->Controller->AllGather(&self_hasdata, &this->HasData[0], 1);
1015 }
1016
1017 for (int i = 0; i < NumProcs; i++)
1018 {
1019 if (this->HasData[i])
1020 {
1021 this->Leader = i;
1022 break;
1023 }
1024 }
1025
1026 std::vector<int> processMap0(MaxId + 1, -1);
1027 for (int i = 0; i < numSeeds; i++)
1028 {
1029 int rank = seeds[i]->GetRank();
1030 int id = seeds[i]->GetId();
1031 if (rank < 0 && this->Locator)
1032 {
1033 rank = this->Locator->InCurrentProcess(seeds[i]->GetSeed()) ? this->Rank : -1;
1034 }
1035 processMap0[id] = rank;
1036 }
1037
1038 std::vector<int> processMap(MaxId + 1);
1039 this->Controller->AllReduce(
1040 &processMap0[0], &processMap[0], MaxId + 1, vtkCommunicator::MAX_OP);
1041
1042 int totalNumTasks = std::accumulate(processMap.begin(), processMap.end(), 0,
1043 [](int accumlatedSum, int b) { return accumlatedSum + (b >= 0 ? 1 : 0); });
1044
1045 this->TotalNumTasks = Rank == this->Leader
1046 ? totalNumTasks
1047 : INT_MAX; // only the master process knows how many are left
1048
1049 for (int i = 0; i < numSeeds; i++)
1050 {
1051 int id = seeds[i]->GetId();
1052 if (processMap[id] == Rank)
1053 {
1054 vtkNew<Task> task;
1055 task->Point = seeds[i];
1056 NTasks.emplace_back(task);
1057 }
1058 }
1059 ALLPRINT(NTasks.size() << " initial seeds out of " << totalNumTasks);
1060 }
1061
NextTask()1062 Task* NextTask()
1063 {
1064 if (!this->HasData[Rank])
1065 {
1066 return nullptr;
1067 }
1068
1069 //---------------------------------------------------------
1070 // Send messages
1071 //---------------------------------------------------------
1072
1073 while (!this->PTasks.empty())
1074 {
1075 vtkSmartPointer<Task> task = PTasks.back();
1076 PTasks.pop_back();
1077
1078 if (task->GetTraceTerminated())
1079 {
1080 // send to the master process
1081 this->Send(TaskFinished, this->Leader, task);
1082 }
1083 else
1084 {
1085 if (!task->GetTraceExtended())
1086 {
1087 // increment the peak
1088 task->NumPeeks++;
1089 PRINT("Skip " << task->GetId() << " with " << task->NumPeeks << " Peeks");
1090 }
1091 else
1092 {
1093 task->NumPeeks = 1;
1094 }
1095 int nextProcess = -1;
1096 if (task->NumPeeks < this->NumProcs)
1097 {
1098 nextProcess = NextProcess(task);
1099 if (nextProcess >= 0)
1100 {
1101 task->IncHop();
1102 // send it to the next guy
1103 this->Send(NewTask, NextProcess(task), task);
1104 }
1105 }
1106
1107 if (nextProcess < 0)
1108 {
1109 this->Send(TaskFinished, this->Leader, task); // no one can do it, norminally finished
1110 PRINT("Bail on " << task->GetId());
1111 }
1112 }
1113 }
1114
1115 //---------------------------------------------------------
1116 // Receive messages
1117 //---------------------------------------------------------
1118
1119 do
1120 {
1121 this->Receive(this->TotalNumTasks != 0 && this->Msgs.empty() &&
1122 NTasks.empty()); // wait if there is nothing to do
1123 while (!this->Msgs.empty())
1124 {
1125 Message msg = this->Msgs.back();
1126 this->Msgs.pop_back();
1127 switch (msg)
1128 {
1129 case NewTask:
1130 break;
1131 case TaskFinished:
1132 AssertEq(Rank, this->Leader);
1133 this->TotalNumTasks--;
1134 PRINT(TotalNumTasks << " tasks left");
1135 break;
1136 case NoMoreTasks:
1137 AssertNe(Rank, this->Leader);
1138 this->TotalNumTasks = 0;
1139 break;
1140 default:
1141 assert(false);
1142 }
1143 }
1144 } while (this->TotalNumTasks != 0 && NTasks.empty());
1145
1146 vtkSmartPointer<Task> nextTask;
1147 if (NTasks.empty())
1148 {
1149 AssertEq(this->TotalNumTasks, 0);
1150 if (this->Rank == this->Leader) // let everyeone know
1151 {
1152 for (int i = (this->Rank + 1) % NumProcs; i != this->Rank; i = (i + 1) % NumProcs)
1153 {
1154 if (this->HasData[i])
1155 {
1156 this->Send(NoMoreTasks, i, nullptr);
1157 }
1158 }
1159 }
1160 }
1161 else
1162 {
1163 nextTask = this->NTasks.back();
1164 this->NTasks.pop_back();
1165 this->PTasks.push_back(nextTask);
1166 }
1167
1168 return nextTask;
1169 }
1170
~TaskManager()1171 ~TaskManager()
1172 {
1173 for (BufferList::iterator itr = SendBuffers.begin(); itr != SendBuffers.end(); ++itr)
1174 {
1175 MessageBuffer* buf = *itr;
1176 AssertNe(buf->GetRequest().Test(), 0);
1177 delete buf;
1178 }
1179 if (this->ReceiveBuffer)
1180 {
1181 this->ReceiveBuffer->GetRequest().Cancel();
1182 delete ReceiveBuffer;
1183 }
1184 }
1185
1186 private:
1187 ProcessLocator* Locator;
1188 vtkSmartPointer<PStreamTracerPoint> Proto;
1189 vtkMPIController* Controller;
1190 std::vector<vtkSmartPointer<Task>> NTasks;
1191 std::vector<vtkSmartPointer<Task>> PTasks;
1192 std::vector<Message> Msgs;
1193 int NumProcs;
1194 int Rank;
1195 int TotalNumTasks;
1196 int MessageSize;
1197 std::vector<int> HasData;
1198 int Leader;
1199 typedef std::list<MessageBuffer*> BufferList;
1200 BufferList SendBuffers;
1201 MessageBuffer* ReceiveBuffer;
1202
Send(int msg,int rank,Task * task)1203 void Send(int msg, int rank, Task* task)
1204 {
1205 if (task && (msg == TaskFinished))
1206 {
1207 PRINT("Done in " << task->Point->GetNumSteps() << " steps " << task->NumHops << " hops");
1208 }
1209 if (rank == this->Rank)
1210 {
1211 switch (msg)
1212 {
1213 case TaskFinished:
1214 this->TotalNumTasks--;
1215 PRINT(TotalNumTasks << " tasks left");
1216 break;
1217 default:
1218 PRINT("Unhandled message " << msg);
1219 assert(false);
1220 }
1221 }
1222 else
1223 {
1224 MessageBuffer& buf = this->NewSendBuffer();
1225 MessageStream& outStream(buf.GetStream());
1226
1227 outStream << msg << this->Rank;
1228 AssertNe(this->Rank, rank);
1229
1230 if (task)
1231 {
1232 outStream << (*task);
1233 }
1234
1235 AssertGe(this->MessageSize, outStream.GetLength());
1236 this->Controller->NoBlockSend(
1237 outStream.GetRawData(), outStream.GetLength(), rank, 561, buf.GetRequest());
1238
1239 NumSends++;
1240 if (task)
1241 {
1242 PRINT("Send " << msg << "; task "
1243 << task->GetId()); //<<" "<<task->Seed[0]<<" "<<task->Seed[1]<<"
1244 //"<<task->Seed[2]<<" to "<<rank);
1245 }
1246 else
1247 {
1248 PRINT("Send " << msg);
1249 }
1250 }
1251 }
NextProcess(Task * task)1252 int NextProcess(Task* task)
1253 {
1254 PStreamTracerPoint* p = task->GetPoint();
1255 int rank = p->GetRank();
1256 if (rank >= 0)
1257 {
1258 return rank;
1259 }
1260
1261 if (this->Locator)
1262 {
1263 rank = this->Locator->FindNextProcess(p->GetSeed());
1264 }
1265 AssertNe(rank, Rank);
1266 return rank;
1267 }
1268
NewTaskInstance()1269 vtkSmartPointer<Task> NewTaskInstance()
1270 {
1271 vtkSmartPointer<Task> task = vtkSmartPointer<Task>::New();
1272
1273 task->Point = this->Proto->NewInstance();
1274 task->Point->CopyTail(this->Proto);
1275 task->Point->Delete();
1276 return task;
1277 }
1278
Read(MessageStream & stream,Task & task)1279 void Read(MessageStream& stream, Task& task)
1280 {
1281 task.Point->Read(stream);
1282 stream >> task.NumPeeks;
1283 stream >> task.NumHops;
1284 }
1285
NewSendBuffer()1286 MessageBuffer& NewSendBuffer()
1287 {
1288 // remove all empty buffers
1289 BufferList::iterator itr = SendBuffers.begin();
1290 while (itr != SendBuffers.end())
1291 {
1292 MessageBuffer* buf(*itr);
1293 BufferList::iterator next = itr;
1294 ++next;
1295 if (buf->GetRequest().Test())
1296 {
1297 delete buf;
1298 SendBuffers.erase(itr);
1299 }
1300 itr = next;
1301 }
1302
1303 MessageBuffer* buf = new MessageBuffer(this->MessageSize);
1304 SendBuffers.push_back(buf);
1305 return *buf;
1306 }
1307
Receive(bool wait=false)1308 void Receive(bool wait = false)
1309 {
1310 int msg = -1;
1311 int sender(0);
1312
1313 #ifdef DEBUGTRACE
1314 this->StartTimer();
1315 #endif
1316 if (ReceiveBuffer && wait)
1317 {
1318 ReceiveBuffer->GetRequest().Wait();
1319 }
1320
1321 if (ReceiveBuffer && ReceiveBuffer->GetRequest().Test())
1322 {
1323 MyStream& inStream(ReceiveBuffer->GetStream());
1324 inStream >> msg >> sender;
1325 this->Msgs.push_back((Message)msg);
1326 if (msg == NewTask)
1327 {
1328 PRINT("Received message " << msg << " from " << sender)
1329
1330 vtkSmartPointer<Task> task = this->NewTaskInstance();
1331 this->Read(inStream, *task);
1332 PRINT("Received task "
1333 << task->GetId()); //<<" "<<task->Seed[0]<<" "<<task->Seed[1]<<" "<<task->Seed[2]);
1334 this->NTasks.push_back(task);
1335 }
1336 delete ReceiveBuffer;
1337 ReceiveBuffer = nullptr;
1338 }
1339 if (ReceiveBuffer == nullptr)
1340 {
1341 ReceiveBuffer = new MessageBuffer(this->MessageSize);
1342 MyStream& inStream(ReceiveBuffer->GetStream());
1343 this->Controller->NoBlockReceive(inStream.GetRawData(), inStream.GetSize(),
1344 vtkMultiProcessController::ANY_SOURCE, 561, ReceiveBuffer->GetRequest());
1345 }
1346
1347 #ifdef DEBUGTRACE
1348 double time = this->StopTimer();
1349 if (msg >= 0)
1350 {
1351 this->ReceiveTime += time;
1352 }
1353 #endif
1354 }
1355
1356 int NumSends;
1357 double ReceiveTime;
1358 vtkSmartPointer<vtkTimerLog> Timer;
1359 };
1360
1361 };
1362
1363 vtkCxxSetObjectMacro(vtkPStreamTracer, Controller, vtkMultiProcessController);
1364 vtkCxxSetObjectMacro(vtkPStreamTracer, Interpolator, vtkAbstractInterpolatedVelocityField);
1365 vtkStandardNewMacro(vtkPStreamTracer);
1366
vtkPStreamTracer()1367 vtkPStreamTracer::vtkPStreamTracer()
1368 {
1369 this->Controller = vtkMultiProcessController::GetGlobalController();
1370 if (this->Controller)
1371 {
1372 this->Controller->Register(this);
1373 }
1374 this->Interpolator = nullptr;
1375 this->GenerateNormalsInIntegrate = false;
1376
1377 this->EmptyData = 0;
1378 }
1379
~vtkPStreamTracer()1380 vtkPStreamTracer::~vtkPStreamTracer()
1381 {
1382 if (this->Controller)
1383 {
1384 this->Controller->UnRegister(this);
1385 this->Controller = nullptr;
1386 }
1387 this->SetInterpolator(nullptr);
1388 }
1389
RequestUpdateExtent(vtkInformation * vtkNotUsed (request),vtkInformationVector ** inputVector,vtkInformationVector * outputVector)1390 int vtkPStreamTracer::RequestUpdateExtent(vtkInformation* vtkNotUsed(request),
1391 vtkInformationVector** inputVector, vtkInformationVector* outputVector)
1392 {
1393 vtkInformation* outInfo = outputVector->GetInformationObject(0);
1394 int piece = outInfo->Get(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER());
1395 int numPieces = outInfo->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES());
1396 int ghostLevel = outInfo->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS());
1397
1398 int numInputs = this->GetNumberOfInputConnections(0);
1399 for (int idx = 0; idx < numInputs; ++idx)
1400 {
1401 vtkInformation* info = inputVector[0]->GetInformationObject(idx);
1402 if (info)
1403 {
1404 info->Set(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER(), piece);
1405 info->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES(), numPieces);
1406 info->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS(), ghostLevel);
1407 }
1408 }
1409
1410 vtkInformation* sourceInfo = inputVector[1]->GetInformationObject(0);
1411 if (sourceInfo)
1412 {
1413 sourceInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER(), 0);
1414 sourceInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES(), 1);
1415 sourceInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS(), ghostLevel);
1416 }
1417
1418 return 1;
1419 }
1420
RequestData(vtkInformation * request,vtkInformationVector ** inputVector,vtkInformationVector * outputVector)1421 int vtkPStreamTracer::RequestData(
1422 vtkInformation* request, vtkInformationVector** inputVector, vtkInformationVector* outputVector)
1423 {
1424 if (!vtkMPIController::SafeDownCast(this->Controller) ||
1425 this->Controller->GetNumberOfProcesses() == 1)
1426 {
1427 this->GenerateNormalsInIntegrate = true;
1428 int result = vtkStreamTracer::RequestData(request, inputVector, outputVector);
1429 this->GenerateNormalsInIntegrate = false;
1430 return result;
1431 }
1432
1433 this->Rank = this->Controller->GetLocalProcessId();
1434 this->NumProcs = this->Controller->GetNumberOfProcesses();
1435
1436 vtkInformation* inInfo = inputVector[0]->GetInformationObject(0);
1437 vtkInformation* outInfo = outputVector->GetInformationObject(0);
1438 if (!this->SetupOutput(inInfo, outInfo))
1439 {
1440 return 0;
1441 }
1442
1443 vtkPolyData* output = vtkPolyData::SafeDownCast(outInfo->Get(vtkDataObject::DATA_OBJECT()));
1444
1445 vtkInformation* sourceInfo = inputVector[1]->GetInformationObject(0);
1446 vtkDataSet* localSource = vtkDataSet::SafeDownCast(sourceInfo->Get(vtkDataObject::DATA_OBJECT()));
1447 vtkDataSet* source = nullptr;
1448 vtkNew<vtkAppendDataSets> distantAppender;
1449 if (this->UseLocalSeedSource)
1450 {
1451 source = localSource;
1452 }
1453 else
1454 {
1455 std::vector<vtkSmartPointer<vtkDataObject>> allSources;
1456 if (this->Controller->AllGather(localSource, allSources) == 0)
1457 {
1458 vtkErrorMacro("Couldn't gather seed sources, aborting StreamTracer");
1459 return 0;
1460 }
1461 for (auto distantSource : allSources)
1462 {
1463 if (vtkDataSet* ds = vtkDataSet::SafeDownCast(distantSource))
1464 {
1465 distantAppender->AddInputData(ds);
1466 }
1467 }
1468 distantAppender->MergePointsOn();
1469 distantAppender->SetTolerance(0.0);
1470 distantAppender->Update();
1471 source = vtkDataSet::SafeDownCast(distantAppender->GetOutputDataObject(0));
1472 }
1473 if (!source)
1474 {
1475 vtkErrorMacro("Error while retrieving the source");
1476 return 0;
1477 }
1478
1479 // init 'func' with nullptr such that we can check it later to determine
1480 // if we need to deallocate 'func' in case CheckInputs() fails (note
1481 // that a process may be assigned no any dataset when the number of
1482 // processes is greater than that of the blocks)
1483 vtkAbstractInterpolatedVelocityField* func = nullptr;
1484 int maxCellSize = 0;
1485 if (this->CheckInputs(func, &maxCellSize) != VTK_OK)
1486 {
1487 vtkDebugMacro("No appropriate inputs have been found..");
1488 this->EmptyData = 1;
1489 PRINT("Has Empty Data")
1490
1491 // the if-statement below is a MUST since 'func' may be still nullptr
1492 // when this->InputData is nullptr ---- no any data has been assigned
1493 // to this process
1494 if (func)
1495 {
1496 func->Delete();
1497 func = nullptr;
1498 }
1499 }
1500 else
1501 {
1502 func->SetCaching(false);
1503 this->SetInterpolator(func);
1504 func->Delete();
1505 }
1506
1507 if (vtkOverlappingAMR::SafeDownCast(this->InputData))
1508 {
1509 this->Utils = vtkSmartPointer<AMRPStreamTracerUtils>::New();
1510 }
1511 else
1512 {
1513 this->Utils = vtkSmartPointer<PStreamTracerUtils>::New();
1514 }
1515 this->Utils->Initialize(this);
1516 ALLPRINT("Vec Name: " << this->Utils->GetVecName());
1517 typedef std::vector<vtkSmartPointer<vtkPolyData>> traceOutputsType;
1518 traceOutputsType traceOutputs;
1519
1520 TaskManager taskManager(this->Utils->GetProcessLocator(), this->Utils->GetProto());
1521 PStreamTracerPointArray seedPoints;
1522
1523 int maxId;
1524 auto originalSeedIds = this->Utils->ComputeSeeds(source, seedPoints, maxId);
1525 taskManager.Initialize(this->EmptyData == 0, seedPoints, maxId);
1526
1527 Task* task(nullptr);
1528 std::vector<int> traceIds;
1529 int iterations = 0;
1530 while ((task = taskManager.NextTask()))
1531 {
1532 iterations++;
1533 PStreamTracerPoint* point = task->GetPoint();
1534
1535 vtkSmartPointer<vtkPolyData> traceOut;
1536 this->Trace(this->Utils->GetInput0(), this->Utils->GetVecType(), this->Utils->GetVecName(),
1537 point, traceOut, func, maxCellSize);
1538
1539 task->SetTraceExtended(traceOut->GetNumberOfPoints() > 0);
1540
1541 if (task->GetTraceExtended() && task->GetPoint()->GetTail())
1542 {
1543 // if we got this streamline from another process then this
1544 // process is responsible for filling in the gap over
1545 // the subdomain boundary
1546 this->Prepend(traceOut, task->GetPoint()->GetTail());
1547 }
1548
1549 int resTerm = vtkStreamTracer::OUT_OF_DOMAIN;
1550 vtkIntArray* resTermArray =
1551 vtkArrayDownCast<vtkIntArray>(traceOut->GetCellData()->GetArray("ReasonForTermination"));
1552 if (resTermArray)
1553 {
1554 resTerm = resTermArray->GetValue(0);
1555 }
1556
1557 // construct a new seed from the last point
1558 task->SetTraceTerminated(this->Controller->GetNumberOfProcesses() == 1 ||
1559 resTerm != vtkStreamTracer::OUT_OF_DOMAIN ||
1560 point->GetPropagation() > this->MaximumPropagation ||
1561 point->GetNumSteps() >= this->MaximumNumberOfSteps);
1562 if (task->GetTraceExtended() && !task->GetTraceTerminated())
1563 {
1564 task->SetTraceTerminated(
1565 !this->TraceOneStep(traceOut, func, point)); // we don't know where to go, just terminate it
1566 }
1567 if (!task->GetTraceTerminated())
1568 {
1569 task->SetTraceTerminated(!this->Utils->PreparePoint(point, func));
1570 }
1571
1572 traceIds.push_back(task->GetId());
1573 traceOutputs.push_back(traceOut);
1574 }
1575
1576 this->Controller->Barrier();
1577
1578 #ifdef LOGTRACE
1579 double receiveTime = taskManager.ComputeReceiveTime();
1580 if (this->Rank == 0)
1581 {
1582 PRINT("Total receive time: " << receiveTime)
1583 }
1584 this->Controller->Barrier();
1585 #endif
1586
1587 PRINT("Done");
1588
1589 // The parallel integration adds all streamlines to traceOutputs
1590 // container. We append them all together here.
1591 vtkNew<vtkAppendPolyData> append;
1592 for (traceOutputsType::iterator it = traceOutputs.begin(); it != traceOutputs.end(); ++it)
1593 {
1594 vtkPolyData* inp = it->GetPointer();
1595 if (inp->GetNumberOfCells() > 0)
1596 {
1597 append->AddInputData(inp);
1598 }
1599 }
1600 if (append->GetNumberOfInputConnections(0) > 0)
1601 {
1602 append->Update();
1603 vtkPolyData* appoutput = append->GetOutput();
1604 output->CopyStructure(appoutput);
1605 output->GetPointData()->PassData(appoutput->GetPointData());
1606 output->GetCellData()->PassData(appoutput->GetCellData());
1607 }
1608
1609 this->InputData->UnRegister(this);
1610
1611 // Fix seed ids. The seed ids that the parallel algorithm uses are not really
1612 // seed ids but seed indices. We need to restore original seed ids so that
1613 // a full streamline gets the same seed id for forward and backward
1614 // directions.
1615 if (auto seedIds = vtkIntArray::SafeDownCast(output->GetCellData()->GetArray("SeedIds")))
1616 {
1617 vtkSMPTools::For(0, seedIds->GetNumberOfTuples(),
1618 [&originalSeedIds, &seedIds](vtkIdType start, vtkIdType end) {
1619 for (vtkIdType cc = start; cc < end; ++cc)
1620 {
1621 const auto seedIdx = seedIds->GetTypedComponent(cc, 0);
1622 assert(seedIdx < originalSeedIds->GetNumberOfIds());
1623 seedIds->SetTypedComponent(cc, 0, originalSeedIds->GetId(seedIdx));
1624 }
1625 });
1626 }
1627
1628 #ifdef DEBUGTRACE
1629 int maxSeeds(maxId + 1);
1630 std::vector<double> lengths(maxSeeds);
1631 for (int i = 0; i < maxSeeds; i++)
1632 {
1633 lengths[i] = 0;
1634 }
1635
1636 AssertEq(traceOutputs.size(), traceIds.size());
1637 for (unsigned int i = 0; i < traceOutputs.size(); i++)
1638 {
1639 vtkPolyData* poly = traceOutputs[i];
1640 int id = traceIds[i];
1641 double length(0);
1642 vtkCellArray* lines = poly->GetLines();
1643 if (lines)
1644 {
1645 lines->InitTraversal();
1646 vtkNew<vtkIdList> trace;
1647 lines->GetNextCell(trace);
1648 length = ComputeLength(trace, poly->GetPoints());
1649 }
1650 lengths[id] += length;
1651 }
1652 std::vector<double> totalLengths(maxSeeds);
1653 this->Controller->AllReduce(&lengths[0], &totalLengths[0], maxSeeds, vtkCommunicator::SUM_OP);
1654
1655 int numNonZeros(0);
1656 double totalLength(0);
1657 for (int i = 0; i < maxSeeds; i++)
1658 {
1659 totalLength += totalLengths[i];
1660 if (totalLengths[i] > 0)
1661 {
1662 numNonZeros++;
1663 }
1664 }
1665
1666 if (this->Rank == 0)
1667 {
1668 PRINT("Summary: " << maxSeeds << " seeds," << numNonZeros << " traces"
1669 << " total length " << totalLength);
1670 }
1671
1672 #endif
1673 PRINT("Done in " << iterations << " iterations");
1674
1675 traceOutputs.erase(traceOutputs.begin(), traceOutputs.end());
1676 return 1;
1677 }
1678
PrintSelf(ostream & os,vtkIndent indent)1679 void vtkPStreamTracer::PrintSelf(ostream& os, vtkIndent indent)
1680 {
1681 this->Superclass::PrintSelf(os, indent);
1682 os << indent << "Controller: " << this->Controller << endl;
1683 }
1684
Trace(vtkDataSet * input,int vecType,const char * vecName,PStreamTracerPoint * point,vtkSmartPointer<vtkPolyData> & traceOut,vtkAbstractInterpolatedVelocityField * func,int maxCellSize)1685 void vtkPStreamTracer::Trace(vtkDataSet* input, int vecType, const char* vecName,
1686 PStreamTracerPoint* point, vtkSmartPointer<vtkPolyData>& traceOut,
1687 vtkAbstractInterpolatedVelocityField* func, int maxCellSize)
1688 {
1689 double* seedSource = point->GetSeed();
1690 int direction = point->GetDirection();
1691
1692 this->Utils->InitializeVelocityFunction(point, func);
1693
1694 double lastPoint[3];
1695 vtkSmartPointer<vtkFloatArray> seeds = vtkSmartPointer<vtkFloatArray>::New();
1696 seeds->SetNumberOfComponents(3);
1697 seeds->InsertNextTuple(seedSource);
1698
1699 vtkNew<vtkIdList> seedIds;
1700 seedIds->InsertNextId(0);
1701
1702 vtkNew<vtkIntArray> integrationDirections;
1703 integrationDirections->InsertNextValue(direction);
1704 traceOut = vtkSmartPointer<vtkPolyData>::New();
1705
1706 double propagation = point->GetPropagation();
1707 vtkIdType numSteps = point->GetNumSteps();
1708 double integrationTime = point->GetIntegrationTime();
1709
1710 vtkStreamTracer::Integrate(input->GetPointData(), traceOut, seeds, seedIds, integrationDirections,
1711 lastPoint, func, maxCellSize, vecType, vecName, propagation, numSteps, integrationTime);
1712 AssertGe(propagation, point->GetPropagation());
1713 AssertGe(numSteps, point->GetNumSteps());
1714
1715 point->SetPropagation(propagation);
1716 point->SetNumSteps(numSteps);
1717 point->SetIntegrationTime(integrationTime);
1718
1719 if (this->GenerateNormalsInIntegrate)
1720 {
1721 this->GenerateNormals(traceOut, point->GetNormal(), vecName);
1722 }
1723
1724 if (traceOut->GetNumberOfPoints() > 0)
1725 {
1726 if (traceOut->GetLines()->GetNumberOfCells() == 0)
1727 {
1728 PRINT("Fix Single Point Path")
1729 AssertEq(traceOut->GetNumberOfPoints(), 1); // fix it
1730 vtkNew<vtkCellArray> newCells;
1731 vtkIdType cells[2] = { 0, 0 };
1732 newCells->InsertNextCell(2, cells);
1733 traceOut->SetLines(newCells);
1734
1735 // Don't forget to add the ReasonForTermination cell array.
1736 vtkNew<vtkIntArray> retVals;
1737 retVals->SetName("ReasonForTermination");
1738 retVals->SetNumberOfTuples(1);
1739 retVals->SetValue(0, vtkStreamTracer::OUT_OF_DOMAIN);
1740 traceOut->GetCellData()->AddArray(retVals);
1741 }
1742
1743 vtkNew<vtkIntArray> ids;
1744 ids->SetName("SeedIds");
1745 ids->SetNumberOfTuples(1);
1746 ids->SetValue(0, point->GetId());
1747 traceOut->GetCellData()->AddArray(ids);
1748 }
1749 Assert(SameShape(traceOut->GetPointData(), this->Utils->GetProto()->GetTail()->GetPointData()),
1750 "trace data does not match prototype");
1751 }
TraceOneStep(vtkPolyData * traceOut,vtkAbstractInterpolatedVelocityField * func,PStreamTracerPoint * point)1752 bool vtkPStreamTracer::TraceOneStep(
1753 vtkPolyData* traceOut, vtkAbstractInterpolatedVelocityField* func, PStreamTracerPoint* point)
1754 {
1755 double outPoint[3], outNormal[3];
1756
1757 vtkIdType lastPointIndex = LastPointIndex(traceOut);
1758 double lastPoint[3];
1759 // Continue the integration a bit further to obtain a point
1760 // outside. The main integration step can not always be used
1761 // for this, specially if the integration is not 2nd order.
1762 traceOut->GetPoint(lastPointIndex, lastPoint);
1763
1764 vtkInitialValueProblemSolver* ivp = this->Integrator;
1765 ivp->Register(this);
1766
1767 vtkNew<vtkRungeKutta2> tmpSolver;
1768 this->SetIntegrator(tmpSolver);
1769
1770 memcpy(outPoint, lastPoint, sizeof(double) * 3);
1771
1772 double timeStepTaken = this->SimpleIntegrate(nullptr, outPoint, this->LastUsedStepSize, func);
1773 PRINT("Simple Integrate from :" << lastPoint[0] << " " << lastPoint[1] << " " << lastPoint[2]
1774 << " to " << outPoint[0] << " " << outPoint[1] << " "
1775 << outPoint[2]);
1776 double d = sqrt(vtkMath::Distance2BetweenPoints(lastPoint, outPoint));
1777
1778 this->SetIntegrator(ivp);
1779 ivp->UnRegister(this);
1780
1781 vtkDataArray* normals = traceOut->GetPointData()->GetArray("Normals");
1782 if (normals)
1783 {
1784 normals->GetTuple(lastPointIndex, outNormal);
1785 }
1786
1787 bool res = d > 0;
1788 if (res)
1789 {
1790 Assert(SameShape(traceOut->GetPointData(), this->Utils->GetProto()->GetTail()->GetPointData()),
1791 "Point data mismatch");
1792 point->Reseed(outPoint, outNormal, traceOut, lastPointIndex, point->GetPropagation() + d,
1793 point->GetIntegrationTime() + timeStepTaken);
1794 AssertEq(point->GetTail()->GetPointData()->GetNumberOfTuples(), 1);
1795 }
1796
1797 return res;
1798 }
1799
Prepend(vtkPolyData * pathPoly,vtkPolyData * headPoly)1800 void vtkPStreamTracer::Prepend(vtkPolyData* pathPoly, vtkPolyData* headPoly)
1801 {
1802 vtkCellArray* pathCells = pathPoly->GetLines();
1803 AssertEq(pathCells->GetNumberOfCells(), 1);
1804 AssertEq(headPoly->GetNumberOfPoints(), 1);
1805
1806 double* newPoint = headPoly->GetPoint(0);
1807 AssertEq(
1808 headPoly->GetPointData()->GetNumberOfArrays(), pathPoly->GetPointData()->GetNumberOfArrays());
1809
1810 const vtkIdType* path(nullptr);
1811 vtkIdType nPoints(0);
1812 pathCells->InitTraversal();
1813 pathCells->GetNextCell(nPoints, path);
1814 AssertNe(path, nullptr);
1815 AssertEq(nPoints, pathPoly->GetNumberOfPoints());
1816
1817 vtkIdType newPointId = pathPoly->GetPoints()->InsertNextPoint(newPoint);
1818
1819 vtkPointData* headData = headPoly->GetPointData();
1820 vtkPointData* pathData = pathPoly->GetPointData();
1821 Assert(SameShape(headData, pathData), "Prepend failure");
1822 int numArrays(headData->GetNumberOfArrays());
1823 for (int i = 0; i < numArrays; i++)
1824 {
1825 pathData->CopyTuple(
1826 headData->GetAbstractArray(i), pathData->GetAbstractArray(i), 0, newPointId);
1827 }
1828
1829 PRINT("Prepend Point " << newPointId << " " << newPoint[0] << " " << newPoint[1] << " "
1830 << newPoint[2]);
1831 vtkNew<vtkIdList> newPath;
1832 newPath->InsertNextId(newPointId);
1833 for (int i = 0; i < nPoints; i++)
1834 {
1835 newPath->InsertNextId(path[i]);
1836 }
1837
1838 pathCells->Reset();
1839 if (newPath->GetNumberOfIds() > 1)
1840 {
1841 pathCells->InsertNextCell(newPath);
1842 }
1843 AssertEq(pathCells->GetNumberOfCells(), 1);
1844 vtkIdType newNumPoints(0);
1845 pathCells->GetNextCell(newNumPoints, path);
1846 AssertEq(newNumPoints, nPoints + 1);
1847 AssertEq(newNumPoints, pathPoly->GetNumberOfPoints());
1848 }
1849