1 //
2 // Copyright 2019 Ettus Research, a National Instruments Brand
3 //
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 //
6
7 #include <uhd/exception.hpp>
8 #include <uhd/rfnoc/chdr_types.hpp>
9 #include <uhd/utils/log.hpp>
10 #include <uhdlib/rfnoc/chdr_packet_writer.hpp>
11 #include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
12 #include <condition_variable>
13 #include <boost/format.hpp>
14 #include <deque>
15 #include <mutex>
16 #include <numeric>
17 #include <queue>
18
19
20 using namespace uhd;
21 using namespace uhd::rfnoc;
22 using namespace uhd::rfnoc::chdr;
23
24 using namespace std::chrono;
25 using namespace std::chrono_literals;
26
27 namespace {
28 //! Max async msg (CTRL_WRITE) size in 32-bit words (2 hdr, 2 TS, 1 op-word, 1 data)
29 constexpr size_t ASYNC_MESSAGE_SIZE = 6;
30 //! Default completion timeout for transactions
31 constexpr double DEFAULT_TIMEOUT = 1.0;
32 //! Long timeout for when we wait on a timed command
33 constexpr double MASSIVE_TIMEOUT = 10.0;
34 //! Default value for whether ACKs are always required
35 constexpr bool DEFAULT_FORCE_ACKS = false;
36 } // namespace
37
38 ctrlport_endpoint::~ctrlport_endpoint() = default;
39
40 class ctrlport_endpoint_impl : public ctrlport_endpoint
41 {
42 public:
ctrlport_endpoint_impl(const send_fn_t & send_fcn,sep_id_t my_epid,uint16_t local_port,size_t buff_capacity,size_t max_outstanding_async_msgs,const clock_iface & client_clk,const clock_iface & timebase_clk)43 ctrlport_endpoint_impl(const send_fn_t& send_fcn,
44 sep_id_t my_epid,
45 uint16_t local_port,
46 size_t buff_capacity,
47 size_t max_outstanding_async_msgs,
48 const clock_iface& client_clk,
49 const clock_iface& timebase_clk)
50 : _handle_send(send_fcn)
51 , _my_epid(my_epid)
52 , _local_port(local_port)
53 , _buff_capacity(buff_capacity)
54 , _max_outstanding_async_msgs(max_outstanding_async_msgs)
55 , _client_clk(client_clk)
56 , _timebase_clk(timebase_clk)
57 {
58 }
59
60 virtual ~ctrlport_endpoint_impl() = default;
61
poke32(uint32_t addr,uint32_t data,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)62 virtual void poke32(uint32_t addr,
63 uint32_t data,
64 uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
65 bool ack = false)
66 {
67 // Compute transaction expiration time
68 auto timeout_time = start_timeout(_policy.timeout);
69 // Send request
70 auto request =
71 send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time);
72 // Optionally wait for an ACK
73 if (ack || _policy.force_acks) {
74 wait_for_ack(request, timeout_time);
75 }
76 }
77
multi_poke32(const std::vector<uint32_t> addrs,const std::vector<uint32_t> data,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)78 virtual void multi_poke32(const std::vector<uint32_t> addrs,
79 const std::vector<uint32_t> data,
80 uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
81 bool ack = false)
82 {
83 if (addrs.size() != data.size()) {
84 throw uhd::value_error("addrs and data vectors must be of the same length");
85 }
86 for (size_t i = 0; i < data.size(); i++) {
87 poke32(addrs[i],
88 data[i],
89 (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
90 (i == data.size() - 1) ? ack : false);
91 }
92 }
93
block_poke32(uint32_t first_addr,const std::vector<uint32_t> data,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)94 virtual void block_poke32(uint32_t first_addr,
95 const std::vector<uint32_t> data,
96 uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
97 bool ack = false)
98 {
99 for (size_t i = 0; i < data.size(); i++) {
100 poke32(first_addr + (i * sizeof(uint32_t)),
101 data[i],
102 (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
103 (i == data.size() - 1) ? ack : false);
104 }
105
106 /* TODO: Uncomment when the atomic block poke is implemented in the FPGA
107 // Compute transaction expiration time
108 auto timeout_time = start_timeout(_policy.timeout);
109 // Send request
110 auto request = send_request_packet(
111 OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time);
112 // Optionally wait for an ACK
113 if (ack || _policy.force_acks) {
114 wait_for_ack(request, timeout_time);
115 }
116 */
117 }
118
peek32(uint32_t addr,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP)119 virtual uint32_t peek32(
120 uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
121 {
122 // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
123 // command is in the queue
124 auto timeout_time =
125 start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
126
127 // Send request
128 auto request =
129 send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time);
130 // Wait for an ACK
131 auto response = wait_for_ack(request, timeout_time);
132 return response.data_vtr[0];
133 }
134
block_peek32(uint32_t first_addr,size_t length,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP)135 virtual std::vector<uint32_t> block_peek32(uint32_t first_addr,
136 size_t length,
137 uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
138 {
139 std::vector<uint32_t> values;
140 for (size_t i = 0; i < length; i++) {
141 values.push_back(peek32(first_addr + (i * sizeof(uint32_t)),
142 (i == 0) ? timestamp : uhd::time_spec_t::ASAP));
143 }
144 return values;
145
146 /* TODO: Uncomment when the atomic block peek is implemented in the FPGA
147 // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
148 // command is in the queue
149 auto timeout_time = start_timeout(
150 check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout
151 );
152 // Send request
153 auto request = send_request_packet(OP_READ,
154 first_addr,
155 std::vector<uint32_t>(length, 0),
156 timestamp,
157 timeout_time);
158 // Wait for an ACK
159 auto response = wait_for_ack(request, timeout_time);
160 return response.data_vtr;
161 */
162 }
163
poll32(uint32_t addr,uint32_t data,uint32_t mask,uhd::time_spec_t timeout,uhd::time_spec_t timestamp=uhd::time_spec_t::ASAP,bool ack=false)164 virtual void poll32(uint32_t addr,
165 uint32_t data,
166 uint32_t mask,
167 uhd::time_spec_t timeout,
168 uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
169 bool ack = false)
170 {
171 // TODO: Uncomment when this is implemented in the FPGA
172 throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
173
174 // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
175 // command is in the queue
176 auto timeout_time =
177 start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
178
179 // Send request
180 auto request = send_request_packet(OP_POLL,
181 addr,
182 {data, mask, static_cast<uint32_t>(timeout.to_ticks(_client_clk.get_freq()))},
183 timestamp,
184 timeout_time);
185 // Optionally wait for an ACK
186 if (ack || _policy.force_acks) {
187 wait_for_ack(request, timeout_time);
188 }
189 }
190
sleep(uhd::time_spec_t duration,bool ack=false)191 virtual void sleep(uhd::time_spec_t duration, bool ack = false)
192 {
193 // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
194 // command is in the queue
195 auto timeout_time =
196 start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
197
198 // Send request
199 auto request = send_request_packet(OP_SLEEP,
200 0,
201 {static_cast<uint32_t>(duration.to_ticks(_client_clk.get_freq()))},
202 uhd::time_spec_t::ASAP,
203 timeout_time);
204 // Optionally wait for an ACK
205 if (ack || _policy.force_acks) {
206 wait_for_ack(request, timeout_time);
207 }
208 }
209
register_async_msg_validator(async_msg_validator_t callback_f)210 virtual void register_async_msg_validator(async_msg_validator_t callback_f)
211 {
212 std::unique_lock<std::mutex> lock(_mutex);
213 _validate_async_msg = callback_f;
214 }
215
register_async_msg_handler(async_msg_callback_t callback_f)216 virtual void register_async_msg_handler(async_msg_callback_t callback_f)
217 {
218 std::unique_lock<std::mutex> lock(_mutex);
219 _handle_async_msg = callback_f;
220 }
221
set_policy(const std::string & name,const uhd::device_addr_t & args)222 virtual void set_policy(const std::string& name, const uhd::device_addr_t& args)
223 {
224 std::unique_lock<std::mutex> lock(_mutex);
225 if (name == "default") {
226 _policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT);
227 _policy.force_acks = DEFAULT_FORCE_ACKS;
228 } else {
229 // TODO: Uncomment when custom policies are implemented
230 throw uhd::not_implemented_error("Policy implemented in the FPGA");
231 }
232 }
233
handle_recv(const ctrl_payload & rx_ctrl)234 virtual void handle_recv(const ctrl_payload& rx_ctrl)
235 {
236 if (rx_ctrl.is_ack) {
237 // Function to process a response with no sequence errors
238 auto process_correct_response = [this, rx_ctrl]() {
239 std::unique_lock<std::mutex> lock(_mutex);
240 response_status_t resp_status = RESP_VALID;
241 // Grant flow control credits
242 _buff_occupied -= get_payload_size(_req_queue.front());
243 _buff_free_cond.notify_one();
244 if (get_payload_size(_req_queue.front()) != get_payload_size(rx_ctrl)) {
245 resp_status = RESP_SIZEERR;
246 }
247 // Pop the request from the queue
248 _req_queue.pop_front();
249 // Push the response into the response queue
250 _resp_queue.push(std::make_tuple(rx_ctrl, resp_status));
251 _resp_ready_cond.notify_one();
252 };
253 // Function to process a response with sequence errors
254 auto process_incorrect_response = [this]() {
255 std::unique_lock<std::mutex> lock(_mutex);
256 // Grant flow control credits
257 _buff_occupied -= get_payload_size(_req_queue.front());
258 _buff_free_cond.notify_one();
259 // Push a fabricated response into the response queue
260 ctrl_payload resp(_req_queue.front());
261 resp.is_ack = true;
262 _resp_queue.push(std::make_tuple(resp, RESP_DROPPED));
263 _resp_ready_cond.notify_one();
264 // Pop the request from the queue
265 _req_queue.pop_front();
266 };
267
268 // Peek at the request queue to check the expected sequence number
269 int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num);
270 if (seq_num_diff == 0) { // No sequence error
271 process_correct_response();
272 } else if (seq_num_diff > 0) { // Packet(s) dropped
273 // Tag all dropped packets
274 for (int8_t i = 0; i < seq_num_diff; i++) {
275 process_incorrect_response();
276 }
277 // Process correct response
278 process_correct_response();
279 } else { // Reordered packet(s)
280 // Requests are processed in order. If seq_num_diff is negative then we
281 // have either already seen this response or we have dropped >128
282 // responses. Either way ignore this packet.
283 }
284 } else {
285 // Handle asynchronous message callback
286 ctrl_status_t status = CMD_CMDERR;
287 if (rx_ctrl.op_code != OP_WRITE && rx_ctrl.op_code != OP_BLOCK_WRITE) {
288 UHD_LOG_ERROR(
289 "CTRLEP", "Malformed async message request: Invalid opcode");
290 } else if (rx_ctrl.dst_port != _local_port) {
291 UHD_LOG_ERROR("CTRLEP",
292 "Malformed async message request: Invalid port "
293 << rx_ctrl.dst_port << ", expected my local port "
294 << _local_port);
295 } else if (rx_ctrl.data_vtr.empty()) {
296 UHD_LOG_ERROR(
297 "CTRLEP", "Malformed async message request: Invalid num_data");
298 } else {
299 if (!_validate_async_msg(rx_ctrl.address, rx_ctrl.data_vtr)) {
300 UHD_LOG_ERROR("CTRLEP",
301 "Malformed async message request: Async message was not "
302 "validated by block controller!");
303 } else {
304 status = CMD_OKAY;
305 }
306 }
307 try {
308 // Respond with an ACK packet
309 // Flow control not needed because we have allocated special room in the
310 // buffer for async message responses
311 ctrl_payload tx_ctrl(rx_ctrl);
312 tx_ctrl.is_ack = true;
313 tx_ctrl.src_epid = _my_epid;
314 tx_ctrl.status = status;
315 const auto timeout = [&]() {
316 std::unique_lock<std::mutex> lock(_mutex);
317 return _policy.timeout;
318 }();
319 _handle_send(tx_ctrl, timeout);
320 } catch (...) {
321 UHD_LOG_ERROR("CTRLEP",
322 "Encountered an error sending a response for an async message");
323 return;
324 }
325 if (status == CMD_OKAY) {
326 try {
327 _handle_async_msg(
328 rx_ctrl.address, rx_ctrl.data_vtr, rx_ctrl.timestamp);
329 } catch (const std::exception& ex) {
330 UHD_LOG_ERROR("CTRLEP",
331 "Caught exception during async message handling: " << ex.what());
332 } catch (...) {
333 UHD_LOG_ERROR("CTRLEP",
334 "Caught unknown exception during async message handling!");
335 }
336 }
337 }
338 }
339
get_src_epid() const340 virtual uint16_t get_src_epid() const
341 {
342 // Is const, does not require a mutex
343 return _my_epid;
344 }
345
get_port_num() const346 virtual uint16_t get_port_num() const
347 {
348 // Is const, does not require a mutex
349 return _local_port;
350 }
351
352 private:
353 //! Returns the length of the control payload in 32-bit words
get_payload_size(const ctrl_payload & payload)354 inline static size_t get_payload_size(const ctrl_payload& payload)
355 {
356 return 2 + (payload.timestamp.is_initialized() ? 2 : 0) + payload.data_vtr.size();
357 }
358
359 //! Marks the start of a timeout for an operation and returns the expiration time
start_timeout(double duration)360 inline const steady_clock::time_point start_timeout(double duration)
361 {
362 return steady_clock::now() + (static_cast<int>(std::ceil(duration / 1e-6)) * 1us);
363 }
364
365 //! Returns whether or not we have a timed command queued
check_timed_in_queue() const366 bool check_timed_in_queue() const
367 {
368 for (auto pyld : _req_queue) {
369 if (pyld.has_timestamp()) {
370 return true;
371 }
372 }
373 return false;
374 }
375
376 //! Sends a request control packet to a remote device
send_request_packet(ctrl_opcode_t op_code,uint32_t address,const std::vector<uint32_t> & data_vtr,const uhd::time_spec_t & time_spec,const steady_clock::time_point & timeout_time)377 const ctrl_payload send_request_packet(ctrl_opcode_t op_code,
378 uint32_t address,
379 const std::vector<uint32_t>& data_vtr,
380 const uhd::time_spec_t& time_spec,
381 const steady_clock::time_point& timeout_time)
382 {
383 if (!_client_clk.is_running()) {
384 throw uhd::system_error("Ctrlport client clock is not running");
385 }
386
387 // Convert from uhd::time_spec to timestamp
388 boost::optional<uint64_t> timestamp;
389 if (time_spec != time_spec_t::ASAP) {
390 if (!_timebase_clk.is_running()) {
391 throw uhd::system_error("Timebase clock is not running");
392 }
393 timestamp = time_spec.to_ticks(_timebase_clk.get_freq());
394 }
395
396 // Assemble the control payload
397 ctrl_payload tx_ctrl;
398 tx_ctrl.dst_port = _local_port;
399 tx_ctrl.src_port = _local_port;
400 tx_ctrl.seq_num = _tx_seq_num;
401 tx_ctrl.timestamp = timestamp;
402 tx_ctrl.is_ack = false;
403 tx_ctrl.src_epid = _my_epid;
404 tx_ctrl.address = address;
405 tx_ctrl.data_vtr = data_vtr;
406 tx_ctrl.byte_enable = 0xF;
407 tx_ctrl.op_code = op_code;
408 tx_ctrl.status = CMD_OKAY;
409
410 std::unique_lock<std::mutex> lock(_mutex);
411
412 // Perform flow control
413 // If there is no room in the downstream buffer, then wait until the timeout
414 size_t pyld_size = get_payload_size(tx_ctrl);
415 auto buff_not_full = [this, pyld_size]() -> bool {
416 // Allocate room in the queue for one async response packet
417 // If we can fit the current request in the queue then we can proceed
418 return (_buff_occupied + pyld_size)
419 <= (_buff_capacity
420 - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
421 };
422 if (!buff_not_full()) {
423 // If we're sending a timed command or if we have a timed command in the
424 // queue, use the MASSIVE_TIMEOUT instead
425 auto timed_timeout =
426 (check_timed_in_queue() ? start_timeout(MASSIVE_TIMEOUT) : timeout_time);
427 if (not _buff_free_cond.wait_until(lock, timed_timeout, buff_not_full)) {
428 throw uhd::op_timeout(
429 "Control operation timed out waiting for space in command buffer");
430 }
431 }
432 _buff_occupied += pyld_size;
433 _req_queue.push_back(tx_ctrl);
434
435 // Send the payload as soon as there is room in the buffer
436 _handle_send(tx_ctrl, _policy.timeout);
437 _tx_seq_num = (_tx_seq_num + 1) % 64;
438
439 return tx_ctrl;
440 }
441
442 //! Waits for and returns the ACK for the specified request
wait_for_ack(const ctrl_payload & request,const steady_clock::time_point & timeout_time)443 const ctrl_payload wait_for_ack(
444 const ctrl_payload& request, const steady_clock::time_point& timeout_time)
445 {
446 auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
447 while (true) {
448 std::unique_lock<std::mutex> lock(_mutex);
449 // Wait until there is a response in the response queue
450 if (!resp_ready()) {
451 if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {
452 throw uhd::op_timeout("Control operation timed out waiting for ACK");
453 }
454 }
455 // Extract the response packet
456 ctrl_payload rx_ctrl;
457 response_status_t resp_status;
458 std::tie(rx_ctrl, resp_status) = _resp_queue.front();
459 _resp_queue.pop();
460 // Check if this is the response meant for the request
461 // Filter by op_code, address and seq_num
462 if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code
463 && rx_ctrl.address == request.address) {
464 // Validate transaction status
465 if (rx_ctrl.status == CMD_CMDERR) {
466 throw uhd::op_failed("Control operation returned a failing status");
467 } else if (rx_ctrl.status == CMD_TSERR) {
468 throw uhd::op_timerr("Control operation returned a timestamp error");
469 }
470 // Check data vector size
471 if (rx_ctrl.data_vtr.size() == 0) {
472 throw uhd::op_failed(
473 "Control operation returned a malformed response");
474 }
475 // Validate response status
476 if (resp_status == RESP_DROPPED) {
477 throw uhd::op_seqerr(
478 "Response for a control transaction was dropped");
479 } else if (resp_status == RESP_RTERR) {
480 throw uhd::op_timerr("Control operation encountered a routing error");
481 }
482 return rx_ctrl;
483 } else {
484 // This response does not belong to the request we passed in. Move on.
485 continue;
486 }
487 }
488 }
489
490
491 //! The parameters associated with the policy that governs this object
492 struct policy_args
493 {
494 double timeout = DEFAULT_TIMEOUT;
495 bool force_acks = DEFAULT_FORCE_ACKS;
496 };
497 //! The software status (different from the transaction status) of the response
498 enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
499
500 //! Function to call to send a control packet
501 const send_fn_t _handle_send;
502 //! The endpoint ID of this software endpoint
503 const sep_id_t _my_epid;
504 //! The local port number on the control crossbar for this ctrlport endpoint
505 const uint16_t _local_port;
506 //! The downstream buffer capacity in 32-bit words (used for flow control)
507 const size_t _buff_capacity;
508 //! The max number of outstanding async messages that a block can have at any time
509 const size_t _max_outstanding_async_msgs;
510 //! The clock that drives the ctrlport endpoint
511 const clock_iface& _client_clk;
512 //! The clock that drives the timing logic for the ctrlport endpoint
513 const clock_iface& _timebase_clk;
514
515 //! The function to call to validate an async message (by default, all async
516 // messages are considered valid)
517 async_msg_validator_t _validate_async_msg =
__anon2104bd8d0702(uint32_t, const std::vector<uint32_t>&) 518 [](uint32_t, const std::vector<uint32_t>&) { return true; };
519 //! The function to call to handle an async message
520 async_msg_callback_t _handle_async_msg = async_msg_callback_t();
521 //! The current control sequence number of outgoing packets
522 uint8_t _tx_seq_num = 0;
523 //! The number of occupied words in the downstream buffer
524 ssize_t _buff_occupied = 0;
525 //! The arguments for the policy that governs this register interface
526 policy_args _policy;
527 //! A condition variable that hold the "downstream buffer is free" condition
528 std::condition_variable _buff_free_cond;
529 //! A queue that holds all outstanding requests
530 std::deque<ctrl_payload> _req_queue;
531 //! A queue that holds all outstanding responses and their status
532 std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
533 //! A condition variable that hold the "response is available" condition
534 std::condition_variable _resp_ready_cond;
535 //! A mutex to protect all state in this class
536 std::mutex _mutex;
537 };
538
make(const send_fn_t & handle_send,sep_id_t this_epid,uint16_t local_port,size_t buff_capacity,size_t max_outstanding_async_msgs,const clock_iface & client_clk,const clock_iface & timebase_clk)539 ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send,
540 sep_id_t this_epid,
541 uint16_t local_port,
542 size_t buff_capacity,
543 size_t max_outstanding_async_msgs,
544 const clock_iface& client_clk,
545 const clock_iface& timebase_clk)
546 {
547 return std::make_shared<ctrlport_endpoint_impl>(handle_send,
548 this_epid,
549 local_port,
550 buff_capacity,
551 max_outstanding_async_msgs,
552 client_clk,
553 timebase_clk);
554 }
555