1 //
2 // Copyright 2011,2014 Ettus Research LLC
3 // Copyright 2018 Ettus Research, a National Instruments Company
4 //
5 // SPDX-License-Identifier: GPL-3.0-or-later
6 //
7
8 #include <uhd/exception.hpp>
9 #include <uhd/utils/log.hpp>
10 #include <uhd/utils/msg_task.hpp>
11 #include <uhd/utils/tasks.hpp>
12 #include <uhd/utils/thread.hpp>
13 #include <boost/thread/barrier.hpp>
14 #include <boost/thread/thread.hpp>
15 #include <atomic>
16 #include <exception>
17 #include <functional>
18 #include <iostream>
19 #include <thread>
20 #include <vector>
21
22 using namespace uhd;
23
24 class task_impl : public task
25 {
26 public:
task_impl(const task_fcn_type & task_fcn,const std::string & name)27 task_impl(const task_fcn_type& task_fcn, const std::string& name) : _exit(false)
28 {
29 _task = std::thread([this, task_fcn]() { this->task_loop(task_fcn); });
30 if (not name.empty()) {
31 set_thread_name(&_task, name);
32 }
33 }
34
~task_impl(void)35 ~task_impl(void)
36 {
37 _exit = true;
38 if (_task.joinable()) {
39 _task.join();
40 }
41 }
42
43 private:
task_loop(const task_fcn_type & task_fcn)44 void task_loop(const task_fcn_type& task_fcn)
45 {
46 try {
47 while (!_exit) {
48 task_fcn();
49 }
50 } catch (const std::exception& e) {
51 do_error_msg(e.what());
52 } catch (...) {
53 UHD_THROW_INVALID_CODE_PATH();
54 }
55 }
56
do_error_msg(const std::string & msg)57 void do_error_msg(const std::string& msg)
58 {
59 UHD_LOGGER_ERROR("UHD")
60 << "An unexpected exception was caught in a task loop."
61 << "The task loop will now exit, things may not work." << msg;
62 }
63
64 std::atomic<bool> _exit;
65 std::thread _task;
66 };
67
make(const task_fcn_type & task_fcn,const std::string & name)68 task::sptr task::make(const task_fcn_type& task_fcn, const std::string& name)
69 {
70 return task::sptr(new task_impl(task_fcn, name));
71 }
72
~msg_task(void)73 msg_task::~msg_task(void)
74 {
75 /* NOP */
76 }
77
78 /*
79 * During shutdown pointers to queues for radio_ctrl_core might not be available anymore.
80 * msg_task_impl provides a dump_queue for such messages.
81 * ctrl_cores can check this queue for stranded messages.
82 */
83
84 class msg_task_impl : public msg_task
85 {
86 public:
msg_task_impl(const task_fcn_type & task_fcn)87 msg_task_impl(const task_fcn_type& task_fcn) : _spawn_barrier(2)
88 {
89 (void)_thread_group.create_thread(
90 std::bind(&msg_task_impl::task_loop, this, task_fcn));
91 _spawn_barrier.wait();
92 }
93
~msg_task_impl(void)94 ~msg_task_impl(void)
95 {
96 _running = false;
97 _thread_group.interrupt_all();
98 _thread_group.join_all();
99 }
100
101 /*
102 * Returns the first message for the given SID.
103 * This way a radio_ctrl_core doesn't have to die in timeout but can check for
104 * stranded messages here. This might happen during shutdown when dtors are called.
105 * See also: comments in b200_io_impl->handle_async_task
106 */
get_msg_from_dump_queue(uint32_t sid)107 msg_payload_t get_msg_from_dump_queue(uint32_t sid)
108 {
109 boost::mutex::scoped_lock lock(_mutex);
110 msg_payload_t b;
111 for (size_t i = 0; i < _dump_queue.size(); i++) {
112 if (sid == _dump_queue[i].first) {
113 b = _dump_queue[i].second;
114 _dump_queue.erase(_dump_queue.begin() + i);
115 break;
116 }
117 }
118 return b;
119 }
120
121 private:
task_loop(const task_fcn_type & task_fcn)122 void task_loop(const task_fcn_type& task_fcn)
123 {
124 _running = true;
125 _spawn_barrier.wait();
126
127 try {
128 while (_running) {
129 boost::optional<msg_type_t> buff = task_fcn();
130 if (buff != boost::none) {
131 /*
132 * If a message gets stranded it is returned by task_fcn and then
133 * pushed to the dump_queue. This way ctrl_cores can check dump_queue
134 * for missing messages.
135 */
136 boost::mutex::scoped_lock lock(_mutex);
137 _dump_queue.push_back(buff.get());
138 }
139 }
140 } catch (const boost::thread_interrupted&) {
141 // this is an ok way to exit the task loop
142 } catch (const std::exception& e) {
143 do_error_msg(e.what());
144 } catch (...) {
145 // FIXME
146 // Unfortunately, this is also an ok way to end a task,
147 // because on some systems boost throws uncatchables.
148 }
149 }
150
do_error_msg(const std::string & msg)151 void do_error_msg(const std::string& msg)
152 {
153 UHD_LOGGER_ERROR("UHD")
154 << "An unexpected exception was caught in a task loop."
155 << "The task loop will now exit, things may not work." << msg;
156 }
157
158 boost::mutex _mutex;
159 boost::thread_group _thread_group;
160 boost::barrier _spawn_barrier;
161 bool _running;
162
163 /*
164 * This queue holds stranded messages until a radio_ctrl_core grabs them via
165 * 'get_msg_from_dump_queue'.
166 */
167 std::vector<msg_type_t> _dump_queue;
168 };
169
make(const task_fcn_type & task_fcn)170 msg_task::sptr msg_task::make(const task_fcn_type& task_fcn)
171 {
172 return msg_task::sptr(new msg_task_impl(task_fcn));
173 }
174