1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 * Copyright 2018 Joyent, Inc.
26 */
27
28 /*
29 * STREAMS Buffering module
30 *
31 * This streams module collects incoming messages from modules below
32 * it on the stream and buffers them up into a smaller number of
33 * aggregated messages. Its main purpose is to reduce overhead by
34 * cutting down on the number of read (or getmsg) calls its client
35 * user process makes.
36 * - only M_DATA is buffered.
37 * - multithreading assumes configured as D_MTQPAIR
38 * - packets are lost only if flag SB_NO_HEADER is clear and buffer
39 * allocation fails.
40 * - in order message transmission. This is enforced for messages other
41 * than high priority messages.
42 * - zero length messages on the read side are not passed up the
43 * stream but used internally for synchronization.
44 * FLAGS:
45 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
46 * (conversion is the default for backwards compatibility
47 * hence the negative logic).
48 * - SB_NO_HEADER - no headers in buffered data.
49 * (adding headers is the default for backwards compatibility
50 * hence the negative logic).
51 * - SB_DEFER_CHUNK - provides improved response time in question-answer
52 * applications. Buffering is not enabled until the second message
53 * is received on the read side within the sb_ticks interval.
54 * This option will often be used in combination with flag SB_SEND_ON_WRITE.
55 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
56 * data being immediately sent upstream.
57 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
58 * the blocked flow condition downstream. If this flag is clear (default)
59 * messages will be dropped if the upstream flow is blocked.
60 */
61
62
63 #include <sys/types.h>
64 #include <sys/errno.h>
65 #include <sys/debug.h>
66 #include <sys/stropts.h>
67 #include <sys/time.h>
68 #include <sys/stream.h>
69 #include <sys/conf.h>
70 #include <sys/ddi.h>
71 #include <sys/sunddi.h>
72 #include <sys/kmem.h>
73 #include <sys/strsun.h>
74 #include <sys/bufmod.h>
75 #include <sys/modctl.h>
76 #include <sys/isa_defs.h>
77
78 /*
79 * Per-Stream state information.
80 *
81 * If sb_ticks is negative, we don't deliver chunks until they're
82 * full. If it's zero, we deliver every packet as it arrives. (In
83 * this case we force sb_chunk to zero, to make the implementation
84 * easier.) Otherwise, sb_ticks gives the number of ticks in a
85 * buffering interval. The interval begins when the a read side data
86 * message is received and a timeout is not active. If sb_snap is
87 * zero, no truncation of the msg is done.
88 */
89 struct sb {
90 queue_t *sb_rq; /* our rq */
91 mblk_t *sb_mp; /* partial chunk */
92 mblk_t *sb_head; /* pre-allocated space for the next header */
93 mblk_t *sb_tail; /* first mblk of last message appended */
94 uint_t sb_mlen; /* sb_mp length */
95 uint_t sb_mcount; /* input msg count in sb_mp */
96 uint_t sb_chunk; /* max chunk size */
97 clock_t sb_ticks; /* timeout interval */
98 timeout_id_t sb_timeoutid; /* qtimeout() id */
99 uint_t sb_drops; /* cumulative # discarded msgs */
100 uint_t sb_snap; /* snapshot length */
101 uint_t sb_flags; /* flags field */
102 uint_t sb_state; /* state variable */
103 };
104
105 /*
106 * Function prototypes.
107 */
108 static int sbopen(queue_t *, dev_t *, int, int, cred_t *);
109 static int sbclose(queue_t *, int, cred_t *);
110 static int sbwput(queue_t *, mblk_t *);
111 static int sbrput(queue_t *, mblk_t *);
112 static int sbrsrv(queue_t *);
113 static void sbioctl(queue_t *, mblk_t *);
114 static void sbaddmsg(queue_t *, mblk_t *);
115 static void sbtick(void *);
116 static void sbclosechunk(struct sb *);
117 static void sbsendit(queue_t *, mblk_t *);
118
119 static struct module_info sb_minfo = {
120 21, /* mi_idnum */
121 "bufmod", /* mi_idname */
122 0, /* mi_minpsz */
123 INFPSZ, /* mi_maxpsz */
124 1, /* mi_hiwat */
125 0 /* mi_lowat */
126 };
127
128 static struct qinit sb_rinit = {
129 sbrput, /* qi_putp */
130 sbrsrv, /* qi_srvp */
131 sbopen, /* qi_qopen */
132 sbclose, /* qi_qclose */
133 NULL, /* qi_qadmin */
134 &sb_minfo, /* qi_minfo */
135 NULL /* qi_mstat */
136 };
137
138 static struct qinit sb_winit = {
139 sbwput, /* qi_putp */
140 NULL, /* qi_srvp */
141 NULL, /* qi_qopen */
142 NULL, /* qi_qclose */
143 NULL, /* qi_qadmin */
144 &sb_minfo, /* qi_minfo */
145 NULL /* qi_mstat */
146 };
147
148 static struct streamtab sb_info = {
149 &sb_rinit, /* st_rdinit */
150 &sb_winit, /* st_wrinit */
151 NULL, /* st_muxrinit */
152 NULL /* st_muxwinit */
153 };
154
155
156 /*
157 * This is the loadable module wrapper.
158 */
159
160 static struct fmodsw fsw = {
161 "bufmod",
162 &sb_info,
163 D_MTQPAIR | D_MP
164 };
165
166 /*
167 * Module linkage information for the kernel.
168 */
169
170 static struct modlstrmod modlstrmod = {
171 &mod_strmodops, "streams buffer mod", &fsw
172 };
173
174 static struct modlinkage modlinkage = {
175 MODREV_1, &modlstrmod, NULL
176 };
177
178
179 int
_init(void)180 _init(void)
181 {
182 return (mod_install(&modlinkage));
183 }
184
185 int
_fini(void)186 _fini(void)
187 {
188 return (mod_remove(&modlinkage));
189 }
190
191 int
_info(struct modinfo * modinfop)192 _info(struct modinfo *modinfop)
193 {
194 return (mod_info(&modlinkage, modinfop));
195 }
196
197
198 /* ARGSUSED */
199 static int
sbopen(queue_t * rq,dev_t * dev,int oflag,int sflag,cred_t * crp)200 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
201 {
202 struct sb *sbp;
203 ASSERT(rq);
204
205 if (sflag != MODOPEN)
206 return (EINVAL);
207
208 if (rq->q_ptr)
209 return (0);
210
211 /*
212 * Allocate and initialize per-Stream structure.
213 */
214 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
215 sbp->sb_rq = rq;
216 sbp->sb_ticks = -1;
217 sbp->sb_chunk = SB_DFLT_CHUNK;
218 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
219 sbp->sb_mlen = 0;
220 sbp->sb_mcount = 0;
221 sbp->sb_timeoutid = 0;
222 sbp->sb_drops = 0;
223 sbp->sb_snap = 0;
224 sbp->sb_flags = 0;
225 sbp->sb_state = 0;
226
227 rq->q_ptr = WR(rq)->q_ptr = sbp;
228
229 qprocson(rq);
230
231
232 return (0);
233 }
234
235 /* ARGSUSED1 */
236 static int
sbclose(queue_t * rq,int flag,cred_t * credp)237 sbclose(queue_t *rq, int flag, cred_t *credp)
238 {
239 struct sb *sbp = (struct sb *)rq->q_ptr;
240
241 ASSERT(sbp);
242
243 qprocsoff(rq);
244 /*
245 * Cancel an outstanding timeout
246 */
247 if (sbp->sb_timeoutid != 0) {
248 (void) quntimeout(rq, sbp->sb_timeoutid);
249 sbp->sb_timeoutid = 0;
250 }
251 /*
252 * Free the current chunk.
253 */
254 if (sbp->sb_mp) {
255 freemsg(sbp->sb_mp);
256 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
257 sbp->sb_mlen = 0;
258 }
259
260 /*
261 * Free the per-Stream structure.
262 */
263 kmem_free((caddr_t)sbp, sizeof (struct sb));
264 rq->q_ptr = WR(rq)->q_ptr = NULL;
265
266 return (0);
267 }
268
269 /*
270 * the correction factor is introduced to compensate for
271 * whatever assumptions the modules below have made about
272 * how much traffic is flowing through the stream and the fact
273 * that bufmod may be snipping messages with the sb_snap length.
274 */
275 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512)
276 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256)
277
278
279 static void
sbioc(queue_t * wq,mblk_t * mp)280 sbioc(queue_t *wq, mblk_t *mp)
281 {
282 struct iocblk *iocp;
283 struct sb *sbp = (struct sb *)wq->q_ptr;
284 clock_t ticks;
285 mblk_t *mop;
286
287 iocp = (struct iocblk *)mp->b_rptr;
288
289 switch (iocp->ioc_cmd) {
290 case SBIOCGCHUNK:
291 case SBIOCGSNAP:
292 case SBIOCGFLAGS:
293 case SBIOCGTIME:
294 miocack(wq, mp, 0, 0);
295 return;
296
297 case SBIOCSTIME:
298 #ifdef _SYSCALL32_IMPL
299 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
300 struct timeval32 *t32;
301
302 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
303 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
304 miocnak(wq, mp, 0, EINVAL);
305 break;
306 }
307 ticks = TIMEVAL_TO_TICK(t32);
308 } else
309 #endif /* _SYSCALL32_IMPL */
310 {
311 struct timeval *tb;
312
313 tb = (struct timeval *)mp->b_cont->b_rptr;
314
315 if (tb->tv_sec < 0 || tb->tv_usec < 0) {
316 miocnak(wq, mp, 0, EINVAL);
317 break;
318 }
319 ticks = TIMEVAL_TO_TICK(tb);
320 }
321 sbp->sb_ticks = ticks;
322 if (ticks == 0)
323 sbp->sb_chunk = 0;
324 miocack(wq, mp, 0, 0);
325 sbclosechunk(sbp);
326 return;
327
328 case SBIOCSCHUNK:
329 /*
330 * set up hi/lo water marks on stream head read queue.
331 * unlikely to run out of resources. Fix at later date.
332 */
333 if ((mop = allocb(sizeof (struct stroptions),
334 BPRI_MED)) != NULL) {
335 struct stroptions *sop;
336 uint_t chunk;
337
338 chunk = *(uint_t *)mp->b_cont->b_rptr;
339 mop->b_datap->db_type = M_SETOPTS;
340 mop->b_wptr += sizeof (struct stroptions);
341 sop = (struct stroptions *)mop->b_rptr;
342 sop->so_flags = SO_HIWAT | SO_LOWAT;
343 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
344 sop->so_lowat = SNIT_LOWAT(chunk, 1);
345 qreply(wq, mop);
346 }
347
348 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
349 miocack(wq, mp, 0, 0);
350 sbclosechunk(sbp);
351 return;
352
353 case SBIOCSFLAGS:
354 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
355 miocack(wq, mp, 0, 0);
356 return;
357
358 case SBIOCSSNAP:
359 /*
360 * if chunking dont worry about effects of
361 * snipping of message size on head flow control
362 * since it has a relatively small bearing on the
363 * data rate onto the streamn head.
364 */
365 if (!sbp->sb_chunk) {
366 /*
367 * set up hi/lo water marks on stream head read queue.
368 * unlikely to run out of resources. Fix at later date.
369 */
370 if ((mop = allocb(sizeof (struct stroptions),
371 BPRI_MED)) != NULL) {
372 struct stroptions *sop;
373 uint_t snap;
374 int fudge;
375
376 snap = *(uint_t *)mp->b_cont->b_rptr;
377 mop->b_datap->db_type = M_SETOPTS;
378 mop->b_wptr += sizeof (struct stroptions);
379 sop = (struct stroptions *)mop->b_rptr;
380 sop->so_flags = SO_HIWAT | SO_LOWAT;
381 fudge = snap <= 100 ? 4 :
382 snap <= 400 ? 2 :
383 1;
384 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
385 sop->so_lowat = SNIT_LOWAT(snap, fudge);
386 qreply(wq, mop);
387 }
388 }
389
390 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
391 miocack(wq, mp, 0, 0);
392 return;
393
394 default:
395 ASSERT(0);
396 return;
397 }
398 }
399
400 /*
401 * Write-side put procedure. Its main task is to detect ioctls
402 * for manipulating the buffering state and hand them to sbioctl.
403 * Other message types are passed on through.
404 */
405 static int
sbwput(queue_t * wq,mblk_t * mp)406 sbwput(queue_t *wq, mblk_t *mp)
407 {
408 struct sb *sbp = (struct sb *)wq->q_ptr;
409 struct copyresp *resp;
410
411 if (sbp->sb_flags & SB_SEND_ON_WRITE)
412 sbclosechunk(sbp);
413 switch (mp->b_datap->db_type) {
414 case M_IOCTL:
415 sbioctl(wq, mp);
416 break;
417
418 case M_IOCDATA:
419 resp = (struct copyresp *)mp->b_rptr;
420 if (resp->cp_rval) {
421 /*
422 * Just free message on failure.
423 */
424 freemsg(mp);
425 break;
426 }
427
428 switch (resp->cp_cmd) {
429 case SBIOCSTIME:
430 case SBIOCSCHUNK:
431 case SBIOCSFLAGS:
432 case SBIOCSSNAP:
433 case SBIOCGTIME:
434 case SBIOCGCHUNK:
435 case SBIOCGSNAP:
436 case SBIOCGFLAGS:
437 sbioc(wq, mp);
438 break;
439
440 default:
441 putnext(wq, mp);
442 break;
443 }
444 break;
445
446 default:
447 putnext(wq, mp);
448 break;
449 }
450 return (0);
451 }
452
453 /*
454 * Read-side put procedure. It's responsible for buffering up incoming
455 * messages and grouping them into aggregates according to the current
456 * buffering parameters.
457 */
458 static int
sbrput(queue_t * rq,mblk_t * mp)459 sbrput(queue_t *rq, mblk_t *mp)
460 {
461 struct sb *sbp = (struct sb *)rq->q_ptr;
462
463 ASSERT(sbp);
464
465 switch (mp->b_datap->db_type) {
466 case M_PROTO:
467 if (sbp->sb_flags & SB_NO_PROTO_CVT) {
468 sbclosechunk(sbp);
469 sbsendit(rq, mp);
470 break;
471 } else {
472 /*
473 * Convert M_PROTO to M_DATA.
474 */
475 mp->b_datap->db_type = M_DATA;
476 }
477 /* FALLTHRU */
478
479 case M_DATA:
480 if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
481 !(sbp->sb_state & SB_FRCVD)) {
482 sbclosechunk(sbp);
483 sbsendit(rq, mp);
484 sbp->sb_state |= SB_FRCVD;
485 } else
486 sbaddmsg(rq, mp);
487
488 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
489 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
490 sbp, sbp->sb_ticks);
491
492 break;
493
494 case M_FLUSH:
495 if (*mp->b_rptr & FLUSHR) {
496 /*
497 * Reset timeout, flush the chunk currently in
498 * progress, and start a new chunk.
499 */
500 if (sbp->sb_timeoutid) {
501 (void) quntimeout(sbp->sb_rq,
502 sbp->sb_timeoutid);
503 sbp->sb_timeoutid = 0;
504 }
505 if (sbp->sb_mp) {
506 freemsg(sbp->sb_mp);
507 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
508 sbp->sb_mlen = 0;
509 sbp->sb_mcount = 0;
510 }
511 flushq(rq, FLUSHALL);
512 }
513 putnext(rq, mp);
514 break;
515
516 case M_CTL:
517 /*
518 * Zero-length M_CTL means our timeout() popped.
519 */
520 if (MBLKL(mp) == 0) {
521 freemsg(mp);
522 sbclosechunk(sbp);
523 } else {
524 sbclosechunk(sbp);
525 sbsendit(rq, mp);
526 }
527 break;
528
529 default:
530 if (mp->b_datap->db_type <= QPCTL) {
531 sbclosechunk(sbp);
532 sbsendit(rq, mp);
533 } else {
534 /* Note: out of band */
535 putnext(rq, mp);
536 }
537 break;
538 }
539 return (0);
540 }
541
542 /*
543 * read service procedure.
544 */
545 /* ARGSUSED */
546 static int
sbrsrv(queue_t * rq)547 sbrsrv(queue_t *rq)
548 {
549 mblk_t *mp;
550
551 /*
552 * High priority messages shouldn't get here but if
553 * one does, jam it through to avoid infinite loop.
554 */
555 while ((mp = getq(rq)) != NULL) {
556 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
557 /* should only get here if SB_NO_SROPS */
558 (void) putbq(rq, mp);
559 return (0);
560 }
561 putnext(rq, mp);
562 }
563 return (0);
564 }
565
566 /*
567 * Handle write-side M_IOCTL messages.
568 */
569 static void
sbioctl(queue_t * wq,mblk_t * mp)570 sbioctl(queue_t *wq, mblk_t *mp)
571 {
572 struct sb *sbp = (struct sb *)wq->q_ptr;
573 struct iocblk *iocp = (struct iocblk *)mp->b_rptr;
574 struct timeval *t;
575 clock_t ticks;
576 mblk_t *mop;
577 int transparent = iocp->ioc_count;
578 mblk_t *datamp;
579 int error;
580
581 switch (iocp->ioc_cmd) {
582 case SBIOCSTIME:
583 if (iocp->ioc_count == TRANSPARENT) {
584 #ifdef _SYSCALL32_IMPL
585 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
586 mcopyin(mp, NULL, sizeof (struct timeval32),
587 NULL);
588 } else
589 #endif /* _SYSCALL32_IMPL */
590 {
591 mcopyin(mp, NULL, sizeof (*t), NULL);
592 }
593 qreply(wq, mp);
594 } else {
595 /*
596 * Verify argument length.
597 */
598 #ifdef _SYSCALL32_IMPL
599 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
600 struct timeval32 *t32;
601
602 error = miocpullup(mp,
603 sizeof (struct timeval32));
604 if (error != 0) {
605 miocnak(wq, mp, 0, error);
606 break;
607 }
608 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
609 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
610 miocnak(wq, mp, 0, EINVAL);
611 break;
612 }
613 ticks = TIMEVAL_TO_TICK(t32);
614 } else
615 #endif /* _SYSCALL32_IMPL */
616 {
617 error = miocpullup(mp, sizeof (struct timeval));
618 if (error != 0) {
619 miocnak(wq, mp, 0, error);
620 break;
621 }
622
623 t = (struct timeval *)mp->b_cont->b_rptr;
624 if (t->tv_sec < 0 || t->tv_usec < 0) {
625 miocnak(wq, mp, 0, EINVAL);
626 break;
627 }
628 ticks = TIMEVAL_TO_TICK(t);
629 }
630 sbp->sb_ticks = ticks;
631 if (ticks == 0)
632 sbp->sb_chunk = 0;
633 miocack(wq, mp, 0, 0);
634 sbclosechunk(sbp);
635 }
636 break;
637
638 case SBIOCGTIME: {
639 struct timeval *t;
640
641 /*
642 * Verify argument length.
643 */
644 if (transparent != TRANSPARENT) {
645 #ifdef _SYSCALL32_IMPL
646 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
647 error = miocpullup(mp,
648 sizeof (struct timeval32));
649 if (error != 0) {
650 miocnak(wq, mp, 0, error);
651 break;
652 }
653 } else
654 #endif /* _SYSCALL32_IMPL */
655 error = miocpullup(mp, sizeof (struct timeval));
656 if (error != 0) {
657 miocnak(wq, mp, 0, error);
658 break;
659 }
660 }
661
662 /*
663 * If infinite timeout, return range error
664 * for the ioctl.
665 */
666 if (sbp->sb_ticks < 0) {
667 miocnak(wq, mp, 0, ERANGE);
668 break;
669 }
670
671 #ifdef _SYSCALL32_IMPL
672 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
673 struct timeval32 *t32;
674
675 if (transparent == TRANSPARENT) {
676 datamp = allocb(sizeof (*t32), BPRI_MED);
677 if (datamp == NULL) {
678 miocnak(wq, mp, 0, EAGAIN);
679 break;
680 }
681 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
682 }
683
684 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
685 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
686
687 if (transparent == TRANSPARENT)
688 qreply(wq, mp);
689 else
690 miocack(wq, mp, sizeof (*t32), 0);
691 } else
692 #endif /* _SYSCALL32_IMPL */
693 {
694 if (transparent == TRANSPARENT) {
695 datamp = allocb(sizeof (*t), BPRI_MED);
696 if (datamp == NULL) {
697 miocnak(wq, mp, 0, EAGAIN);
698 break;
699 }
700 mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
701 }
702
703 t = (struct timeval *)mp->b_cont->b_rptr;
704 TICK_TO_TIMEVAL(sbp->sb_ticks, t);
705
706 if (transparent == TRANSPARENT)
707 qreply(wq, mp);
708 else
709 miocack(wq, mp, sizeof (*t), 0);
710 }
711 break;
712 }
713
714 case SBIOCCTIME:
715 sbp->sb_ticks = -1;
716 miocack(wq, mp, 0, 0);
717 break;
718
719 case SBIOCSCHUNK:
720 if (iocp->ioc_count == TRANSPARENT) {
721 mcopyin(mp, NULL, sizeof (uint_t), NULL);
722 qreply(wq, mp);
723 } else {
724 /*
725 * Verify argument length.
726 */
727 error = miocpullup(mp, sizeof (uint_t));
728 if (error != 0) {
729 miocnak(wq, mp, 0, error);
730 break;
731 }
732
733 /*
734 * set up hi/lo water marks on stream head read queue.
735 * unlikely to run out of resources. Fix at later date.
736 */
737 if ((mop = allocb(sizeof (struct stroptions),
738 BPRI_MED)) != NULL) {
739 struct stroptions *sop;
740 uint_t chunk;
741
742 chunk = *(uint_t *)mp->b_cont->b_rptr;
743 mop->b_datap->db_type = M_SETOPTS;
744 mop->b_wptr += sizeof (struct stroptions);
745 sop = (struct stroptions *)mop->b_rptr;
746 sop->so_flags = SO_HIWAT | SO_LOWAT;
747 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
748 sop->so_lowat = SNIT_LOWAT(chunk, 1);
749 qreply(wq, mop);
750 }
751
752 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
753 miocack(wq, mp, 0, 0);
754 sbclosechunk(sbp);
755 }
756 break;
757
758 case SBIOCGCHUNK:
759 /*
760 * Verify argument length.
761 */
762 if (transparent != TRANSPARENT) {
763 error = miocpullup(mp, sizeof (uint_t));
764 if (error != 0) {
765 miocnak(wq, mp, 0, error);
766 break;
767 }
768 }
769
770 if (transparent == TRANSPARENT) {
771 datamp = allocb(sizeof (uint_t), BPRI_MED);
772 if (datamp == NULL) {
773 miocnak(wq, mp, 0, EAGAIN);
774 break;
775 }
776 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
777 }
778
779 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
780
781 if (transparent == TRANSPARENT)
782 qreply(wq, mp);
783 else
784 miocack(wq, mp, sizeof (uint_t), 0);
785 break;
786
787 case SBIOCSSNAP:
788 if (iocp->ioc_count == TRANSPARENT) {
789 mcopyin(mp, NULL, sizeof (uint_t), NULL);
790 qreply(wq, mp);
791 } else {
792 /*
793 * Verify argument length.
794 */
795 error = miocpullup(mp, sizeof (uint_t));
796 if (error != 0) {
797 miocnak(wq, mp, 0, error);
798 break;
799 }
800
801 /*
802 * if chunking dont worry about effects of
803 * snipping of message size on head flow control
804 * since it has a relatively small bearing on the
805 * data rate onto the streamn head.
806 */
807 if (!sbp->sb_chunk) {
808 /*
809 * set up hi/lo water marks on stream
810 * head read queue. unlikely to run out
811 * of resources. Fix at later date.
812 */
813 if ((mop = allocb(sizeof (struct stroptions),
814 BPRI_MED)) != NULL) {
815 struct stroptions *sop;
816 uint_t snap;
817 int fudge;
818
819 snap = *(uint_t *)mp->b_cont->b_rptr;
820 mop->b_datap->db_type = M_SETOPTS;
821 mop->b_wptr += sizeof (*sop);
822 sop = (struct stroptions *)mop->b_rptr;
823 sop->so_flags = SO_HIWAT | SO_LOWAT;
824 fudge = (snap <= 100) ? 4 :
825 (snap <= 400) ? 2 : 1;
826 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
827 sop->so_lowat = SNIT_LOWAT(snap, fudge);
828 qreply(wq, mop);
829 }
830 }
831
832 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
833
834 miocack(wq, mp, 0, 0);
835 }
836 break;
837
838 case SBIOCGSNAP:
839 /*
840 * Verify argument length
841 */
842 if (transparent != TRANSPARENT) {
843 error = miocpullup(mp, sizeof (uint_t));
844 if (error != 0) {
845 miocnak(wq, mp, 0, error);
846 break;
847 }
848 }
849
850 if (transparent == TRANSPARENT) {
851 datamp = allocb(sizeof (uint_t), BPRI_MED);
852 if (datamp == NULL) {
853 miocnak(wq, mp, 0, EAGAIN);
854 break;
855 }
856 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
857 }
858
859 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
860
861 if (transparent == TRANSPARENT)
862 qreply(wq, mp);
863 else
864 miocack(wq, mp, sizeof (uint_t), 0);
865 break;
866
867 case SBIOCSFLAGS:
868 /*
869 * set the flags.
870 */
871 if (iocp->ioc_count == TRANSPARENT) {
872 mcopyin(mp, NULL, sizeof (uint_t), NULL);
873 qreply(wq, mp);
874 } else {
875 error = miocpullup(mp, sizeof (uint_t));
876 if (error != 0) {
877 miocnak(wq, mp, 0, error);
878 break;
879 }
880 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
881 miocack(wq, mp, 0, 0);
882 }
883 break;
884
885 case SBIOCGFLAGS:
886 /*
887 * Verify argument length
888 */
889 if (transparent != TRANSPARENT) {
890 error = miocpullup(mp, sizeof (uint_t));
891 if (error != 0) {
892 miocnak(wq, mp, 0, error);
893 break;
894 }
895 }
896
897 if (transparent == TRANSPARENT) {
898 datamp = allocb(sizeof (uint_t), BPRI_MED);
899 if (datamp == NULL) {
900 miocnak(wq, mp, 0, EAGAIN);
901 break;
902 }
903 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
904 }
905
906 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
907
908 if (transparent == TRANSPARENT)
909 qreply(wq, mp);
910 else
911 miocack(wq, mp, sizeof (uint_t), 0);
912 break;
913
914
915 default:
916 putnext(wq, mp);
917 break;
918 }
919 }
920
921 /*
922 * Given a length l, calculate the amount of extra storage
923 * required to round it up to the next multiple of the alignment a.
924 */
925 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0)
926 /*
927 * Calculate additional amount of space required for alignment.
928 */
929 #define Align(l) RoundUpAmt(l, sizeof (ulong_t))
930 /*
931 * Smallest possible message size when headers are enabled.
932 * This is used to calculate whether a chunk is nearly full.
933 */
934 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
935
936 /*
937 * Process a read-side M_DATA message.
938 *
939 * If the currently accumulating chunk doesn't have enough room
940 * for the message, close off the chunk, pass it upward, and start
941 * a new one. Then add the message to the current chunk, taking
942 * account of the possibility that the message's size exceeds the
943 * chunk size.
944 *
945 * If headers are enabled add an sb_hdr header and trailing alignment padding.
946 *
947 * To optimise performance the total number of msgbs should be kept
948 * to a minimum. This is achieved by using any remaining space in message N
949 * for both its own padding as well as the header of message N+1 if possible.
950 * If there's insufficient space we allocate one message to hold this 'wrapper'.
951 * (there's likely to be space beyond message N, since allocb would have
952 * rounded up the required size to one of the dblk_sizes).
953 *
954 */
955 static void
sbaddmsg(queue_t * rq,mblk_t * mp)956 sbaddmsg(queue_t *rq, mblk_t *mp)
957 {
958 struct sb *sbp;
959 struct timeval t;
960 struct sb_hdr hp;
961 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */
962 mblk_t *last; /* last mblk of current message */
963 size_t wrapperlen; /* length of header + padding */
964 size_t origlen; /* data length before truncation */
965 size_t pad; /* bytes required to align header */
966
967 sbp = (struct sb *)rq->q_ptr;
968
969 origlen = msgdsize(mp);
970
971 /*
972 * Truncate the message.
973 */
974 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
975 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
976 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
977 else
978 hp.sbh_totlen = hp.sbh_msglen = origlen;
979
980 if (sbp->sb_flags & SB_NO_HEADER) {
981
982 /*
983 * Would the inclusion of this message overflow the current
984 * chunk? If so close the chunk off and start a new one.
985 */
986 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
987 sbclosechunk(sbp);
988 /*
989 * First message too big for chunk - just send it up.
990 * This will always be true when we're not chunking.
991 */
992 if (hp.sbh_totlen > sbp->sb_chunk) {
993 sbsendit(rq, mp);
994 return;
995 }
996
997 /*
998 * We now know that the msg will fit in the chunk.
999 * Link it onto the end of the chunk.
1000 * Since linkb() walks the entire chain, we keep a pointer to
1001 * the first mblk of the last msgb added and call linkb on that
1002 * that last message, rather than performing the
1003 * O(n) linkb() operation on the whole chain.
1004 * sb_head isn't needed in this SB_NO_HEADER mode.
1005 */
1006 if (sbp->sb_mp)
1007 linkb(sbp->sb_tail, mp);
1008 else
1009 sbp->sb_mp = mp;
1010
1011 sbp->sb_tail = mp;
1012 sbp->sb_mlen += hp.sbh_totlen;
1013 sbp->sb_mcount++;
1014 } else {
1015 /* Timestamp must be done immediately */
1016 uniqtime(&t);
1017 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1018
1019 pad = Align(hp.sbh_totlen);
1020 hp.sbh_totlen += sizeof (hp);
1021
1022 /* We can't fit this message on the current chunk. */
1023 if ((sbp->sb_mlen + hp.sbh_totlen) > sbp->sb_chunk)
1024 sbclosechunk(sbp);
1025
1026 /*
1027 * If we closed it (just now or during a previous
1028 * call) then allocate the head of a new chunk.
1029 */
1030 if (sbp->sb_head == NULL) {
1031 /* Allocate leading header of new chunk */
1032 sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1033 if (sbp->sb_head == NULL) {
1034 /*
1035 * Memory allocation failure.
1036 * This will need to be revisited
1037 * since using certain flag combinations
1038 * can result in messages being dropped
1039 * silently.
1040 */
1041 freemsg(mp);
1042 sbp->sb_drops++;
1043 return;
1044 }
1045 sbp->sb_mp = sbp->sb_head;
1046 }
1047
1048 /*
1049 * Set the header values and join the message to the
1050 * chunk. The header values are copied into the chunk
1051 * after we adjust for padding below.
1052 */
1053 hp.sbh_drops = sbp->sb_drops;
1054 hp.sbh_origlen = origlen;
1055 linkb(sbp->sb_head, mp);
1056 sbp->sb_mcount++;
1057 sbp->sb_mlen += hp.sbh_totlen;
1058
1059 /*
1060 * There's no chance to fit another message on the
1061 * chunk -- forgo the padding and close the chunk.
1062 */
1063 if ((sbp->sb_mlen + pad + SMALLEST_MESSAGE) > sbp->sb_chunk) {
1064 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp,
1065 sizeof (hp));
1066 sbp->sb_head->b_wptr += sizeof (hp);
1067 ASSERT(sbp->sb_head->b_wptr <=
1068 sbp->sb_head->b_datap->db_lim);
1069 sbclosechunk(sbp);
1070 return;
1071 }
1072
1073 /*
1074 * We may add another message to this chunk -- adjust
1075 * the headers for padding to be added below.
1076 */
1077 hp.sbh_totlen += pad;
1078 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1079 sbp->sb_head->b_wptr += sizeof (hp);
1080 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1081 sbp->sb_mlen += pad;
1082
1083 /*
1084 * Find space for the wrapper. The wrapper consists of:
1085 *
1086 * 1) Padding for this message (this is to ensure each header
1087 * begins on an 8 byte boundary in the userland buffer).
1088 *
1089 * 2) Space for the next message's header, in case the next
1090 * next message will fit in this chunk.
1091 *
1092 * It may be possible to append the wrapper to the last mblk
1093 * of the message, but only if we 'own' the data. If the dblk
1094 * has been shared through dupmsg() we mustn't alter it.
1095 */
1096 wrapperlen = (sizeof (hp) + pad);
1097
1098 /* Is there space for the wrapper beyond the message's data ? */
1099 for (last = mp; last->b_cont; last = last->b_cont)
1100 ;
1101
1102 if ((wrapperlen <= MBLKTAIL(last)) &&
1103 (last->b_datap->db_ref == 1)) {
1104 if (pad > 0) {
1105 /*
1106 * Pad with zeroes to the next pointer boundary
1107 * (we don't want to disclose kernel data to
1108 * users), then advance wptr.
1109 */
1110 (void) memset(last->b_wptr, 0, pad);
1111 last->b_wptr += pad;
1112 }
1113 /* Remember where to write the header information */
1114 sbp->sb_head = last;
1115 } else {
1116 /* Have to allocate additional space for the wrapper */
1117 wrapper = allocb(wrapperlen, BPRI_MED);
1118 if (wrapper == NULL) {
1119 sbclosechunk(sbp);
1120 return;
1121 }
1122 if (pad > 0) {
1123 /*
1124 * Pad with zeroes (we don't want to disclose
1125 * kernel data to users).
1126 */
1127 (void) memset(wrapper->b_wptr, 0, pad);
1128 wrapper->b_wptr += pad;
1129 }
1130 /* Link the wrapper msg onto the end of the chunk */
1131 linkb(mp, wrapper);
1132 /* Remember to write the next header in this wrapper */
1133 sbp->sb_head = wrapper;
1134 }
1135 }
1136 }
1137
1138 /*
1139 * Called from timeout().
1140 * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1141 * to synchronize with any active module threads (open, close, wput, rput).
1142 */
1143 static void
sbtick(void * arg)1144 sbtick(void *arg)
1145 {
1146 struct sb *sbp = arg;
1147 queue_t *rq;
1148
1149 ASSERT(sbp);
1150
1151 rq = sbp->sb_rq;
1152 sbp->sb_timeoutid = 0; /* timeout has fired */
1153
1154 if (putctl(rq, M_CTL) == 0) /* failure */
1155 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1156 }
1157
1158 /*
1159 * Close off the currently accumulating chunk and pass
1160 * it upward. Takes care of resetting timers as well.
1161 *
1162 * This routine is called both directly and as a result
1163 * of the chunk timeout expiring.
1164 */
1165 static void
sbclosechunk(struct sb * sbp)1166 sbclosechunk(struct sb *sbp)
1167 {
1168 mblk_t *mp;
1169 queue_t *rq;
1170
1171 ASSERT(sbp);
1172
1173 if (sbp->sb_timeoutid) {
1174 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1175 sbp->sb_timeoutid = 0;
1176 }
1177
1178 mp = sbp->sb_mp;
1179 rq = sbp->sb_rq;
1180
1181 /*
1182 * If there's currently a chunk in progress, close it off
1183 * and try to send it up.
1184 */
1185 if (mp) {
1186 sbsendit(rq, mp);
1187 }
1188
1189 /*
1190 * Clear old chunk. Ready for new msgs.
1191 */
1192 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1193 sbp->sb_mlen = 0;
1194 sbp->sb_mcount = 0;
1195 if (sbp->sb_flags & SB_DEFER_CHUNK)
1196 sbp->sb_state &= ~SB_FRCVD;
1197
1198 }
1199
1200 static void
sbsendit(queue_t * rq,mblk_t * mp)1201 sbsendit(queue_t *rq, mblk_t *mp)
1202 {
1203 struct sb *sbp = (struct sb *)rq->q_ptr;
1204
1205 if (!canputnext(rq)) {
1206 if (sbp->sb_flags & SB_NO_DROPS)
1207 (void) putq(rq, mp);
1208 else {
1209 freemsg(mp);
1210 sbp->sb_drops += sbp->sb_mcount;
1211 }
1212 return;
1213 }
1214 /*
1215 * If there are messages on the q already, keep
1216 * queueing them since they need to be processed in order.
1217 */
1218 if (qsize(rq) > 0) {
1219 /* should only get here if SB_NO_DROPS */
1220 (void) putq(rq, mp);
1221 }
1222 else
1223 putnext(rq, mp);
1224 }
1225