1 //
2 // Copyright 2011,2014 Ettus Research LLC
3 // Copyright 2018 Ettus Research, a National Instruments Company
4 //
5 // SPDX-License-Identifier: GPL-3.0-or-later
6 //
7 
8 #include <uhd/exception.hpp>
9 #include <uhd/utils/log.hpp>
10 #include <uhd/utils/msg_task.hpp>
11 #include <uhd/utils/tasks.hpp>
12 #include <uhd/utils/thread.hpp>
13 #include <boost/thread/barrier.hpp>
14 #include <boost/thread/thread.hpp>
15 #include <atomic>
16 #include <exception>
17 #include <functional>
18 #include <iostream>
19 #include <thread>
20 #include <vector>
21 
22 using namespace uhd;
23 
24 class task_impl : public task
25 {
26 public:
task_impl(const task_fcn_type & task_fcn,const std::string & name)27     task_impl(const task_fcn_type& task_fcn, const std::string& name) : _exit(false)
28     {
29         _task = std::thread([this, task_fcn]() { this->task_loop(task_fcn); });
30         if (not name.empty()) {
31             set_thread_name(&_task, name);
32         }
33     }
34 
~task_impl(void)35     ~task_impl(void)
36     {
37         _exit = true;
38         if (_task.joinable()) {
39             _task.join();
40         }
41     }
42 
43 private:
task_loop(const task_fcn_type & task_fcn)44     void task_loop(const task_fcn_type& task_fcn)
45     {
46         try {
47             while (!_exit) {
48                 task_fcn();
49             }
50         } catch (const std::exception& e) {
51             do_error_msg(e.what());
52         } catch (...) {
53             UHD_THROW_INVALID_CODE_PATH();
54         }
55     }
56 
do_error_msg(const std::string & msg)57     void do_error_msg(const std::string& msg)
58     {
59         UHD_LOGGER_ERROR("UHD")
60             << "An unexpected exception was caught in a task loop."
61             << "The task loop will now exit, things may not work." << msg;
62     }
63 
64     std::atomic<bool> _exit;
65     std::thread _task;
66 };
67 
make(const task_fcn_type & task_fcn,const std::string & name)68 task::sptr task::make(const task_fcn_type& task_fcn, const std::string& name)
69 {
70     return task::sptr(new task_impl(task_fcn, name));
71 }
72 
~msg_task(void)73 msg_task::~msg_task(void)
74 {
75     /* NOP */
76 }
77 
78 /*
79  * During shutdown pointers to queues for radio_ctrl_core might not be available anymore.
80  * msg_task_impl provides a dump_queue for such messages.
81  * ctrl_cores can check this queue for stranded messages.
82  */
83 
84 class msg_task_impl : public msg_task
85 {
86 public:
msg_task_impl(const task_fcn_type & task_fcn)87     msg_task_impl(const task_fcn_type& task_fcn) : _spawn_barrier(2)
88     {
89         (void)_thread_group.create_thread(
90             std::bind(&msg_task_impl::task_loop, this, task_fcn));
91         _spawn_barrier.wait();
92     }
93 
~msg_task_impl(void)94     ~msg_task_impl(void)
95     {
96         _running = false;
97         _thread_group.interrupt_all();
98         _thread_group.join_all();
99     }
100 
101     /*
102      * Returns the first message for the given SID.
103      * This way a radio_ctrl_core doesn't have to die in timeout but can check for
104      * stranded messages here. This might happen during shutdown when dtors are called.
105      * See also: comments in b200_io_impl->handle_async_task
106      */
get_msg_from_dump_queue(uint32_t sid)107     msg_payload_t get_msg_from_dump_queue(uint32_t sid)
108     {
109         boost::mutex::scoped_lock lock(_mutex);
110         msg_payload_t b;
111         for (size_t i = 0; i < _dump_queue.size(); i++) {
112             if (sid == _dump_queue[i].first) {
113                 b = _dump_queue[i].second;
114                 _dump_queue.erase(_dump_queue.begin() + i);
115                 break;
116             }
117         }
118         return b;
119     }
120 
121 private:
task_loop(const task_fcn_type & task_fcn)122     void task_loop(const task_fcn_type& task_fcn)
123     {
124         _running = true;
125         _spawn_barrier.wait();
126 
127         try {
128             while (_running) {
129                 boost::optional<msg_type_t> buff = task_fcn();
130                 if (buff != boost::none) {
131                     /*
132                      * If a message gets stranded it is returned by task_fcn and then
133                      * pushed to the dump_queue. This way ctrl_cores can check dump_queue
134                      * for missing messages.
135                      */
136                     boost::mutex::scoped_lock lock(_mutex);
137                     _dump_queue.push_back(buff.get());
138                 }
139             }
140         } catch (const boost::thread_interrupted&) {
141             // this is an ok way to exit the task loop
142         } catch (const std::exception& e) {
143             do_error_msg(e.what());
144         } catch (...) {
145             // FIXME
146             // Unfortunately, this is also an ok way to end a task,
147             // because on some systems boost throws uncatchables.
148         }
149     }
150 
do_error_msg(const std::string & msg)151     void do_error_msg(const std::string& msg)
152     {
153         UHD_LOGGER_ERROR("UHD")
154             << "An unexpected exception was caught in a task loop."
155             << "The task loop will now exit, things may not work." << msg;
156     }
157 
158     boost::mutex _mutex;
159     boost::thread_group _thread_group;
160     boost::barrier _spawn_barrier;
161     bool _running;
162 
163     /*
164      * This queue holds stranded messages until a radio_ctrl_core grabs them via
165      * 'get_msg_from_dump_queue'.
166      */
167     std::vector<msg_type_t> _dump_queue;
168 };
169 
make(const task_fcn_type & task_fcn)170 msg_task::sptr msg_task::make(const task_fcn_type& task_fcn)
171 {
172     return msg_task::sptr(new msg_task_impl(task_fcn));
173 }
174