xref: /illumos-gate/usr/src/uts/common/inet/squeue.c (revision fe0e7ec4)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * Squeues - TCP/IP serialization mechanism.
31  *
32  * This is a general purpose high-performance serialization mechanism. It is
33  * similar to a taskq with a single worker thread, the difference is that it
34  * does not imply a context switch - the thread placing a request may actually
35  * process it. It is also biased for processing requests in interrupt context.
36  *
37  * Each squeue has a worker thread which may optionally be bound to a CPU.
38  *
39  * Only one thread may process requests from a given squeue at any time. This is
40  * called "entering" squeue.
41  *
42  * Each dispatched request is processed either by
43  *
44  *	a) Dispatching thread or
45  *	b) Some other thread that is currently processing squeue at the time of
46  *		request or
47  *	c) worker thread.
48  *
49  * INTERFACES:
50  *
51  * squeue_t *squeue_create(name, bind, wait, pri)
52  *
53  *	name: symbolic name for squeue.
54  *	wait: time to wait before waiking the worker thread after queueing
55  *		request.
56  *	bind: preferred CPU binding for the worker thread.
57  *	pri:  thread priority for the worker thread.
58  *
59  *   This function never fails and may sleep. It returns a transparent pointer
60  *   to the squeue_t structure that is passed to all other squeue operations.
61  *
62  * void squeue_bind(sqp, bind)
63  *
64  *   Bind squeue worker thread to a CPU specified by the 'bind' argument. The
65  *   'bind' value of -1 binds to the preferred thread specified for
66  *   squeue_create.
67  *
68  *   NOTE: Any value of 'bind' other then -1 is not supported currently, but the
69  *	 API is present - in the future it may be useful to specify different
70  *	 binding.
71  *
72  * void squeue_unbind(sqp)
73  *
74  *   Unbind the worker thread from its preferred CPU.
75  *
76  * void squeue_enter(*sqp, *mp, proc, arg, tag)
77  *
78  *   Post a single request for processing. Each request consists of mblock 'mp',
79  *   function 'proc' to execute and an argument 'arg' to pass to this
80  *   function. The function is called as (*proc)(arg, mp, sqp); The tag is an
81  *   arbitrary number from 0 to 255 which will be stored in mp to track exact
82  *   caller of squeue_enter. The combination of function name and the tag should
83  *   provide enough information to identify the caller.
84  *
85  *   If no one is processing the squeue, squeue_enter() will call the function
86  *   immediately. Otherwise it will add the request to the queue for later
87  *   processing. Once the function is executed, the thread may continue
88  *   executing all other requests pending on the queue.
89  *
90  *   NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1.
91  *   NOTE: The argument can be conn_t only. Ideally we'd like to have generic
92  *	   argument, but we want to drop connection reference count here - this
93  *	   improves tail-call optimizations.
94  *	   XXX: The arg should have type conn_t.
95  *
96  * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag)
97  *
98  *   Same as squeue_enter(), but the entering thread will only try to execute a
99  *   single request. It will not continue executing any pending requests.
100  *
101  * void squeue_fill(*sqp, *mp, proc, arg, tag)
102  *
103  *   Just place the request on the queue without trying to execute it. Arrange
104  *   for the worker thread to process the request.
105  *
106  * void squeue_profile_enable(sqp)
107  * void squeue_profile_disable(sqp)
108  *
109  *    Enable or disable profiling for specified 'sqp'. Profiling is only
110  *    available when SQUEUE_PROFILE is set.
111  *
112  * void squeue_profile_reset(sqp)
113  *
114  *    Reset all profiling information to zero. Profiling is only
115  *    available when SQUEUE_PROFILE is set.
116  *
117  * void squeue_profile_start()
118  * void squeue_profile_stop()
119  *
120  *    Globally enable or disabled profiling for all squeues.
121  *
122  * uintptr_t *squeue_getprivate(sqp, p)
123  *
124  *    Each squeue keeps small amount of private data space available for various
125  *    consumers. Current consumers include TCP and NCA. Other consumers need to
126  *    add their private tag to the sqprivate_t enum. The private information is
127  *    limited to an uintptr_t value. The squeue has no knowledge of its content
128  *    and does not manage it in any way.
129  *
130  *    The typical use may be a breakdown of data structures per CPU (since
131  *    squeues are usually per CPU). See NCA for examples of use.
132  *    Currently 'p' may have one legal value SQPRIVATE_TCP.
133  *
134  * processorid_t squeue_binding(sqp)
135  *
136  *    Returns the CPU binding for a given squeue.
137  *
138  * TUNABALES:
139  *
140  * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any
141  *	squeue. Note that this is approximation - squeues have no control on the
142  *	time it takes to process each request. This limit is only checked
143  *	between processing individual messages.
144  *    Default: 20 ms.
145  *
146  * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any
147  *	squeue. Note that this is approximation - squeues have no control on the
148  *	time it takes to process each request. This limit is only checked
149  *	between processing individual messages.
150  *    Default: 10 ms.
151  *
152  * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any
153  *	squeue. Note that this is approximation - squeues have no control on the
154  *	time it takes to process each request. This limit is only checked
155  *	between processing individual messages.
156  *    Default: 10 ms.
157  *
158  * squeue_workerwait_ms: When worker thread is interrupted because workerdrain
159  *	expired, how much time to wait before waking worker thread again.
160  *    Default: 10 ms.
161  *
162  * DEFINES:
163  *
164  * SQUEUE_DEBUG: If defined as 1, special code is compiled in which records
165  *	additional information aiding debugging is recorded in squeue.
166  *
167  * SQUEUE_PROFILE: If defined as 1, special code is compiled in which collects
168  *	various squeue statistics and exports them as kstats.
169  *
170  * Ideally we would like both SQUEUE_DEBUG and SQUEUE_PROFILE to be always set,
171  * but it affects performance, so they are enabled on DEBUG kernels and disabled
172  * on non-DEBUG by default.
173  */
174 
175 #include <sys/types.h>
176 #include <sys/cmn_err.h>
177 #include <sys/debug.h>
178 #include <sys/kmem.h>
179 #include <sys/cpuvar.h>
180 #include <sys/condvar_impl.h>
181 #include <sys/systm.h>
182 #include <sys/callb.h>
183 #include <sys/sdt.h>
184 #include <sys/ddi.h>
185 
186 #include <inet/ipclassifier.h>
187 
188 /*
189  * State flags.
190  * Note: The MDB IP module depends on the values of these flags.
191  */
192 #define	SQS_PROC	0x0001	/* being processed */
193 #define	SQS_WORKER	0x0002	/* worker thread */
194 #define	SQS_ENTER	0x0004	/* enter thread */
195 #define	SQS_FAST	0x0008	/* enter-fast thread */
196 #define	SQS_USER	0x0010	/* A non interrupt user */
197 #define	SQS_BOUND	0x0020	/* Worker thread is bound */
198 #define	SQS_PROFILE	0x0040	/* Enable profiling */
199 #define	SQS_REENTER	0x0080	/* Re entered thread */
200 #define	SQS_TMO_PROG	0x0100	/* Timeout is being set */
201 
202 #ifdef DEBUG
203 #define	SQUEUE_DEBUG 1
204 #define	SQUEUE_PROFILE 1
205 #else
206 #define	SQUEUE_DEBUG 0
207 #define	SQUEUE_PROFILE 0
208 #endif
209 
210 #include <sys/squeue_impl.h>
211 
212 static void squeue_fire(void *);
213 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
214 static void squeue_worker(squeue_t *sqp);
215 
216 #if SQUEUE_PROFILE
217 static kmutex_t squeue_kstat_lock;
218 static int  squeue_kstat_update(kstat_t *, int);
219 #endif
220 
221 kmem_cache_t *squeue_cache;
222 
223 #define	SQUEUE_MSEC_TO_NSEC 1000000
224 
225 int squeue_intrdrain_ms = 20;
226 int squeue_writerdrain_ms = 10;
227 int squeue_workerdrain_ms = 10;
228 int squeue_workerwait_ms = 10;
229 
230 /* The values above converted to ticks or nano seconds */
231 static int squeue_intrdrain_ns = 0;
232 static int squeue_writerdrain_ns = 0;
233 static int squeue_workerdrain_ns = 0;
234 static int squeue_workerwait_tick = 0;
235 
236 /*
237  * The minimum packet queued when worker thread doing the drain triggers
238  * polling (if squeue allows it). The choice of 3 is arbitrary. You
239  * definitely don't want it to be 1 since that will trigger polling
240  * on very low loads as well (ssh seems to do be one such example
241  * where packet flow was very low yet somehow 1 packet ended up getting
242  * queued and worker thread fires every 10ms and blanking also gets
243  * triggered.
244  */
245 int squeue_worker_poll_min = 3;
246 
247 #if SQUEUE_PROFILE
248 /*
249  * Set to B_TRUE to enable profiling.
250  */
251 static int squeue_profile = B_FALSE;
252 #define	SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE))
253 
254 #define	SQSTAT(sqp, x) ((sqp)->sq_stats.x++)
255 #define	SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d))
256 
257 struct squeue_kstat {
258 	kstat_named_t	sq_count;
259 	kstat_named_t	sq_max_qlen;
260 	kstat_named_t	sq_npackets_worker;
261 	kstat_named_t	sq_npackets_intr;
262 	kstat_named_t	sq_npackets_other;
263 	kstat_named_t	sq_nqueued_intr;
264 	kstat_named_t	sq_nqueued_other;
265 	kstat_named_t	sq_ndrains_worker;
266 	kstat_named_t	sq_ndrains_intr;
267 	kstat_named_t	sq_ndrains_other;
268 	kstat_named_t	sq_time_worker;
269 	kstat_named_t	sq_time_intr;
270 	kstat_named_t	sq_time_other;
271 } squeue_kstat = {
272 	{ "count",		KSTAT_DATA_UINT64 },
273 	{ "max_qlen",		KSTAT_DATA_UINT64 },
274 	{ "packets_worker",	KSTAT_DATA_UINT64 },
275 	{ "packets_intr",	KSTAT_DATA_UINT64 },
276 	{ "packets_other",	KSTAT_DATA_UINT64 },
277 	{ "queued_intr",	KSTAT_DATA_UINT64 },
278 	{ "queued_other",	KSTAT_DATA_UINT64 },
279 	{ "ndrains_worker",	KSTAT_DATA_UINT64 },
280 	{ "ndrains_intr",	KSTAT_DATA_UINT64 },
281 	{ "ndrains_other",	KSTAT_DATA_UINT64 },
282 	{ "time_worker",	KSTAT_DATA_UINT64 },
283 	{ "time_intr",		KSTAT_DATA_UINT64 },
284 	{ "time_other",		KSTAT_DATA_UINT64 },
285 };
286 #endif
287 
288 #define	SQUEUE_WORKER_WAKEUP(sqp) {					\
289 	timeout_id_t tid = (sqp)->sq_tid;				\
290 									\
291 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));				\
292 	/*								\
293 	 * Queue isn't being processed, so take				\
294 	 * any post enqueue actions needed before leaving.		\
295 	 */								\
296 	if (tid != 0) {							\
297 		/*							\
298 		 * Waiting for an enter() to process mblk(s).		\
299 		 */							\
300 		clock_t	waited = lbolt - (sqp)->sq_awaken;		\
301 									\
302 		if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) {		\
303 			/*						\
304 			 * Times up and have a worker thread		\
305 			 * waiting for work, so schedule it.		\
306 			 */						\
307 			(sqp)->sq_tid = 0;				\
308 			(sqp)->sq_awaken = lbolt;			\
309 			cv_signal(&(sqp)->sq_async);			\
310 			mutex_exit(&(sqp)->sq_lock);			\
311 			(void) untimeout(tid);				\
312 			return;						\
313 		}							\
314 		mutex_exit(&(sqp)->sq_lock);				\
315 		return;							\
316 	} else if ((sqp)->sq_state & SQS_TMO_PROG) {			\
317 		mutex_exit(&(sqp)->sq_lock);				\
318 		return;							\
319 	} else if ((sqp)->sq_wait != 0) {				\
320 		clock_t	wait = (sqp)->sq_wait;				\
321 		/*							\
322 		 * Wait up to sqp->sq_wait ms for an			\
323 		 * enter() to process this queue. We			\
324 		 * don't want to contend on timeout locks		\
325 		 * with sq_lock held for performance reasons,		\
326 		 * so drop the sq_lock before calling timeout		\
327 		 * but we need to check if timeout is required		\
328 		 * after re acquiring the sq_lock. Once			\
329 		 * the sq_lock is dropped, someone else could		\
330 		 * have processed the packet or the timeout could	\
331 		 * have already fired.					\
332 		 */							\
333 		(sqp)->sq_state |= SQS_TMO_PROG;			\
334 		mutex_exit(&(sqp)->sq_lock);				\
335 		tid = timeout(squeue_fire, (sqp), wait);		\
336 		mutex_enter(&(sqp)->sq_lock);				\
337 		/* Check again if we still need the timeout */		\
338 		if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==	\
339 			SQS_TMO_PROG) && ((sqp)->sq_tid == 0) &&	\
340 			((sqp)->sq_first != NULL)) {			\
341 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
342 				(sqp)->sq_awaken = lbolt;		\
343 				(sqp)->sq_tid = tid;			\
344 				mutex_exit(&(sqp)->sq_lock);		\
345 				return;					\
346 		} else {						\
347 			if ((sqp)->sq_state & SQS_TMO_PROG) {		\
348 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
349 				mutex_exit(&(sqp)->sq_lock);		\
350 				(void) untimeout(tid);			\
351 			} else {					\
352 				/*					\
353 				 * The timer fired before we could 	\
354 				 * reacquire the sq_lock. squeue_fire	\
355 				 * removes the SQS_TMO_PROG flag	\
356 				 * and we don't need to	do anything	\
357 				 * else.				\
358 				 */					\
359 				mutex_exit(&(sqp)->sq_lock);		\
360 			}						\
361 		}							\
362 	} else {							\
363 		/*							\
364 		 * Schedule the worker thread.				\
365 		 */							\
366 		(sqp)->sq_awaken = lbolt;				\
367 		cv_signal(&(sqp)->sq_async);				\
368 		mutex_exit(&(sqp)->sq_lock);				\
369 	}								\
370 	ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); 			\
371 }
372 
373 #define	ENQUEUE_MP(sqp, mp, proc, arg) {			\
374 	/*							\
375 	 * Enque our mblk.					\
376 	 */							\
377 	(mp)->b_queue = NULL;					\
378 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
379 	ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); 	\
380 	(mp)->b_queue = (queue_t *)(proc);			\
381 	(mp)->b_prev = (mblk_t *)(arg);				\
382 								\
383 	if ((sqp)->sq_last != NULL)				\
384 		(sqp)->sq_last->b_next = (mp);			\
385 	else							\
386 		(sqp)->sq_first = (mp);				\
387 	(sqp)->sq_last = (mp);					\
388 	(sqp)->sq_count++;					\
389 	ASSERT((sqp)->sq_count > 0);				\
390 	DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp,		\
391 	    mblk_t *, mp);					\
392 }
393 
394 
395 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
396 	/*							\
397 	 * Enqueue our mblk chain.				\
398 	 */							\
399 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
400 								\
401 	if ((sqp)->sq_last != NULL)				\
402 		(sqp)->sq_last->b_next = (mp);			\
403 	else							\
404 		(sqp)->sq_first = (mp);				\
405 	(sqp)->sq_last = (tail);				\
406 	(sqp)->sq_count += (cnt);				\
407 	ASSERT((sqp)->sq_count > 0);				\
408 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
409 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
410 								\
411 }
412 
413 #define	SQS_POLLING_ON(sqp, rx_ring) {				\
414 	ASSERT(rx_ring != NULL);				\
415 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
416 	rx_ring->rr_blank(rx_ring->rr_handle,			\
417 	    MIN((sqp->sq_avg_drain_time * sqp->sq_count),	\
418 		rx_ring->rr_max_blank_time),			\
419 		rx_ring->rr_max_pkt_cnt);			\
420 	rx_ring->rr_poll_state |= ILL_POLLING;			\
421 	rx_ring->rr_poll_time = lbolt;				\
422 }
423 
424 
425 #define	SQS_POLLING_OFF(sqp, rx_ring) {				\
426 	ASSERT(rx_ring != NULL);				\
427 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
428 	rx_ring->rr_blank(rx_ring->rr_handle,			\
429 	    rx_ring->rr_min_blank_time,				\
430 	    rx_ring->rr_min_pkt_cnt);				\
431 }
432 
433 void
434 squeue_init(void)
435 {
436 	squeue_cache = kmem_cache_create("squeue_cache",
437 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
438 
439 	squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC;
440 	squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC;
441 	squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC;
442 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
443 }
444 
445 /* ARGSUSED */
446 squeue_t *
447 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri)
448 {
449 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
450 
451 	bzero(sqp, sizeof (squeue_t));
452 	(void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1);
453 	sqp->sq_name[SQ_NAMELEN] = '\0';
454 
455 	sqp->sq_bind = bind;
456 	sqp->sq_wait = MSEC_TO_TICK(wait);
457 	sqp->sq_avg_drain_time =
458 	    drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) /
459 	    NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns);
460 
461 #if SQUEUE_PROFILE
462 	if ((sqp->sq_kstat = kstat_create("ip", bind, name,
463 		"net", KSTAT_TYPE_NAMED,
464 		sizeof (squeue_kstat) / sizeof (kstat_named_t),
465 		KSTAT_FLAG_VIRTUAL)) != NULL) {
466 		sqp->sq_kstat->ks_lock = &squeue_kstat_lock;
467 		sqp->sq_kstat->ks_data = &squeue_kstat;
468 		sqp->sq_kstat->ks_update = squeue_kstat_update;
469 		sqp->sq_kstat->ks_private = sqp;
470 		kstat_install(sqp->sq_kstat);
471 	}
472 #endif
473 
474 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
475 	    sqp, 0, &p0, TS_RUN, pri);
476 
477 	return (sqp);
478 }
479 
480 /* ARGSUSED */
481 void
482 squeue_bind(squeue_t *sqp, processorid_t bind)
483 {
484 	ASSERT(bind == -1);
485 
486 	mutex_enter(&sqp->sq_lock);
487 	if (sqp->sq_state & SQS_BOUND) {
488 		mutex_exit(&sqp->sq_lock);
489 		return;
490 	}
491 
492 	sqp->sq_state |= SQS_BOUND;
493 	mutex_exit(&sqp->sq_lock);
494 
495 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
496 }
497 
498 void
499 squeue_unbind(squeue_t *sqp)
500 {
501 	mutex_enter(&sqp->sq_lock);
502 	if (!(sqp->sq_state & SQS_BOUND)) {
503 		mutex_exit(&sqp->sq_lock);
504 		return;
505 	}
506 
507 	sqp->sq_state &= ~SQS_BOUND;
508 	mutex_exit(&sqp->sq_lock);
509 
510 	thread_affinity_clear(sqp->sq_worker);
511 }
512 
513 /*
514  * squeue_enter() - enter squeue sqp with mblk mp (which can be
515  * a chain), while tail points to the end and cnt in number of
516  * mblks in the chain.
517  *
518  * For a chain of single packet (i.e. mp == tail), go through the
519  * fast path if no one is processing the squeue and nothing is queued.
520  *
521  * The proc and arg for each mblk is already stored in the mblk in
522  * appropriate places.
523  */
524 void
525 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail,
526     uint32_t cnt, uint8_t tag)
527 {
528 	int		interrupt = servicing_interrupt();
529 	void 		*arg;
530 	sqproc_t	proc;
531 	hrtime_t	now;
532 #if SQUEUE_PROFILE
533 	hrtime_t 	start, delta;
534 #endif
535 
536 	ASSERT(sqp != NULL);
537 	ASSERT(mp != NULL);
538 	ASSERT(tail != NULL);
539 	ASSERT(cnt > 0);
540 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
541 
542 	mutex_enter(&sqp->sq_lock);
543 	if (!(sqp->sq_state & SQS_PROC)) {
544 		/*
545 		 * See if anything is already queued. If we are the
546 		 * first packet, do inline processing else queue the
547 		 * packet and do the drain.
548 		 */
549 		sqp->sq_run = curthread;
550 		if (sqp->sq_first == NULL && cnt == 1) {
551 			/*
552 			 * Fast-path, ok to process and nothing queued.
553 			 */
554 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
555 			mutex_exit(&sqp->sq_lock);
556 
557 			/*
558 			 * We are the chain of 1 packet so
559 			 * go through this fast path.
560 			 */
561 			arg = mp->b_prev;
562 			mp->b_prev = NULL;
563 			proc = (sqproc_t)mp->b_queue;
564 			mp->b_queue = NULL;
565 
566 			ASSERT(proc != NULL);
567 			ASSERT(arg != NULL);
568 			ASSERT(mp->b_next == NULL);
569 
570 #if SQUEUE_DEBUG
571 			sqp->sq_isintr = interrupt;
572 			sqp->sq_curmp = mp;
573 			sqp->sq_curproc = proc;
574 			sqp->sq_connp = arg;
575 			mp->b_tag = sqp->sq_tag = tag;
576 #endif
577 #if SQUEUE_PROFILE
578 			if (SQ_PROFILING(sqp)) {
579 				if (interrupt)
580 					SQSTAT(sqp, sq_npackets_intr);
581 				else
582 					SQSTAT(sqp, sq_npackets_other);
583 				start = gethrtime();
584 			}
585 #endif
586 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
587 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
588 			    sqp, mblk_t *, mp, conn_t *, arg);
589 			(*proc)(arg, mp, sqp);
590 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
591 			    sqp, conn_t *, arg);
592 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
593 
594 #if SQUEUE_PROFILE
595 			if (SQ_PROFILING(sqp)) {
596 				delta = gethrtime() - start;
597 				if (interrupt)
598 					SQDELTA(sqp, sq_time_intr, delta);
599 				else
600 					SQDELTA(sqp, sq_time_other, delta);
601 			}
602 #endif
603 #if SQUEUE_DEBUG
604 			sqp->sq_curmp = NULL;
605 			sqp->sq_curproc = NULL;
606 			sqp->sq_connp = NULL;
607 			sqp->sq_isintr = 0;
608 #endif
609 
610 			CONN_DEC_REF((conn_t *)arg);
611 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
612 			mutex_enter(&sqp->sq_lock);
613 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
614 			if (sqp->sq_first == NULL) {
615 				/*
616 				 * We processed inline our packet and
617 				 * nothing new has arrived. We are done.
618 				 */
619 				sqp->sq_run = NULL;
620 				mutex_exit(&sqp->sq_lock);
621 				return;
622 			} else if (sqp->sq_bind != CPU->cpu_id) {
623 				/*
624 				 * If the current thread is not running
625 				 * on the CPU to which this squeue is bound,
626 				 * then don't allow it to drain.
627 				 */
628 				sqp->sq_run = NULL;
629 				SQUEUE_WORKER_WAKEUP(sqp);
630 				return;
631 			}
632 		} else {
633 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
634 #if SQUEUE_DEBUG
635 			mp->b_tag = tag;
636 #endif
637 #if SQUEUE_PROFILE
638 			if (SQ_PROFILING(sqp)) {
639 				if (servicing_interrupt())
640 					SQSTAT(sqp, sq_nqueued_intr);
641 				else
642 					SQSTAT(sqp, sq_nqueued_other);
643 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
644 					sqp->sq_stats.sq_max_qlen =
645 					    sqp->sq_count;
646 			}
647 #endif
648 		}
649 
650 		/*
651 		 * We are here because either we couldn't do inline
652 		 * processing (because something was already queued),
653 		 * or we had a chanin of more than one packet,
654 		 * or something else arrived after we were done with
655 		 * inline processing.
656 		 */
657 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
658 		ASSERT(sqp->sq_first != NULL);
659 
660 #if SQUEUE_PROFILE
661 		if (SQ_PROFILING(sqp)) {
662 			start = gethrtime();
663 		}
664 #endif
665 #if SQUEUE_DEBUG
666 		sqp->sq_isintr = interrupt;
667 #endif
668 
669 		now = gethrtime();
670 		if (interrupt) {
671 			squeue_drain(sqp, SQS_ENTER, now +
672 			    squeue_intrdrain_ns);
673 		} else {
674 			squeue_drain(sqp, SQS_USER, now +
675 			    squeue_writerdrain_ns);
676 		}
677 
678 #if SQUEUE_PROFILE
679 		if (SQ_PROFILING(sqp)) {
680 			delta = gethrtime() - start;
681 			if (interrupt)
682 				SQDELTA(sqp, sq_time_intr, delta);
683 			else
684 				SQDELTA(sqp, sq_time_other, delta);
685 		}
686 #endif
687 #if SQUEUE_DEBUG
688 		sqp->sq_isintr = 0;
689 #endif
690 
691 		/*
692 		 * If we didn't do a complete drain, the worker
693 		 * thread was already signalled by squeue_drain.
694 		 */
695 		sqp->sq_run = NULL;
696 		mutex_exit(&sqp->sq_lock);
697 		return;
698 	} else {
699 		ASSERT(sqp->sq_run != NULL);
700 		/*
701 		 * Queue is already being processed. Just enqueue
702 		 * the packet and go away.
703 		 */
704 #if SQUEUE_DEBUG
705 		mp->b_tag = tag;
706 #endif
707 #if SQUEUE_PROFILE
708 		if (SQ_PROFILING(sqp)) {
709 			if (servicing_interrupt())
710 				SQSTAT(sqp, sq_nqueued_intr);
711 			else
712 				SQSTAT(sqp, sq_nqueued_other);
713 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
714 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
715 		}
716 #endif
717 
718 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
719 		mutex_exit(&sqp->sq_lock);
720 		return;
721 	}
722 }
723 
724 /*
725  * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg.
726  */
727 void
728 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
729     uint8_t tag)
730 {
731 	int	interrupt = servicing_interrupt();
732 	hrtime_t now;
733 #if SQUEUE_PROFILE
734 	hrtime_t start, delta;
735 #endif
736 #if SQUEUE_DEBUG
737 	conn_t 	*connp = (conn_t *)arg;
738 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
739 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
740 #endif
741 
742 	ASSERT(proc != NULL);
743 	ASSERT(sqp != NULL);
744 	ASSERT(mp != NULL);
745 	ASSERT(mp->b_next == NULL);
746 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
747 
748 	mutex_enter(&sqp->sq_lock);
749 	if (!(sqp->sq_state & SQS_PROC)) {
750 		/*
751 		 * See if anything is already queued. If we are the
752 		 * first packet, do inline processing else queue the
753 		 * packet and do the drain.
754 		 */
755 		sqp->sq_run = curthread;
756 		if (sqp->sq_first == NULL) {
757 			/*
758 			 * Fast-path, ok to process and nothing queued.
759 			 */
760 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
761 			mutex_exit(&sqp->sq_lock);
762 
763 #if SQUEUE_DEBUG
764 			sqp->sq_isintr = interrupt;
765 			sqp->sq_curmp = mp;
766 			sqp->sq_curproc = proc;
767 			sqp->sq_connp = connp;
768 			mp->b_tag = sqp->sq_tag = tag;
769 #endif
770 #if SQUEUE_PROFILE
771 			if (SQ_PROFILING(sqp)) {
772 				if (interrupt)
773 					SQSTAT(sqp, sq_npackets_intr);
774 				else
775 					SQSTAT(sqp, sq_npackets_other);
776 				start = gethrtime();
777 			}
778 #endif
779 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
780 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
781 			    sqp, mblk_t *, mp, conn_t *, arg);
782 			(*proc)(arg, mp, sqp);
783 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
784 			    sqp, conn_t *, arg);
785 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
786 
787 #if SQUEUE_PROFILE
788 			if (SQ_PROFILING(sqp)) {
789 				delta = gethrtime() - start;
790 				if (interrupt)
791 					SQDELTA(sqp, sq_time_intr, delta);
792 				else
793 					SQDELTA(sqp, sq_time_other, delta);
794 			}
795 #endif
796 #if SQUEUE_DEBUG
797 			sqp->sq_curmp = NULL;
798 			sqp->sq_curproc = NULL;
799 			sqp->sq_connp = NULL;
800 			sqp->sq_isintr = 0;
801 #endif
802 
803 			CONN_DEC_REF((conn_t *)arg);
804 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
805 			mutex_enter(&sqp->sq_lock);
806 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
807 			if (sqp->sq_first == NULL) {
808 				/*
809 				 * We processed inline our packet and
810 				 * nothing new has arrived. We are done.
811 				 */
812 				sqp->sq_run = NULL;
813 				mutex_exit(&sqp->sq_lock);
814 				return;
815 			} else if (sqp->sq_bind != CPU->cpu_id) {
816 				/*
817 				 * If the current thread is not running
818 				 * on the CPU to which this squeue is bound,
819 				 * then don't allow it to drain.
820 				 */
821 				sqp->sq_run = NULL;
822 				SQUEUE_WORKER_WAKEUP(sqp);
823 				return;
824 			}
825 		} else {
826 			ENQUEUE_MP(sqp, mp, proc, arg);
827 #if SQUEUE_DEBUG
828 			mp->b_tag = tag;
829 #endif
830 #if SQUEUE_PROFILE
831 			if (SQ_PROFILING(sqp)) {
832 				if (servicing_interrupt())
833 					SQSTAT(sqp, sq_nqueued_intr);
834 				else
835 					SQSTAT(sqp, sq_nqueued_other);
836 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
837 					sqp->sq_stats.sq_max_qlen =
838 					    sqp->sq_count;
839 			}
840 #endif
841 		}
842 
843 		/*
844 		 * We are here because either we couldn't do inline
845 		 * processing (because something was already queued)
846 		 * or something else arrived after we were done with
847 		 * inline processing.
848 		 */
849 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
850 		ASSERT(sqp->sq_first != NULL);
851 
852 #if SQUEUE_PROFILE
853 		if (SQ_PROFILING(sqp)) {
854 			start = gethrtime();
855 		}
856 #endif
857 #if SQUEUE_DEBUG
858 		sqp->sq_isintr = interrupt;
859 #endif
860 
861 		now = gethrtime();
862 		if (interrupt) {
863 			squeue_drain(sqp, SQS_ENTER, now +
864 			    squeue_intrdrain_ns);
865 		} else {
866 			squeue_drain(sqp, SQS_USER, now +
867 			    squeue_writerdrain_ns);
868 		}
869 
870 #if SQUEUE_PROFILE
871 		if (SQ_PROFILING(sqp)) {
872 			delta = gethrtime() - start;
873 			if (interrupt)
874 				SQDELTA(sqp, sq_time_intr, delta);
875 			else
876 				SQDELTA(sqp, sq_time_other, delta);
877 		}
878 #endif
879 #if SQUEUE_DEBUG
880 		sqp->sq_isintr = 0;
881 #endif
882 
883 		/*
884 		 * If we didn't do a complete drain, the worker
885 		 * thread was already signalled by squeue_drain.
886 		 */
887 		sqp->sq_run = NULL;
888 		mutex_exit(&sqp->sq_lock);
889 		return;
890 	} else {
891 		ASSERT(sqp->sq_run != NULL);
892 		/*
893 		 * We let a thread processing a squeue reenter only
894 		 * once. This helps the case of incoming connection
895 		 * where a SYN-ACK-ACK that triggers the conn_ind
896 		 * doesn't have to queue the packet if listener and
897 		 * eager are on the same squeue. Also helps the
898 		 * loopback connection where the two ends are bound
899 		 * to the same squeue (which is typical on single
900 		 * CPU machines).
901 		 * We let the thread reenter only once for the fear
902 		 * of stack getting blown with multiple traversal.
903 		 */
904 		if (!(sqp->sq_state & SQS_REENTER) &&
905 		    (sqp->sq_run == curthread) &&
906 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
907 			sqp->sq_state |= SQS_REENTER;
908 			mutex_exit(&sqp->sq_lock);
909 
910 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
911 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
912 			    sqp, mblk_t *, mp, conn_t *, arg);
913 			(*proc)(arg, mp, sqp);
914 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
915 			    sqp, conn_t *, arg);
916 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
917 			CONN_DEC_REF((conn_t *)arg);
918 
919 			mutex_enter(&sqp->sq_lock);
920 			sqp->sq_state &= ~SQS_REENTER;
921 			mutex_exit(&sqp->sq_lock);
922 			return;
923 		}
924 		/*
925 		 * Queue is already being processed. Just enqueue
926 		 * the packet and go away.
927 		 */
928 #if SQUEUE_DEBUG
929 		mp->b_tag = tag;
930 #endif
931 #if SQUEUE_PROFILE
932 		if (SQ_PROFILING(sqp)) {
933 			if (servicing_interrupt())
934 				SQSTAT(sqp, sq_nqueued_intr);
935 			else
936 				SQSTAT(sqp, sq_nqueued_other);
937 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
938 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
939 		}
940 #endif
941 
942 		ENQUEUE_MP(sqp, mp, proc, arg);
943 		mutex_exit(&sqp->sq_lock);
944 		return;
945 	}
946 }
947 
948 void
949 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
950     uint8_t tag)
951 {
952 	int		interrupt = servicing_interrupt();
953 	boolean_t	being_processed;
954 #if SQUEUE_DEBUG
955 	conn_t 		*connp = (conn_t *)arg;
956 #endif
957 #if SQUEUE_PROFILE
958 	hrtime_t 	start, delta;
959 #endif
960 
961 	ASSERT(proc != NULL);
962 	ASSERT(sqp != NULL);
963 	ASSERT(mp != NULL);
964 	ASSERT(mp->b_next == NULL);
965 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
966 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
967 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
968 
969 	mutex_enter(&sqp->sq_lock);
970 
971 	being_processed = (sqp->sq_state & SQS_PROC);
972 	if (!being_processed && (sqp->sq_first == NULL)) {
973 		/*
974 		 * Fast-path, ok to process and nothing queued.
975 		 */
976 		sqp->sq_state |= (SQS_PROC|SQS_FAST);
977 		sqp->sq_run = curthread;
978 		mutex_exit(&sqp->sq_lock);
979 
980 #if SQUEUE_DEBUG
981 		sqp->sq_isintr = interrupt;
982 		sqp->sq_curmp = mp;
983 		sqp->sq_curproc = proc;
984 		sqp->sq_connp = connp;
985 		mp->b_tag = sqp->sq_tag = tag;
986 #endif
987 
988 #if SQUEUE_PROFILE
989 		if (SQ_PROFILING(sqp)) {
990 			if (interrupt)
991 				SQSTAT(sqp, sq_npackets_intr);
992 			else
993 				SQSTAT(sqp, sq_npackets_other);
994 			start = gethrtime();
995 		}
996 #endif
997 
998 		((conn_t *)arg)->conn_on_sqp = B_TRUE;
999 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1000 		    sqp, mblk_t *, mp, conn_t *, arg);
1001 		(*proc)(arg, mp, sqp);
1002 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1003 		    sqp, conn_t *, arg);
1004 		((conn_t *)arg)->conn_on_sqp = B_FALSE;
1005 
1006 #if SQUEUE_DEBUG
1007 		sqp->sq_curmp = NULL;
1008 		sqp->sq_curproc = NULL;
1009 		sqp->sq_connp = NULL;
1010 		sqp->sq_isintr = 0;
1011 #endif
1012 #if SQUEUE_PROFILE
1013 		if (SQ_PROFILING(sqp)) {
1014 			delta = gethrtime() - start;
1015 			if (interrupt)
1016 				SQDELTA(sqp, sq_time_intr, delta);
1017 			else
1018 				SQDELTA(sqp, sq_time_other, delta);
1019 		}
1020 #endif
1021 
1022 		CONN_DEC_REF((conn_t *)arg);
1023 		mutex_enter(&sqp->sq_lock);
1024 		sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
1025 		sqp->sq_run = NULL;
1026 		if (sqp->sq_first == NULL) {
1027 			/*
1028 			 * We processed inline our packet and
1029 			 * nothing new has arrived. We are done.
1030 			 */
1031 			mutex_exit(&sqp->sq_lock);
1032 		} else {
1033 			SQUEUE_WORKER_WAKEUP(sqp);
1034 		}
1035 		return;
1036 	} else {
1037 		/*
1038 		 * We let a thread processing a squeue reenter only
1039 		 * once. This helps the case of incoming connection
1040 		 * where a SYN-ACK-ACK that triggers the conn_ind
1041 		 * doesn't have to queue the packet if listener and
1042 		 * eager are on the same squeue. Also helps the
1043 		 * loopback connection where the two ends are bound
1044 		 * to the same squeue (which is typical on single
1045 		 * CPU machines).
1046 		 * We let the thread reenter only once for the fear
1047 		 * of stack getting blown with multiple traversal.
1048 		 */
1049 		if (being_processed && !(sqp->sq_state & SQS_REENTER) &&
1050 		    (sqp->sq_run == curthread) &&
1051 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
1052 			sqp->sq_state |= SQS_REENTER;
1053 			mutex_exit(&sqp->sq_lock);
1054 
1055 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
1056 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1057 			    sqp, mblk_t *, mp, conn_t *, arg);
1058 			(*proc)(arg, mp, sqp);
1059 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1060 			    sqp, conn_t *, arg);
1061 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
1062 			CONN_DEC_REF((conn_t *)arg);
1063 
1064 			mutex_enter(&sqp->sq_lock);
1065 			sqp->sq_state &= ~SQS_REENTER;
1066 			mutex_exit(&sqp->sq_lock);
1067 			return;
1068 		}
1069 
1070 #if SQUEUE_DEBUG
1071 		mp->b_tag = tag;
1072 #endif
1073 #if SQUEUE_PROFILE
1074 		if (SQ_PROFILING(sqp)) {
1075 			if (servicing_interrupt())
1076 				SQSTAT(sqp, sq_nqueued_intr);
1077 			else
1078 				SQSTAT(sqp, sq_nqueued_other);
1079 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1080 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1081 		}
1082 #endif
1083 		ENQUEUE_MP(sqp, mp, proc, arg);
1084 		if (being_processed) {
1085 			/*
1086 			 * Queue is already being processed.
1087 			 * No need to do anything.
1088 			 */
1089 			mutex_exit(&sqp->sq_lock);
1090 			return;
1091 		}
1092 		SQUEUE_WORKER_WAKEUP(sqp);
1093 	}
1094 }
1095 
1096 /*
1097  * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg
1098  * without processing the squeue.
1099  */
1100 /* ARGSUSED */
1101 void
1102 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg,
1103     uint8_t tag)
1104 {
1105 #if SQUEUE_DEBUG
1106 	conn_t *connp = (conn_t *)arg;
1107 #endif
1108 	ASSERT(proc != NULL);
1109 	ASSERT(sqp != NULL);
1110 	ASSERT(mp != NULL);
1111 	ASSERT(mp->b_next == NULL);
1112 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
1113 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
1114 
1115 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
1116 	mutex_enter(&sqp->sq_lock);
1117 	ENQUEUE_MP(sqp, mp, proc, arg);
1118 #if SQUEUE_DEBUG
1119 	mp->b_tag = tag;
1120 #endif
1121 #if SQUEUE_PROFILE
1122 	if (SQ_PROFILING(sqp)) {
1123 		if (servicing_interrupt())
1124 			SQSTAT(sqp, sq_nqueued_intr);
1125 		else
1126 			SQSTAT(sqp, sq_nqueued_other);
1127 		if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1128 			sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1129 	}
1130 #endif
1131 
1132 	/*
1133 	 * If queue is already being processed. No need to do anything.
1134 	 */
1135 	if (sqp->sq_state & SQS_PROC) {
1136 		mutex_exit(&sqp->sq_lock);
1137 		return;
1138 	}
1139 
1140 	SQUEUE_WORKER_WAKEUP(sqp);
1141 }
1142 
1143 
1144 /*
1145  * PRIVATE FUNCTIONS
1146  */
1147 
1148 static void
1149 squeue_fire(void *arg)
1150 {
1151 	squeue_t	*sqp = arg;
1152 	uint_t		state;
1153 
1154 	mutex_enter(&sqp->sq_lock);
1155 
1156 	state = sqp->sq_state;
1157 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
1158 		mutex_exit(&sqp->sq_lock);
1159 		return;
1160 	}
1161 
1162 	sqp->sq_tid = 0;
1163 	/*
1164 	 * The timeout fired before we got a chance to set it.
1165 	 * Process it anyway but remove the SQS_TMO_PROG so that
1166 	 * the guy trying to set the timeout knows that it has
1167 	 * already been processed.
1168 	 */
1169 	if (state & SQS_TMO_PROG)
1170 		sqp->sq_state &= ~SQS_TMO_PROG;
1171 
1172 	if (!(state & SQS_PROC)) {
1173 		sqp->sq_awaken = lbolt;
1174 		cv_signal(&sqp->sq_async);
1175 	}
1176 	mutex_exit(&sqp->sq_lock);
1177 }
1178 
1179 static void
1180 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
1181 {
1182 	mblk_t	*mp;
1183 	mblk_t 	*head;
1184 	sqproc_t proc;
1185 	conn_t	*connp;
1186 	clock_t	start = lbolt;
1187 	clock_t	drain_time;
1188 	timeout_id_t tid;
1189 	uint_t	cnt;
1190 	uint_t	total_cnt = 0;
1191 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
1192 	int	interrupt = servicing_interrupt();
1193 	boolean_t poll_on = B_FALSE;
1194 	hrtime_t now;
1195 
1196 	ASSERT(mutex_owned(&sqp->sq_lock));
1197 	ASSERT(!(sqp->sq_state & SQS_PROC));
1198 
1199 #if SQUEUE_PROFILE
1200 	if (SQ_PROFILING(sqp)) {
1201 		if (interrupt)
1202 			SQSTAT(sqp, sq_ndrains_intr);
1203 		else if (!(proc_type & SQS_WORKER))
1204 			SQSTAT(sqp, sq_ndrains_other);
1205 		else
1206 			SQSTAT(sqp, sq_ndrains_worker);
1207 	}
1208 #endif
1209 
1210 	if ((tid = sqp->sq_tid) != 0)
1211 		sqp->sq_tid = 0;
1212 
1213 	sqp->sq_state |= SQS_PROC | proc_type;
1214 	head = sqp->sq_first;
1215 	sqp->sq_first = NULL;
1216 	sqp->sq_last = NULL;
1217 	cnt = sqp->sq_count;
1218 
1219 	/*
1220 	 * We have backlog built up. Switch to polling mode if the
1221 	 * device underneath allows it. Need to do it only for
1222 	 * drain by non-interrupt thread so interrupts don't
1223 	 * come and disrupt us in between. If its a interrupt thread,
1224 	 * no need because most devices will not issue another
1225 	 * interrupt till this one returns.
1226 	 */
1227 	if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) &&
1228 		(sqp->sq_count > squeue_worker_poll_min)) {
1229 		ASSERT(sq_rx_ring != NULL);
1230 		SQS_POLLING_ON(sqp, sq_rx_ring);
1231 		poll_on = B_TRUE;
1232 	}
1233 
1234 	mutex_exit(&sqp->sq_lock);
1235 
1236 	if (tid != 0)
1237 		(void) untimeout(tid);
1238 again:
1239 	while ((mp = head) != NULL) {
1240 		head = mp->b_next;
1241 		mp->b_next = NULL;
1242 
1243 		proc = (sqproc_t)mp->b_queue;
1244 		mp->b_queue = NULL;
1245 		connp = (conn_t *)mp->b_prev;
1246 		mp->b_prev = NULL;
1247 #if SQUEUE_DEBUG
1248 		sqp->sq_curmp = mp;
1249 		sqp->sq_curproc = proc;
1250 		sqp->sq_connp = connp;
1251 		sqp->sq_tag = mp->b_tag;
1252 #endif
1253 
1254 #if SQUEUE_PROFILE
1255 		if (SQ_PROFILING(sqp)) {
1256 			if (interrupt)
1257 				SQSTAT(sqp, sq_npackets_intr);
1258 			else if (!(proc_type & SQS_WORKER))
1259 				SQSTAT(sqp, sq_npackets_other);
1260 			else
1261 				SQSTAT(sqp, sq_npackets_worker);
1262 		}
1263 #endif
1264 
1265 		connp->conn_on_sqp = B_TRUE;
1266 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1267 		    sqp, mblk_t *, mp, conn_t *, connp);
1268 		(*proc)(connp, mp, sqp);
1269 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1270 		    sqp, conn_t *, connp);
1271 		connp->conn_on_sqp = B_FALSE;
1272 		CONN_DEC_REF(connp);
1273 	}
1274 
1275 
1276 #if SQUEUE_DEBUG
1277 	sqp->sq_curmp = NULL;
1278 	sqp->sq_curproc = NULL;
1279 	sqp->sq_connp = NULL;
1280 #endif
1281 
1282 	mutex_enter(&sqp->sq_lock);
1283 	sqp->sq_count -= cnt;
1284 	total_cnt += cnt;
1285 
1286 	if (sqp->sq_first != NULL) {
1287 
1288 		now = gethrtime();
1289 		if (!expire || (now < expire)) {
1290 			/* More arrived and time not expired */
1291 			head = sqp->sq_first;
1292 			sqp->sq_first = NULL;
1293 			sqp->sq_last = NULL;
1294 			cnt = sqp->sq_count;
1295 			mutex_exit(&sqp->sq_lock);
1296 			goto again;
1297 		}
1298 
1299 		/*
1300 		 * If we are not worker thread and we
1301 		 * reached our time limit to do drain,
1302 		 * signal the worker thread to pick
1303 		 * up the work.
1304 		 * If we were the worker thread, then
1305 		 * we take a break to allow an interrupt
1306 		 * or writer to pick up the load.
1307 		 */
1308 		if (proc_type != SQS_WORKER) {
1309 			sqp->sq_awaken = lbolt;
1310 			cv_signal(&sqp->sq_async);
1311 		}
1312 	}
1313 
1314 	/*
1315 	 * Try to see if we can get a time estimate to process a packet.
1316 	 * Do it only in interrupt context since less chance of context
1317 	 * switch or pinning etc. to get a better estimate.
1318 	 */
1319 	if (interrupt && ((drain_time = (lbolt - start)) > 0))
1320 		sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) +
1321 		    (20 * (drv_hztousec(drain_time)/total_cnt)))/100;
1322 
1323 	sqp->sq_state &= ~(SQS_PROC | proc_type);
1324 
1325 	/*
1326 	 * If polling was turned on, turn it off and reduce the default
1327 	 * interrupt blank interval as well to bring new packets in faster
1328 	 * (reduces the latency when there is no backlog).
1329 	 */
1330 	if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) {
1331 		ASSERT(sq_rx_ring != NULL);
1332 		SQS_POLLING_OFF(sqp, sq_rx_ring);
1333 	}
1334 }
1335 
1336 static void
1337 squeue_worker(squeue_t *sqp)
1338 {
1339 	kmutex_t *lock = &sqp->sq_lock;
1340 	kcondvar_t *async = &sqp->sq_async;
1341 	callb_cpr_t cprinfo;
1342 	hrtime_t now;
1343 #if SQUEUE_PROFILE
1344 	hrtime_t start;
1345 #endif
1346 
1347 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca");
1348 	mutex_enter(lock);
1349 
1350 	for (;;) {
1351 		while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) {
1352 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1353 still_wait:
1354 			cv_wait(async, lock);
1355 			if (sqp->sq_state & SQS_PROC) {
1356 				goto still_wait;
1357 			}
1358 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1359 		}
1360 
1361 #if SQUEUE_PROFILE
1362 		if (SQ_PROFILING(sqp)) {
1363 			start = gethrtime();
1364 		}
1365 #endif
1366 
1367 		ASSERT(squeue_workerdrain_ns != 0);
1368 		now = gethrtime();
1369 		sqp->sq_run = curthread;
1370 		squeue_drain(sqp, SQS_WORKER, now +  squeue_workerdrain_ns);
1371 		sqp->sq_run = NULL;
1372 
1373 		if (sqp->sq_first != NULL) {
1374 			/*
1375 			 * Doing too much processing by worker thread
1376 			 * in presense of interrupts can be sub optimal.
1377 			 * Instead, once a drain is done by worker thread
1378 			 * for squeue_writerdrain_ns (the reason we are
1379 			 * here), we force wait for squeue_workerwait_tick
1380 			 * before doing more processing even if sq_wait is
1381 			 * set to 0.
1382 			 *
1383 			 * This can be counterproductive for performance
1384 			 * if worker thread is the only means to process
1385 			 * the packets (interrupts or writers are not
1386 			 * allowed inside the squeue).
1387 			 */
1388 			if (sqp->sq_tid == 0 &&
1389 			    !(sqp->sq_state & SQS_TMO_PROG)) {
1390 				timeout_id_t	tid;
1391 
1392 				sqp->sq_state |= SQS_TMO_PROG;
1393 				mutex_exit(&sqp->sq_lock);
1394 				tid = timeout(squeue_fire, sqp,
1395 				    squeue_workerwait_tick);
1396 				mutex_enter(&sqp->sq_lock);
1397 				/*
1398 				 * Check again if we still need
1399 				 * the timeout
1400 				 */
1401 				if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC))
1402 				    == SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
1403 				    (sqp->sq_first != NULL)) {
1404 					sqp->sq_state &= ~SQS_TMO_PROG;
1405 					sqp->sq_awaken = lbolt;
1406 					sqp->sq_tid = tid;
1407 				} else if (sqp->sq_state & SQS_TMO_PROG) {
1408 					/* timeout not needed */
1409 					sqp->sq_state &= ~SQS_TMO_PROG;
1410 					mutex_exit(&(sqp)->sq_lock);
1411 					(void) untimeout(tid);
1412 					mutex_enter(&sqp->sq_lock);
1413 				}
1414 			}
1415 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1416 			cv_wait(async, lock);
1417 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1418 		}
1419 
1420 
1421 #if SQUEUE_PROFILE
1422 		if (SQ_PROFILING(sqp)) {
1423 			SQDELTA(sqp, sq_time_worker, gethrtime() - start);
1424 		}
1425 #endif
1426 	}
1427 }
1428 
1429 #if SQUEUE_PROFILE
1430 static int
1431 squeue_kstat_update(kstat_t *ksp, int rw)
1432 {
1433 	struct squeue_kstat *sqsp = &squeue_kstat;
1434 	squeue_t *sqp = ksp->ks_private;
1435 
1436 	if (rw == KSTAT_WRITE)
1437 		return (EACCES);
1438 
1439 #if SQUEUE_DEBUG
1440 	sqsp->sq_count.value.ui64 = sqp->sq_count;
1441 	sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen;
1442 #endif
1443 	sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker;
1444 	sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr;
1445 	sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other;
1446 	sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr;
1447 	sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other;
1448 	sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker;
1449 	sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr;
1450 	sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other;
1451 	sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker;
1452 	sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr;
1453 	sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other;
1454 	return (0);
1455 }
1456 #endif
1457 
1458 void
1459 squeue_profile_enable(squeue_t *sqp)
1460 {
1461 	mutex_enter(&sqp->sq_lock);
1462 	sqp->sq_state |= SQS_PROFILE;
1463 	mutex_exit(&sqp->sq_lock);
1464 }
1465 
1466 void
1467 squeue_profile_disable(squeue_t *sqp)
1468 {
1469 	mutex_enter(&sqp->sq_lock);
1470 	sqp->sq_state &= ~SQS_PROFILE;
1471 	mutex_exit(&sqp->sq_lock);
1472 }
1473 
1474 void
1475 squeue_profile_reset(squeue_t *sqp)
1476 {
1477 #if SQUEUE_PROFILE
1478 	bzero(&sqp->sq_stats, sizeof (sqstat_t));
1479 #endif
1480 }
1481 
1482 void
1483 squeue_profile_start(void)
1484 {
1485 #if SQUEUE_PROFILE
1486 	squeue_profile = B_TRUE;
1487 #endif
1488 }
1489 
1490 void
1491 squeue_profile_stop(void)
1492 {
1493 #if SQUEUE_PROFILE
1494 	squeue_profile = B_FALSE;
1495 #endif
1496 }
1497 
1498 uintptr_t *
1499 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1500 {
1501 	ASSERT(p < SQPRIVATE_MAX);
1502 
1503 	return (&sqp->sq_private[p]);
1504 }
1505 
1506 processorid_t
1507 squeue_binding(squeue_t *sqp)
1508 {
1509 	return (sqp->sq_bind);
1510 }
1511