1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright (c) 2006 Oracle. All rights reserved. 27 * 28 * This software is available to you under a choice of one of two 29 * licenses. You may choose to be licensed under the terms of the GNU 30 * General Public License (GPL) Version 2, available from the file 31 * COPYING in the main directory of this source tree, or the 32 * OpenIB.org BSD license below: 33 * 34 * Redistribution and use in source and binary forms, with or 35 * without modification, are permitted provided that the following 36 * conditions are met: 37 * 38 * - Redistributions of source code must retain the above 39 * copyright notice, this list of conditions and the following 40 * disclaimer. 41 * 42 * - Redistributions in binary form must reproduce the above 43 * copyright notice, this list of conditions and the following 44 * disclaimer in the documentation and/or other materials 45 * provided with the distribution. 46 * 47 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 48 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 49 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 50 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 51 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 52 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 53 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 54 * SOFTWARE. 55 * 56 */ 57 #include <sys/stropts.h> 58 #include <sys/systm.h> 59 60 #include <sys/rds.h> 61 #include <sys/socket.h> 62 #include <sys/socketvar.h> 63 64 #include <sys/ib/clients/rdsv3/rdsv3.h> 65 #include <sys/ib/clients/rdsv3/rdma.h> 66 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 67 68 /* 69 * When transmitting messages in rdsv3_send_xmit, we need to emerge from 70 * time to time and briefly release the CPU. Otherwise the softlock watchdog 71 * will kick our shin. 72 * Also, it seems fairer to not let one busy connection stall all the 73 * others. 74 * 75 * send_batch_count is the number of times we'll loop in send_xmit. Setting 76 * it to 0 will restore the old behavior (where we looped until we had 77 * drained the queue). 78 */ 79 static int send_batch_count = 64; 80 81 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op); 82 /* 83 * Reset the send state. Caller must hold c_send_lock when calling here. 84 */ 85 void 86 rdsv3_send_reset(struct rdsv3_connection *conn) 87 { 88 struct rdsv3_message *rm, *tmp; 89 struct rdsv3_rdma_op *ro; 90 91 RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn); 92 93 ASSERT(MUTEX_HELD(&conn->c_send_lock)); 94 95 if (conn->c_xmit_rm) { 96 rm = conn->c_xmit_rm; 97 ro = rm->m_rdma_op; 98 if (ro && ro->r_mapped) { 99 RDSV3_DPRINTF2("rdsv3_send_reset", 100 "rm %p mflg 0x%x map %d mihdl %p sgl %p", 101 rm, rm->m_flags, ro->r_mapped, 102 ro->r_rdma_sg[0].mihdl, 103 ro->r_rdma_sg[0].swr.wr_sgl); 104 rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro); 105 } 106 /* 107 * Tell the user the RDMA op is no longer mapped by the 108 * transport. This isn't entirely true (it's flushed out 109 * independently) but as the connection is down, there's 110 * no ongoing RDMA to/from that memory 111 */ 112 rdsv3_message_unmapped(conn->c_xmit_rm); 113 rdsv3_message_put(conn->c_xmit_rm); 114 conn->c_xmit_rm = NULL; 115 } 116 117 conn->c_xmit_sg = 0; 118 conn->c_xmit_hdr_off = 0; 119 conn->c_xmit_data_off = 0; 120 conn->c_xmit_rdma_sent = 0; 121 conn->c_map_queued = 0; 122 123 conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets; 124 conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes; 125 126 /* Mark messages as retransmissions, and move them to the send q */ 127 mutex_enter(&conn->c_lock); 128 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 129 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 130 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags); 131 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) { 132 RDSV3_DPRINTF4("_send_reset", 133 "RT rm %p mflg 0x%x sgl %p", 134 rm, rm->m_flags, 135 rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl); 136 } 137 } 138 list_move_tail(&conn->c_send_queue, &conn->c_retrans); 139 mutex_exit(&conn->c_lock); 140 141 RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn); 142 } 143 144 /* 145 * We're making the concious trade-off here to only send one message 146 * down the connection at a time. 147 * Pro: 148 * - tx queueing is a simple fifo list 149 * - reassembly is optional and easily done by transports per conn 150 * - no per flow rx lookup at all, straight to the socket 151 * - less per-frag memory and wire overhead 152 * Con: 153 * - queued acks can be delayed behind large messages 154 * Depends: 155 * - small message latency is higher behind queued large messages 156 * - large message latency isn't starved by intervening small sends 157 */ 158 int 159 rdsv3_send_xmit(struct rdsv3_connection *conn) 160 { 161 struct rdsv3_message *rm; 162 unsigned int tmp; 163 unsigned int send_quota = send_batch_count; 164 struct rdsv3_scatterlist *sg; 165 int ret = 0; 166 int was_empty = 0; 167 list_t to_be_dropped; 168 169 restart: 170 if (!rdsv3_conn_up(conn)) 171 goto out; 172 173 RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn); 174 175 list_create(&to_be_dropped, sizeof (struct rdsv3_message), 176 offsetof(struct rdsv3_message, m_conn_item)); 177 178 /* 179 * sendmsg calls here after having queued its message on the send 180 * queue. We only have one task feeding the connection at a time. If 181 * another thread is already feeding the queue then we back off. This 182 * avoids blocking the caller and trading per-connection data between 183 * caches per message. 184 */ 185 if (!mutex_tryenter(&conn->c_send_lock)) { 186 RDSV3_DPRINTF4("rdsv3_send_xmit", 187 "Another thread running(conn: %p)", conn); 188 rdsv3_stats_inc(s_send_sem_contention); 189 ret = -ENOMEM; 190 goto out; 191 } 192 atomic_add_32(&conn->c_senders, 1); 193 194 if (conn->c_trans->xmit_prepare) 195 conn->c_trans->xmit_prepare(conn); 196 197 /* 198 * spin trying to push headers and data down the connection until 199 * the connection doesn't make forward progress. 200 */ 201 while (--send_quota) { 202 /* 203 * See if need to send a congestion map update if we're 204 * between sending messages. The send_sem protects our sole 205 * use of c_map_offset and _bytes. 206 * Note this is used only by transports that define a special 207 * xmit_cong_map function. For all others, we create allocate 208 * a cong_map message and treat it just like any other send. 209 */ 210 if (conn->c_map_bytes) { 211 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, 212 conn->c_map_offset); 213 if (ret <= 0) 214 break; 215 216 conn->c_map_offset += ret; 217 conn->c_map_bytes -= ret; 218 if (conn->c_map_bytes) 219 continue; 220 } 221 222 /* 223 * If we're done sending the current message, clear the 224 * offset and S/G temporaries. 225 */ 226 rm = conn->c_xmit_rm; 227 if (rm != NULL && 228 conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) && 229 conn->c_xmit_sg == rm->m_nents) { 230 conn->c_xmit_rm = NULL; 231 conn->c_xmit_sg = 0; 232 conn->c_xmit_hdr_off = 0; 233 conn->c_xmit_data_off = 0; 234 conn->c_xmit_rdma_sent = 0; 235 236 /* Release the reference to the previous message. */ 237 rdsv3_message_put(rm); 238 rm = NULL; 239 } 240 241 /* If we're asked to send a cong map update, do so. */ 242 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 243 if (conn->c_trans->xmit_cong_map != NULL) { 244 conn->c_map_offset = 0; 245 conn->c_map_bytes = 246 sizeof (struct rdsv3_header) + 247 RDSV3_CONG_MAP_BYTES; 248 continue; 249 } 250 251 rm = rdsv3_cong_update_alloc(conn); 252 if (IS_ERR(rm)) { 253 ret = PTR_ERR(rm); 254 break; 255 } 256 257 conn->c_xmit_rm = rm; 258 } 259 260 /* 261 * Grab the next message from the send queue, if there is one. 262 * 263 * c_xmit_rm holds a ref while we're sending this message down 264 * the connction. We can use this ref while holding the 265 * send_sem.. rdsv3_send_reset() is serialized with it. 266 */ 267 if (rm == NULL) { 268 unsigned int len; 269 270 mutex_enter(&conn->c_lock); 271 272 if (!list_is_empty(&conn->c_send_queue)) { 273 rm = list_remove_head(&conn->c_send_queue); 274 rdsv3_message_addref(rm); 275 276 /* 277 * Move the message from the send queue to 278 * the retransmit 279 * list right away. 280 */ 281 list_insert_tail(&conn->c_retrans, rm); 282 } 283 284 mutex_exit(&conn->c_lock); 285 286 if (rm == NULL) { 287 was_empty = 1; 288 break; 289 } 290 291 /* 292 * Unfortunately, the way Infiniband deals with 293 * RDMA to a bad MR key is by moving the entire 294 * queue pair to error state. We cold possibly 295 * recover from that, but right now we drop the 296 * connection. 297 * Therefore, we never retransmit messages with 298 * RDMA ops. 299 */ 300 if (rm->m_rdma_op && 301 test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) { 302 mutex_enter(&conn->c_lock); 303 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, 304 &rm->m_flags)) 305 list_remove_node(&rm->m_conn_item); 306 list_insert_tail(&to_be_dropped, rm); 307 mutex_exit(&conn->c_lock); 308 rdsv3_message_put(rm); 309 continue; 310 } 311 312 /* Require an ACK every once in a while */ 313 len = ntohl(rm->m_inc.i_hdr.h_len); 314 if (conn->c_unacked_packets == 0 || 315 conn->c_unacked_bytes < len) { 316 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 317 318 conn->c_unacked_packets = 319 rdsv3_sysctl_max_unacked_packets; 320 conn->c_unacked_bytes = 321 rdsv3_sysctl_max_unacked_bytes; 322 rdsv3_stats_inc(s_send_ack_required); 323 } else { 324 conn->c_unacked_bytes -= len; 325 conn->c_unacked_packets--; 326 } 327 328 conn->c_xmit_rm = rm; 329 } 330 331 /* 332 * Try and send an rdma message. Let's see if we can 333 * keep this simple and require that the transport either 334 * send the whole rdma or none of it. 335 */ 336 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) { 337 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op); 338 if (ret) 339 break; 340 conn->c_xmit_rdma_sent = 1; 341 /* 342 * The transport owns the mapped memory for now. 343 * You can't unmap it while it's on the send queue 344 */ 345 set_bit(RDSV3_MSG_MAPPED, &rm->m_flags); 346 } 347 348 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) || 349 conn->c_xmit_sg < rm->m_nents) { 350 ret = conn->c_trans->xmit(conn, rm, 351 conn->c_xmit_hdr_off, 352 conn->c_xmit_sg, 353 conn->c_xmit_data_off); 354 if (ret <= 0) 355 break; 356 357 if (conn->c_xmit_hdr_off < 358 sizeof (struct rdsv3_header)) { 359 tmp = min(ret, 360 sizeof (struct rdsv3_header) - 361 conn->c_xmit_hdr_off); 362 conn->c_xmit_hdr_off += tmp; 363 ret -= tmp; 364 } 365 366 sg = &rm->m_sg[conn->c_xmit_sg]; 367 while (ret) { 368 tmp = min(ret, rdsv3_sg_len(sg) - 369 conn->c_xmit_data_off); 370 conn->c_xmit_data_off += tmp; 371 ret -= tmp; 372 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) { 373 conn->c_xmit_data_off = 0; 374 sg++; 375 conn->c_xmit_sg++; 376 ASSERT(!(ret != 0 && 377 conn->c_xmit_sg == rm->m_nents)); 378 } 379 } 380 } 381 } 382 383 /* Nuke any messages we decided not to retransmit. */ 384 if (!list_is_empty(&to_be_dropped)) 385 rdsv3_send_remove_from_sock(&to_be_dropped, RDSV3_RDMA_DROPPED); 386 387 if (conn->c_trans->xmit_complete) 388 conn->c_trans->xmit_complete(conn); 389 390 /* 391 * We might be racing with another sender who queued a message but 392 * backed off on noticing that we held the c_send_lock. If we check 393 * for queued messages after dropping the sem then either we'll 394 * see the queued message or the queuer will get the sem. If we 395 * notice the queued message then we trigger an immediate retry. 396 * 397 * We need to be careful only to do this when we stopped processing 398 * the send queue because it was empty. It's the only way we 399 * stop processing the loop when the transport hasn't taken 400 * responsibility for forward progress. 401 */ 402 mutex_exit(&conn->c_send_lock); 403 404 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 405 /* 406 * We exhausted the send quota, but there's work left to 407 * do. Return and (re-)schedule the send worker. 408 */ 409 ret = -EAGAIN; 410 } 411 412 atomic_dec_32(&conn->c_senders); 413 414 if (ret == 0 && was_empty) { 415 /* 416 * A simple bit test would be way faster than taking the 417 * spin lock 418 */ 419 mutex_enter(&conn->c_lock); 420 if (!list_is_empty(&conn->c_send_queue)) { 421 rdsv3_stats_inc(s_send_sem_queue_raced); 422 ret = -EAGAIN; 423 } 424 mutex_exit(&conn->c_lock); 425 } 426 427 out: 428 RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)", 429 conn, ret); 430 return (ret); 431 } 432 433 static void 434 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm) 435 { 436 uint32_t len = ntohl(rm->m_inc.i_hdr.h_len); 437 438 ASSERT(mutex_owned(&rs->rs_lock)); 439 440 ASSERT(rs->rs_snd_bytes >= len); 441 rs->rs_snd_bytes -= len; 442 443 if (rs->rs_snd_bytes == 0) 444 rdsv3_stats_inc(s_send_queue_empty); 445 } 446 447 static inline int 448 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack, 449 is_acked_func is_acked) 450 { 451 if (is_acked) 452 return (is_acked(rm, ack)); 453 return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack); 454 } 455 456 /* 457 * Returns true if there are no messages on the send and retransmit queues 458 * which have a sequence number greater than or equal to the given sequence 459 * number. 460 */ 461 int 462 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq) 463 { 464 struct rdsv3_message *rm; 465 int ret = 1; 466 467 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn); 468 469 mutex_enter(&conn->c_lock); 470 471 /* XXX - original code spits out warning */ 472 rm = list_head(&conn->c_retrans); 473 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 474 ret = 0; 475 476 /* XXX - original code spits out warning */ 477 rm = list_head(&conn->c_send_queue); 478 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 479 ret = 0; 480 481 mutex_exit(&conn->c_lock); 482 483 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn); 484 485 return (ret); 486 } 487 488 /* 489 * This is pretty similar to what happens below in the ACK 490 * handling code - except that we call here as soon as we get 491 * the IB send completion on the RDMA op and the accompanying 492 * message. 493 */ 494 void 495 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status) 496 { 497 struct rdsv3_sock *rs = NULL; 498 struct rdsv3_rdma_op *ro; 499 struct rdsv3_notifier *notifier; 500 501 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm); 502 503 mutex_enter(&rm->m_rs_lock); 504 505 ro = rm->m_rdma_op; 506 if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) && 507 ro && ro->r_notify && ro->r_notifier) { 508 notifier = ro->r_notifier; 509 rs = rm->m_rs; 510 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 511 512 notifier->n_status = status; 513 mutex_enter(&rs->rs_lock); 514 list_insert_tail(&rs->rs_notify_queue, notifier); 515 mutex_exit(&rs->rs_lock); 516 ro->r_notifier = NULL; 517 } 518 519 mutex_exit(&rm->m_rs_lock); 520 521 if (rs) { 522 rdsv3_wake_sk_sleep(rs); 523 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 524 } 525 526 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm); 527 } 528 529 /* 530 * This is the same as rdsv3_rdma_send_complete except we 531 * don't do any locking - we have all the ingredients (message, 532 * socket, socket lock) and can just move the notifier. 533 */ 534 static inline void 535 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm, 536 int status) 537 { 538 struct rdsv3_rdma_op *ro; 539 void *ic; 540 541 RDSV3_DPRINTF4("__rdsv3_rdma_send_complete", 542 "Enter(rs: %p, rm: %p)", rs, rm); 543 544 ro = rm->m_rdma_op; 545 if (ro && ro->r_notify && ro->r_notifier) { 546 ro->r_notifier->n_status = status; 547 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier); 548 ro->r_notifier = NULL; 549 } 550 551 /* No need to wake the app - caller does this */ 552 } 553 554 /* 555 * This is called from the IB send completion when we detect 556 * a RDMA operation that failed with remote access error. 557 * So speed is not an issue here. 558 */ 559 struct rdsv3_message * 560 rdsv3_send_get_message(struct rdsv3_connection *conn, 561 struct rdsv3_rdma_op *op) 562 { 563 struct rdsv3_message *rm, *tmp, *found = NULL; 564 565 RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn); 566 567 mutex_enter(&conn->c_lock); 568 569 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 570 if (rm->m_rdma_op == op) { 571 atomic_add_32(&rm->m_refcount, 1); 572 found = rm; 573 goto out; 574 } 575 } 576 577 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue, 578 m_conn_item) { 579 if (rm->m_rdma_op == op) { 580 atomic_add_32(&rm->m_refcount, 1); 581 found = rm; 582 break; 583 } 584 } 585 586 out: 587 mutex_exit(&conn->c_lock); 588 589 return (found); 590 } 591 592 /* 593 * This removes messages from the socket's list if they're on it. The list 594 * argument must be private to the caller, we must be able to modify it 595 * without locks. The messages must have a reference held for their 596 * position on the list. This function will drop that reference after 597 * removing the messages from the 'messages' list regardless of if it found 598 * the messages on the socket list or not. 599 */ 600 void 601 rdsv3_send_remove_from_sock(struct list *messages, int status) 602 { 603 struct rdsv3_sock *rs = NULL; 604 struct rdsv3_message *rm; 605 606 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter"); 607 608 while (!list_is_empty(messages)) { 609 int was_on_sock = 0; 610 rm = list_remove_head(messages); 611 612 /* 613 * If we see this flag cleared then we're *sure* that someone 614 * else beat us to removing it from the sock. If we race 615 * with their flag update we'll get the lock and then really 616 * see that the flag has been cleared. 617 * 618 * The message spinlock makes sure nobody clears rm->m_rs 619 * while we're messing with it. It does not prevent the 620 * message from being removed from the socket, though. 621 */ 622 mutex_enter(&rm->m_rs_lock); 623 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) 624 goto unlock_and_drop; 625 626 if (rs != rm->m_rs) { 627 if (rs) { 628 rdsv3_wake_sk_sleep(rs); 629 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 630 } 631 rs = rm->m_rs; 632 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 633 } 634 635 mutex_enter(&rs->rs_lock); 636 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) { 637 struct rdsv3_rdma_op *ro = rm->m_rdma_op; 638 struct rdsv3_notifier *notifier; 639 640 list_remove_node(&rm->m_sock_item); 641 rdsv3_send_sndbuf_remove(rs, rm); 642 if (ro && ro->r_notifier && 643 (status || ro->r_notify)) { 644 notifier = ro->r_notifier; 645 list_insert_tail(&rs->rs_notify_queue, 646 notifier); 647 if (!notifier->n_status) 648 notifier->n_status = status; 649 rm->m_rdma_op->r_notifier = NULL; 650 } 651 was_on_sock = 1; 652 rm->m_rs = NULL; 653 } 654 mutex_exit(&rs->rs_lock); 655 656 unlock_and_drop: 657 mutex_exit(&rm->m_rs_lock); 658 rdsv3_message_put(rm); 659 if (was_on_sock) 660 rdsv3_message_put(rm); 661 } 662 663 if (rs) { 664 rdsv3_wake_sk_sleep(rs); 665 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 666 } 667 668 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return"); 669 } 670 671 /* 672 * Transports call here when they've determined that the receiver queued 673 * messages up to, and including, the given sequence number. Messages are 674 * moved to the retrans queue when rdsv3_send_xmit picks them off the send 675 * queue. This means that in the TCP case, the message may not have been 676 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 677 * checks the RDSV3_MSG_HAS_ACK_SEQ bit. 678 * 679 * XXX It's not clear to me how this is safely serialized with socket 680 * destruction. Maybe it should bail if it sees SOCK_DEAD. 681 */ 682 void 683 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack, 684 is_acked_func is_acked) 685 { 686 struct rdsv3_message *rm, *tmp; 687 list_t list; 688 689 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn); 690 691 list_create(&list, sizeof (struct rdsv3_message), 692 offsetof(struct rdsv3_message, m_conn_item)); 693 694 mutex_enter(&conn->c_lock); 695 696 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 697 if (!rdsv3_send_is_acked(rm, ack, is_acked)) 698 break; 699 700 list_remove_node(&rm->m_conn_item); 701 list_insert_tail(&list, rm); 702 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 703 } 704 705 #if 0 706 XXX 707 /* order flag updates with spin locks */ 708 if (!list_is_empty(&list)) 709 smp_mb__after_clear_bit(); 710 #endif 711 712 mutex_exit(&conn->c_lock); 713 714 /* now remove the messages from the sock list as needed */ 715 rdsv3_send_remove_from_sock(&list, RDSV3_RDMA_SUCCESS); 716 717 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn); 718 } 719 720 void 721 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest) 722 { 723 struct rdsv3_message *rm, *tmp; 724 struct rdsv3_connection *conn; 725 list_t list; 726 int wake = 0; 727 728 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs); 729 730 list_create(&list, sizeof (struct rdsv3_message), 731 offsetof(struct rdsv3_message, m_sock_item)); 732 733 /* get all the messages we're dropping under the rs lock */ 734 mutex_enter(&rs->rs_lock); 735 736 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue, 737 m_sock_item) { 738 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 739 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 740 continue; 741 wake = 1; 742 list_remove(&rs->rs_send_queue, rm); 743 list_insert_tail(&list, rm); 744 rdsv3_send_sndbuf_remove(rs, rm); 745 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 746 } 747 748 mutex_exit(&rs->rs_lock); 749 750 conn = NULL; 751 752 /* now remove the messages from the conn list as needed */ 753 RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) { 754 /* 755 * We do this here rather than in the loop above, so that 756 * we don't have to nest m_rs_lock under rs->rs_lock 757 */ 758 mutex_enter(&rm->m_rs_lock); 759 /* If this is a RDMA operation, notify the app. */ 760 __rdsv3_rdma_send_complete(rs, rm, RDSV3_RDMA_CANCELED); 761 rm->m_rs = NULL; 762 mutex_exit(&rm->m_rs_lock); 763 764 /* 765 * If we see this flag cleared then we're *sure* that someone 766 * else beat us to removing it from the conn. If we race 767 * with their flag update we'll get the lock and then really 768 * see that the flag has been cleared. 769 */ 770 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) 771 continue; 772 773 if (conn != rm->m_inc.i_conn) { 774 if (conn) 775 mutex_exit(&conn->c_lock); 776 conn = rm->m_inc.i_conn; 777 mutex_enter(&conn->c_lock); 778 } 779 780 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) { 781 list_remove_node(&rm->m_conn_item); 782 rdsv3_message_put(rm); 783 } 784 } 785 786 if (conn) 787 mutex_exit(&conn->c_lock); 788 789 if (wake) 790 rdsv3_wake_sk_sleep(rs); 791 792 while (!list_is_empty(&list)) { 793 rm = list_remove_head(&list); 794 795 rdsv3_message_wait(rm); 796 rdsv3_message_put(rm); 797 } 798 799 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs); 800 } 801 802 /* 803 * we only want this to fire once so we use the callers 'queued'. It's 804 * possible that another thread can race with us and remove the 805 * message from the flow with RDSV3_CANCEL_SENT_TO. 806 */ 807 static int 808 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn, 809 struct rdsv3_message *rm, uint16_be_t sport, 810 uint16_be_t dport, int *queued) 811 { 812 uint32_t len; 813 814 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm); 815 816 if (*queued) 817 goto out; 818 819 len = ntohl(rm->m_inc.i_hdr.h_len); 820 821 /* 822 * this is the only place which holds both the socket's rs_lock 823 * and the connection's c_lock 824 */ 825 mutex_enter(&rs->rs_lock); 826 827 /* 828 * If there is a little space in sndbuf, we don't queue anything, 829 * and userspace gets -EAGAIN. But poll() indicates there's send 830 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 831 * freed up by incoming acks. So we check the *old* value of 832 * rs_snd_bytes here to allow the last msg to exceed the buffer, 833 * and poll() now knows no more data can be sent. 834 */ 835 if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) { 836 rs->rs_snd_bytes += len; 837 838 /* 839 * let recv side know we are close to send space exhaustion. 840 * This is probably not the optimal way to do it, as this 841 * means we set the flag on *all* messages as soon as our 842 * throughput hits a certain threshold. 843 */ 844 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2) 845 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 846 847 list_insert_tail(&rs->rs_send_queue, rm); 848 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 849 850 rdsv3_message_addref(rm); 851 rm->m_rs = rs; 852 853 /* 854 * The code ordering is a little weird, but we're 855 * trying to minimize the time we hold c_lock 856 */ 857 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport, 858 dport, 0); 859 rm->m_inc.i_conn = conn; 860 rdsv3_message_addref(rm); /* XXX - called twice */ 861 862 mutex_enter(&conn->c_lock); 863 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++); 864 list_insert_tail(&conn->c_send_queue, rm); 865 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 866 mutex_exit(&conn->c_lock); 867 868 RDSV3_DPRINTF5("rdsv3_send_queue_rm", 869 "queued msg %p len %d, rs %p bytes %d seq %llu", 870 rm, len, rs, rs->rs_snd_bytes, 871 (unsigned long long)ntohll( 872 rm->m_inc.i_hdr.h_sequence)); 873 874 *queued = 1; 875 } 876 877 mutex_exit(&rs->rs_lock); 878 879 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs); 880 out: 881 return (*queued); 882 } 883 884 static int 885 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm, 886 struct msghdr *msg, int *allocated_mr) 887 { 888 struct cmsghdr *cmsg; 889 int ret = 0; 890 891 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs); 892 893 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 894 895 if (cmsg->cmsg_level != SOL_RDS) 896 continue; 897 898 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d", 899 cmsg, rm, cmsg->cmsg_type); 900 /* 901 * As a side effect, RDMA_DEST and RDMA_MAP will set 902 * rm->m_rdma_cookie and rm->m_rdma_mr. 903 */ 904 switch (cmsg->cmsg_type) { 905 case RDSV3_CMSG_RDMA_ARGS: 906 ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg); 907 break; 908 909 case RDSV3_CMSG_RDMA_DEST: 910 ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg); 911 break; 912 913 case RDSV3_CMSG_RDMA_MAP: 914 ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg); 915 if (ret) 916 *allocated_mr = 1; 917 break; 918 919 default: 920 return (-EINVAL); 921 } 922 923 if (ret) 924 break; 925 } 926 927 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs); 928 929 return (ret); 930 } 931 932 int 933 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg, 934 size_t payload_len) 935 { 936 struct rsock *sk = rdsv3_rs_to_sk(rs); 937 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 938 uint32_be_t daddr; 939 uint16_be_t dport; 940 struct rdsv3_message *rm = NULL; 941 struct rdsv3_connection *conn; 942 int ret = 0; 943 int queued = 0, allocated_mr = 0; 944 int nonblock = msg->msg_flags & MSG_DONTWAIT; 945 long timeo = rdsv3_sndtimeo(sk, nonblock); 946 947 RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs); 948 949 if (msg->msg_namelen) { 950 /* XXX fail non-unicast destination IPs? */ 951 if (msg->msg_namelen < sizeof (*usin) || 952 usin->sin_family != AF_INET_OFFLOAD) { 953 ret = -EINVAL; 954 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 955 goto out; 956 } 957 daddr = usin->sin_addr.s_addr; 958 dport = usin->sin_port; 959 } else { 960 /* We only care about consistency with ->connect() */ 961 mutex_enter(&sk->sk_lock); 962 daddr = rs->rs_conn_addr; 963 dport = rs->rs_conn_port; 964 mutex_exit(&sk->sk_lock); 965 } 966 967 /* racing with another thread binding seems ok here */ 968 if (daddr == 0 || rs->rs_bound_addr == 0) { 969 ret = -ENOTCONN; /* XXX not a great errno */ 970 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 971 goto out; 972 } 973 974 rm = rdsv3_message_copy_from_user(uio, payload_len); 975 if (IS_ERR(rm)) { 976 ret = PTR_ERR(rm); 977 RDSV3_DPRINTF2("rdsv3_sendmsg", 978 "rdsv3_message_copy_from_user failed %d", -ret); 979 rm = NULL; 980 goto out; 981 } 982 983 rm->m_daddr = daddr; 984 985 /* Parse any control messages the user may have included. */ 986 ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr); 987 if (ret) { 988 RDSV3_DPRINTF2("rdsv3_sendmsg", 989 "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d", 990 rs, rm, msg, ret); 991 goto out; 992 } 993 994 /* 995 * rdsv3_conn_create has a spinlock that runs with IRQ off. 996 * Caching the conn in the socket helps a lot. 997 */ 998 mutex_enter(&rs->rs_conn_lock); 999 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) { 1000 conn = rs->rs_conn; 1001 } else { 1002 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr, 1003 daddr, rs->rs_transport, KM_NOSLEEP); 1004 if (IS_ERR(conn)) { 1005 mutex_exit(&rs->rs_conn_lock); 1006 ret = PTR_ERR(conn); 1007 RDSV3_DPRINTF2("rdsv3_sendmsg", 1008 "rdsv3_conn_create_outgoing failed %d", 1009 -ret); 1010 goto out; 1011 } 1012 rs->rs_conn = conn; 1013 } 1014 mutex_exit(&rs->rs_conn_lock); 1015 1016 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1017 conn->c_trans->xmit_rdma == NULL) { 1018 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p", 1019 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1020 ret = -EOPNOTSUPP; 1021 goto out; 1022 } 1023 1024 /* 1025 * If the connection is down, trigger a connect. We may 1026 * have scheduled a delayed reconnect however - in this case 1027 * we should not interfere. 1028 */ 1029 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1030 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1031 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1032 1033 ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs); 1034 if (ret) { 1035 mutex_enter(&rs->rs_congested_lock); 1036 rs->rs_seen_congestion = 1; 1037 cv_signal(&rs->rs_congested_cv); 1038 mutex_exit(&rs->rs_congested_lock); 1039 1040 RDSV3_DPRINTF2("rdsv3_sendmsg", 1041 "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret); 1042 goto out; 1043 } 1044 1045 (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, 1046 &queued); 1047 if (!queued) { 1048 /* rdsv3_stats_inc(s_send_queue_full); */ 1049 /* XXX make sure this is reasonable */ 1050 if (payload_len > rdsv3_sk_sndbuf(rs)) { 1051 ret = -EMSGSIZE; 1052 RDSV3_DPRINTF2("rdsv3_sendmsg", 1053 "msgsize(%d) too big, returning: %d", 1054 payload_len, -ret); 1055 goto out; 1056 } 1057 if (nonblock) { 1058 ret = -EAGAIN; 1059 RDSV3_DPRINTF3("rdsv3_sendmsg", 1060 "send queue full (%d), returning: %d", 1061 payload_len, -ret); 1062 goto out; 1063 } 1064 1065 #if 0 1066 ret = rdsv3_wait_sig(sk->sk_sleep, 1067 (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1068 dport, &queued))); 1069 if (ret == 0) { 1070 /* signal/timeout pending */ 1071 RDSV3_DPRINTF2("rdsv3_sendmsg", 1072 "woke due to signal: %d", ret); 1073 ret = -ERESTART; 1074 goto out; 1075 } 1076 #else 1077 mutex_enter(&sk->sk_sleep->waitq_mutex); 1078 sk->sk_sleep->waitq_waiters++; 1079 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1080 dport, &queued)) { 1081 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv, 1082 &sk->sk_sleep->waitq_mutex); 1083 if (ret == 0) { 1084 /* signal/timeout pending */ 1085 RDSV3_DPRINTF2("rdsv3_sendmsg", 1086 "woke due to signal: %d", ret); 1087 ret = -ERESTART; 1088 sk->sk_sleep->waitq_waiters--; 1089 mutex_exit(&sk->sk_sleep->waitq_mutex); 1090 goto out; 1091 } 1092 } 1093 sk->sk_sleep->waitq_waiters--; 1094 mutex_exit(&sk->sk_sleep->waitq_mutex); 1095 #endif 1096 1097 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d", 1098 queued); 1099 1100 ASSERT(queued); 1101 ret = 0; 1102 } 1103 1104 /* 1105 * By now we've committed to the send. We reuse rdsv3_send_worker() 1106 * to retry sends in the rds thread if the transport asks us to. 1107 */ 1108 rdsv3_stats_inc(s_send_queued); 1109 1110 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1111 (void) rdsv3_send_xmit(conn); 1112 1113 rdsv3_message_put(rm); 1114 RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)", 1115 rs, payload_len); 1116 return (payload_len); 1117 1118 out: 1119 /* 1120 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1121 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1122 * or in any other way, we need to destroy the MR again 1123 */ 1124 if (allocated_mr) 1125 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 1126 1); 1127 1128 if (rm) 1129 rdsv3_message_put(rm); 1130 return (ret); 1131 } 1132 1133 /* 1134 * Reply to a ping packet. 1135 */ 1136 int 1137 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport) 1138 { 1139 struct rdsv3_message *rm; 1140 int ret = 0; 1141 1142 RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn); 1143 1144 rm = rdsv3_message_alloc(0, KM_NOSLEEP); 1145 if (!rm) { 1146 ret = -ENOMEM; 1147 goto out; 1148 } 1149 1150 rm->m_daddr = conn->c_faddr; 1151 1152 /* 1153 * If the connection is down, trigger a connect. We may 1154 * have scheduled a delayed reconnect however - in this case 1155 * we should not interfere. 1156 */ 1157 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1158 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1159 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1160 1161 ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL); 1162 if (ret) 1163 goto out; 1164 1165 mutex_enter(&conn->c_lock); 1166 list_insert_tail(&conn->c_send_queue, rm); 1167 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 1168 rdsv3_message_addref(rm); 1169 rm->m_inc.i_conn = conn; 1170 1171 rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1172 conn->c_next_tx_seq); 1173 conn->c_next_tx_seq++; 1174 mutex_exit(&conn->c_lock); 1175 1176 rdsv3_stats_inc(s_send_queued); 1177 rdsv3_stats_inc(s_send_pong); 1178 1179 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1180 (void) rdsv3_send_xmit(conn); 1181 1182 rdsv3_message_put(rm); 1183 1184 RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn); 1185 return (0); 1186 1187 out: 1188 if (rm) 1189 rdsv3_message_put(rm); 1190 return (ret); 1191 } 1192