1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 #include <uhd/exception.hpp>
8 #include <uhd/rfnoc/chdr_types.hpp>
9 #include <uhd/utils/log.hpp>
10 #include <uhd/utils/safe_call.hpp>
11 #include <uhd/utils/thread.hpp>
12 #include <uhdlib/rfnoc/chdr_ctrl_endpoint.hpp>
13 #include <uhdlib/rfnoc/chdr_packet_writer.hpp>
14 #include <boost/format.hpp>
15 #include <atomic>
16 #include <mutex>
17 #include <thread>
18 
19 using namespace uhd;
20 using namespace uhd::rfnoc;
21 using namespace uhd::rfnoc::chdr;
22 
23 
24 chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;
25 
26 class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint
27 {
28 public:
chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport,const chdr::chdr_packet_factory & pkt_factory,sep_id_t my_epid)29     chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport,
30         const chdr::chdr_packet_factory& pkt_factory,
31         sep_id_t my_epid)
32         : _my_epid(my_epid)
33         , _xport(xport)
34         , _send_pkt(pkt_factory.make_ctrl())
35         , _recv_pkt(pkt_factory.make_ctrl())
36         , _stop_recv_thread(false)
37         , _recv_thread([this]() { recv_worker(); })
38     {
39         const std::string thread_name(str(boost::format("uhd_ctrl_ep%04x") % _my_epid));
40         uhd::set_thread_name(&_recv_thread, thread_name);
41         UHD_LOG_DEBUG("RFNOC",
42             boost::format(
43                 "Started thread %s to process messages control messages on EPID %d")
44                 % thread_name % _my_epid);
45     }
46 
~chdr_ctrl_endpoint_impl()47     virtual ~chdr_ctrl_endpoint_impl()
48     {
49         UHD_SAFE_CALL(
50             // Interrupt buffer updater loop
51             _stop_recv_thread = true;
52             // Wait for loop to finish
53             // No timeout on join. The recv loop is guaranteed
54             // to terminate in a reasonable amount of time because
55             // there are no timed blocks on the underlying.
56             _recv_thread.join();
57             // Flush base transport
58             while (true) {
59                 auto buff = _xport->get_recv_buff(100);
60                 if (buff) {
61                     _xport->release_recv_buff(std::move(buff));
62                 } else {
63                     break;
64                 }
65             }
66             // Release child endpoints
67             _endpoint_map.clear(););
68     }
69 
get_ctrlport_ep(sep_id_t dst_epid,uint16_t dst_port,size_t buff_capacity,size_t max_outstanding_async_msgs,const clock_iface & client_clk,const clock_iface & timebase_clk)70     virtual ctrlport_endpoint::sptr get_ctrlport_ep(sep_id_t dst_epid,
71         uint16_t dst_port,
72         size_t buff_capacity,
73         size_t max_outstanding_async_msgs,
74         const clock_iface& client_clk,
75         const clock_iface& timebase_clk)
76     {
77         std::lock_guard<std::mutex> lock(_mutex);
78 
79         ep_map_key_t key{dst_epid, dst_port};
80         // Function to send a control payload
81         auto send_fn = [this, dst_epid](const ctrl_payload& payload, double timeout) {
82             // Build header
83             chdr_header header;
84             header.set_pkt_type(PKT_TYPE_CTRL);
85             header.set_num_mdata(0);
86             header.set_seq_num(_send_seqnum++);
87             header.set_dst_epid(dst_epid);
88             // Acquire send buffer and send the packet
89             std::lock_guard<std::mutex> lock(_send_mutex);
90             auto send_buff = _xport->get_send_buff(timeout * 1000);
91             _send_pkt->refresh(send_buff->data(), header, payload);
92             send_buff->set_packet_size(header.get_length());
93             _xport->release_send_buff(std::move(send_buff));
94         };
95 
96         if (_endpoint_map.find(key) == _endpoint_map.end()) {
97             ctrlport_endpoint::sptr ctrlport_ep = ctrlport_endpoint::make(send_fn,
98                 _my_epid,
99                 dst_port,
100                 buff_capacity,
101                 max_outstanding_async_msgs,
102                 client_clk,
103                 timebase_clk);
104             _endpoint_map.insert(std::make_pair(key, ctrlport_ep));
105             UHD_LOG_DEBUG("RFNOC",
106                 boost::format("Created ctrlport endpoint for port %d on EPID %d")
107                     % dst_port % _my_epid);
108             return ctrlport_ep;
109         } else {
110             return _endpoint_map.at(key);
111         }
112     }
113 
get_num_drops() const114     virtual size_t get_num_drops() const
115     {
116         return _num_drops;
117     }
118 
119 private:
recv_worker()120     void recv_worker()
121     {
122         // Run forever:
123         // - Pull packets from the base transport
124         // - Route them based on the dst_port
125         // - Pass them to the ctrlport_endpoint for additional processing
126         while (not _stop_recv_thread) {
127             // FIXME Move lock back once have threaded_io_service
128             std::unique_lock<std::mutex> lock(_mutex);
129             auto buff = _xport->get_recv_buff(0);
130             if (buff) {
131                 // FIXME Move lock back to here once have threaded_io_service
132                 // std::lock_guard<std::mutex> lock(_mutex);
133                 try {
134                     _recv_pkt->refresh(buff->data());
135                     const ctrl_payload payload = _recv_pkt->get_payload();
136                     ep_map_key_t key{payload.src_epid, payload.dst_port};
137                     if (_endpoint_map.find(key) != _endpoint_map.end()) {
138                         _endpoint_map.at(key)->handle_recv(payload);
139                     }
140                 } catch (...) {
141                     // Ignore all errors
142                 }
143                 _xport->release_recv_buff(std::move(buff));
144             } else {
145                 // FIXME Move lock back to lock_guard once have threaded_io_service
146                 lock.unlock();
147                 // Be a good citizen and yield if no packet is processed
148                 static const size_t MIN_DUR = 1;
149                 boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
150                 // We call sleep(MIN_DUR) above instead of yield() to ensure that we
151                 // relinquish the current scheduler time slot.
152                 // yield() is a hint to the scheduler to end the time
153                 // slice early and schedule in another thread that is ready to run.
154                 // However in most situations, there will be no other thread and
155                 // this thread will continue to run which will rail a CPU core.
156                 // We call sleep(MIN_DUR=1) instead which will sleep for a minimum
157                 // time. Ideally we would like to use boost::chrono::.*seconds::min()
158                 // but that is bound to 0, which causes the sleep_for call to be a
159                 // no-op and thus useless to actually force a sleep.
160             }
161         }
162     }
163 
164     using ep_map_key_t = std::pair<sep_id_t, uint16_t>;
165 
166     // The endpoint ID of this software endpoint
167     const sep_id_t _my_epid;
168     // Send/recv transports
169     chdr_ctrl_xport::sptr _xport;
170     // The curent sequence number for a send packet
171     size_t _send_seqnum = 0;
172     // The number of packets dropped
173     size_t _num_drops = 0;
174     // Packet containers
175     chdr_ctrl_packet::uptr _send_pkt;
176     chdr_ctrl_packet::cuptr _recv_pkt;
177     // A collection of ctrlport endpoints (keyed by the port number)
178     std::map<ep_map_key_t, ctrlport_endpoint::sptr> _endpoint_map;
179     // Mutex that protects all state in this class except for _send_pkt
180     std::mutex _mutex;
181     // Mutex that protects _send_pkt and _xport.send
182     std::mutex _send_mutex;
183     // A thread that will handle all responses and async message requests
184     // Must be declared after the mutexes, the thread starts at construction and
185     // depends on the mutexes having been constructed.
186     std::atomic_bool _stop_recv_thread;
187     std::thread _recv_thread;
188 };
189 
make(chdr_ctrl_xport::sptr xport,const chdr::chdr_packet_factory & pkt_factory,sep_id_t my_epid)190 chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport,
191     const chdr::chdr_packet_factory& pkt_factory,
192     sep_id_t my_epid)
193 {
194     return std::make_unique<chdr_ctrl_endpoint_impl>(xport, pkt_factory, my_epid);
195 }
196