1 /* -*-c++-*- */
2 /* osgEarth - Geospatial SDK for OpenSceneGraph
3 * Copyright 2019 Pelican Mapping
4 * http://osgearth.org
5 *
6 * osgEarth is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>
18 */
19 #include <osgEarth/TileVisitor>
20 #include <osgEarth/CacheEstimator>
21 #include <osgEarth/FileUtils>
22
23 #if OSG_VERSION_GREATER_OR_EQUAL(3,5,10)
24 #include <osg/os_utils>
25 #define OS_SYSTEM osg_system
26 #else
27 #define OS_SYSTEM system
28 #endif
29
30 using namespace osgEarth;
31
TileVisitor()32 TileVisitor::TileVisitor():
33 _total(0),
34 _processed(0),
35 _minLevel(0),
36 _maxLevel(99)
37 {
38 }
39
40
TileVisitor(TileHandler * handler)41 TileVisitor::TileVisitor(TileHandler* handler):
42 _tileHandler( handler ),
43 _total(0),
44 _processed(0),
45 _minLevel(0),
46 _maxLevel(99)
47 {
48 }
49
resetProgress()50 void TileVisitor::resetProgress()
51 {
52 _total = 0;
53 _processed = 0;
54 }
55
addExtent(const GeoExtent & extent)56 void TileVisitor::addExtent( const GeoExtent& extent )
57 {
58 _extents.push_back( extent );
59 }
60
intersects(const GeoExtent & extent)61 bool TileVisitor::intersects( const GeoExtent& extent )
62 {
63 if ( _extents.empty()) return true;
64 else
65 {
66 for (unsigned int i = 0; i < _extents.size(); ++i)
67 {
68 if (_extents[i].intersects( extent ))
69 {
70 return true;
71 }
72
73 }
74 }
75 return false;
76 }
77
setTileHandler(TileHandler * handler)78 void TileVisitor::setTileHandler( TileHandler* handler )
79 {
80 _tileHandler = handler;
81 }
82
setProgressCallback(ProgressCallback * progress)83 void TileVisitor::setProgressCallback( ProgressCallback* progress )
84 {
85 _progress = progress;
86 }
87
run(const Profile * mapProfile)88 void TileVisitor::run( const Profile* mapProfile )
89 {
90 _profile = mapProfile;
91
92 // Reset the progress in case this visitor has been ran before.
93 resetProgress();
94
95 estimate();
96
97 // Get all the root keys and process them.
98 std::vector<TileKey> keys;
99 mapProfile->getRootKeys(keys);
100
101 for (unsigned int i = 0; i < keys.size(); ++i)
102 {
103 processKey( keys[i] );
104 }
105 }
106
estimate()107 void TileVisitor::estimate()
108 {
109 //Estimate the number of tiles
110 CacheEstimator est;
111 est.setMinLevel( _minLevel );
112 est.setMaxLevel( _maxLevel );
113 est.setProfile( _profile.get() );
114 for (unsigned int i = 0; i < _extents.size(); i++)
115 {
116 est.addExtent( _extents[ i ] );
117 }
118 _total = est.getNumTiles();
119 }
120
processKey(const TileKey & key)121 void TileVisitor::processKey( const TileKey& key )
122 {
123 // If we've been cancelled then just return.
124 if (_progress && _progress->isCanceled())
125 {
126 return;
127 }
128
129 unsigned int x, y, lod;
130 key.getTileXY(x, y);
131 lod = key.getLevelOfDetail();
132
133 // Only process this key if it has a chance of succeeding.
134 if (_tileHandler && !_tileHandler->hasData(key))
135 {
136 return;
137 }
138
139 bool traverseChildren = false;
140
141 // If the key intersects the extent attempt to traverse
142 if (intersects( key.getExtent() ))
143 {
144 // If the lod is less than the min level don't do anything but do traverse the children.
145 if (lod < _minLevel)
146 {
147 traverseChildren = true;
148 }
149 else
150 {
151 // Process the key
152 traverseChildren = handleTile( key );
153 }
154 }
155
156 // Traverse the children
157 if (traverseChildren && lod < _maxLevel)
158 {
159 for (unsigned int i = 0; i < 4; i++)
160 {
161 TileKey k = key.createChildKey(i);
162 processKey( k );
163 }
164 }
165 }
166
incrementProgress(unsigned int amount)167 void TileVisitor::incrementProgress(unsigned int amount)
168 {
169 {
170 OpenThreads::ScopedLock< OpenThreads::Mutex > lk(_progressMutex );
171 _processed += amount;
172 }
173 if (_progress.valid())
174 {
175 // If report progress returns true then mark the task as being cancelled.
176 if (_progress->reportProgress( _processed, _total ))
177 {
178 _progress->cancel();
179 }
180 }
181 }
182
handleTile(const TileKey & key)183 bool TileVisitor::handleTile( const TileKey& key )
184 {
185 bool result = false;
186 if (_tileHandler.valid() )
187 {
188 result = _tileHandler->handleTile( key, *this );
189 }
190
191 incrementProgress(1);
192
193 return result;
194 }
195
196
197
198 /*****************************************************************************************/
199 /**
200 * A TaskRequest that runs a TileHandler in a background thread.
201 */
202 class HandleTileTask : public TaskRequest
203 {
204 public:
HandleTileTask(TileHandler * handler,TileVisitor * visitor,const TileKey & key)205 HandleTileTask( TileHandler* handler, TileVisitor* visitor, const TileKey& key ):
206 _handler( handler ),
207 _visitor(visitor),
208 _key( key )
209 {
210
211 }
212
operator ()(ProgressCallback * progress)213 virtual void operator()(ProgressCallback* progress )
214 {
215 if (_handler.valid())
216 {
217 _handler->handleTile( _key, *_visitor.get() );
218 _visitor->incrementProgress(1);
219 }
220 }
221
222 osg::ref_ptr<TileHandler> _handler;
223 TileKey _key;
224 osg::ref_ptr<TileVisitor> _visitor;
225 };
226
MultithreadedTileVisitor()227 MultithreadedTileVisitor::MultithreadedTileVisitor():
228 _numThreads( OpenThreads::GetNumberOfProcessors() )
229 {
230 // We must do this to avoid an error message in OpenSceneGraph b/c the findWrapper method doesn't appear to be threadsafe.
231 // This really isn't a big deal b/c this only effects data that is already cached.
232 osgDB::ObjectWrapper* wrapper = osgDB::Registry::instance()->getObjectWrapperManager()->findWrapper( "osg::Image" );
233 }
234
MultithreadedTileVisitor(TileHandler * handler)235 MultithreadedTileVisitor::MultithreadedTileVisitor( TileHandler* handler ):
236 TileVisitor( handler ),
237 _numThreads( OpenThreads::GetNumberOfProcessors() )
238 {
239 }
240
getNumThreads() const241 unsigned int MultithreadedTileVisitor::getNumThreads() const
242 {
243 return _numThreads;
244 }
245
setNumThreads(unsigned int numThreads)246 void MultithreadedTileVisitor::setNumThreads( unsigned int numThreads)
247 {
248 _numThreads = numThreads;
249 }
250
run(const Profile * mapProfile)251 void MultithreadedTileVisitor::run(const Profile* mapProfile)
252 {
253 // Start up the task service
254 OE_INFO << "Starting " << _numThreads << std::endl;
255 _taskService = new TaskService( "MTTileHandler", _numThreads, 1000 );
256
257 // Produce the tiles
258 TileVisitor::run( mapProfile );
259
260 // Send a poison pill to kill all the threads
261 _taskService->add( new PoisonPill() );
262
263 OE_INFO << "Waiting on threads to complete" << _taskService->getNumRequests() << " tasks remaining" << std::endl;
264
265 // Wait for everything to finish, checking for cancellation while we wait so we can kill all the existing tasks.
266 while (_taskService->areThreadsRunning())
267 {
268 OpenThreads::Thread::microSleep(10000);
269 if (_progress && _progress->isCanceled())
270 {
271 _taskService->cancelAll();
272 }
273 }
274 OE_INFO << "All threads have completed" << std::endl;
275 }
276
handleTile(const TileKey & key)277 bool MultithreadedTileVisitor::handleTile( const TileKey& key )
278 {
279 // Add the tile to the task queue.
280 _taskService->add( new HandleTileTask(_tileHandler.get(), this, key ) );
281 return true;
282 }
283
284 /*****************************************************************************************/
285
TaskList(const Profile * profile)286 TaskList::TaskList(const Profile* profile):
287 _profile( profile )
288 {
289 }
290
load(const std::string & filename)291 bool TaskList::load( const std::string &filename)
292 {
293 std::ifstream in( filename.c_str(), std::ios::in );
294
295 std::string line;
296 while( getline(in, line) )
297 {
298 std::vector< std::string > parts;
299 StringTokenizer(line, parts, "," );
300
301 if (parts.size() >= 3)
302 {
303 _keys.push_back( TileKey(
304 as<unsigned int>(parts[0], 0u),
305 as<unsigned int>(parts[1], 0u),
306 as<unsigned int>(parts[2], 0u),
307 _profile.get() ) );
308 }
309 }
310
311
312 return true;
313 }
314
save(const std::string & filename)315 void TaskList::save( const std::string& filename )
316 {
317 std::ofstream out( filename.c_str() );
318 for (TileKeyList::iterator itr = _keys.begin(); itr != _keys.end(); ++itr)
319 {
320 out << (*itr).getLevelOfDetail() << ", " << (*itr).getTileX() << ", " << (*itr).getTileY() << std::endl;
321 }
322 }
323
getKeys()324 TileKeyList& TaskList::getKeys()
325 {
326 return _keys;
327 }
328
getKeys() const329 const TileKeyList& TaskList::getKeys() const
330 {
331 return _keys;
332 }
333
334 /*****************************************************************************************/
MultiprocessTileVisitor()335 MultiprocessTileVisitor::MultiprocessTileVisitor():
336 _numProcesses( OpenThreads::GetNumberOfProcessors() ),
337 _batchSize(100)
338 {
339 osgDB::ObjectWrapper* wrapper = osgDB::Registry::instance()->getObjectWrapperManager()->findWrapper( "osg::Image" );
340 }
341
MultiprocessTileVisitor(TileHandler * handler)342 MultiprocessTileVisitor::MultiprocessTileVisitor( TileHandler* handler ):
343 TileVisitor( handler ),
344 _numProcesses( OpenThreads::GetNumberOfProcessors() ),
345 _batchSize(100)
346 {
347 }
348
getNumProcesses() const349 unsigned int MultiprocessTileVisitor::getNumProcesses() const
350 {
351 return _numProcesses;
352 }
353
setNumProcesses(unsigned int numProcesses)354 void MultiprocessTileVisitor::setNumProcesses( unsigned int numProcesses)
355 {
356 _numProcesses = numProcesses;
357 }
358
getBatchSize() const359 unsigned int MultiprocessTileVisitor::getBatchSize() const
360 {
361 return _batchSize;
362 }
363
setBatchSize(unsigned int batchSize)364 void MultiprocessTileVisitor::setBatchSize( unsigned int batchSize )
365 {
366 _batchSize = batchSize;
367 }
368
369
run(const Profile * mapProfile)370 void MultiprocessTileVisitor::run(const Profile* mapProfile)
371 {
372 // Start up the task service
373 _taskService = new TaskService( "MPTileHandler", _numProcesses, 1000 );
374
375 // Produce the tiles
376 TileVisitor::run( mapProfile );
377
378 // Process any remaining tasks in the final batch
379 processBatch();
380
381 // Send a poison pill to kill all the threads
382 _taskService->add( new PoisonPill() );
383
384 OE_INFO << "Waiting on threads to complete" << _taskService->getNumRequests() << " tasks remaining" << std::endl;
385
386 // Wait for everything to finish, checking for cancellation while we wait so we can kill all the existing tasks.
387 while (_taskService->areThreadsRunning())
388 {
389 OpenThreads::Thread::microSleep(10000);
390 if (_progress && _progress->isCanceled())
391 {
392 _taskService->cancelAll();
393 }
394 }
395 OE_INFO << "All threads have completed" << std::endl;
396 }
397
handleTile(const TileKey & key)398 bool MultiprocessTileVisitor::handleTile( const TileKey& key )
399 {
400 _batch.push_back( key );
401
402 if (_batch.size() == _batchSize)
403 {
404 processBatch();
405 }
406 return true;
407 }
408
getEarthFile() const409 const std::string& MultiprocessTileVisitor::getEarthFile() const
410 {
411 return _earthFile;
412 }
413
setEarthFile(const std::string & earthFile)414 void MultiprocessTileVisitor::setEarthFile( const std::string& earthFile )
415 {
416 _earthFile = earthFile;
417 }
418
419 /**
420 * Executes a command in an external process
421 */
422 class ExecuteTask : public TaskRequest
423 {
424 public:
ExecuteTask(const std::string & command,TileVisitor * visitor,unsigned int count)425 ExecuteTask(const std::string& command, TileVisitor* visitor, unsigned int count):
426 _command( command ),
427 _visitor( visitor ),
428 _count( count )
429 {
430 }
431
operator ()(ProgressCallback * progress)432 virtual void operator()(ProgressCallback* progress )
433 {
434 OS_SYSTEM(_command.c_str());
435
436 // Cleanup the temp files and increment the progress on the visitor.
437 cleanupTempFiles();
438 _visitor->incrementProgress( _count );
439 }
440
addTempFile(const std::string & filename)441 void addTempFile( const std::string& filename )
442 {
443 _tempFiles.push_back(filename);
444 }
445
cleanupTempFiles()446 void cleanupTempFiles()
447 {
448 for (unsigned int i = 0; i < _tempFiles.size(); i++)
449 {
450 remove( _tempFiles[i].c_str() );
451 }
452 }
453
454
455 std::vector< std::string > _tempFiles;
456 std::string _command;
457 TileVisitor* _visitor;
458 unsigned int _count;
459 };
460
processBatch()461 void MultiprocessTileVisitor::processBatch()
462 {
463 TaskList tasks( 0 );
464 for (unsigned int i = 0; i < _batch.size(); i++)
465 {
466 tasks.getKeys().push_back( _batch[i] );
467 }
468 // Save the task file out.
469 std::string tmpPath = getTempPath();
470 std::string filename = getTempName(tmpPath, "batch.tiles");
471 tasks.save( filename );
472
473 std::stringstream command;
474 command << _tileHandler->getProcessString() << " --tiles " << filename << " " << _earthFile;
475 OE_INFO << "Running command " << command.str() << std::endl;
476 osg::ref_ptr< ExecuteTask > task = new ExecuteTask( command.str(), this, tasks.getKeys().size() );
477 // Add the task file as a temp file to the task to make sure it gets deleted
478 task->addTempFile( filename );
479
480 _taskService->add(task.get());
481 _batch.clear();
482 }
483
484
485 /*****************************************************************************************/
TileKeyListVisitor()486 TileKeyListVisitor::TileKeyListVisitor()
487 {
488 }
489
setKeys(const TileKeyList & keys)490 void TileKeyListVisitor::setKeys(const TileKeyList& keys)
491 {
492 _keys = keys;
493 }
494
run(const Profile * mapProfile)495 void TileKeyListVisitor::run(const Profile* mapProfile)
496 {
497 resetProgress();
498
499 for (TileKeyList::iterator itr = _keys.begin(); itr != _keys.end(); ++itr)
500 {
501 if (_tileHandler)
502 {
503 _tileHandler->handleTile( *itr, *this );
504 incrementProgress(1);
505 }
506 }
507 }
508