1 /* -*-c++-*- */
2 /* osgEarth - Geospatial SDK for OpenSceneGraph
3  * Copyright 2019 Pelican Mapping
4  * http://osgearth.org
5  *
6  * osgEarth is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>
18  */
19 #include <osgEarth/TileVisitor>
20 #include <osgEarth/CacheEstimator>
21 #include <osgEarth/FileUtils>
22 
23 #if OSG_VERSION_GREATER_OR_EQUAL(3,5,10)
24 #include <osg/os_utils>
25 #define OS_SYSTEM osg_system
26 #else
27 #define OS_SYSTEM system
28 #endif
29 
30 using namespace osgEarth;
31 
TileVisitor()32 TileVisitor::TileVisitor():
33 _total(0),
34 _processed(0),
35 _minLevel(0),
36 _maxLevel(99)
37 {
38 }
39 
40 
TileVisitor(TileHandler * handler)41 TileVisitor::TileVisitor(TileHandler* handler):
42 _tileHandler( handler ),
43 _total(0),
44 _processed(0),
45 _minLevel(0),
46 _maxLevel(99)
47 {
48 }
49 
resetProgress()50 void TileVisitor::resetProgress()
51 {
52     _total = 0;
53     _processed = 0;
54 }
55 
addExtent(const GeoExtent & extent)56 void TileVisitor::addExtent( const GeoExtent& extent )
57 {
58     _extents.push_back( extent );
59 }
60 
intersects(const GeoExtent & extent)61 bool TileVisitor::intersects( const GeoExtent& extent )
62 {
63     if ( _extents.empty()) return true;
64     else
65     {
66         for (unsigned int i = 0; i < _extents.size(); ++i)
67         {
68             if (_extents[i].intersects( extent ))
69             {
70                 return true;
71             }
72 
73         }
74     }
75     return false;
76 }
77 
setTileHandler(TileHandler * handler)78 void TileVisitor::setTileHandler( TileHandler* handler )
79 {
80     _tileHandler = handler;
81 }
82 
setProgressCallback(ProgressCallback * progress)83 void TileVisitor::setProgressCallback( ProgressCallback* progress )
84 {
85     _progress = progress;
86 }
87 
run(const Profile * mapProfile)88 void TileVisitor::run( const Profile* mapProfile )
89 {
90     _profile = mapProfile;
91 
92     // Reset the progress in case this visitor has been ran before.
93     resetProgress();
94 
95     estimate();
96 
97     // Get all the root keys and process them.
98     std::vector<TileKey> keys;
99     mapProfile->getRootKeys(keys);
100 
101     for (unsigned int i = 0; i < keys.size(); ++i)
102     {
103         processKey( keys[i] );
104     }
105 }
106 
estimate()107 void TileVisitor::estimate()
108 {
109     //Estimate the number of tiles
110     CacheEstimator est;
111     est.setMinLevel( _minLevel );
112     est.setMaxLevel( _maxLevel );
113     est.setProfile( _profile.get() );
114     for (unsigned int i = 0; i < _extents.size(); i++)
115     {
116         est.addExtent( _extents[ i ] );
117     }
118     _total = est.getNumTiles();
119 }
120 
processKey(const TileKey & key)121 void TileVisitor::processKey( const TileKey& key )
122 {
123     // If we've been cancelled then just return.
124     if (_progress && _progress->isCanceled())
125     {
126         return;
127     }
128 
129     unsigned int x, y, lod;
130     key.getTileXY(x, y);
131     lod = key.getLevelOfDetail();
132 
133     // Only process this key if it has a chance of succeeding.
134     if (_tileHandler && !_tileHandler->hasData(key))
135     {
136         return;
137     }
138 
139     bool traverseChildren = false;
140 
141     // If the key intersects the extent attempt to traverse
142     if (intersects( key.getExtent() ))
143     {
144         // If the lod is less than the min level don't do anything but do traverse the children.
145         if (lod < _minLevel)
146         {
147             traverseChildren = true;
148         }
149         else
150         {
151             // Process the key
152             traverseChildren = handleTile( key );
153         }
154     }
155 
156     // Traverse the children
157     if (traverseChildren && lod < _maxLevel)
158     {
159         for (unsigned int i = 0; i < 4; i++)
160         {
161             TileKey k = key.createChildKey(i);
162             processKey( k );
163         }
164     }
165 }
166 
incrementProgress(unsigned int amount)167 void TileVisitor::incrementProgress(unsigned int amount)
168 {
169     {
170         OpenThreads::ScopedLock< OpenThreads::Mutex > lk(_progressMutex );
171         _processed += amount;
172     }
173     if (_progress.valid())
174     {
175         // If report progress returns true then mark the task as being cancelled.
176         if (_progress->reportProgress( _processed, _total ))
177         {
178             _progress->cancel();
179         }
180     }
181 }
182 
handleTile(const TileKey & key)183 bool TileVisitor::handleTile( const TileKey& key )
184 {
185     bool result = false;
186     if (_tileHandler.valid() )
187     {
188         result = _tileHandler->handleTile( key, *this );
189     }
190 
191     incrementProgress(1);
192 
193     return result;
194 }
195 
196 
197 
198 /*****************************************************************************************/
199 /**
200  * A TaskRequest that runs a TileHandler in a background thread.
201  */
202 class HandleTileTask : public TaskRequest
203 {
204 public:
HandleTileTask(TileHandler * handler,TileVisitor * visitor,const TileKey & key)205     HandleTileTask( TileHandler* handler, TileVisitor* visitor, const TileKey& key ):
206       _handler( handler ),
207           _visitor(visitor),
208           _key( key )
209       {
210 
211       }
212 
operator ()(ProgressCallback * progress)213       virtual void operator()(ProgressCallback* progress )
214       {
215           if (_handler.valid())
216           {
217               _handler->handleTile( _key, *_visitor.get() );
218               _visitor->incrementProgress(1);
219           }
220       }
221 
222       osg::ref_ptr<TileHandler> _handler;
223       TileKey _key;
224       osg::ref_ptr<TileVisitor> _visitor;
225 };
226 
MultithreadedTileVisitor()227 MultithreadedTileVisitor::MultithreadedTileVisitor():
228 _numThreads( OpenThreads::GetNumberOfProcessors() )
229 {
230     // We must do this to avoid an error message in OpenSceneGraph b/c the findWrapper method doesn't appear to be threadsafe.
231     // This really isn't a big deal b/c this only effects data that is already cached.
232     osgDB::ObjectWrapper* wrapper = osgDB::Registry::instance()->getObjectWrapperManager()->findWrapper( "osg::Image" );
233 }
234 
MultithreadedTileVisitor(TileHandler * handler)235 MultithreadedTileVisitor::MultithreadedTileVisitor( TileHandler* handler ):
236 TileVisitor( handler ),
237     _numThreads( OpenThreads::GetNumberOfProcessors() )
238 {
239 }
240 
getNumThreads() const241 unsigned int MultithreadedTileVisitor::getNumThreads() const
242 {
243     return _numThreads;
244 }
245 
setNumThreads(unsigned int numThreads)246 void MultithreadedTileVisitor::setNumThreads( unsigned int numThreads)
247 {
248     _numThreads = numThreads;
249 }
250 
run(const Profile * mapProfile)251 void MultithreadedTileVisitor::run(const Profile* mapProfile)
252 {
253     // Start up the task service
254     OE_INFO << "Starting " << _numThreads << std::endl;
255     _taskService = new TaskService( "MTTileHandler", _numThreads, 1000 );
256 
257     // Produce the tiles
258     TileVisitor::run( mapProfile );
259 
260     // Send a poison pill to kill all the threads
261     _taskService->add( new PoisonPill() );
262 
263     OE_INFO << "Waiting on threads to complete" << _taskService->getNumRequests() << " tasks remaining" << std::endl;
264 
265     // Wait for everything to finish, checking for cancellation while we wait so we can kill all the existing tasks.
266     while (_taskService->areThreadsRunning())
267     {
268         OpenThreads::Thread::microSleep(10000);
269         if (_progress && _progress->isCanceled())
270         {
271             _taskService->cancelAll();
272         }
273     }
274     OE_INFO << "All threads have completed" << std::endl;
275 }
276 
handleTile(const TileKey & key)277 bool MultithreadedTileVisitor::handleTile( const TileKey& key )
278 {
279     // Add the tile to the task queue.
280     _taskService->add( new HandleTileTask(_tileHandler.get(), this, key ) );
281     return true;
282 }
283 
284 /*****************************************************************************************/
285 
TaskList(const Profile * profile)286 TaskList::TaskList(const Profile* profile):
287 _profile( profile )
288 {
289 }
290 
load(const std::string & filename)291 bool TaskList::load( const std::string &filename)
292 {
293     std::ifstream in( filename.c_str(), std::ios::in );
294 
295     std::string line;
296     while( getline(in, line) )
297     {
298         std::vector< std::string > parts;
299         StringTokenizer(line, parts, "," );
300 
301         if (parts.size() >= 3)
302         {
303             _keys.push_back( TileKey(
304                 as<unsigned int>(parts[0], 0u),
305                 as<unsigned int>(parts[1], 0u),
306                 as<unsigned int>(parts[2], 0u),
307                 _profile.get() ) );
308         }
309     }
310 
311 
312     return true;
313 }
314 
save(const std::string & filename)315 void TaskList::save( const std::string& filename )
316 {
317     std::ofstream out( filename.c_str() );
318     for (TileKeyList::iterator itr = _keys.begin(); itr != _keys.end(); ++itr)
319     {
320         out << (*itr).getLevelOfDetail() << ", " << (*itr).getTileX() << ", " << (*itr).getTileY() << std::endl;
321     }
322 }
323 
getKeys()324 TileKeyList& TaskList::getKeys()
325 {
326     return _keys;
327 }
328 
getKeys() const329 const TileKeyList& TaskList::getKeys() const
330 {
331     return _keys;
332 }
333 
334 /*****************************************************************************************/
MultiprocessTileVisitor()335 MultiprocessTileVisitor::MultiprocessTileVisitor():
336     _numProcesses( OpenThreads::GetNumberOfProcessors() ),
337     _batchSize(100)
338 {
339     osgDB::ObjectWrapper* wrapper = osgDB::Registry::instance()->getObjectWrapperManager()->findWrapper( "osg::Image" );
340 }
341 
MultiprocessTileVisitor(TileHandler * handler)342 MultiprocessTileVisitor::MultiprocessTileVisitor( TileHandler* handler ):
343 TileVisitor( handler ),
344     _numProcesses( OpenThreads::GetNumberOfProcessors() ),
345     _batchSize(100)
346 {
347 }
348 
getNumProcesses() const349 unsigned int MultiprocessTileVisitor::getNumProcesses() const
350 {
351     return _numProcesses;
352 }
353 
setNumProcesses(unsigned int numProcesses)354 void MultiprocessTileVisitor::setNumProcesses( unsigned int numProcesses)
355 {
356     _numProcesses = numProcesses;
357 }
358 
getBatchSize() const359 unsigned int MultiprocessTileVisitor::getBatchSize() const
360 {
361     return _batchSize;
362 }
363 
setBatchSize(unsigned int batchSize)364 void MultiprocessTileVisitor::setBatchSize( unsigned int batchSize )
365 {
366     _batchSize = batchSize;
367 }
368 
369 
run(const Profile * mapProfile)370 void MultiprocessTileVisitor::run(const Profile* mapProfile)
371 {
372     // Start up the task service
373     _taskService = new TaskService( "MPTileHandler", _numProcesses, 1000 );
374 
375     // Produce the tiles
376     TileVisitor::run( mapProfile );
377 
378     // Process any remaining tasks in the final batch
379     processBatch();
380 
381     // Send a poison pill to kill all the threads
382     _taskService->add( new PoisonPill() );
383 
384     OE_INFO << "Waiting on threads to complete" << _taskService->getNumRequests() << " tasks remaining" << std::endl;
385 
386     // Wait for everything to finish, checking for cancellation while we wait so we can kill all the existing tasks.
387     while (_taskService->areThreadsRunning())
388     {
389         OpenThreads::Thread::microSleep(10000);
390         if (_progress && _progress->isCanceled())
391         {
392             _taskService->cancelAll();
393         }
394     }
395     OE_INFO << "All threads have completed" << std::endl;
396 }
397 
handleTile(const TileKey & key)398 bool MultiprocessTileVisitor::handleTile( const TileKey& key )
399 {
400     _batch.push_back( key );
401 
402     if (_batch.size() == _batchSize)
403     {
404         processBatch();
405     }
406     return true;
407 }
408 
getEarthFile() const409 const std::string& MultiprocessTileVisitor::getEarthFile() const
410 {
411     return _earthFile;
412 }
413 
setEarthFile(const std::string & earthFile)414 void MultiprocessTileVisitor::setEarthFile( const std::string& earthFile )
415 {
416     _earthFile = earthFile;
417 }
418 
419 /**
420 * Executes a command in an external process
421 */
422 class ExecuteTask : public TaskRequest
423 {
424 public:
ExecuteTask(const std::string & command,TileVisitor * visitor,unsigned int count)425     ExecuteTask(const std::string& command, TileVisitor* visitor, unsigned int count):
426       _command( command ),
427       _visitor( visitor ),
428       _count( count )
429       {
430       }
431 
operator ()(ProgressCallback * progress)432       virtual void operator()(ProgressCallback* progress )
433       {
434           OS_SYSTEM(_command.c_str());
435 
436           // Cleanup the temp files and increment the progress on the visitor.
437           cleanupTempFiles();
438           _visitor->incrementProgress( _count );
439       }
440 
addTempFile(const std::string & filename)441       void addTempFile( const std::string& filename )
442       {
443           _tempFiles.push_back(filename);
444       }
445 
cleanupTempFiles()446       void cleanupTempFiles()
447       {
448           for (unsigned int i = 0; i < _tempFiles.size(); i++)
449           {
450               remove( _tempFiles[i].c_str() );
451           }
452       }
453 
454 
455       std::vector< std::string > _tempFiles;
456       std::string _command;
457       TileVisitor* _visitor;
458       unsigned int _count;
459 };
460 
processBatch()461 void MultiprocessTileVisitor::processBatch()
462 {
463     TaskList tasks( 0 );
464     for (unsigned int i = 0; i < _batch.size(); i++)
465     {
466         tasks.getKeys().push_back( _batch[i] );
467     }
468     // Save the task file out.
469     std::string tmpPath = getTempPath();
470     std::string filename = getTempName(tmpPath, "batch.tiles");
471     tasks.save( filename );
472 
473     std::stringstream command;
474     command << _tileHandler->getProcessString() << " --tiles " << filename << " " << _earthFile;
475     OE_INFO << "Running command " << command.str() << std::endl;
476     osg::ref_ptr< ExecuteTask > task = new ExecuteTask( command.str(), this, tasks.getKeys().size() );
477     // Add the task file as a temp file to the task to make sure it gets deleted
478     task->addTempFile( filename );
479 
480     _taskService->add(task.get());
481     _batch.clear();
482 }
483 
484 
485 /*****************************************************************************************/
TileKeyListVisitor()486 TileKeyListVisitor::TileKeyListVisitor()
487 {
488 }
489 
setKeys(const TileKeyList & keys)490 void TileKeyListVisitor::setKeys(const TileKeyList& keys)
491 {
492     _keys = keys;
493 }
494 
run(const Profile * mapProfile)495 void TileKeyListVisitor::run(const Profile* mapProfile)
496 {
497     resetProgress();
498 
499     for (TileKeyList::iterator itr = _keys.begin(); itr != _keys.end(); ++itr)
500     {
501         if (_tileHandler)
502         {
503             _tileHandler->handleTile( *itr, *this );
504             incrementProgress(1);
505         }
506     }
507 }
508