1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/module.h> 11 #include <linux/circ_buf.h> 12 #include <linux/net.h> 13 #include <linux/skbuff.h> 14 #include <linux/slab.h> 15 #include <linux/udp.h> 16 #include <net/sock.h> 17 #include <net/af_rxrpc.h> 18 #include "ar-internal.h" 19 20 /* 21 * Propose a PING ACK be sent. 22 */ 23 void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial, 24 enum rxrpc_propose_ack_trace why) 25 { 26 unsigned long now = jiffies; 27 unsigned long ping_at = now + rxrpc_idle_ack_delay; 28 29 if (time_before(ping_at, call->ping_at)) { 30 WRITE_ONCE(call->ping_at, ping_at); 31 rxrpc_reduce_call_timer(call, ping_at, now, 32 rxrpc_timer_set_for_ping); 33 trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial); 34 } 35 } 36 37 /* 38 * Propose a DELAY ACK be sent in the future. 39 */ 40 void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial, 41 enum rxrpc_propose_ack_trace why) 42 { 43 unsigned long expiry = rxrpc_soft_ack_delay; 44 unsigned long now = jiffies, ack_at; 45 46 call->ackr_serial = serial; 47 48 if (rxrpc_soft_ack_delay < expiry) 49 expiry = rxrpc_soft_ack_delay; 50 if (call->peer->srtt_us != 0) 51 ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3); 52 else 53 ack_at = expiry; 54 55 ack_at += READ_ONCE(call->tx_backoff); 56 ack_at += now; 57 if (time_before(ack_at, call->delay_ack_at)) { 58 WRITE_ONCE(call->delay_ack_at, ack_at); 59 rxrpc_reduce_call_timer(call, ack_at, now, 60 rxrpc_timer_set_for_ack); 61 } 62 63 trace_rxrpc_propose_ack(call, why, RXRPC_ACK_DELAY, serial); 64 } 65 66 /* 67 * Queue an ACK for immediate transmission. 68 */ 69 void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason, 70 rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why) 71 { 72 struct rxrpc_txbuf *txb; 73 74 if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) 75 return; 76 77 rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]); 78 79 txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK, 80 rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS); 81 if (!txb) { 82 kleave(" = -ENOMEM"); 83 return; 84 } 85 86 txb->ack_why = why; 87 txb->wire.seq = 0; 88 txb->wire.type = RXRPC_PACKET_TYPE_ACK; 89 txb->wire.flags |= RXRPC_SLOW_START_OK; 90 txb->ack.bufferSpace = 0; 91 txb->ack.maxSkew = 0; 92 txb->ack.firstPacket = 0; 93 txb->ack.previousPacket = 0; 94 txb->ack.serial = htonl(serial); 95 txb->ack.reason = ack_reason; 96 txb->ack.nAcks = 0; 97 98 trace_rxrpc_send_ack(call, why, ack_reason, serial); 99 rxrpc_send_ack_packet(call, txb); 100 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx); 101 } 102 103 /* 104 * Handle congestion being detected by the retransmit timeout. 105 */ 106 static void rxrpc_congestion_timeout(struct rxrpc_call *call) 107 { 108 set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags); 109 } 110 111 /* 112 * Perform retransmission of NAK'd and unack'd packets. 113 */ 114 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb) 115 { 116 struct rxrpc_ackpacket *ack = NULL; 117 struct rxrpc_txbuf *txb; 118 unsigned long resend_at; 119 rxrpc_seq_t transmitted = READ_ONCE(call->tx_transmitted); 120 ktime_t now, max_age, oldest, ack_ts; 121 bool unacked = false; 122 unsigned int i; 123 LIST_HEAD(retrans_queue); 124 125 _enter("{%d,%d}", call->acks_hard_ack, call->tx_top); 126 127 now = ktime_get_real(); 128 max_age = ktime_sub_us(now, jiffies_to_usecs(call->peer->rto_j)); 129 oldest = now; 130 131 if (list_empty(&call->tx_buffer)) 132 goto no_resend; 133 134 if (list_empty(&call->tx_buffer)) 135 goto no_further_resend; 136 137 trace_rxrpc_resend(call, ack_skb); 138 txb = list_first_entry(&call->tx_buffer, struct rxrpc_txbuf, call_link); 139 140 /* Scan the soft ACK table without dropping the lock and resend any 141 * explicitly NAK'd packets. 142 */ 143 if (ack_skb) { 144 ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header); 145 146 for (i = 0; i < ack->nAcks; i++) { 147 rxrpc_seq_t seq; 148 149 if (ack->acks[i] & 1) 150 continue; 151 seq = ntohl(ack->firstPacket) + i; 152 if (after(txb->seq, transmitted)) 153 break; 154 if (after(txb->seq, seq)) 155 continue; /* A new hard ACK probably came in */ 156 list_for_each_entry_from(txb, &call->tx_buffer, call_link) { 157 if (txb->seq == seq) 158 goto found_txb; 159 } 160 goto no_further_resend; 161 162 found_txb: 163 if (after(ntohl(txb->wire.serial), call->acks_highest_serial)) 164 continue; /* Ack point not yet reached */ 165 166 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked); 167 168 if (list_empty(&txb->tx_link)) { 169 list_add_tail(&txb->tx_link, &retrans_queue); 170 set_bit(RXRPC_TXBUF_RESENT, &txb->flags); 171 } 172 173 trace_rxrpc_retransmit(call, txb->seq, 174 ktime_to_ns(ktime_sub(txb->last_sent, 175 max_age))); 176 177 if (list_is_last(&txb->call_link, &call->tx_buffer)) 178 goto no_further_resend; 179 txb = list_next_entry(txb, call_link); 180 } 181 } 182 183 /* Fast-forward through the Tx queue to the point the peer says it has 184 * seen. Anything between the soft-ACK table and that point will get 185 * ACK'd or NACK'd in due course, so don't worry about it here; here we 186 * need to consider retransmitting anything beyond that point. 187 * 188 * Note that ACK for a packet can beat the update of tx_transmitted. 189 */ 190 if (after_eq(READ_ONCE(call->acks_prev_seq), READ_ONCE(call->tx_transmitted))) 191 goto no_further_resend; 192 193 list_for_each_entry_from(txb, &call->tx_buffer, call_link) { 194 if (before_eq(txb->seq, READ_ONCE(call->acks_prev_seq))) 195 continue; 196 if (after(txb->seq, READ_ONCE(call->tx_transmitted))) 197 break; /* Not transmitted yet */ 198 199 if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE && 200 before(ntohl(txb->wire.serial), ntohl(ack->serial))) 201 goto do_resend; /* Wasn't accounted for by a more recent ping. */ 202 203 if (ktime_after(txb->last_sent, max_age)) { 204 if (ktime_before(txb->last_sent, oldest)) 205 oldest = txb->last_sent; 206 continue; 207 } 208 209 do_resend: 210 unacked = true; 211 if (list_empty(&txb->tx_link)) { 212 list_add_tail(&txb->tx_link, &retrans_queue); 213 set_bit(RXRPC_TXBUF_RESENT, &txb->flags); 214 rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans); 215 } 216 } 217 218 no_further_resend: 219 no_resend: 220 resend_at = nsecs_to_jiffies(ktime_to_ns(ktime_sub(now, oldest))); 221 resend_at += jiffies + rxrpc_get_rto_backoff(call->peer, 222 !list_empty(&retrans_queue)); 223 WRITE_ONCE(call->resend_at, resend_at); 224 225 if (unacked) 226 rxrpc_congestion_timeout(call); 227 228 /* If there was nothing that needed retransmission then it's likely 229 * that an ACK got lost somewhere. Send a ping to find out instead of 230 * retransmitting data. 231 */ 232 if (list_empty(&retrans_queue)) { 233 rxrpc_reduce_call_timer(call, resend_at, jiffies, 234 rxrpc_timer_set_for_resend); 235 ack_ts = ktime_sub(now, call->acks_latest_ts); 236 if (ktime_to_us(ack_ts) < (call->peer->srtt_us >> 3)) 237 goto out; 238 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 239 rxrpc_propose_ack_ping_for_lost_ack); 240 goto out; 241 } 242 243 /* Retransmit the queue */ 244 while ((txb = list_first_entry_or_null(&retrans_queue, 245 struct rxrpc_txbuf, tx_link))) { 246 list_del_init(&txb->tx_link); 247 rxrpc_transmit_one(call, txb); 248 } 249 250 out: 251 _leave(""); 252 } 253 254 /* 255 * Start transmitting the reply to a service. This cancels the need to ACK the 256 * request if we haven't yet done so. 257 */ 258 static void rxrpc_begin_service_reply(struct rxrpc_call *call) 259 { 260 unsigned long now = jiffies; 261 262 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY); 263 WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET); 264 if (call->ackr_reason == RXRPC_ACK_DELAY) 265 call->ackr_reason = 0; 266 trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now); 267 } 268 269 /* 270 * Close the transmission phase. After this point there is no more data to be 271 * transmitted in the call. 272 */ 273 static void rxrpc_close_tx_phase(struct rxrpc_call *call) 274 { 275 _debug("________awaiting reply/ACK__________"); 276 277 switch (__rxrpc_call_state(call)) { 278 case RXRPC_CALL_CLIENT_SEND_REQUEST: 279 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY); 280 break; 281 case RXRPC_CALL_SERVER_SEND_REPLY: 282 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK); 283 break; 284 default: 285 break; 286 } 287 } 288 289 static bool rxrpc_tx_window_has_space(struct rxrpc_call *call) 290 { 291 unsigned int winsize = min_t(unsigned int, call->tx_winsize, 292 call->cong_cwnd + call->cong_extra); 293 rxrpc_seq_t window = call->acks_hard_ack, wtop = window + winsize; 294 rxrpc_seq_t tx_top = call->tx_top; 295 int space; 296 297 space = wtop - tx_top; 298 return space > 0; 299 } 300 301 /* 302 * Decant some if the sendmsg prepared queue into the transmission buffer. 303 */ 304 static void rxrpc_decant_prepared_tx(struct rxrpc_call *call) 305 { 306 struct rxrpc_txbuf *txb; 307 308 if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { 309 if (list_empty(&call->tx_sendmsg)) 310 return; 311 rxrpc_expose_client_call(call); 312 } 313 314 while ((txb = list_first_entry_or_null(&call->tx_sendmsg, 315 struct rxrpc_txbuf, call_link))) { 316 spin_lock(&call->tx_lock); 317 list_del(&txb->call_link); 318 spin_unlock(&call->tx_lock); 319 320 call->tx_top = txb->seq; 321 list_add_tail(&txb->call_link, &call->tx_buffer); 322 323 if (txb->wire.flags & RXRPC_LAST_PACKET) 324 rxrpc_close_tx_phase(call); 325 326 rxrpc_transmit_one(call, txb); 327 328 if (!rxrpc_tx_window_has_space(call)) 329 break; 330 } 331 } 332 333 static void rxrpc_transmit_some_data(struct rxrpc_call *call) 334 { 335 switch (__rxrpc_call_state(call)) { 336 case RXRPC_CALL_SERVER_ACK_REQUEST: 337 if (list_empty(&call->tx_sendmsg)) 338 return; 339 rxrpc_begin_service_reply(call); 340 fallthrough; 341 342 case RXRPC_CALL_SERVER_SEND_REPLY: 343 case RXRPC_CALL_CLIENT_SEND_REQUEST: 344 if (!rxrpc_tx_window_has_space(call)) 345 return; 346 if (list_empty(&call->tx_sendmsg)) { 347 rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow); 348 return; 349 } 350 rxrpc_decant_prepared_tx(call); 351 break; 352 default: 353 return; 354 } 355 } 356 357 /* 358 * Ping the other end to fill our RTT cache and to retrieve the rwind 359 * and MTU parameters. 360 */ 361 static void rxrpc_send_initial_ping(struct rxrpc_call *call) 362 { 363 if (call->peer->rtt_count < 3 || 364 ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), 365 ktime_get_real())) 366 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 367 rxrpc_propose_ack_ping_for_params); 368 } 369 370 /* 371 * Handle retransmission and deferred ACK/abort generation. 372 */ 373 bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb) 374 { 375 unsigned long now, next, t; 376 rxrpc_serial_t ackr_serial; 377 bool resend = false, expired = false; 378 s32 abort_code; 379 380 rxrpc_see_call(call, rxrpc_call_see_input); 381 382 //printk("\n--------------------\n"); 383 _enter("{%d,%s,%lx}", 384 call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)], 385 call->events); 386 387 if (__rxrpc_call_is_complete(call)) 388 goto out; 389 390 /* Handle abort request locklessly, vs rxrpc_propose_abort(). */ 391 abort_code = smp_load_acquire(&call->send_abort); 392 if (abort_code) { 393 rxrpc_abort_call(call, 0, call->send_abort, call->send_abort_err, 394 call->send_abort_why); 395 goto out; 396 } 397 398 if (skb && skb->mark == RXRPC_SKB_MARK_ERROR) 399 goto out; 400 401 /* If we see our async-event poke, check for timeout trippage. */ 402 now = jiffies; 403 t = READ_ONCE(call->expect_rx_by); 404 if (time_after_eq(now, t)) { 405 trace_rxrpc_timer(call, rxrpc_timer_exp_normal, now); 406 expired = true; 407 } 408 409 t = READ_ONCE(call->expect_req_by); 410 if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST && 411 time_after_eq(now, t)) { 412 trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now); 413 expired = true; 414 } 415 416 t = READ_ONCE(call->expect_term_by); 417 if (time_after_eq(now, t)) { 418 trace_rxrpc_timer(call, rxrpc_timer_exp_hard, now); 419 expired = true; 420 } 421 422 t = READ_ONCE(call->delay_ack_at); 423 if (time_after_eq(now, t)) { 424 trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now); 425 cmpxchg(&call->delay_ack_at, t, now + MAX_JIFFY_OFFSET); 426 ackr_serial = xchg(&call->ackr_serial, 0); 427 rxrpc_send_ACK(call, RXRPC_ACK_DELAY, ackr_serial, 428 rxrpc_propose_ack_ping_for_lost_ack); 429 } 430 431 t = READ_ONCE(call->ack_lost_at); 432 if (time_after_eq(now, t)) { 433 trace_rxrpc_timer(call, rxrpc_timer_exp_lost_ack, now); 434 cmpxchg(&call->ack_lost_at, t, now + MAX_JIFFY_OFFSET); 435 set_bit(RXRPC_CALL_EV_ACK_LOST, &call->events); 436 } 437 438 t = READ_ONCE(call->keepalive_at); 439 if (time_after_eq(now, t)) { 440 trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now); 441 cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET); 442 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 443 rxrpc_propose_ack_ping_for_keepalive); 444 } 445 446 t = READ_ONCE(call->ping_at); 447 if (time_after_eq(now, t)) { 448 trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now); 449 cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET); 450 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 451 rxrpc_propose_ack_ping_for_keepalive); 452 } 453 454 t = READ_ONCE(call->resend_at); 455 if (time_after_eq(now, t)) { 456 trace_rxrpc_timer(call, rxrpc_timer_exp_resend, now); 457 cmpxchg(&call->resend_at, t, now + MAX_JIFFY_OFFSET); 458 resend = true; 459 } 460 461 if (skb) 462 rxrpc_input_call_packet(call, skb); 463 464 rxrpc_transmit_some_data(call); 465 466 if (skb) { 467 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 468 469 if (sp->hdr.type == RXRPC_PACKET_TYPE_ACK) 470 rxrpc_congestion_degrade(call); 471 } 472 473 if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events)) 474 rxrpc_send_initial_ping(call); 475 476 /* Process events */ 477 if (expired) { 478 if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) && 479 (int)call->conn->hi_serial - (int)call->rx_serial > 0) { 480 trace_rxrpc_call_reset(call); 481 rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ECONNRESET, 482 rxrpc_abort_call_reset); 483 } else { 484 rxrpc_abort_call(call, 0, RX_CALL_TIMEOUT, -ETIME, 485 rxrpc_abort_call_timeout); 486 } 487 goto out; 488 } 489 490 if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) 491 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 492 rxrpc_propose_ack_ping_for_lost_ack); 493 494 if (resend && __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY) 495 rxrpc_resend(call, NULL); 496 497 if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 498 rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0, 499 rxrpc_propose_ack_rx_idle); 500 501 if (atomic_read(&call->ackr_nr_unacked) > 2) { 502 if (call->peer->rtt_count < 3) 503 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 504 rxrpc_propose_ack_ping_for_rtt); 505 else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), 506 ktime_get_real())) 507 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 508 rxrpc_propose_ack_ping_for_old_rtt); 509 else 510 rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0, 511 rxrpc_propose_ack_input_data); 512 } 513 514 /* Make sure the timer is restarted */ 515 if (!__rxrpc_call_is_complete(call)) { 516 next = call->expect_rx_by; 517 518 #define set(T) { t = READ_ONCE(T); if (time_before(t, next)) next = t; } 519 520 set(call->expect_req_by); 521 set(call->expect_term_by); 522 set(call->delay_ack_at); 523 set(call->ack_lost_at); 524 set(call->resend_at); 525 set(call->keepalive_at); 526 set(call->ping_at); 527 528 now = jiffies; 529 if (time_after_eq(now, next)) 530 rxrpc_poke_call(call, rxrpc_call_poke_timer_now); 531 532 rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart); 533 } 534 535 out: 536 if (__rxrpc_call_is_complete(call)) { 537 del_timer_sync(&call->timer); 538 if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) 539 rxrpc_disconnect_call(call); 540 if (call->security) 541 call->security->free_call_crypto(call); 542 } 543 if (call->acks_hard_ack != call->tx_bottom) 544 rxrpc_shrink_call_tx_buffer(call); 545 _leave(""); 546 return true; 547 } 548