1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 * Copyright 2018 Joyent, Inc. 26 */ 27 28 /* 29 * STREAMS Buffering module 30 * 31 * This streams module collects incoming messages from modules below 32 * it on the stream and buffers them up into a smaller number of 33 * aggregated messages. Its main purpose is to reduce overhead by 34 * cutting down on the number of read (or getmsg) calls its client 35 * user process makes. 36 * - only M_DATA is buffered. 37 * - multithreading assumes configured as D_MTQPAIR 38 * - packets are lost only if flag SB_NO_HEADER is clear and buffer 39 * allocation fails. 40 * - in order message transmission. This is enforced for messages other 41 * than high priority messages. 42 * - zero length messages on the read side are not passed up the 43 * stream but used internally for synchronization. 44 * FLAGS: 45 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA. 46 * (conversion is the default for backwards compatibility 47 * hence the negative logic). 48 * - SB_NO_HEADER - no headers in buffered data. 49 * (adding headers is the default for backwards compatibility 50 * hence the negative logic). 51 * - SB_DEFER_CHUNK - provides improved response time in question-answer 52 * applications. Buffering is not enabled until the second message 53 * is received on the read side within the sb_ticks interval. 54 * This option will often be used in combination with flag SB_SEND_ON_WRITE. 55 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read 56 * data being immediately sent upstream. 57 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates 58 * the blocked flow condition downstream. If this flag is clear (default) 59 * messages will be dropped if the upstream flow is blocked. 60 */ 61 62 63 #include <sys/types.h> 64 #include <sys/errno.h> 65 #include <sys/debug.h> 66 #include <sys/stropts.h> 67 #include <sys/time.h> 68 #include <sys/stream.h> 69 #include <sys/conf.h> 70 #include <sys/ddi.h> 71 #include <sys/sunddi.h> 72 #include <sys/kmem.h> 73 #include <sys/strsun.h> 74 #include <sys/bufmod.h> 75 #include <sys/modctl.h> 76 #include <sys/isa_defs.h> 77 78 /* 79 * Per-Stream state information. 80 * 81 * If sb_ticks is negative, we don't deliver chunks until they're 82 * full. If it's zero, we deliver every packet as it arrives. (In 83 * this case we force sb_chunk to zero, to make the implementation 84 * easier.) Otherwise, sb_ticks gives the number of ticks in a 85 * buffering interval. The interval begins when the a read side data 86 * message is received and a timeout is not active. If sb_snap is 87 * zero, no truncation of the msg is done. 88 */ 89 struct sb { 90 queue_t *sb_rq; /* our rq */ 91 mblk_t *sb_mp; /* partial chunk */ 92 mblk_t *sb_head; /* pre-allocated space for the next header */ 93 mblk_t *sb_tail; /* first mblk of last message appended */ 94 uint_t sb_mlen; /* sb_mp length */ 95 uint_t sb_mcount; /* input msg count in sb_mp */ 96 uint_t sb_chunk; /* max chunk size */ 97 clock_t sb_ticks; /* timeout interval */ 98 timeout_id_t sb_timeoutid; /* qtimeout() id */ 99 uint_t sb_drops; /* cumulative # discarded msgs */ 100 uint_t sb_snap; /* snapshot length */ 101 uint_t sb_flags; /* flags field */ 102 uint_t sb_state; /* state variable */ 103 }; 104 105 /* 106 * Function prototypes. 107 */ 108 static int sbopen(queue_t *, dev_t *, int, int, cred_t *); 109 static int sbclose(queue_t *, int, cred_t *); 110 static int sbwput(queue_t *, mblk_t *); 111 static int sbrput(queue_t *, mblk_t *); 112 static int sbrsrv(queue_t *); 113 static void sbioctl(queue_t *, mblk_t *); 114 static void sbaddmsg(queue_t *, mblk_t *); 115 static void sbtick(void *); 116 static void sbclosechunk(struct sb *); 117 static void sbsendit(queue_t *, mblk_t *); 118 119 static struct module_info sb_minfo = { 120 21, /* mi_idnum */ 121 "bufmod", /* mi_idname */ 122 0, /* mi_minpsz */ 123 INFPSZ, /* mi_maxpsz */ 124 1, /* mi_hiwat */ 125 0 /* mi_lowat */ 126 }; 127 128 static struct qinit sb_rinit = { 129 sbrput, /* qi_putp */ 130 sbrsrv, /* qi_srvp */ 131 sbopen, /* qi_qopen */ 132 sbclose, /* qi_qclose */ 133 NULL, /* qi_qadmin */ 134 &sb_minfo, /* qi_minfo */ 135 NULL /* qi_mstat */ 136 }; 137 138 static struct qinit sb_winit = { 139 sbwput, /* qi_putp */ 140 NULL, /* qi_srvp */ 141 NULL, /* qi_qopen */ 142 NULL, /* qi_qclose */ 143 NULL, /* qi_qadmin */ 144 &sb_minfo, /* qi_minfo */ 145 NULL /* qi_mstat */ 146 }; 147 148 static struct streamtab sb_info = { 149 &sb_rinit, /* st_rdinit */ 150 &sb_winit, /* st_wrinit */ 151 NULL, /* st_muxrinit */ 152 NULL /* st_muxwinit */ 153 }; 154 155 156 /* 157 * This is the loadable module wrapper. 158 */ 159 160 static struct fmodsw fsw = { 161 "bufmod", 162 &sb_info, 163 D_MTQPAIR | D_MP 164 }; 165 166 /* 167 * Module linkage information for the kernel. 168 */ 169 170 static struct modlstrmod modlstrmod = { 171 &mod_strmodops, "streams buffer mod", &fsw 172 }; 173 174 static struct modlinkage modlinkage = { 175 MODREV_1, &modlstrmod, NULL 176 }; 177 178 179 int 180 _init(void) 181 { 182 return (mod_install(&modlinkage)); 183 } 184 185 int 186 _fini(void) 187 { 188 return (mod_remove(&modlinkage)); 189 } 190 191 int 192 _info(struct modinfo *modinfop) 193 { 194 return (mod_info(&modlinkage, modinfop)); 195 } 196 197 198 /* ARGSUSED */ 199 static int 200 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp) 201 { 202 struct sb *sbp; 203 ASSERT(rq); 204 205 if (sflag != MODOPEN) 206 return (EINVAL); 207 208 if (rq->q_ptr) 209 return (0); 210 211 /* 212 * Allocate and initialize per-Stream structure. 213 */ 214 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP); 215 sbp->sb_rq = rq; 216 sbp->sb_ticks = -1; 217 sbp->sb_chunk = SB_DFLT_CHUNK; 218 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 219 sbp->sb_mlen = 0; 220 sbp->sb_mcount = 0; 221 sbp->sb_timeoutid = 0; 222 sbp->sb_drops = 0; 223 sbp->sb_snap = 0; 224 sbp->sb_flags = 0; 225 sbp->sb_state = 0; 226 227 rq->q_ptr = WR(rq)->q_ptr = sbp; 228 229 qprocson(rq); 230 231 232 return (0); 233 } 234 235 /* ARGSUSED1 */ 236 static int 237 sbclose(queue_t *rq, int flag, cred_t *credp) 238 { 239 struct sb *sbp = (struct sb *)rq->q_ptr; 240 241 ASSERT(sbp); 242 243 qprocsoff(rq); 244 /* 245 * Cancel an outstanding timeout 246 */ 247 if (sbp->sb_timeoutid != 0) { 248 (void) quntimeout(rq, sbp->sb_timeoutid); 249 sbp->sb_timeoutid = 0; 250 } 251 /* 252 * Free the current chunk. 253 */ 254 if (sbp->sb_mp) { 255 freemsg(sbp->sb_mp); 256 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 257 sbp->sb_mlen = 0; 258 } 259 260 /* 261 * Free the per-Stream structure. 262 */ 263 kmem_free((caddr_t)sbp, sizeof (struct sb)); 264 rq->q_ptr = WR(rq)->q_ptr = NULL; 265 266 return (0); 267 } 268 269 /* 270 * the correction factor is introduced to compensate for 271 * whatever assumptions the modules below have made about 272 * how much traffic is flowing through the stream and the fact 273 * that bufmod may be snipping messages with the sb_snap length. 274 */ 275 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512) 276 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256) 277 278 279 static void 280 sbioc(queue_t *wq, mblk_t *mp) 281 { 282 struct iocblk *iocp; 283 struct sb *sbp = (struct sb *)wq->q_ptr; 284 clock_t ticks; 285 mblk_t *mop; 286 287 iocp = (struct iocblk *)mp->b_rptr; 288 289 switch (iocp->ioc_cmd) { 290 case SBIOCGCHUNK: 291 case SBIOCGSNAP: 292 case SBIOCGFLAGS: 293 case SBIOCGTIME: 294 miocack(wq, mp, 0, 0); 295 return; 296 297 case SBIOCSTIME: 298 #ifdef _SYSCALL32_IMPL 299 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 300 struct timeval32 *t32; 301 302 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 303 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 304 miocnak(wq, mp, 0, EINVAL); 305 break; 306 } 307 ticks = TIMEVAL_TO_TICK(t32); 308 } else 309 #endif /* _SYSCALL32_IMPL */ 310 { 311 struct timeval *tb; 312 313 tb = (struct timeval *)mp->b_cont->b_rptr; 314 315 if (tb->tv_sec < 0 || tb->tv_usec < 0) { 316 miocnak(wq, mp, 0, EINVAL); 317 break; 318 } 319 ticks = TIMEVAL_TO_TICK(tb); 320 } 321 sbp->sb_ticks = ticks; 322 if (ticks == 0) 323 sbp->sb_chunk = 0; 324 miocack(wq, mp, 0, 0); 325 sbclosechunk(sbp); 326 return; 327 328 case SBIOCSCHUNK: 329 /* 330 * set up hi/lo water marks on stream head read queue. 331 * unlikely to run out of resources. Fix at later date. 332 */ 333 if ((mop = allocb(sizeof (struct stroptions), 334 BPRI_MED)) != NULL) { 335 struct stroptions *sop; 336 uint_t chunk; 337 338 chunk = *(uint_t *)mp->b_cont->b_rptr; 339 mop->b_datap->db_type = M_SETOPTS; 340 mop->b_wptr += sizeof (struct stroptions); 341 sop = (struct stroptions *)mop->b_rptr; 342 sop->so_flags = SO_HIWAT | SO_LOWAT; 343 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 344 sop->so_lowat = SNIT_LOWAT(chunk, 1); 345 qreply(wq, mop); 346 } 347 348 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 349 miocack(wq, mp, 0, 0); 350 sbclosechunk(sbp); 351 return; 352 353 case SBIOCSFLAGS: 354 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 355 miocack(wq, mp, 0, 0); 356 return; 357 358 case SBIOCSSNAP: 359 /* 360 * if chunking dont worry about effects of 361 * snipping of message size on head flow control 362 * since it has a relatively small bearing on the 363 * data rate onto the streamn head. 364 */ 365 if (!sbp->sb_chunk) { 366 /* 367 * set up hi/lo water marks on stream head read queue. 368 * unlikely to run out of resources. Fix at later date. 369 */ 370 if ((mop = allocb(sizeof (struct stroptions), 371 BPRI_MED)) != NULL) { 372 struct stroptions *sop; 373 uint_t snap; 374 int fudge; 375 376 snap = *(uint_t *)mp->b_cont->b_rptr; 377 mop->b_datap->db_type = M_SETOPTS; 378 mop->b_wptr += sizeof (struct stroptions); 379 sop = (struct stroptions *)mop->b_rptr; 380 sop->so_flags = SO_HIWAT | SO_LOWAT; 381 fudge = snap <= 100 ? 4 : 382 snap <= 400 ? 2 : 383 1; 384 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 385 sop->so_lowat = SNIT_LOWAT(snap, fudge); 386 qreply(wq, mop); 387 } 388 } 389 390 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 391 miocack(wq, mp, 0, 0); 392 return; 393 394 default: 395 ASSERT(0); 396 return; 397 } 398 } 399 400 /* 401 * Write-side put procedure. Its main task is to detect ioctls 402 * for manipulating the buffering state and hand them to sbioctl. 403 * Other message types are passed on through. 404 */ 405 static int 406 sbwput(queue_t *wq, mblk_t *mp) 407 { 408 struct sb *sbp = (struct sb *)wq->q_ptr; 409 struct copyresp *resp; 410 411 if (sbp->sb_flags & SB_SEND_ON_WRITE) 412 sbclosechunk(sbp); 413 switch (mp->b_datap->db_type) { 414 case M_IOCTL: 415 sbioctl(wq, mp); 416 break; 417 418 case M_IOCDATA: 419 resp = (struct copyresp *)mp->b_rptr; 420 if (resp->cp_rval) { 421 /* 422 * Just free message on failure. 423 */ 424 freemsg(mp); 425 break; 426 } 427 428 switch (resp->cp_cmd) { 429 case SBIOCSTIME: 430 case SBIOCSCHUNK: 431 case SBIOCSFLAGS: 432 case SBIOCSSNAP: 433 case SBIOCGTIME: 434 case SBIOCGCHUNK: 435 case SBIOCGSNAP: 436 case SBIOCGFLAGS: 437 sbioc(wq, mp); 438 break; 439 440 default: 441 putnext(wq, mp); 442 break; 443 } 444 break; 445 446 default: 447 putnext(wq, mp); 448 break; 449 } 450 return (0); 451 } 452 453 /* 454 * Read-side put procedure. It's responsible for buffering up incoming 455 * messages and grouping them into aggregates according to the current 456 * buffering parameters. 457 */ 458 static int 459 sbrput(queue_t *rq, mblk_t *mp) 460 { 461 struct sb *sbp = (struct sb *)rq->q_ptr; 462 463 ASSERT(sbp); 464 465 switch (mp->b_datap->db_type) { 466 case M_PROTO: 467 if (sbp->sb_flags & SB_NO_PROTO_CVT) { 468 sbclosechunk(sbp); 469 sbsendit(rq, mp); 470 break; 471 } else { 472 /* 473 * Convert M_PROTO to M_DATA. 474 */ 475 mp->b_datap->db_type = M_DATA; 476 } 477 /* FALLTHRU */ 478 479 case M_DATA: 480 if ((sbp->sb_flags & SB_DEFER_CHUNK) && 481 !(sbp->sb_state & SB_FRCVD)) { 482 sbclosechunk(sbp); 483 sbsendit(rq, mp); 484 sbp->sb_state |= SB_FRCVD; 485 } else 486 sbaddmsg(rq, mp); 487 488 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid)) 489 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick, 490 sbp, sbp->sb_ticks); 491 492 break; 493 494 case M_FLUSH: 495 if (*mp->b_rptr & FLUSHR) { 496 /* 497 * Reset timeout, flush the chunk currently in 498 * progress, and start a new chunk. 499 */ 500 if (sbp->sb_timeoutid) { 501 (void) quntimeout(sbp->sb_rq, 502 sbp->sb_timeoutid); 503 sbp->sb_timeoutid = 0; 504 } 505 if (sbp->sb_mp) { 506 freemsg(sbp->sb_mp); 507 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 508 sbp->sb_mlen = 0; 509 sbp->sb_mcount = 0; 510 } 511 flushq(rq, FLUSHALL); 512 } 513 putnext(rq, mp); 514 break; 515 516 case M_CTL: 517 /* 518 * Zero-length M_CTL means our timeout() popped. 519 */ 520 if (MBLKL(mp) == 0) { 521 freemsg(mp); 522 sbclosechunk(sbp); 523 } else { 524 sbclosechunk(sbp); 525 sbsendit(rq, mp); 526 } 527 break; 528 529 default: 530 if (mp->b_datap->db_type <= QPCTL) { 531 sbclosechunk(sbp); 532 sbsendit(rq, mp); 533 } else { 534 /* Note: out of band */ 535 putnext(rq, mp); 536 } 537 break; 538 } 539 return (0); 540 } 541 542 /* 543 * read service procedure. 544 */ 545 /* ARGSUSED */ 546 static int 547 sbrsrv(queue_t *rq) 548 { 549 mblk_t *mp; 550 551 /* 552 * High priority messages shouldn't get here but if 553 * one does, jam it through to avoid infinite loop. 554 */ 555 while ((mp = getq(rq)) != NULL) { 556 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) { 557 /* should only get here if SB_NO_SROPS */ 558 (void) putbq(rq, mp); 559 return (0); 560 } 561 putnext(rq, mp); 562 } 563 return (0); 564 } 565 566 /* 567 * Handle write-side M_IOCTL messages. 568 */ 569 static void 570 sbioctl(queue_t *wq, mblk_t *mp) 571 { 572 struct sb *sbp = (struct sb *)wq->q_ptr; 573 struct iocblk *iocp = (struct iocblk *)mp->b_rptr; 574 struct timeval *t; 575 clock_t ticks; 576 mblk_t *mop; 577 int transparent = iocp->ioc_count; 578 mblk_t *datamp; 579 int error; 580 581 switch (iocp->ioc_cmd) { 582 case SBIOCSTIME: 583 if (iocp->ioc_count == TRANSPARENT) { 584 #ifdef _SYSCALL32_IMPL 585 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 586 mcopyin(mp, NULL, sizeof (struct timeval32), 587 NULL); 588 } else 589 #endif /* _SYSCALL32_IMPL */ 590 { 591 mcopyin(mp, NULL, sizeof (*t), NULL); 592 } 593 qreply(wq, mp); 594 } else { 595 /* 596 * Verify argument length. 597 */ 598 #ifdef _SYSCALL32_IMPL 599 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 600 struct timeval32 *t32; 601 602 error = miocpullup(mp, 603 sizeof (struct timeval32)); 604 if (error != 0) { 605 miocnak(wq, mp, 0, error); 606 break; 607 } 608 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 609 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 610 miocnak(wq, mp, 0, EINVAL); 611 break; 612 } 613 ticks = TIMEVAL_TO_TICK(t32); 614 } else 615 #endif /* _SYSCALL32_IMPL */ 616 { 617 error = miocpullup(mp, sizeof (struct timeval)); 618 if (error != 0) { 619 miocnak(wq, mp, 0, error); 620 break; 621 } 622 623 t = (struct timeval *)mp->b_cont->b_rptr; 624 if (t->tv_sec < 0 || t->tv_usec < 0) { 625 miocnak(wq, mp, 0, EINVAL); 626 break; 627 } 628 ticks = TIMEVAL_TO_TICK(t); 629 } 630 sbp->sb_ticks = ticks; 631 if (ticks == 0) 632 sbp->sb_chunk = 0; 633 miocack(wq, mp, 0, 0); 634 sbclosechunk(sbp); 635 } 636 break; 637 638 case SBIOCGTIME: { 639 struct timeval *t; 640 641 /* 642 * Verify argument length. 643 */ 644 if (transparent != TRANSPARENT) { 645 #ifdef _SYSCALL32_IMPL 646 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 647 error = miocpullup(mp, 648 sizeof (struct timeval32)); 649 if (error != 0) { 650 miocnak(wq, mp, 0, error); 651 break; 652 } 653 } else 654 #endif /* _SYSCALL32_IMPL */ 655 error = miocpullup(mp, sizeof (struct timeval)); 656 if (error != 0) { 657 miocnak(wq, mp, 0, error); 658 break; 659 } 660 } 661 662 /* 663 * If infinite timeout, return range error 664 * for the ioctl. 665 */ 666 if (sbp->sb_ticks < 0) { 667 miocnak(wq, mp, 0, ERANGE); 668 break; 669 } 670 671 #ifdef _SYSCALL32_IMPL 672 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 673 struct timeval32 *t32; 674 675 if (transparent == TRANSPARENT) { 676 datamp = allocb(sizeof (*t32), BPRI_MED); 677 if (datamp == NULL) { 678 miocnak(wq, mp, 0, EAGAIN); 679 break; 680 } 681 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp); 682 } 683 684 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 685 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32); 686 687 if (transparent == TRANSPARENT) 688 qreply(wq, mp); 689 else 690 miocack(wq, mp, sizeof (*t32), 0); 691 } else 692 #endif /* _SYSCALL32_IMPL */ 693 { 694 if (transparent == TRANSPARENT) { 695 datamp = allocb(sizeof (*t), BPRI_MED); 696 if (datamp == NULL) { 697 miocnak(wq, mp, 0, EAGAIN); 698 break; 699 } 700 mcopyout(mp, NULL, sizeof (*t), NULL, datamp); 701 } 702 703 t = (struct timeval *)mp->b_cont->b_rptr; 704 TICK_TO_TIMEVAL(sbp->sb_ticks, t); 705 706 if (transparent == TRANSPARENT) 707 qreply(wq, mp); 708 else 709 miocack(wq, mp, sizeof (*t), 0); 710 } 711 break; 712 } 713 714 case SBIOCCTIME: 715 sbp->sb_ticks = -1; 716 miocack(wq, mp, 0, 0); 717 break; 718 719 case SBIOCSCHUNK: 720 if (iocp->ioc_count == TRANSPARENT) { 721 mcopyin(mp, NULL, sizeof (uint_t), NULL); 722 qreply(wq, mp); 723 } else { 724 /* 725 * Verify argument length. 726 */ 727 error = miocpullup(mp, sizeof (uint_t)); 728 if (error != 0) { 729 miocnak(wq, mp, 0, error); 730 break; 731 } 732 733 /* 734 * set up hi/lo water marks on stream head read queue. 735 * unlikely to run out of resources. Fix at later date. 736 */ 737 if ((mop = allocb(sizeof (struct stroptions), 738 BPRI_MED)) != NULL) { 739 struct stroptions *sop; 740 uint_t chunk; 741 742 chunk = *(uint_t *)mp->b_cont->b_rptr; 743 mop->b_datap->db_type = M_SETOPTS; 744 mop->b_wptr += sizeof (struct stroptions); 745 sop = (struct stroptions *)mop->b_rptr; 746 sop->so_flags = SO_HIWAT | SO_LOWAT; 747 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 748 sop->so_lowat = SNIT_LOWAT(chunk, 1); 749 qreply(wq, mop); 750 } 751 752 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 753 miocack(wq, mp, 0, 0); 754 sbclosechunk(sbp); 755 } 756 break; 757 758 case SBIOCGCHUNK: 759 /* 760 * Verify argument length. 761 */ 762 if (transparent != TRANSPARENT) { 763 error = miocpullup(mp, sizeof (uint_t)); 764 if (error != 0) { 765 miocnak(wq, mp, 0, error); 766 break; 767 } 768 } 769 770 if (transparent == TRANSPARENT) { 771 datamp = allocb(sizeof (uint_t), BPRI_MED); 772 if (datamp == NULL) { 773 miocnak(wq, mp, 0, EAGAIN); 774 break; 775 } 776 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 777 } 778 779 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk; 780 781 if (transparent == TRANSPARENT) 782 qreply(wq, mp); 783 else 784 miocack(wq, mp, sizeof (uint_t), 0); 785 break; 786 787 case SBIOCSSNAP: 788 if (iocp->ioc_count == TRANSPARENT) { 789 mcopyin(mp, NULL, sizeof (uint_t), NULL); 790 qreply(wq, mp); 791 } else { 792 /* 793 * Verify argument length. 794 */ 795 error = miocpullup(mp, sizeof (uint_t)); 796 if (error != 0) { 797 miocnak(wq, mp, 0, error); 798 break; 799 } 800 801 /* 802 * if chunking dont worry about effects of 803 * snipping of message size on head flow control 804 * since it has a relatively small bearing on the 805 * data rate onto the streamn head. 806 */ 807 if (!sbp->sb_chunk) { 808 /* 809 * set up hi/lo water marks on stream 810 * head read queue. unlikely to run out 811 * of resources. Fix at later date. 812 */ 813 if ((mop = allocb(sizeof (struct stroptions), 814 BPRI_MED)) != NULL) { 815 struct stroptions *sop; 816 uint_t snap; 817 int fudge; 818 819 snap = *(uint_t *)mp->b_cont->b_rptr; 820 mop->b_datap->db_type = M_SETOPTS; 821 mop->b_wptr += sizeof (*sop); 822 sop = (struct stroptions *)mop->b_rptr; 823 sop->so_flags = SO_HIWAT | SO_LOWAT; 824 fudge = (snap <= 100) ? 4 : 825 (snap <= 400) ? 2 : 1; 826 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 827 sop->so_lowat = SNIT_LOWAT(snap, fudge); 828 qreply(wq, mop); 829 } 830 } 831 832 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 833 834 miocack(wq, mp, 0, 0); 835 } 836 break; 837 838 case SBIOCGSNAP: 839 /* 840 * Verify argument length 841 */ 842 if (transparent != TRANSPARENT) { 843 error = miocpullup(mp, sizeof (uint_t)); 844 if (error != 0) { 845 miocnak(wq, mp, 0, error); 846 break; 847 } 848 } 849 850 if (transparent == TRANSPARENT) { 851 datamp = allocb(sizeof (uint_t), BPRI_MED); 852 if (datamp == NULL) { 853 miocnak(wq, mp, 0, EAGAIN); 854 break; 855 } 856 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 857 } 858 859 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap; 860 861 if (transparent == TRANSPARENT) 862 qreply(wq, mp); 863 else 864 miocack(wq, mp, sizeof (uint_t), 0); 865 break; 866 867 case SBIOCSFLAGS: 868 /* 869 * set the flags. 870 */ 871 if (iocp->ioc_count == TRANSPARENT) { 872 mcopyin(mp, NULL, sizeof (uint_t), NULL); 873 qreply(wq, mp); 874 } else { 875 error = miocpullup(mp, sizeof (uint_t)); 876 if (error != 0) { 877 miocnak(wq, mp, 0, error); 878 break; 879 } 880 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 881 miocack(wq, mp, 0, 0); 882 } 883 break; 884 885 case SBIOCGFLAGS: 886 /* 887 * Verify argument length 888 */ 889 if (transparent != TRANSPARENT) { 890 error = miocpullup(mp, sizeof (uint_t)); 891 if (error != 0) { 892 miocnak(wq, mp, 0, error); 893 break; 894 } 895 } 896 897 if (transparent == TRANSPARENT) { 898 datamp = allocb(sizeof (uint_t), BPRI_MED); 899 if (datamp == NULL) { 900 miocnak(wq, mp, 0, EAGAIN); 901 break; 902 } 903 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 904 } 905 906 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags; 907 908 if (transparent == TRANSPARENT) 909 qreply(wq, mp); 910 else 911 miocack(wq, mp, sizeof (uint_t), 0); 912 break; 913 914 915 default: 916 putnext(wq, mp); 917 break; 918 } 919 } 920 921 /* 922 * Given a length l, calculate the amount of extra storage 923 * required to round it up to the next multiple of the alignment a. 924 */ 925 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0) 926 /* 927 * Calculate additional amount of space required for alignment. 928 */ 929 #define Align(l) RoundUpAmt(l, sizeof (ulong_t)) 930 /* 931 * Smallest possible message size when headers are enabled. 932 * This is used to calculate whether a chunk is nearly full. 933 */ 934 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT 935 936 /* 937 * Process a read-side M_DATA message. 938 * 939 * If the currently accumulating chunk doesn't have enough room 940 * for the message, close off the chunk, pass it upward, and start 941 * a new one. Then add the message to the current chunk, taking 942 * account of the possibility that the message's size exceeds the 943 * chunk size. 944 * 945 * If headers are enabled add an sb_hdr header and trailing alignment padding. 946 * 947 * To optimise performance the total number of msgbs should be kept 948 * to a minimum. This is achieved by using any remaining space in message N 949 * for both its own padding as well as the header of message N+1 if possible. 950 * If there's insufficient space we allocate one message to hold this 'wrapper'. 951 * (there's likely to be space beyond message N, since allocb would have 952 * rounded up the required size to one of the dblk_sizes). 953 * 954 */ 955 static void 956 sbaddmsg(queue_t *rq, mblk_t *mp) 957 { 958 struct sb *sbp; 959 struct timeval t; 960 struct sb_hdr hp; 961 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */ 962 mblk_t *last; /* last mblk of current message */ 963 size_t wrapperlen; /* length of header + padding */ 964 size_t origlen; /* data length before truncation */ 965 size_t pad; /* bytes required to align header */ 966 967 sbp = (struct sb *)rq->q_ptr; 968 969 origlen = msgdsize(mp); 970 971 /* 972 * Truncate the message. 973 */ 974 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) && 975 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1)) 976 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap; 977 else 978 hp.sbh_totlen = hp.sbh_msglen = origlen; 979 980 if (sbp->sb_flags & SB_NO_HEADER) { 981 982 /* 983 * Would the inclusion of this message overflow the current 984 * chunk? If so close the chunk off and start a new one. 985 */ 986 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk) 987 sbclosechunk(sbp); 988 /* 989 * First message too big for chunk - just send it up. 990 * This will always be true when we're not chunking. 991 */ 992 if (hp.sbh_totlen > sbp->sb_chunk) { 993 sbsendit(rq, mp); 994 return; 995 } 996 997 /* 998 * We now know that the msg will fit in the chunk. 999 * Link it onto the end of the chunk. 1000 * Since linkb() walks the entire chain, we keep a pointer to 1001 * the first mblk of the last msgb added and call linkb on that 1002 * that last message, rather than performing the 1003 * O(n) linkb() operation on the whole chain. 1004 * sb_head isn't needed in this SB_NO_HEADER mode. 1005 */ 1006 if (sbp->sb_mp) 1007 linkb(sbp->sb_tail, mp); 1008 else 1009 sbp->sb_mp = mp; 1010 1011 sbp->sb_tail = mp; 1012 sbp->sb_mlen += hp.sbh_totlen; 1013 sbp->sb_mcount++; 1014 } else { 1015 /* Timestamp must be done immediately */ 1016 uniqtime(&t); 1017 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t); 1018 1019 pad = Align(hp.sbh_totlen); 1020 hp.sbh_totlen += sizeof (hp); 1021 1022 /* We can't fit this message on the current chunk. */ 1023 if ((sbp->sb_mlen + hp.sbh_totlen) > sbp->sb_chunk) 1024 sbclosechunk(sbp); 1025 1026 /* 1027 * If we closed it (just now or during a previous 1028 * call) then allocate the head of a new chunk. 1029 */ 1030 if (sbp->sb_head == NULL) { 1031 /* Allocate leading header of new chunk */ 1032 sbp->sb_head = allocb(sizeof (hp), BPRI_MED); 1033 if (sbp->sb_head == NULL) { 1034 /* 1035 * Memory allocation failure. 1036 * This will need to be revisited 1037 * since using certain flag combinations 1038 * can result in messages being dropped 1039 * silently. 1040 */ 1041 freemsg(mp); 1042 sbp->sb_drops++; 1043 return; 1044 } 1045 sbp->sb_mp = sbp->sb_head; 1046 } 1047 1048 /* 1049 * Set the header values and join the message to the 1050 * chunk. The header values are copied into the chunk 1051 * after we adjust for padding below. 1052 */ 1053 hp.sbh_drops = sbp->sb_drops; 1054 hp.sbh_origlen = origlen; 1055 linkb(sbp->sb_head, mp); 1056 sbp->sb_mcount++; 1057 sbp->sb_mlen += hp.sbh_totlen; 1058 1059 /* 1060 * There's no chance to fit another message on the 1061 * chunk -- forgo the padding and close the chunk. 1062 */ 1063 if ((sbp->sb_mlen + pad + SMALLEST_MESSAGE) > sbp->sb_chunk) { 1064 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, 1065 sizeof (hp)); 1066 sbp->sb_head->b_wptr += sizeof (hp); 1067 ASSERT(sbp->sb_head->b_wptr <= 1068 sbp->sb_head->b_datap->db_lim); 1069 sbclosechunk(sbp); 1070 return; 1071 } 1072 1073 /* 1074 * We may add another message to this chunk -- adjust 1075 * the headers for padding to be added below. 1076 */ 1077 hp.sbh_totlen += pad; 1078 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp)); 1079 sbp->sb_head->b_wptr += sizeof (hp); 1080 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim); 1081 sbp->sb_mlen += pad; 1082 1083 /* 1084 * Find space for the wrapper. The wrapper consists of: 1085 * 1086 * 1) Padding for this message (this is to ensure each header 1087 * begins on an 8 byte boundary in the userland buffer). 1088 * 1089 * 2) Space for the next message's header, in case the next 1090 * next message will fit in this chunk. 1091 * 1092 * It may be possible to append the wrapper to the last mblk 1093 * of the message, but only if we 'own' the data. If the dblk 1094 * has been shared through dupmsg() we mustn't alter it. 1095 */ 1096 wrapperlen = (sizeof (hp) + pad); 1097 1098 /* Is there space for the wrapper beyond the message's data ? */ 1099 for (last = mp; last->b_cont; last = last->b_cont) 1100 ; 1101 1102 if ((wrapperlen <= MBLKTAIL(last)) && 1103 (last->b_datap->db_ref == 1)) { 1104 if (pad > 0) { 1105 /* 1106 * Pad with zeroes to the next pointer boundary 1107 * (we don't want to disclose kernel data to 1108 * users), then advance wptr. 1109 */ 1110 (void) memset(last->b_wptr, 0, pad); 1111 last->b_wptr += pad; 1112 } 1113 /* Remember where to write the header information */ 1114 sbp->sb_head = last; 1115 } else { 1116 /* Have to allocate additional space for the wrapper */ 1117 wrapper = allocb(wrapperlen, BPRI_MED); 1118 if (wrapper == NULL) { 1119 sbclosechunk(sbp); 1120 return; 1121 } 1122 if (pad > 0) { 1123 /* 1124 * Pad with zeroes (we don't want to disclose 1125 * kernel data to users). 1126 */ 1127 (void) memset(wrapper->b_wptr, 0, pad); 1128 wrapper->b_wptr += pad; 1129 } 1130 /* Link the wrapper msg onto the end of the chunk */ 1131 linkb(mp, wrapper); 1132 /* Remember to write the next header in this wrapper */ 1133 sbp->sb_head = wrapper; 1134 } 1135 } 1136 } 1137 1138 /* 1139 * Called from timeout(). 1140 * Signal a timeout by passing a zero-length M_CTL msg in the read-side 1141 * to synchronize with any active module threads (open, close, wput, rput). 1142 */ 1143 static void 1144 sbtick(void *arg) 1145 { 1146 struct sb *sbp = arg; 1147 queue_t *rq; 1148 1149 ASSERT(sbp); 1150 1151 rq = sbp->sb_rq; 1152 sbp->sb_timeoutid = 0; /* timeout has fired */ 1153 1154 if (putctl(rq, M_CTL) == 0) /* failure */ 1155 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks); 1156 } 1157 1158 /* 1159 * Close off the currently accumulating chunk and pass 1160 * it upward. Takes care of resetting timers as well. 1161 * 1162 * This routine is called both directly and as a result 1163 * of the chunk timeout expiring. 1164 */ 1165 static void 1166 sbclosechunk(struct sb *sbp) 1167 { 1168 mblk_t *mp; 1169 queue_t *rq; 1170 1171 ASSERT(sbp); 1172 1173 if (sbp->sb_timeoutid) { 1174 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid); 1175 sbp->sb_timeoutid = 0; 1176 } 1177 1178 mp = sbp->sb_mp; 1179 rq = sbp->sb_rq; 1180 1181 /* 1182 * If there's currently a chunk in progress, close it off 1183 * and try to send it up. 1184 */ 1185 if (mp) { 1186 sbsendit(rq, mp); 1187 } 1188 1189 /* 1190 * Clear old chunk. Ready for new msgs. 1191 */ 1192 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 1193 sbp->sb_mlen = 0; 1194 sbp->sb_mcount = 0; 1195 if (sbp->sb_flags & SB_DEFER_CHUNK) 1196 sbp->sb_state &= ~SB_FRCVD; 1197 1198 } 1199 1200 static void 1201 sbsendit(queue_t *rq, mblk_t *mp) 1202 { 1203 struct sb *sbp = (struct sb *)rq->q_ptr; 1204 1205 if (!canputnext(rq)) { 1206 if (sbp->sb_flags & SB_NO_DROPS) 1207 (void) putq(rq, mp); 1208 else { 1209 freemsg(mp); 1210 sbp->sb_drops += sbp->sb_mcount; 1211 } 1212 return; 1213 } 1214 /* 1215 * If there are messages on the q already, keep 1216 * queueing them since they need to be processed in order. 1217 */ 1218 if (qsize(rq) > 0) { 1219 /* should only get here if SB_NO_DROPS */ 1220 (void) putq(rq, mp); 1221 } 1222 else 1223 putnext(rq, mp); 1224 } 1225