1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6 
7 #include <uhd/exception.hpp>
8 #include <uhd/rfnoc/chdr_types.hpp>
9 #include <uhd/utils/log.hpp>
10 #include <uhdlib/rfnoc/chdr_packet_writer.hpp>
11 #include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
12 #include <condition_variable>
13 #include <boost/format.hpp>
14 #include <deque>
15 #include <mutex>
16 #include <numeric>
17 #include <queue>
18 
19 
20 using namespace uhd;
21 using namespace uhd::rfnoc;
22 using namespace uhd::rfnoc::chdr;
23 
24 using namespace std::chrono;
25 using namespace std::chrono_literals;
26 
27 namespace {
28 //! Max async msg (CTRL_WRITE) size in 32-bit words (2 hdr, 2 TS, 1 op-word, 1 data)
29 constexpr size_t ASYNC_MESSAGE_SIZE = 6;
30 //! Default completion timeout for transactions
31 constexpr double DEFAULT_TIMEOUT = 1.0;
32 //! Long timeout for when we wait on a timed command
33 constexpr double MASSIVE_TIMEOUT = 10.0;
34 //! Default value for whether ACKs are always required
35 constexpr bool DEFAULT_FORCE_ACKS = false;
36 } // namespace
37 
38 ctrlport_endpoint::~ctrlport_endpoint() = default;
39 
40 class ctrlport_endpoint_impl : public ctrlport_endpoint
41 {
42 public:
ctrlport_endpoint_impl(const send_fn_t & send_fcn,sep_id_t my_epid,uint16_t local_port,size_t buff_capacity,size_t max_outstanding_async_msgs,const clock_iface & client_clk,const clock_iface & timebase_clk)43     ctrlport_endpoint_impl(const send_fn_t& send_fcn,
44         sep_id_t my_epid,
45         uint16_t local_port,
46         size_t buff_capacity,
47         size_t max_outstanding_async_msgs,
48         const clock_iface& client_clk,
49         const clock_iface& timebase_clk)
50         : _handle_send(send_fcn)
51         , _my_epid(my_epid)
52         , _local_port(local_port)
53         , _buff_capacity(buff_capacity)
54         , _max_outstanding_async_msgs(max_outstanding_async_msgs)
55         , _client_clk(client_clk)
56         , _timebase_clk(timebase_clk)
57     {
58     }
59 
60     virtual ~ctrlport_endpoint_impl() = default;
61 
poke32(uint32_t addr,uint32_t data,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)62     virtual void poke32(uint32_t addr,
63         uint32_t data,
64         uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
65         bool ack                   = false)
66     {
67         // Compute transaction expiration time
68         auto timeout_time = start_timeout(_policy.timeout);
69         // Send request
70         auto request =
71             send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time);
72         // Optionally wait for an ACK
73         if (ack || _policy.force_acks) {
74             wait_for_ack(request, timeout_time);
75         }
76     }
77 
multi_poke32(const std::vector<uint32_t> addrs,const std::vector<uint32_t> data,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)78     virtual void multi_poke32(const std::vector<uint32_t> addrs,
79         const std::vector<uint32_t> data,
80         uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
81         bool ack                   = false)
82     {
83         if (addrs.size() != data.size()) {
84             throw uhd::value_error("addrs and data vectors must be of the same length");
85         }
86         for (size_t i = 0; i < data.size(); i++) {
87             poke32(addrs[i],
88                 data[i],
89                 (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
90                 (i == data.size() - 1) ? ack : false);
91         }
92     }
93 
block_poke32(uint32_t first_addr,const std::vector<uint32_t> data,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)94     virtual void block_poke32(uint32_t first_addr,
95         const std::vector<uint32_t> data,
96         uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
97         bool ack                   = false)
98     {
99         for (size_t i = 0; i < data.size(); i++) {
100             poke32(first_addr + (i * sizeof(uint32_t)),
101                 data[i],
102                 (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
103                 (i == data.size() - 1) ? ack : false);
104         }
105 
106         /* TODO: Uncomment when the atomic block poke is implemented in the FPGA
107         // Compute transaction expiration time
108         auto timeout_time = start_timeout(_policy.timeout);
109         // Send request
110         auto request = send_request_packet(
111             OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time);
112         // Optionally wait for an ACK
113         if (ack || _policy.force_acks) {
114             wait_for_ack(request, timeout_time);
115         }
116         */
117     }
118 
peek32(uint32_t addr,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP)119     virtual uint32_t peek32(
120         uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
121     {
122         // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
123         // command is in the queue
124         auto timeout_time =
125             start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
126 
127         // Send request
128         auto request =
129             send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time);
130         // Wait for an ACK
131         auto response = wait_for_ack(request, timeout_time);
132         return response.data_vtr[0];
133     }
134 
block_peek32(uint32_t first_addr,size_t length,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP)135     virtual std::vector<uint32_t> block_peek32(uint32_t first_addr,
136         size_t length,
137         uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
138     {
139         std::vector<uint32_t> values;
140         for (size_t i = 0; i < length; i++) {
141             values.push_back(peek32(first_addr + (i * sizeof(uint32_t)),
142                 (i == 0) ? timestamp : uhd::time_spec_t::ASAP));
143         }
144         return values;
145 
146         /* TODO: Uncomment when the atomic block peek is implemented in the FPGA
147         // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
148         // command is in the queue
149         auto timeout_time = start_timeout(
150             check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout
151         );
152         // Send request
153         auto request = send_request_packet(OP_READ,
154             first_addr,
155             std::vector<uint32_t>(length, 0),
156             timestamp,
157             timeout_time);
158         // Wait for an ACK
159         auto response = wait_for_ack(request, timeout_time);
160         return response.data_vtr;
161         */
162     }
163 
poll32(uint32_t addr,uint32_t data,uint32_t mask,uhd::time_spec_t timeout,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)164     virtual void poll32(uint32_t addr,
165         uint32_t data,
166         uint32_t mask,
167         uhd::time_spec_t timeout,
168         uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
169         bool ack                   = false)
170     {
171         // TODO: Uncomment when this is implemented in the FPGA
172         throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
173 
174         // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
175         // command is in the queue
176         auto timeout_time =
177             start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
178 
179         // Send request
180         auto request = send_request_packet(OP_POLL,
181             addr,
182             {data, mask, static_cast<uint32_t>(timeout.to_ticks(_client_clk.get_freq()))},
183             timestamp,
184             timeout_time);
185         // Optionally wait for an ACK
186         if (ack || _policy.force_acks) {
187             wait_for_ack(request, timeout_time);
188         }
189     }
190 
sleep(uhd::time_spec_t duration,bool ack=false)191     virtual void sleep(uhd::time_spec_t duration, bool ack = false)
192     {
193         // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
194         // command is in the queue
195         auto timeout_time =
196             start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
197 
198         // Send request
199         auto request = send_request_packet(OP_SLEEP,
200             0,
201             {static_cast<uint32_t>(duration.to_ticks(_client_clk.get_freq()))},
202             uhd::time_spec_t::ASAP,
203             timeout_time);
204         // Optionally wait for an ACK
205         if (ack || _policy.force_acks) {
206             wait_for_ack(request, timeout_time);
207         }
208     }
209 
register_async_msg_validator(async_msg_validator_t callback_f)210     virtual void register_async_msg_validator(async_msg_validator_t callback_f)
211     {
212         std::unique_lock<std::mutex> lock(_mutex);
213         _validate_async_msg = callback_f;
214     }
215 
register_async_msg_handler(async_msg_callback_t callback_f)216     virtual void register_async_msg_handler(async_msg_callback_t callback_f)
217     {
218         std::unique_lock<std::mutex> lock(_mutex);
219         _handle_async_msg = callback_f;
220     }
221 
set_policy(const std::string & name,const uhd::device_addr_t & args)222     virtual void set_policy(const std::string& name, const uhd::device_addr_t& args)
223     {
224         std::unique_lock<std::mutex> lock(_mutex);
225         if (name == "default") {
226             _policy.timeout    = args.cast<double>("timeout", DEFAULT_TIMEOUT);
227             _policy.force_acks = DEFAULT_FORCE_ACKS;
228         } else {
229             // TODO: Uncomment when custom policies are implemented
230             throw uhd::not_implemented_error("Policy implemented in the FPGA");
231         }
232     }
233 
handle_recv(const ctrl_payload & rx_ctrl)234     virtual void handle_recv(const ctrl_payload& rx_ctrl)
235     {
236         if (rx_ctrl.is_ack) {
237             // Function to process a response with no sequence errors
238             auto process_correct_response = [this, rx_ctrl]() {
239                 std::unique_lock<std::mutex> lock(_mutex);
240                 response_status_t resp_status = RESP_VALID;
241                 // Grant flow control credits
242                 _buff_occupied -= get_payload_size(_req_queue.front());
243                 _buff_free_cond.notify_one();
244                 if (get_payload_size(_req_queue.front()) != get_payload_size(rx_ctrl)) {
245                     resp_status = RESP_SIZEERR;
246                 }
247                 // Pop the request from the queue
248                 _req_queue.pop_front();
249                 // Push the response into the response queue
250                 _resp_queue.push(std::make_tuple(rx_ctrl, resp_status));
251                 _resp_ready_cond.notify_one();
252             };
253             // Function to process a response with sequence errors
254             auto process_incorrect_response = [this]() {
255                 std::unique_lock<std::mutex> lock(_mutex);
256                 // Grant flow control credits
257                 _buff_occupied -= get_payload_size(_req_queue.front());
258                 _buff_free_cond.notify_one();
259                 // Push a fabricated response into the response queue
260                 ctrl_payload resp(_req_queue.front());
261                 resp.is_ack = true;
262                 _resp_queue.push(std::make_tuple(resp, RESP_DROPPED));
263                 _resp_ready_cond.notify_one();
264                 // Pop the request from the queue
265                 _req_queue.pop_front();
266             };
267 
268             // Peek at the request queue to check the expected sequence number
269             int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num);
270             if (seq_num_diff == 0) { // No sequence error
271                 process_correct_response();
272             } else if (seq_num_diff > 0) { // Packet(s) dropped
273                 // Tag all dropped packets
274                 for (int8_t i = 0; i < seq_num_diff; i++) {
275                     process_incorrect_response();
276                 }
277                 // Process correct response
278                 process_correct_response();
279             } else { // Reordered packet(s)
280                 // Requests are processed in order. If seq_num_diff is negative then we
281                 // have either already seen this response or we have dropped >128
282                 // responses. Either way ignore this packet.
283             }
284         } else {
285             // Handle asynchronous message callback
286             ctrl_status_t status = CMD_CMDERR;
287             if (rx_ctrl.op_code != OP_WRITE && rx_ctrl.op_code != OP_BLOCK_WRITE) {
288                 UHD_LOG_ERROR(
289                     "CTRLEP", "Malformed async message request: Invalid opcode");
290             } else if (rx_ctrl.dst_port != _local_port) {
291                 UHD_LOG_ERROR("CTRLEP",
292                     "Malformed async message request: Invalid port "
293                         << rx_ctrl.dst_port << ", expected my local port "
294                         << _local_port);
295             } else if (rx_ctrl.data_vtr.empty()) {
296                 UHD_LOG_ERROR(
297                     "CTRLEP", "Malformed async message request: Invalid num_data");
298             } else {
299                 if (!_validate_async_msg(rx_ctrl.address, rx_ctrl.data_vtr)) {
300                     UHD_LOG_ERROR("CTRLEP",
301                         "Malformed async message request: Async message was not "
302                         "validated by block controller!");
303                 } else {
304                     status = CMD_OKAY;
305                 }
306             }
307             try {
308                 // Respond with an ACK packet
309                 // Flow control not needed because we have allocated special room in the
310                 // buffer for async message responses
311                 ctrl_payload tx_ctrl(rx_ctrl);
312                 tx_ctrl.is_ack     = true;
313                 tx_ctrl.src_epid   = _my_epid;
314                 tx_ctrl.status     = status;
315                 const auto timeout = [&]() {
316                     std::unique_lock<std::mutex> lock(_mutex);
317                     return _policy.timeout;
318                 }();
319                 _handle_send(tx_ctrl, timeout);
320             } catch (...) {
321                 UHD_LOG_ERROR("CTRLEP",
322                     "Encountered an error sending a response for an async message");
323                 return;
324             }
325             if (status == CMD_OKAY) {
326                 try {
327                     _handle_async_msg(
328                         rx_ctrl.address, rx_ctrl.data_vtr, rx_ctrl.timestamp);
329                 } catch (const std::exception& ex) {
330                     UHD_LOG_ERROR("CTRLEP",
331                         "Caught exception during async message handling: " << ex.what());
332                 } catch (...) {
333                     UHD_LOG_ERROR("CTRLEP",
334                         "Caught unknown exception during async message handling!");
335                 }
336             }
337         }
338     }
339 
get_src_epid() const340     virtual uint16_t get_src_epid() const
341     {
342         // Is const, does not require a mutex
343         return _my_epid;
344     }
345 
get_port_num() const346     virtual uint16_t get_port_num() const
347     {
348         // Is const, does not require a mutex
349         return _local_port;
350     }
351 
352 private:
353     //! Returns the length of the control payload in 32-bit words
get_payload_size(const ctrl_payload & payload)354     inline static size_t get_payload_size(const ctrl_payload& payload)
355     {
356         return 2 + (payload.timestamp.is_initialized() ? 2 : 0) + payload.data_vtr.size();
357     }
358 
359     //! Marks the start of a timeout for an operation and returns the expiration time
start_timeout(double duration)360     inline const steady_clock::time_point start_timeout(double duration)
361     {
362         return steady_clock::now() + (static_cast<int>(std::ceil(duration / 1e-6)) * 1us);
363     }
364 
365     //! Returns whether or not we have a timed command queued
check_timed_in_queue() const366     bool check_timed_in_queue() const
367     {
368         for (auto pyld : _req_queue) {
369             if (pyld.has_timestamp()) {
370                 return true;
371             }
372         }
373         return false;
374     }
375 
376     //! Sends a request control packet to a remote device
send_request_packet(ctrl_opcode_t op_code,uint32_t address,const std::vector<uint32_t> & data_vtr,const uhd::time_spec_t & time_spec,const steady_clock::time_point & timeout_time)377     const ctrl_payload send_request_packet(ctrl_opcode_t op_code,
378         uint32_t address,
379         const std::vector<uint32_t>& data_vtr,
380         const uhd::time_spec_t& time_spec,
381         const steady_clock::time_point& timeout_time)
382     {
383         if (!_client_clk.is_running()) {
384             throw uhd::system_error("Ctrlport client clock is not running");
385         }
386 
387         // Convert from uhd::time_spec to timestamp
388         boost::optional<uint64_t> timestamp;
389         if (time_spec != time_spec_t::ASAP) {
390             if (!_timebase_clk.is_running()) {
391                 throw uhd::system_error("Timebase clock is not running");
392             }
393             timestamp = time_spec.to_ticks(_timebase_clk.get_freq());
394         }
395 
396         // Assemble the control payload
397         ctrl_payload tx_ctrl;
398         tx_ctrl.dst_port    = _local_port;
399         tx_ctrl.src_port    = _local_port;
400         tx_ctrl.seq_num     = _tx_seq_num;
401         tx_ctrl.timestamp   = timestamp;
402         tx_ctrl.is_ack      = false;
403         tx_ctrl.src_epid    = _my_epid;
404         tx_ctrl.address     = address;
405         tx_ctrl.data_vtr    = data_vtr;
406         tx_ctrl.byte_enable = 0xF;
407         tx_ctrl.op_code     = op_code;
408         tx_ctrl.status      = CMD_OKAY;
409 
410         std::unique_lock<std::mutex> lock(_mutex);
411 
412         // Perform flow control
413         // If there is no room in the downstream buffer, then wait until the timeout
414         size_t pyld_size   = get_payload_size(tx_ctrl);
415         auto buff_not_full = [this, pyld_size]() -> bool {
416             // Allocate room in the queue for one async response packet
417             // If we can fit the current request in the queue then we can proceed
418             return (_buff_occupied + pyld_size)
419                    <= (_buff_capacity
420                        - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
421         };
422         if (!buff_not_full()) {
423             // If we're sending a timed command or if we have a timed command in the
424             // queue, use the MASSIVE_TIMEOUT instead
425             auto timed_timeout =
426                 (check_timed_in_queue() ? start_timeout(MASSIVE_TIMEOUT) : timeout_time);
427             if (not _buff_free_cond.wait_until(lock, timed_timeout, buff_not_full)) {
428                 throw uhd::op_timeout(
429                     "Control operation timed out waiting for space in command buffer");
430             }
431         }
432         _buff_occupied += pyld_size;
433         _req_queue.push_back(tx_ctrl);
434 
435         // Send the payload as soon as there is room in the buffer
436         _handle_send(tx_ctrl, _policy.timeout);
437         _tx_seq_num = (_tx_seq_num + 1) % 64;
438 
439         return tx_ctrl;
440     }
441 
442     //! Waits for and returns the ACK for the specified request
wait_for_ack(const ctrl_payload & request,const steady_clock::time_point & timeout_time)443     const ctrl_payload wait_for_ack(
444         const ctrl_payload& request, const steady_clock::time_point& timeout_time)
445     {
446         auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
447         while (true) {
448             std::unique_lock<std::mutex> lock(_mutex);
449             // Wait until there is a response in the response queue
450             if (!resp_ready()) {
451                 if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {
452                     throw uhd::op_timeout("Control operation timed out waiting for ACK");
453                 }
454             }
455             // Extract the response packet
456             ctrl_payload rx_ctrl;
457             response_status_t resp_status;
458             std::tie(rx_ctrl, resp_status) = _resp_queue.front();
459             _resp_queue.pop();
460             // Check if this is the response meant for the request
461             // Filter by op_code, address and seq_num
462             if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code
463                 && rx_ctrl.address == request.address) {
464                 // Validate transaction status
465                 if (rx_ctrl.status == CMD_CMDERR) {
466                     throw uhd::op_failed("Control operation returned a failing status");
467                 } else if (rx_ctrl.status == CMD_TSERR) {
468                     throw uhd::op_timerr("Control operation returned a timestamp error");
469                 }
470                 // Check data vector size
471                 if (rx_ctrl.data_vtr.size() == 0) {
472                     throw uhd::op_failed(
473                         "Control operation returned a malformed response");
474                 }
475                 // Validate response status
476                 if (resp_status == RESP_DROPPED) {
477                     throw uhd::op_seqerr(
478                         "Response for a control transaction was dropped");
479                 } else if (resp_status == RESP_RTERR) {
480                     throw uhd::op_timerr("Control operation encountered a routing error");
481                 }
482                 return rx_ctrl;
483             } else {
484                 // This response does not belong to the request we passed in. Move on.
485                 continue;
486             }
487         }
488     }
489 
490 
491     //! The parameters associated with the policy that governs this object
492     struct policy_args
493     {
494         double timeout  = DEFAULT_TIMEOUT;
495         bool force_acks = DEFAULT_FORCE_ACKS;
496     };
497     //! The software status (different from the transaction status) of the response
498     enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
499 
500     //! Function to call to send a control packet
501     const send_fn_t _handle_send;
502     //! The endpoint ID of this software endpoint
503     const sep_id_t _my_epid;
504     //! The local port number on the control crossbar for this ctrlport endpoint
505     const uint16_t _local_port;
506     //! The downstream buffer capacity in 32-bit words (used for flow control)
507     const size_t _buff_capacity;
508     //! The max number of outstanding async messages that a block can have at any time
509     const size_t _max_outstanding_async_msgs;
510     //! The clock that drives the ctrlport endpoint
511     const clock_iface& _client_clk;
512     //! The clock that drives the timing logic for the ctrlport endpoint
513     const clock_iface& _timebase_clk;
514 
515     //! The function to call to validate an async message (by default, all async
516     // messages are considered valid)
517     async_msg_validator_t _validate_async_msg =
__anon2104bd8d0702(uint32_t, const std::vector<uint32_t>&) 518         [](uint32_t, const std::vector<uint32_t>&) { return true; };
519     //! The function to call to handle an async message
520     async_msg_callback_t _handle_async_msg = async_msg_callback_t();
521     //! The current control sequence number of outgoing packets
522     uint8_t _tx_seq_num = 0;
523     //! The number of occupied words in the downstream buffer
524     ssize_t _buff_occupied = 0;
525     //! The arguments for the policy that governs this register interface
526     policy_args _policy;
527     //! A condition variable that hold the "downstream buffer is free" condition
528     std::condition_variable _buff_free_cond;
529     //! A queue that holds all outstanding requests
530     std::deque<ctrl_payload> _req_queue;
531     //! A queue that holds all outstanding responses and their status
532     std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
533     //! A condition variable that hold the "response is available" condition
534     std::condition_variable _resp_ready_cond;
535     //! A mutex to protect all state in this class
536     std::mutex _mutex;
537 };
538 
make(const send_fn_t & handle_send,sep_id_t this_epid,uint16_t local_port,size_t buff_capacity,size_t max_outstanding_async_msgs,const clock_iface & client_clk,const clock_iface & timebase_clk)539 ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send,
540     sep_id_t this_epid,
541     uint16_t local_port,
542     size_t buff_capacity,
543     size_t max_outstanding_async_msgs,
544     const clock_iface& client_clk,
545     const clock_iface& timebase_clk)
546 {
547     return std::make_shared<ctrlport_endpoint_impl>(handle_send,
548         this_epid,
549         local_port,
550         buff_capacity,
551         max_outstanding_async_msgs,
552         client_clk,
553         timebase_clk);
554 }
555