1 /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
2 file Copyright.txt or https://cmake.org/licensing for details. */
3 #include "cmWorkerPool.h"
4
5 #include <algorithm>
6 #include <array>
7 #include <condition_variable>
8 #include <cstddef>
9 #include <deque>
10 #include <functional>
11 #include <mutex>
12 #include <thread>
13
14 #include <cm/memory>
15
16 #include <cm3p/uv.h>
17
18 #include "cmRange.h"
19 #include "cmStringAlgorithms.h"
20 #include "cmUVHandlePtr.h"
21 #include "cmUVSignalHackRAII.h" // IWYU pragma: keep
22
23 /**
24 * @brief libuv pipe buffer class
25 */
26 class cmUVPipeBuffer
27 {
28 public:
29 using DataRange = cmRange<const char*>;
30 using DataFunction = std::function<void(DataRange)>;
31 /// On error the ssize_t argument is a non zero libuv error code
32 using EndFunction = std::function<void(ssize_t)>;
33
34 /**
35 * Reset to construction state
36 */
37 void reset();
38
39 /**
40 * Initializes uv_pipe(), uv_stream() and uv_handle()
41 * @return true on success
42 */
43 bool init(uv_loop_t* uv_loop);
44
45 /**
46 * Start reading
47 * @return true on success
48 */
49 bool startRead(DataFunction dataFunction, EndFunction endFunction);
50
51 //! libuv pipe
uv_pipe() const52 uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
53 //! uv_pipe() casted to libuv stream
uv_stream() const54 uv_stream_t* uv_stream() const
55 {
56 return static_cast<uv_stream_t*>(this->UVPipe_);
57 }
58 //! uv_pipe() casted to libuv handle
uv_handle()59 uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
60
61 private:
62 // -- Libuv callbacks
63 static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
64 uv_buf_t* buf);
65 static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
66
67 cm::uv_pipe_ptr UVPipe_;
68 std::vector<char> Buffer_;
69 DataFunction DataFunction_;
70 EndFunction EndFunction_;
71 };
72
reset()73 void cmUVPipeBuffer::reset()
74 {
75 if (this->UVPipe_.get() != nullptr) {
76 this->EndFunction_ = nullptr;
77 this->DataFunction_ = nullptr;
78 this->Buffer_.clear();
79 this->Buffer_.shrink_to_fit();
80 this->UVPipe_.reset();
81 }
82 }
83
init(uv_loop_t * uv_loop)84 bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
85 {
86 this->reset();
87 if (uv_loop == nullptr) {
88 return false;
89 }
90 int ret = this->UVPipe_.init(*uv_loop, 0, this);
91 return (ret == 0);
92 }
93
startRead(DataFunction dataFunction,EndFunction endFunction)94 bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
95 EndFunction endFunction)
96 {
97 if (this->UVPipe_.get() == nullptr) {
98 return false;
99 }
100 if (!dataFunction || !endFunction) {
101 return false;
102 }
103 this->DataFunction_ = std::move(dataFunction);
104 this->EndFunction_ = std::move(endFunction);
105 int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
106 &cmUVPipeBuffer::UVData);
107 return (ret == 0);
108 }
109
UVAlloc(uv_handle_t * handle,size_t suggestedSize,uv_buf_t * buf)110 void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
111 uv_buf_t* buf)
112 {
113 auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
114 pipe.Buffer_.resize(suggestedSize);
115 buf->base = pipe.Buffer_.data();
116 buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
117 }
118
UVData(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)119 void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
120 const uv_buf_t* buf)
121 {
122 auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
123 if (nread > 0) {
124 if (buf->base != nullptr) {
125 // Call data function
126 pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
127 }
128 } else if (nread < 0) {
129 // Save the end function on the stack before resetting the pipe
130 EndFunction efunc;
131 efunc.swap(pipe.EndFunction_);
132 // Reset pipe before calling the end function
133 pipe.reset();
134 // Call end function
135 efunc((nread == UV_EOF) ? 0 : nread);
136 }
137 }
138
139 /**
140 * @brief External process management class
141 */
142 class cmUVReadOnlyProcess
143 {
144 public:
145 // -- Types
146 //! @brief Process settings
147 struct SetupT
148 {
149 std::string WorkingDirectory;
150 std::vector<std::string> Command;
151 cmWorkerPool::ProcessResultT* Result = nullptr;
152 bool MergedOutput = false;
153 };
154
155 // -- Const accessors
Setup() const156 SetupT const& Setup() const { return this->Setup_; }
Result() const157 cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
IsStarted() const158 bool IsStarted() const { return this->IsStarted_; }
IsFinished() const159 bool IsFinished() const { return this->IsFinished_; }
160
161 // -- Runtime
162 void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
163 std::vector<std::string> const& command,
164 std::string const& workingDirectory = std::string());
165 bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
166
167 private:
168 // -- Libuv callbacks
169 static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
170 void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
171 void UVPipeOutEnd(ssize_t error);
172 void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
173 void UVPipeErrEnd(ssize_t error);
174 void UVTryFinish();
175
176 // -- Setup
177 SetupT Setup_;
178 // -- Runtime
179 bool IsStarted_ = false;
180 bool IsFinished_ = false;
181 std::function<void()> FinishedCallback_;
182 std::vector<const char*> CommandPtr_;
183 std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
184 uv_process_options_t UVOptions_;
185 cm::uv_process_ptr UVProcess_;
186 cmUVPipeBuffer UVPipeOut_;
187 cmUVPipeBuffer UVPipeErr_;
188 };
189
setup(cmWorkerPool::ProcessResultT * result,bool mergedOutput,std::vector<std::string> const & command,std::string const & workingDirectory)190 void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
191 bool mergedOutput,
192 std::vector<std::string> const& command,
193 std::string const& workingDirectory)
194 {
195 this->Setup_.WorkingDirectory = workingDirectory;
196 this->Setup_.Command = command;
197 this->Setup_.Result = result;
198 this->Setup_.MergedOutput = mergedOutput;
199 }
200
start(uv_loop_t * uv_loop,std::function<void ()> finishedCallback)201 bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
202 std::function<void()> finishedCallback)
203 {
204 if (this->IsStarted() || (this->Result() == nullptr)) {
205 return false;
206 }
207
208 // Reset result before the start
209 this->Result()->reset();
210
211 // Fill command string pointers
212 if (!this->Setup().Command.empty()) {
213 this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
214 for (std::string const& arg : this->Setup().Command) {
215 this->CommandPtr_.push_back(arg.c_str());
216 }
217 this->CommandPtr_.push_back(nullptr);
218 } else {
219 this->Result()->ErrorMessage = "Empty command";
220 }
221
222 if (!this->Result()->error()) {
223 if (!this->UVPipeOut_.init(uv_loop)) {
224 this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
225 }
226 }
227 if (!this->Result()->error()) {
228 if (!this->UVPipeErr_.init(uv_loop)) {
229 this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
230 }
231 }
232 if (!this->Result()->error()) {
233 // -- Setup process stdio options
234 // stdin
235 this->UVOptionsStdIO_[0].flags = UV_IGNORE;
236 this->UVOptionsStdIO_[0].data.stream = nullptr;
237 // stdout
238 this->UVOptionsStdIO_[1].flags =
239 static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
240 this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
241 // stderr
242 this->UVOptionsStdIO_[2].flags =
243 static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
244 this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
245
246 // -- Setup process options
247 std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
248 sizeof(this->UVOptions_), 0);
249 this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
250 this->UVOptions_.file = this->CommandPtr_[0];
251 this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
252 this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
253 this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
254 this->UVOptions_.stdio_count =
255 static_cast<int>(this->UVOptionsStdIO_.size());
256 this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
257
258 // -- Spawn process
259 int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
260 if (uvErrorCode != 0) {
261 this->Result()->ErrorMessage = "libuv process spawn failed";
262 if (const char* uvErr = uv_strerror(uvErrorCode)) {
263 this->Result()->ErrorMessage += ": ";
264 this->Result()->ErrorMessage += uvErr;
265 }
266 }
267 }
268 // -- Start reading from stdio streams
269 if (!this->Result()->error()) {
270 if (!this->UVPipeOut_.startRead(
271 [this](cmUVPipeBuffer::DataRange range) {
272 this->UVPipeOutData(range);
273 },
274 [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
275 this->Result()->ErrorMessage =
276 "libuv start reading from stdout pipe failed";
277 }
278 }
279 if (!this->Result()->error()) {
280 if (!this->UVPipeErr_.startRead(
281 [this](cmUVPipeBuffer::DataRange range) {
282 this->UVPipeErrData(range);
283 },
284 [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
285 this->Result()->ErrorMessage =
286 "libuv start reading from stderr pipe failed";
287 }
288 }
289
290 if (!this->Result()->error()) {
291 this->IsStarted_ = true;
292 this->FinishedCallback_ = std::move(finishedCallback);
293 } else {
294 // Clear libuv handles and finish
295 this->UVProcess_.reset();
296 this->UVPipeOut_.reset();
297 this->UVPipeErr_.reset();
298 this->CommandPtr_.clear();
299 }
300
301 return this->IsStarted();
302 }
303
UVExit(uv_process_t * handle,int64_t exitStatus,int termSignal)304 void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
305 int termSignal)
306 {
307 auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
308 if (proc.IsStarted() && !proc.IsFinished()) {
309 // Set error message on demand
310 proc.Result()->ExitStatus = exitStatus;
311 proc.Result()->TermSignal = termSignal;
312 if (!proc.Result()->error()) {
313 if (termSignal != 0) {
314 proc.Result()->ErrorMessage = cmStrCat(
315 "Process was terminated by signal ", proc.Result()->TermSignal);
316 } else if (exitStatus != 0) {
317 proc.Result()->ErrorMessage = cmStrCat(
318 "Process failed with return value ", proc.Result()->ExitStatus);
319 }
320 }
321
322 // Reset process handle
323 proc.UVProcess_.reset();
324 // Try finish
325 proc.UVTryFinish();
326 }
327 }
328
UVPipeOutData(cmUVPipeBuffer::DataRange data) const329 void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
330 {
331 this->Result()->StdOut.append(data.begin(), data.end());
332 }
333
UVPipeOutEnd(ssize_t error)334 void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
335 {
336 // Process pipe error
337 if ((error != 0) && !this->Result()->error()) {
338 this->Result()->ErrorMessage = cmStrCat(
339 "Reading from stdout pipe failed with libuv error code ", error);
340 }
341 // Try finish
342 this->UVTryFinish();
343 }
344
UVPipeErrData(cmUVPipeBuffer::DataRange data) const345 void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
346 {
347 std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
348 : &this->Result()->StdErr;
349 str->append(data.begin(), data.end());
350 }
351
UVPipeErrEnd(ssize_t error)352 void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
353 {
354 // Process pipe error
355 if ((error != 0) && !this->Result()->error()) {
356 this->Result()->ErrorMessage = cmStrCat(
357 "Reading from stderr pipe failed with libuv error code ", error);
358 }
359 // Try finish
360 this->UVTryFinish();
361 }
362
UVTryFinish()363 void cmUVReadOnlyProcess::UVTryFinish()
364 {
365 // There still might be data in the pipes after the process has finished.
366 // Therefore check if the process is finished AND all pipes are closed
367 // before signaling the worker thread to continue.
368 if ((this->UVProcess_.get() != nullptr) ||
369 (this->UVPipeOut_.uv_pipe() != nullptr) ||
370 (this->UVPipeErr_.uv_pipe() != nullptr)) {
371 return;
372 }
373 this->IsFinished_ = true;
374 this->FinishedCallback_();
375 }
376
377 /**
378 * @brief Worker pool worker thread
379 */
380 class cmWorkerPoolWorker
381 {
382 public:
383 cmWorkerPoolWorker(uv_loop_t& uvLoop);
384 ~cmWorkerPoolWorker();
385
386 cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
387 cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
388
389 /**
390 * Set the internal thread
391 */
SetThread(std::thread && aThread)392 void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
393
394 /**
395 * Run an external process
396 */
397 bool RunProcess(cmWorkerPool::ProcessResultT& result,
398 std::vector<std::string> const& command,
399 std::string const& workingDirectory);
400
401 private:
402 // -- Libuv callbacks
403 static void UVProcessStart(uv_async_t* handle);
404 void UVProcessFinished();
405
406 // -- Process management
407 struct
408 {
409 std::mutex Mutex;
410 cm::uv_async_ptr Request;
411 std::condition_variable Condition;
412 std::unique_ptr<cmUVReadOnlyProcess> ROP;
413 } Proc_;
414 // -- System thread
415 std::thread Thread_;
416 };
417
cmWorkerPoolWorker(uv_loop_t & uvLoop)418 cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
419 {
420 this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
421 }
422
~cmWorkerPoolWorker()423 cmWorkerPoolWorker::~cmWorkerPoolWorker()
424 {
425 if (this->Thread_.joinable()) {
426 this->Thread_.join();
427 }
428 }
429
RunProcess(cmWorkerPool::ProcessResultT & result,std::vector<std::string> const & command,std::string const & workingDirectory)430 bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
431 std::vector<std::string> const& command,
432 std::string const& workingDirectory)
433 {
434 if (command.empty()) {
435 return false;
436 }
437 // Create process instance
438 {
439 std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
440 this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
441 this->Proc_.ROP->setup(&result, true, command, workingDirectory);
442 }
443 // Send asynchronous process start request to libuv loop
444 this->Proc_.Request.send();
445 // Wait until the process has been finished and destroyed
446 {
447 std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
448 while (this->Proc_.ROP) {
449 this->Proc_.Condition.wait(ulock);
450 }
451 }
452 return !result.error();
453 }
454
UVProcessStart(uv_async_t * handle)455 void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
456 {
457 auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
458 bool startFailed = false;
459 {
460 auto& Proc = wrk->Proc_;
461 std::lock_guard<std::mutex> lock(Proc.Mutex);
462 if (Proc.ROP && !Proc.ROP->IsStarted()) {
463 startFailed =
464 !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
465 }
466 }
467 // Clean up if starting of the process failed
468 if (startFailed) {
469 wrk->UVProcessFinished();
470 }
471 }
472
UVProcessFinished()473 void cmWorkerPoolWorker::UVProcessFinished()
474 {
475 std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
476 if (this->Proc_.ROP &&
477 (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
478 this->Proc_.ROP.reset();
479 }
480 // Notify idling thread
481 this->Proc_.Condition.notify_one();
482 }
483
484 /**
485 * @brief Private worker pool internals
486 */
487 class cmWorkerPoolInternal
488 {
489 public:
490 // -- Constructors
491 cmWorkerPoolInternal(cmWorkerPool* pool);
492 ~cmWorkerPoolInternal();
493
494 /**
495 * Runs the libuv loop.
496 */
497 bool Process();
498
499 /**
500 * Clear queue and abort threads.
501 */
502 void Abort();
503
504 /**
505 * Push a job to the queue and notify a worker.
506 */
507 bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
508
509 /**
510 * Worker thread main loop method.
511 */
512 void Work(unsigned int workerIndex);
513
514 // -- Request slots
515 static void UVSlotBegin(uv_async_t* handle);
516 static void UVSlotEnd(uv_async_t* handle);
517
518 // -- UV loop
519 #ifdef CMAKE_UV_SIGNAL_HACK
520 std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
521 #endif
522 std::unique_ptr<uv_loop_t> UVLoop;
523 cm::uv_async_ptr UVRequestBegin;
524 cm::uv_async_ptr UVRequestEnd;
525
526 // -- Thread pool and job queue
527 std::mutex Mutex;
528 bool Processing = false;
529 bool Aborting = false;
530 bool FenceProcessing = false;
531 unsigned int WorkersRunning = 0;
532 unsigned int WorkersIdle = 0;
533 unsigned int JobsProcessing = 0;
534 std::deque<cmWorkerPool::JobHandleT> Queue;
535 std::condition_variable Condition;
536 std::condition_variable ConditionFence;
537 std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
538
539 // -- References
540 cmWorkerPool* Pool = nullptr;
541 };
542
reset()543 void cmWorkerPool::ProcessResultT::reset()
544 {
545 this->ExitStatus = 0;
546 this->TermSignal = 0;
547 if (!this->StdOut.empty()) {
548 this->StdOut.clear();
549 this->StdOut.shrink_to_fit();
550 }
551 if (!this->StdErr.empty()) {
552 this->StdErr.clear();
553 this->StdErr.shrink_to_fit();
554 }
555 if (!this->ErrorMessage.empty()) {
556 this->ErrorMessage.clear();
557 this->ErrorMessage.shrink_to_fit();
558 }
559 }
560
cmWorkerPoolInternal(cmWorkerPool * pool)561 cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
562 : Pool(pool)
563 {
564 // Initialize libuv loop
565 uv_disable_stdio_inheritance();
566 #ifdef CMAKE_UV_SIGNAL_HACK
567 UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
568 #endif
569 this->UVLoop = cm::make_unique<uv_loop_t>();
570 uv_loop_init(this->UVLoop.get());
571 }
572
~cmWorkerPoolInternal()573 cmWorkerPoolInternal::~cmWorkerPoolInternal()
574 {
575 uv_loop_close(this->UVLoop.get());
576 }
577
Process()578 bool cmWorkerPoolInternal::Process()
579 {
580 // Reset state flags
581 this->Processing = true;
582 this->Aborting = false;
583 // Initialize libuv asynchronous request
584 this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
585 this);
586 this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
587 this);
588 // Send begin request
589 this->UVRequestBegin.send();
590 // Run libuv loop
591 bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
592 // Update state flags
593 this->Processing = false;
594 this->Aborting = false;
595 return success;
596 }
597
Abort()598 void cmWorkerPoolInternal::Abort()
599 {
600 // Clear all jobs and set abort flag
601 std::lock_guard<std::mutex> guard(this->Mutex);
602 if (!this->Aborting) {
603 // Register abort and clear queue
604 this->Aborting = true;
605 this->Queue.clear();
606 this->Condition.notify_all();
607 }
608 }
609
PushJob(cmWorkerPool::JobHandleT && jobHandle)610 inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
611 {
612 std::lock_guard<std::mutex> guard(this->Mutex);
613 if (this->Aborting) {
614 return false;
615 }
616 // Append the job to the queue
617 this->Queue.emplace_back(std::move(jobHandle));
618 // Notify an idle worker if there's one
619 if (this->WorkersIdle != 0) {
620 this->Condition.notify_one();
621 }
622 // Return success
623 return true;
624 }
625
UVSlotBegin(uv_async_t * handle)626 void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
627 {
628 auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
629 // Create worker threads
630 {
631 unsigned int const num = gint.Pool->ThreadCount();
632 // Create workers
633 gint.Workers.reserve(num);
634 for (unsigned int ii = 0; ii != num; ++ii) {
635 gint.Workers.emplace_back(
636 cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
637 }
638 // Start worker threads
639 for (unsigned int ii = 0; ii != num; ++ii) {
640 gint.Workers[ii]->SetThread(
641 std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
642 }
643 }
644 // Destroy begin request
645 gint.UVRequestBegin.reset();
646 }
647
UVSlotEnd(uv_async_t * handle)648 void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
649 {
650 auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
651 // Join and destroy worker threads
652 gint.Workers.clear();
653 // Destroy end request
654 gint.UVRequestEnd.reset();
655 }
656
Work(unsigned int workerIndex)657 void cmWorkerPoolInternal::Work(unsigned int workerIndex)
658 {
659 cmWorkerPool::JobHandleT jobHandle;
660 std::unique_lock<std::mutex> uLock(this->Mutex);
661 // Increment running workers count
662 ++this->WorkersRunning;
663 // Enter worker main loop
664 while (true) {
665 // Abort on request
666 if (this->Aborting) {
667 break;
668 }
669 // Wait for new jobs on the main CV
670 if (this->Queue.empty()) {
671 ++this->WorkersIdle;
672 this->Condition.wait(uLock);
673 --this->WorkersIdle;
674 continue;
675 }
676
677 // If there is a fence currently active or waiting,
678 // sleep on the main CV and try again.
679 if (this->FenceProcessing) {
680 this->Condition.wait(uLock);
681 continue;
682 }
683
684 // Pop next job from queue
685 jobHandle = std::move(this->Queue.front());
686 this->Queue.pop_front();
687
688 // Check for fence jobs
689 bool raisedFence = false;
690 if (jobHandle->IsFence()) {
691 this->FenceProcessing = true;
692 raisedFence = true;
693 // Wait on the Fence CV until all pending jobs are done.
694 while (this->JobsProcessing != 0 && !this->Aborting) {
695 this->ConditionFence.wait(uLock);
696 }
697 // When aborting, explicitly kick all threads alive once more.
698 if (this->Aborting) {
699 this->FenceProcessing = false;
700 this->Condition.notify_all();
701 break;
702 }
703 }
704
705 // Unlocked scope for job processing
706 ++this->JobsProcessing;
707 {
708 uLock.unlock();
709 jobHandle->Work(this->Pool, workerIndex); // Process job
710 jobHandle.reset(); // Destroy job
711 uLock.lock();
712 }
713 --this->JobsProcessing;
714
715 // If this was the thread that entered fence processing
716 // originally, notify all idling workers that the fence
717 // is done.
718 if (raisedFence) {
719 this->FenceProcessing = false;
720 this->Condition.notify_all();
721 }
722 // If fence processing is still not done, notify the
723 // the fencing worker when all active jobs are done.
724 if (this->FenceProcessing && this->JobsProcessing == 0) {
725 this->ConditionFence.notify_all();
726 }
727 }
728
729 // Decrement running workers count
730 if (--this->WorkersRunning == 0) {
731 // Last worker thread about to finish. Send libuv event.
732 this->UVRequestEnd.send();
733 }
734 }
735
736 cmWorkerPool::JobT::~JobT() = default;
737
RunProcess(ProcessResultT & result,std::vector<std::string> const & command,std::string const & workingDirectory)738 bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
739 std::vector<std::string> const& command,
740 std::string const& workingDirectory)
741 {
742 // Get worker by index
743 auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
744 return wrk->RunProcess(result, command, workingDirectory);
745 }
746
cmWorkerPool()747 cmWorkerPool::cmWorkerPool()
748 : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
749 {
750 }
751
752 cmWorkerPool::~cmWorkerPool() = default;
753
SetThreadCount(unsigned int threadCount)754 void cmWorkerPool::SetThreadCount(unsigned int threadCount)
755 {
756 if (!this->Int_->Processing) {
757 this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
758 }
759 }
760
Process(void * userData)761 bool cmWorkerPool::Process(void* userData)
762 {
763 // Setup user data
764 this->UserData_ = userData;
765 // Run libuv loop
766 bool success = this->Int_->Process();
767 // Clear user data
768 this->UserData_ = nullptr;
769 // Return
770 return success;
771 }
772
PushJob(JobHandleT && jobHandle)773 bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
774 {
775 return this->Int_->PushJob(std::move(jobHandle));
776 }
777
Abort()778 void cmWorkerPool::Abort()
779 {
780 this->Int_->Abort();
781 }
782