xref: /illumos-gate/usr/src/uts/common/inet/squeue.c (revision 06e1a714)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 /*
29  * Squeues - TCP/IP serialization mechanism.
30  *
31  * This is a general purpose high-performance serialization mechanism. It is
32  * similar to a taskq with a single worker thread, the difference is that it
33  * does not imply a context switch - the thread placing a request may actually
34  * process it. It is also biased for processing requests in interrupt context.
35  *
36  * Each squeue has a worker thread which may optionally be bound to a CPU.
37  *
38  * Only one thread may process requests from a given squeue at any time. This is
39  * called "entering" squeue.
40  *
41  * Each dispatched request is processed either by
42  *
43  *	a) Dispatching thread or
44  *	b) Some other thread that is currently processing squeue at the time of
45  *		request or
46  *	c) worker thread.
47  *
48  * INTERFACES:
49  *
50  * squeue_t *squeue_create(name, bind, wait, pri)
51  *
52  *	name: symbolic name for squeue.
53  *	wait: time to wait before waiking the worker thread after queueing
54  *		request.
55  *	bind: preferred CPU binding for the worker thread.
56  *	pri:  thread priority for the worker thread.
57  *
58  *   This function never fails and may sleep. It returns a transparent pointer
59  *   to the squeue_t structure that is passed to all other squeue operations.
60  *
61  * void squeue_bind(sqp, bind)
62  *
63  *   Bind squeue worker thread to a CPU specified by the 'bind' argument. The
64  *   'bind' value of -1 binds to the preferred thread specified for
65  *   squeue_create.
66  *
67  *   NOTE: Any value of 'bind' other then -1 is not supported currently, but the
68  *	 API is present - in the future it may be useful to specify different
69  *	 binding.
70  *
71  * void squeue_unbind(sqp)
72  *
73  *   Unbind the worker thread from its preferred CPU.
74  *
75  * void squeue_enter(*sqp, *mp, proc, arg, tag)
76  *
77  *   Post a single request for processing. Each request consists of mblock 'mp',
78  *   function 'proc' to execute and an argument 'arg' to pass to this
79  *   function. The function is called as (*proc)(arg, mp, sqp); The tag is an
80  *   arbitrary number from 0 to 255 which will be stored in mp to track exact
81  *   caller of squeue_enter. The combination of function name and the tag should
82  *   provide enough information to identify the caller.
83  *
84  *   If no one is processing the squeue, squeue_enter() will call the function
85  *   immediately. Otherwise it will add the request to the queue for later
86  *   processing. Once the function is executed, the thread may continue
87  *   executing all other requests pending on the queue.
88  *
89  *   NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1.
90  *   NOTE: The argument can be conn_t only. Ideally we'd like to have generic
91  *	   argument, but we want to drop connection reference count here - this
92  *	   improves tail-call optimizations.
93  *	   XXX: The arg should have type conn_t.
94  *
95  * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag)
96  *
97  *   Same as squeue_enter(), but the entering thread will only try to execute a
98  *   single request. It will not continue executing any pending requests.
99  *
100  * void squeue_fill(*sqp, *mp, proc, arg, tag)
101  *
102  *   Just place the request on the queue without trying to execute it. Arrange
103  *   for the worker thread to process the request.
104  *
105  * void squeue_profile_enable(sqp)
106  * void squeue_profile_disable(sqp)
107  *
108  *    Enable or disable profiling for specified 'sqp'. Profiling is only
109  *    available when SQUEUE_PROFILE is set.
110  *
111  * void squeue_profile_reset(sqp)
112  *
113  *    Reset all profiling information to zero. Profiling is only
114  *    available when SQUEUE_PROFILE is set.
115  *
116  * void squeue_profile_start()
117  * void squeue_profile_stop()
118  *
119  *    Globally enable or disabled profiling for all squeues.
120  *
121  * uintptr_t *squeue_getprivate(sqp, p)
122  *
123  *    Each squeue keeps small amount of private data space available for various
124  *    consumers. Current consumers include TCP and NCA. Other consumers need to
125  *    add their private tag to the sqprivate_t enum. The private information is
126  *    limited to an uintptr_t value. The squeue has no knowledge of its content
127  *    and does not manage it in any way.
128  *
129  *    The typical use may be a breakdown of data structures per CPU (since
130  *    squeues are usually per CPU). See NCA for examples of use.
131  *    Currently 'p' may have one legal value SQPRIVATE_TCP.
132  *
133  * processorid_t squeue_binding(sqp)
134  *
135  *    Returns the CPU binding for a given squeue.
136  *
137  * TUNABALES:
138  *
139  * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any
140  *	squeue. Note that this is approximation - squeues have no control on the
141  *	time it takes to process each request. This limit is only checked
142  *	between processing individual messages.
143  *    Default: 20 ms.
144  *
145  * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any
146  *	squeue. Note that this is approximation - squeues have no control on the
147  *	time it takes to process each request. This limit is only checked
148  *	between processing individual messages.
149  *    Default: 10 ms.
150  *
151  * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any
152  *	squeue. Note that this is approximation - squeues have no control on the
153  *	time it takes to process each request. This limit is only checked
154  *	between processing individual messages.
155  *    Default: 10 ms.
156  *
157  * squeue_workerwait_ms: When worker thread is interrupted because workerdrain
158  *	expired, how much time to wait before waking worker thread again.
159  *    Default: 10 ms.
160  */
161 
162 #include <sys/types.h>
163 #include <sys/cmn_err.h>
164 #include <sys/debug.h>
165 #include <sys/kmem.h>
166 #include <sys/cpuvar.h>
167 #include <sys/condvar_impl.h>
168 #include <sys/systm.h>
169 #include <sys/callb.h>
170 #include <sys/sdt.h>
171 #include <sys/ddi.h>
172 
173 #include <inet/ipclassifier.h>
174 #include <inet/udp_impl.h>
175 
176 /*
177  * State flags.
178  * Note: The MDB IP module depends on the values of these flags.
179  */
180 #define	SQS_PROC	0x0001	/* being processed */
181 #define	SQS_WORKER	0x0002	/* worker thread */
182 #define	SQS_ENTER	0x0004	/* enter thread */
183 #define	SQS_FAST	0x0008	/* enter-fast thread */
184 #define	SQS_USER	0x0010	/* A non interrupt user */
185 #define	SQS_BOUND	0x0020	/* Worker thread is bound */
186 #define	SQS_PROFILE	0x0040	/* Enable profiling */
187 #define	SQS_REENTER	0x0080	/* Re entered thread */
188 #define	SQS_TMO_PROG	0x0100	/* Timeout is being set */
189 
190 #include <sys/squeue_impl.h>
191 
192 static void squeue_fire(void *);
193 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
194 static void squeue_worker(squeue_t *sqp);
195 
196 #if SQUEUE_PROFILE
197 static kmutex_t squeue_kstat_lock;
198 static int  squeue_kstat_update(kstat_t *, int);
199 #endif
200 
201 kmem_cache_t *squeue_cache;
202 
203 #define	SQUEUE_MSEC_TO_NSEC 1000000
204 
205 int squeue_intrdrain_ms = 20;
206 int squeue_writerdrain_ms = 10;
207 int squeue_workerdrain_ms = 10;
208 int squeue_workerwait_ms = 10;
209 
210 /* The values above converted to ticks or nano seconds */
211 static int squeue_intrdrain_ns = 0;
212 static int squeue_writerdrain_ns = 0;
213 static int squeue_workerdrain_ns = 0;
214 static int squeue_workerwait_tick = 0;
215 
216 /*
217  * The minimum packet queued when worker thread doing the drain triggers
218  * polling (if squeue allows it). The choice of 3 is arbitrary. You
219  * definitely don't want it to be 1 since that will trigger polling
220  * on very low loads as well (ssh seems to do be one such example
221  * where packet flow was very low yet somehow 1 packet ended up getting
222  * queued and worker thread fires every 10ms and blanking also gets
223  * triggered.
224  */
225 int squeue_worker_poll_min = 3;
226 
227 #if SQUEUE_PROFILE
228 /*
229  * Set to B_TRUE to enable profiling.
230  */
231 static int squeue_profile = B_FALSE;
232 #define	SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE))
233 
234 #define	SQSTAT(sqp, x) ((sqp)->sq_stats.x++)
235 #define	SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d))
236 
237 struct squeue_kstat {
238 	kstat_named_t	sq_count;
239 	kstat_named_t	sq_max_qlen;
240 	kstat_named_t	sq_npackets_worker;
241 	kstat_named_t	sq_npackets_intr;
242 	kstat_named_t	sq_npackets_other;
243 	kstat_named_t	sq_nqueued_intr;
244 	kstat_named_t	sq_nqueued_other;
245 	kstat_named_t	sq_ndrains_worker;
246 	kstat_named_t	sq_ndrains_intr;
247 	kstat_named_t	sq_ndrains_other;
248 	kstat_named_t	sq_time_worker;
249 	kstat_named_t	sq_time_intr;
250 	kstat_named_t	sq_time_other;
251 } squeue_kstat = {
252 	{ "count",		KSTAT_DATA_UINT64 },
253 	{ "max_qlen",		KSTAT_DATA_UINT64 },
254 	{ "packets_worker",	KSTAT_DATA_UINT64 },
255 	{ "packets_intr",	KSTAT_DATA_UINT64 },
256 	{ "packets_other",	KSTAT_DATA_UINT64 },
257 	{ "queued_intr",	KSTAT_DATA_UINT64 },
258 	{ "queued_other",	KSTAT_DATA_UINT64 },
259 	{ "ndrains_worker",	KSTAT_DATA_UINT64 },
260 	{ "ndrains_intr",	KSTAT_DATA_UINT64 },
261 	{ "ndrains_other",	KSTAT_DATA_UINT64 },
262 	{ "time_worker",	KSTAT_DATA_UINT64 },
263 	{ "time_intr",		KSTAT_DATA_UINT64 },
264 	{ "time_other",		KSTAT_DATA_UINT64 },
265 };
266 #endif
267 
268 #define	SQUEUE_WORKER_WAKEUP(sqp) {					\
269 	timeout_id_t tid = (sqp)->sq_tid;				\
270 									\
271 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));				\
272 	/*								\
273 	 * Queue isn't being processed, so take				\
274 	 * any post enqueue actions needed before leaving.		\
275 	 */								\
276 	if (tid != 0) {							\
277 		/*							\
278 		 * Waiting for an enter() to process mblk(s).		\
279 		 */							\
280 		clock_t	waited = lbolt - (sqp)->sq_awaken;		\
281 									\
282 		if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) {		\
283 			/*						\
284 			 * Times up and have a worker thread		\
285 			 * waiting for work, so schedule it.		\
286 			 */						\
287 			(sqp)->sq_tid = 0;				\
288 			(sqp)->sq_awaken = lbolt;			\
289 			cv_signal(&(sqp)->sq_async);			\
290 			mutex_exit(&(sqp)->sq_lock);			\
291 			(void) untimeout(tid);				\
292 			return;						\
293 		}							\
294 		mutex_exit(&(sqp)->sq_lock);				\
295 		return;							\
296 	} else if ((sqp)->sq_state & SQS_TMO_PROG) {			\
297 		mutex_exit(&(sqp)->sq_lock);				\
298 		return;							\
299 	} else if ((sqp)->sq_wait != 0) {				\
300 		clock_t	wait = (sqp)->sq_wait;				\
301 		/*							\
302 		 * Wait up to sqp->sq_wait ms for an			\
303 		 * enter() to process this queue. We			\
304 		 * don't want to contend on timeout locks		\
305 		 * with sq_lock held for performance reasons,		\
306 		 * so drop the sq_lock before calling timeout		\
307 		 * but we need to check if timeout is required		\
308 		 * after re acquiring the sq_lock. Once			\
309 		 * the sq_lock is dropped, someone else could		\
310 		 * have processed the packet or the timeout could	\
311 		 * have already fired.					\
312 		 */							\
313 		(sqp)->sq_state |= SQS_TMO_PROG;			\
314 		mutex_exit(&(sqp)->sq_lock);				\
315 		tid = timeout(squeue_fire, (sqp), wait);		\
316 		mutex_enter(&(sqp)->sq_lock);				\
317 		/* Check again if we still need the timeout */		\
318 		if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==	\
319 			SQS_TMO_PROG) && ((sqp)->sq_tid == 0) &&	\
320 			((sqp)->sq_first != NULL)) {			\
321 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
322 				(sqp)->sq_awaken = lbolt;		\
323 				(sqp)->sq_tid = tid;			\
324 				mutex_exit(&(sqp)->sq_lock);		\
325 				return;					\
326 		} else {						\
327 			if ((sqp)->sq_state & SQS_TMO_PROG) {		\
328 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
329 				mutex_exit(&(sqp)->sq_lock);		\
330 				(void) untimeout(tid);			\
331 			} else {					\
332 				/*					\
333 				 * The timer fired before we could 	\
334 				 * reacquire the sq_lock. squeue_fire	\
335 				 * removes the SQS_TMO_PROG flag	\
336 				 * and we don't need to	do anything	\
337 				 * else.				\
338 				 */					\
339 				mutex_exit(&(sqp)->sq_lock);		\
340 			}						\
341 		}							\
342 	} else {							\
343 		/*							\
344 		 * Schedule the worker thread.				\
345 		 */							\
346 		(sqp)->sq_awaken = lbolt;				\
347 		cv_signal(&(sqp)->sq_async);				\
348 		mutex_exit(&(sqp)->sq_lock);				\
349 	}								\
350 	ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); 			\
351 }
352 
353 #define	ENQUEUE_MP(sqp, mp, proc, arg) {			\
354 	/*							\
355 	 * Enque our mblk.					\
356 	 */							\
357 	(mp)->b_queue = NULL;					\
358 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
359 	ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); 	\
360 	(mp)->b_queue = (queue_t *)(proc);			\
361 	(mp)->b_prev = (mblk_t *)(arg);				\
362 								\
363 	if ((sqp)->sq_last != NULL)				\
364 		(sqp)->sq_last->b_next = (mp);			\
365 	else							\
366 		(sqp)->sq_first = (mp);				\
367 	(sqp)->sq_last = (mp);					\
368 	(sqp)->sq_count++;					\
369 	ASSERT((sqp)->sq_count > 0);				\
370 	DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp,		\
371 	    mblk_t *, mp);					\
372 }
373 
374 
375 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
376 	/*							\
377 	 * Enqueue our mblk chain.				\
378 	 */							\
379 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
380 								\
381 	if ((sqp)->sq_last != NULL)				\
382 		(sqp)->sq_last->b_next = (mp);			\
383 	else							\
384 		(sqp)->sq_first = (mp);				\
385 	(sqp)->sq_last = (tail);				\
386 	(sqp)->sq_count += (cnt);				\
387 	ASSERT((sqp)->sq_count > 0);				\
388 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
389 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
390 								\
391 }
392 
393 #define	SQS_POLLING_ON(sqp, rx_ring) {				\
394 	ASSERT(rx_ring != NULL);				\
395 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
396 	rx_ring->rr_blank(rx_ring->rr_handle,			\
397 	    MIN((sqp->sq_avg_drain_time * sqp->sq_count),	\
398 		rx_ring->rr_max_blank_time),			\
399 		rx_ring->rr_max_pkt_cnt);			\
400 	rx_ring->rr_poll_state |= ILL_POLLING;			\
401 	rx_ring->rr_poll_time = lbolt;				\
402 }
403 
404 
405 #define	SQS_POLLING_OFF(sqp, rx_ring) {				\
406 	ASSERT(rx_ring != NULL);				\
407 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
408 	rx_ring->rr_blank(rx_ring->rr_handle,			\
409 	    rx_ring->rr_min_blank_time,				\
410 	    rx_ring->rr_min_pkt_cnt);				\
411 }
412 
413 void
414 squeue_init(void)
415 {
416 	squeue_cache = kmem_cache_create("squeue_cache",
417 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
418 
419 	squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC;
420 	squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC;
421 	squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC;
422 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
423 }
424 
425 /* ARGSUSED */
426 squeue_t *
427 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri)
428 {
429 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
430 
431 	bzero(sqp, sizeof (squeue_t));
432 	(void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1);
433 	sqp->sq_name[SQ_NAMELEN] = '\0';
434 
435 	sqp->sq_bind = bind;
436 	sqp->sq_wait = MSEC_TO_TICK(wait);
437 	sqp->sq_avg_drain_time =
438 	    drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) /
439 	    NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns);
440 
441 #if SQUEUE_PROFILE
442 	if ((sqp->sq_kstat = kstat_create("ip", bind, name,
443 		"net", KSTAT_TYPE_NAMED,
444 		sizeof (squeue_kstat) / sizeof (kstat_named_t),
445 		KSTAT_FLAG_VIRTUAL)) != NULL) {
446 		sqp->sq_kstat->ks_lock = &squeue_kstat_lock;
447 		sqp->sq_kstat->ks_data = &squeue_kstat;
448 		sqp->sq_kstat->ks_update = squeue_kstat_update;
449 		sqp->sq_kstat->ks_private = sqp;
450 		kstat_install(sqp->sq_kstat);
451 	}
452 #endif
453 
454 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
455 	    sqp, 0, &p0, TS_RUN, pri);
456 
457 	return (sqp);
458 }
459 
460 /* ARGSUSED */
461 void
462 squeue_bind(squeue_t *sqp, processorid_t bind)
463 {
464 	ASSERT(bind == -1);
465 
466 	mutex_enter(&sqp->sq_lock);
467 	if (sqp->sq_state & SQS_BOUND) {
468 		mutex_exit(&sqp->sq_lock);
469 		return;
470 	}
471 
472 	sqp->sq_state |= SQS_BOUND;
473 	mutex_exit(&sqp->sq_lock);
474 
475 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
476 }
477 
478 void
479 squeue_unbind(squeue_t *sqp)
480 {
481 	mutex_enter(&sqp->sq_lock);
482 	if (!(sqp->sq_state & SQS_BOUND)) {
483 		mutex_exit(&sqp->sq_lock);
484 		return;
485 	}
486 
487 	sqp->sq_state &= ~SQS_BOUND;
488 	mutex_exit(&sqp->sq_lock);
489 
490 	thread_affinity_clear(sqp->sq_worker);
491 }
492 
493 /*
494  * squeue_enter() - enter squeue sqp with mblk mp (which can be
495  * a chain), while tail points to the end and cnt in number of
496  * mblks in the chain.
497  *
498  * For a chain of single packet (i.e. mp == tail), go through the
499  * fast path if no one is processing the squeue and nothing is queued.
500  *
501  * The proc and arg for each mblk is already stored in the mblk in
502  * appropriate places.
503  */
504 void
505 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail,
506     uint32_t cnt, uint8_t tag)
507 {
508 	int		interrupt = servicing_interrupt();
509 	void 		*arg;
510 	sqproc_t	proc;
511 	hrtime_t	now;
512 #if SQUEUE_PROFILE
513 	hrtime_t 	start, delta;
514 #endif
515 
516 	ASSERT(sqp != NULL);
517 	ASSERT(mp != NULL);
518 	ASSERT(tail != NULL);
519 	ASSERT(cnt > 0);
520 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
521 
522 	mutex_enter(&sqp->sq_lock);
523 	if (!(sqp->sq_state & SQS_PROC)) {
524 		/*
525 		 * See if anything is already queued. If we are the
526 		 * first packet, do inline processing else queue the
527 		 * packet and do the drain.
528 		 */
529 		sqp->sq_run = curthread;
530 		if (sqp->sq_first == NULL && cnt == 1) {
531 			/*
532 			 * Fast-path, ok to process and nothing queued.
533 			 */
534 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
535 			mutex_exit(&sqp->sq_lock);
536 
537 			/*
538 			 * We are the chain of 1 packet so
539 			 * go through this fast path.
540 			 */
541 			arg = mp->b_prev;
542 			mp->b_prev = NULL;
543 			proc = (sqproc_t)mp->b_queue;
544 			mp->b_queue = NULL;
545 
546 			ASSERT(proc != NULL);
547 			ASSERT(arg != NULL);
548 			ASSERT(mp->b_next == NULL);
549 
550 #if SQUEUE_DEBUG
551 			sqp->sq_isintr = interrupt;
552 			sqp->sq_curmp = mp;
553 			sqp->sq_curproc = proc;
554 			sqp->sq_connp = arg;
555 			mp->b_tag = sqp->sq_tag = tag;
556 #endif
557 #if SQUEUE_PROFILE
558 			if (SQ_PROFILING(sqp)) {
559 				if (interrupt)
560 					SQSTAT(sqp, sq_npackets_intr);
561 				else
562 					SQSTAT(sqp, sq_npackets_other);
563 				start = gethrtime();
564 			}
565 #endif
566 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
567 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
568 			    sqp, mblk_t *, mp, conn_t *, arg);
569 			(*proc)(arg, mp, sqp);
570 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
571 			    sqp, conn_t *, arg);
572 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
573 
574 #if SQUEUE_PROFILE
575 			if (SQ_PROFILING(sqp)) {
576 				delta = gethrtime() - start;
577 				if (interrupt)
578 					SQDELTA(sqp, sq_time_intr, delta);
579 				else
580 					SQDELTA(sqp, sq_time_other, delta);
581 			}
582 #endif
583 #if SQUEUE_DEBUG
584 			sqp->sq_curmp = NULL;
585 			sqp->sq_curproc = NULL;
586 			sqp->sq_connp = NULL;
587 			sqp->sq_isintr = 0;
588 #endif
589 
590 			CONN_DEC_REF((conn_t *)arg);
591 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
592 			mutex_enter(&sqp->sq_lock);
593 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
594 			if (sqp->sq_first == NULL) {
595 				/*
596 				 * We processed inline our packet and
597 				 * nothing new has arrived. We are done.
598 				 */
599 				sqp->sq_run = NULL;
600 				mutex_exit(&sqp->sq_lock);
601 				return;
602 			} else if (sqp->sq_bind != CPU->cpu_id) {
603 				/*
604 				 * If the current thread is not running
605 				 * on the CPU to which this squeue is bound,
606 				 * then don't allow it to drain.
607 				 */
608 				sqp->sq_run = NULL;
609 				SQUEUE_WORKER_WAKEUP(sqp);
610 				return;
611 			}
612 		} else {
613 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
614 #if SQUEUE_DEBUG
615 			mp->b_tag = tag;
616 #endif
617 #if SQUEUE_PROFILE
618 			if (SQ_PROFILING(sqp)) {
619 				if (servicing_interrupt())
620 					SQSTAT(sqp, sq_nqueued_intr);
621 				else
622 					SQSTAT(sqp, sq_nqueued_other);
623 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
624 					sqp->sq_stats.sq_max_qlen =
625 					    sqp->sq_count;
626 			}
627 #endif
628 		}
629 
630 		/*
631 		 * We are here because either we couldn't do inline
632 		 * processing (because something was already queued),
633 		 * or we had a chanin of more than one packet,
634 		 * or something else arrived after we were done with
635 		 * inline processing.
636 		 */
637 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
638 		ASSERT(sqp->sq_first != NULL);
639 
640 #if SQUEUE_PROFILE
641 		if (SQ_PROFILING(sqp)) {
642 			start = gethrtime();
643 		}
644 #endif
645 #if SQUEUE_DEBUG
646 		sqp->sq_isintr = interrupt;
647 #endif
648 
649 		now = gethrtime();
650 		if (interrupt) {
651 			squeue_drain(sqp, SQS_ENTER, now +
652 			    squeue_intrdrain_ns);
653 		} else {
654 			squeue_drain(sqp, SQS_USER, now +
655 			    squeue_writerdrain_ns);
656 		}
657 
658 #if SQUEUE_PROFILE
659 		if (SQ_PROFILING(sqp)) {
660 			delta = gethrtime() - start;
661 			if (interrupt)
662 				SQDELTA(sqp, sq_time_intr, delta);
663 			else
664 				SQDELTA(sqp, sq_time_other, delta);
665 		}
666 #endif
667 #if SQUEUE_DEBUG
668 		sqp->sq_isintr = 0;
669 #endif
670 
671 		/*
672 		 * If we didn't do a complete drain, the worker
673 		 * thread was already signalled by squeue_drain.
674 		 */
675 		sqp->sq_run = NULL;
676 		mutex_exit(&sqp->sq_lock);
677 		return;
678 	} else {
679 		ASSERT(sqp->sq_run != NULL);
680 		/*
681 		 * Queue is already being processed. Just enqueue
682 		 * the packet and go away.
683 		 */
684 #if SQUEUE_DEBUG
685 		mp->b_tag = tag;
686 #endif
687 #if SQUEUE_PROFILE
688 		if (SQ_PROFILING(sqp)) {
689 			if (servicing_interrupt())
690 				SQSTAT(sqp, sq_nqueued_intr);
691 			else
692 				SQSTAT(sqp, sq_nqueued_other);
693 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
694 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
695 		}
696 #endif
697 
698 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
699 		mutex_exit(&sqp->sq_lock);
700 		return;
701 	}
702 }
703 
704 /*
705  * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg.
706  */
707 void
708 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
709     uint8_t tag)
710 {
711 	int	interrupt = servicing_interrupt();
712 	hrtime_t now;
713 #if SQUEUE_PROFILE
714 	hrtime_t start, delta;
715 #endif
716 #if SQUEUE_DEBUG
717 	conn_t 	*connp = (conn_t *)arg;
718 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
719 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
720 #endif
721 
722 	ASSERT(proc != NULL);
723 	ASSERT(sqp != NULL);
724 	ASSERT(mp != NULL);
725 	ASSERT(mp->b_next == NULL);
726 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
727 
728 	mutex_enter(&sqp->sq_lock);
729 	if (!(sqp->sq_state & SQS_PROC)) {
730 		/*
731 		 * See if anything is already queued. If we are the
732 		 * first packet, do inline processing else queue the
733 		 * packet and do the drain.
734 		 */
735 		sqp->sq_run = curthread;
736 		if (sqp->sq_first == NULL) {
737 			/*
738 			 * Fast-path, ok to process and nothing queued.
739 			 */
740 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
741 			mutex_exit(&sqp->sq_lock);
742 
743 #if SQUEUE_DEBUG
744 			sqp->sq_isintr = interrupt;
745 			sqp->sq_curmp = mp;
746 			sqp->sq_curproc = proc;
747 			sqp->sq_connp = connp;
748 			mp->b_tag = sqp->sq_tag = tag;
749 #endif
750 #if SQUEUE_PROFILE
751 			if (SQ_PROFILING(sqp)) {
752 				if (interrupt)
753 					SQSTAT(sqp, sq_npackets_intr);
754 				else
755 					SQSTAT(sqp, sq_npackets_other);
756 				start = gethrtime();
757 			}
758 #endif
759 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
760 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
761 			    sqp, mblk_t *, mp, conn_t *, arg);
762 			(*proc)(arg, mp, sqp);
763 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
764 			    sqp, conn_t *, arg);
765 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
766 
767 #if SQUEUE_PROFILE
768 			if (SQ_PROFILING(sqp)) {
769 				delta = gethrtime() - start;
770 				if (interrupt)
771 					SQDELTA(sqp, sq_time_intr, delta);
772 				else
773 					SQDELTA(sqp, sq_time_other, delta);
774 			}
775 #endif
776 #if SQUEUE_DEBUG
777 			sqp->sq_curmp = NULL;
778 			sqp->sq_curproc = NULL;
779 			sqp->sq_connp = NULL;
780 			sqp->sq_isintr = 0;
781 #endif
782 
783 			CONN_DEC_REF((conn_t *)arg);
784 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
785 			mutex_enter(&sqp->sq_lock);
786 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
787 			if (sqp->sq_first == NULL) {
788 				/*
789 				 * We processed inline our packet and
790 				 * nothing new has arrived. We are done.
791 				 */
792 				sqp->sq_run = NULL;
793 				mutex_exit(&sqp->sq_lock);
794 				return;
795 			} else if (sqp->sq_bind != CPU->cpu_id) {
796 				/*
797 				 * If the current thread is not running
798 				 * on the CPU to which this squeue is bound,
799 				 * then don't allow it to drain.
800 				 */
801 				sqp->sq_run = NULL;
802 				SQUEUE_WORKER_WAKEUP(sqp);
803 				return;
804 			}
805 		} else {
806 			ENQUEUE_MP(sqp, mp, proc, arg);
807 #if SQUEUE_DEBUG
808 			mp->b_tag = tag;
809 #endif
810 #if SQUEUE_PROFILE
811 			if (SQ_PROFILING(sqp)) {
812 				if (servicing_interrupt())
813 					SQSTAT(sqp, sq_nqueued_intr);
814 				else
815 					SQSTAT(sqp, sq_nqueued_other);
816 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
817 					sqp->sq_stats.sq_max_qlen =
818 					    sqp->sq_count;
819 			}
820 #endif
821 		}
822 
823 		/*
824 		 * We are here because either we couldn't do inline
825 		 * processing (because something was already queued)
826 		 * or something else arrived after we were done with
827 		 * inline processing.
828 		 */
829 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
830 		ASSERT(sqp->sq_first != NULL);
831 
832 #if SQUEUE_PROFILE
833 		if (SQ_PROFILING(sqp)) {
834 			start = gethrtime();
835 		}
836 #endif
837 #if SQUEUE_DEBUG
838 		sqp->sq_isintr = interrupt;
839 #endif
840 
841 		now = gethrtime();
842 		if (interrupt) {
843 			squeue_drain(sqp, SQS_ENTER, now +
844 			    squeue_intrdrain_ns);
845 		} else {
846 			squeue_drain(sqp, SQS_USER, now +
847 			    squeue_writerdrain_ns);
848 		}
849 
850 #if SQUEUE_PROFILE
851 		if (SQ_PROFILING(sqp)) {
852 			delta = gethrtime() - start;
853 			if (interrupt)
854 				SQDELTA(sqp, sq_time_intr, delta);
855 			else
856 				SQDELTA(sqp, sq_time_other, delta);
857 		}
858 #endif
859 #if SQUEUE_DEBUG
860 		sqp->sq_isintr = 0;
861 #endif
862 
863 		/*
864 		 * If we didn't do a complete drain, the worker
865 		 * thread was already signalled by squeue_drain.
866 		 */
867 		sqp->sq_run = NULL;
868 		mutex_exit(&sqp->sq_lock);
869 		return;
870 	} else {
871 		ASSERT(sqp->sq_run != NULL);
872 		/*
873 		 * We let a thread processing a squeue reenter only
874 		 * once. This helps the case of incoming connection
875 		 * where a SYN-ACK-ACK that triggers the conn_ind
876 		 * doesn't have to queue the packet if listener and
877 		 * eager are on the same squeue. Also helps the
878 		 * loopback connection where the two ends are bound
879 		 * to the same squeue (which is typical on single
880 		 * CPU machines).
881 		 * We let the thread reenter only once for the fear
882 		 * of stack getting blown with multiple traversal.
883 		 */
884 		if (!(sqp->sq_state & SQS_REENTER) &&
885 		    (sqp->sq_run == curthread) &&
886 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
887 			sqp->sq_state |= SQS_REENTER;
888 			mutex_exit(&sqp->sq_lock);
889 
890 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
891 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
892 			    sqp, mblk_t *, mp, conn_t *, arg);
893 			(*proc)(arg, mp, sqp);
894 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
895 			    sqp, conn_t *, arg);
896 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
897 			CONN_DEC_REF((conn_t *)arg);
898 
899 			mutex_enter(&sqp->sq_lock);
900 			sqp->sq_state &= ~SQS_REENTER;
901 			mutex_exit(&sqp->sq_lock);
902 			return;
903 		}
904 		/*
905 		 * Queue is already being processed. Just enqueue
906 		 * the packet and go away.
907 		 */
908 #if SQUEUE_DEBUG
909 		mp->b_tag = tag;
910 #endif
911 #if SQUEUE_PROFILE
912 		if (SQ_PROFILING(sqp)) {
913 			if (servicing_interrupt())
914 				SQSTAT(sqp, sq_nqueued_intr);
915 			else
916 				SQSTAT(sqp, sq_nqueued_other);
917 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
918 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
919 		}
920 #endif
921 
922 		ENQUEUE_MP(sqp, mp, proc, arg);
923 		mutex_exit(&sqp->sq_lock);
924 		return;
925 	}
926 }
927 
928 void
929 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
930     uint8_t tag)
931 {
932 	int		interrupt = servicing_interrupt();
933 	boolean_t	being_processed;
934 #if SQUEUE_DEBUG
935 	conn_t 		*connp = (conn_t *)arg;
936 #endif
937 #if SQUEUE_PROFILE
938 	hrtime_t 	start, delta;
939 #endif
940 
941 	ASSERT(proc != NULL);
942 	ASSERT(sqp != NULL);
943 	ASSERT(mp != NULL);
944 	ASSERT(mp->b_next == NULL);
945 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
946 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
947 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
948 
949 	mutex_enter(&sqp->sq_lock);
950 
951 	being_processed = (sqp->sq_state & SQS_PROC);
952 	if (!being_processed && (sqp->sq_first == NULL)) {
953 		/*
954 		 * Fast-path, ok to process and nothing queued.
955 		 */
956 		sqp->sq_state |= (SQS_PROC|SQS_FAST);
957 		sqp->sq_run = curthread;
958 		mutex_exit(&sqp->sq_lock);
959 
960 #if SQUEUE_DEBUG
961 		sqp->sq_isintr = interrupt;
962 		sqp->sq_curmp = mp;
963 		sqp->sq_curproc = proc;
964 		sqp->sq_connp = connp;
965 		mp->b_tag = sqp->sq_tag = tag;
966 #endif
967 
968 #if SQUEUE_PROFILE
969 		if (SQ_PROFILING(sqp)) {
970 			if (interrupt)
971 				SQSTAT(sqp, sq_npackets_intr);
972 			else
973 				SQSTAT(sqp, sq_npackets_other);
974 			start = gethrtime();
975 		}
976 #endif
977 
978 		((conn_t *)arg)->conn_on_sqp = B_TRUE;
979 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
980 		    sqp, mblk_t *, mp, conn_t *, arg);
981 		(*proc)(arg, mp, sqp);
982 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
983 		    sqp, conn_t *, arg);
984 		((conn_t *)arg)->conn_on_sqp = B_FALSE;
985 
986 #if SQUEUE_DEBUG
987 		sqp->sq_curmp = NULL;
988 		sqp->sq_curproc = NULL;
989 		sqp->sq_connp = NULL;
990 		sqp->sq_isintr = 0;
991 #endif
992 #if SQUEUE_PROFILE
993 		if (SQ_PROFILING(sqp)) {
994 			delta = gethrtime() - start;
995 			if (interrupt)
996 				SQDELTA(sqp, sq_time_intr, delta);
997 			else
998 				SQDELTA(sqp, sq_time_other, delta);
999 		}
1000 #endif
1001 
1002 		CONN_DEC_REF((conn_t *)arg);
1003 		mutex_enter(&sqp->sq_lock);
1004 		sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
1005 		sqp->sq_run = NULL;
1006 		if (sqp->sq_first == NULL) {
1007 			/*
1008 			 * We processed inline our packet and
1009 			 * nothing new has arrived. We are done.
1010 			 */
1011 			mutex_exit(&sqp->sq_lock);
1012 		} else {
1013 			SQUEUE_WORKER_WAKEUP(sqp);
1014 		}
1015 		return;
1016 	} else {
1017 		/*
1018 		 * We let a thread processing a squeue reenter only
1019 		 * once. This helps the case of incoming connection
1020 		 * where a SYN-ACK-ACK that triggers the conn_ind
1021 		 * doesn't have to queue the packet if listener and
1022 		 * eager are on the same squeue. Also helps the
1023 		 * loopback connection where the two ends are bound
1024 		 * to the same squeue (which is typical on single
1025 		 * CPU machines).
1026 		 * We let the thread reenter only once for the fear
1027 		 * of stack getting blown with multiple traversal.
1028 		 */
1029 		if (being_processed && !(sqp->sq_state & SQS_REENTER) &&
1030 		    (sqp->sq_run == curthread) &&
1031 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
1032 			sqp->sq_state |= SQS_REENTER;
1033 			mutex_exit(&sqp->sq_lock);
1034 
1035 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
1036 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1037 			    sqp, mblk_t *, mp, conn_t *, arg);
1038 			(*proc)(arg, mp, sqp);
1039 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1040 			    sqp, conn_t *, arg);
1041 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
1042 			CONN_DEC_REF((conn_t *)arg);
1043 
1044 			mutex_enter(&sqp->sq_lock);
1045 			sqp->sq_state &= ~SQS_REENTER;
1046 			mutex_exit(&sqp->sq_lock);
1047 			return;
1048 		}
1049 
1050 #if SQUEUE_DEBUG
1051 		mp->b_tag = tag;
1052 #endif
1053 #if SQUEUE_PROFILE
1054 		if (SQ_PROFILING(sqp)) {
1055 			if (servicing_interrupt())
1056 				SQSTAT(sqp, sq_nqueued_intr);
1057 			else
1058 				SQSTAT(sqp, sq_nqueued_other);
1059 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1060 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1061 		}
1062 #endif
1063 		ENQUEUE_MP(sqp, mp, proc, arg);
1064 		if (being_processed) {
1065 			/*
1066 			 * Queue is already being processed.
1067 			 * No need to do anything.
1068 			 */
1069 			mutex_exit(&sqp->sq_lock);
1070 			return;
1071 		}
1072 		SQUEUE_WORKER_WAKEUP(sqp);
1073 	}
1074 }
1075 
1076 /*
1077  * squeue_fill() - fill squeue *sqp with mblk *mp with argument of *arg
1078  * without processing the squeue.
1079  */
1080 /* ARGSUSED */
1081 void
1082 squeue_fill(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void * arg,
1083     uint8_t tag)
1084 {
1085 #if SQUEUE_DEBUG
1086 	conn_t *connp = (conn_t *)arg;
1087 #endif
1088 	ASSERT(proc != NULL);
1089 	ASSERT(sqp != NULL);
1090 	ASSERT(mp != NULL);
1091 	ASSERT(mp->b_next == NULL);
1092 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
1093 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
1094 
1095 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
1096 	mutex_enter(&sqp->sq_lock);
1097 	ENQUEUE_MP(sqp, mp, proc, arg);
1098 #if SQUEUE_DEBUG
1099 	mp->b_tag = tag;
1100 #endif
1101 #if SQUEUE_PROFILE
1102 	if (SQ_PROFILING(sqp)) {
1103 		if (servicing_interrupt())
1104 			SQSTAT(sqp, sq_nqueued_intr);
1105 		else
1106 			SQSTAT(sqp, sq_nqueued_other);
1107 		if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
1108 			sqp->sq_stats.sq_max_qlen = sqp->sq_count;
1109 	}
1110 #endif
1111 
1112 	/*
1113 	 * If queue is already being processed. No need to do anything.
1114 	 */
1115 	if (sqp->sq_state & SQS_PROC) {
1116 		mutex_exit(&sqp->sq_lock);
1117 		return;
1118 	}
1119 
1120 	SQUEUE_WORKER_WAKEUP(sqp);
1121 }
1122 
1123 
1124 /*
1125  * PRIVATE FUNCTIONS
1126  */
1127 
1128 static void
1129 squeue_fire(void *arg)
1130 {
1131 	squeue_t	*sqp = arg;
1132 	uint_t		state;
1133 
1134 	mutex_enter(&sqp->sq_lock);
1135 
1136 	state = sqp->sq_state;
1137 	if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
1138 		mutex_exit(&sqp->sq_lock);
1139 		return;
1140 	}
1141 
1142 	sqp->sq_tid = 0;
1143 	/*
1144 	 * The timeout fired before we got a chance to set it.
1145 	 * Process it anyway but remove the SQS_TMO_PROG so that
1146 	 * the guy trying to set the timeout knows that it has
1147 	 * already been processed.
1148 	 */
1149 	if (state & SQS_TMO_PROG)
1150 		sqp->sq_state &= ~SQS_TMO_PROG;
1151 
1152 	if (!(state & SQS_PROC)) {
1153 		sqp->sq_awaken = lbolt;
1154 		cv_signal(&sqp->sq_async);
1155 	}
1156 	mutex_exit(&sqp->sq_lock);
1157 }
1158 
1159 static void
1160 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
1161 {
1162 	mblk_t	*mp;
1163 	mblk_t 	*head;
1164 	sqproc_t proc;
1165 	conn_t	*connp;
1166 	clock_t	start = lbolt;
1167 	clock_t	drain_time;
1168 	timeout_id_t tid;
1169 	uint_t	cnt;
1170 	uint_t	total_cnt = 0;
1171 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
1172 	int	interrupt = servicing_interrupt();
1173 	boolean_t poll_on = B_FALSE;
1174 	hrtime_t now;
1175 
1176 	ASSERT(mutex_owned(&sqp->sq_lock));
1177 	ASSERT(!(sqp->sq_state & SQS_PROC));
1178 
1179 #if SQUEUE_PROFILE
1180 	if (SQ_PROFILING(sqp)) {
1181 		if (interrupt)
1182 			SQSTAT(sqp, sq_ndrains_intr);
1183 		else if (!(proc_type & SQS_WORKER))
1184 			SQSTAT(sqp, sq_ndrains_other);
1185 		else
1186 			SQSTAT(sqp, sq_ndrains_worker);
1187 	}
1188 #endif
1189 
1190 	if ((tid = sqp->sq_tid) != 0)
1191 		sqp->sq_tid = 0;
1192 
1193 	sqp->sq_state |= SQS_PROC | proc_type;
1194 	head = sqp->sq_first;
1195 	sqp->sq_first = NULL;
1196 	sqp->sq_last = NULL;
1197 	cnt = sqp->sq_count;
1198 
1199 	/*
1200 	 * We have backlog built up. Switch to polling mode if the
1201 	 * device underneath allows it. Need to do it only for
1202 	 * drain by non-interrupt thread so interrupts don't
1203 	 * come and disrupt us in between. If its a interrupt thread,
1204 	 * no need because most devices will not issue another
1205 	 * interrupt till this one returns.
1206 	 */
1207 	if ((sqp->sq_state & SQS_POLL_CAPAB) && !(proc_type & SQS_ENTER) &&
1208 		(sqp->sq_count > squeue_worker_poll_min)) {
1209 		ASSERT(sq_rx_ring != NULL);
1210 		SQS_POLLING_ON(sqp, sq_rx_ring);
1211 		poll_on = B_TRUE;
1212 	}
1213 
1214 	mutex_exit(&sqp->sq_lock);
1215 
1216 	if (tid != 0)
1217 		(void) untimeout(tid);
1218 again:
1219 	while ((mp = head) != NULL) {
1220 		head = mp->b_next;
1221 		mp->b_next = NULL;
1222 
1223 		proc = (sqproc_t)mp->b_queue;
1224 		mp->b_queue = NULL;
1225 		connp = (conn_t *)mp->b_prev;
1226 		mp->b_prev = NULL;
1227 #if SQUEUE_DEBUG
1228 		sqp->sq_curmp = mp;
1229 		sqp->sq_curproc = proc;
1230 		sqp->sq_connp = connp;
1231 		sqp->sq_tag = mp->b_tag;
1232 #endif
1233 
1234 #if SQUEUE_PROFILE
1235 		if (SQ_PROFILING(sqp)) {
1236 			if (interrupt)
1237 				SQSTAT(sqp, sq_npackets_intr);
1238 			else if (!(proc_type & SQS_WORKER))
1239 				SQSTAT(sqp, sq_npackets_other);
1240 			else
1241 				SQSTAT(sqp, sq_npackets_worker);
1242 		}
1243 #endif
1244 
1245 		connp->conn_on_sqp = B_TRUE;
1246 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
1247 		    sqp, mblk_t *, mp, conn_t *, connp);
1248 		(*proc)(connp, mp, sqp);
1249 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
1250 		    sqp, conn_t *, connp);
1251 		connp->conn_on_sqp = B_FALSE;
1252 		CONN_DEC_REF(connp);
1253 	}
1254 
1255 
1256 #if SQUEUE_DEBUG
1257 	sqp->sq_curmp = NULL;
1258 	sqp->sq_curproc = NULL;
1259 	sqp->sq_connp = NULL;
1260 #endif
1261 
1262 	mutex_enter(&sqp->sq_lock);
1263 	sqp->sq_count -= cnt;
1264 	total_cnt += cnt;
1265 
1266 	if (sqp->sq_first != NULL) {
1267 
1268 		now = gethrtime();
1269 		if (!expire || (now < expire)) {
1270 			/* More arrived and time not expired */
1271 			head = sqp->sq_first;
1272 			sqp->sq_first = NULL;
1273 			sqp->sq_last = NULL;
1274 			cnt = sqp->sq_count;
1275 			mutex_exit(&sqp->sq_lock);
1276 			goto again;
1277 		}
1278 
1279 		/*
1280 		 * If we are not worker thread and we
1281 		 * reached our time limit to do drain,
1282 		 * signal the worker thread to pick
1283 		 * up the work.
1284 		 * If we were the worker thread, then
1285 		 * we take a break to allow an interrupt
1286 		 * or writer to pick up the load.
1287 		 */
1288 		if (proc_type != SQS_WORKER) {
1289 			sqp->sq_awaken = lbolt;
1290 			cv_signal(&sqp->sq_async);
1291 		}
1292 	}
1293 
1294 	/*
1295 	 * Try to see if we can get a time estimate to process a packet.
1296 	 * Do it only in interrupt context since less chance of context
1297 	 * switch or pinning etc. to get a better estimate.
1298 	 */
1299 	if (interrupt && ((drain_time = (lbolt - start)) > 0))
1300 		sqp->sq_avg_drain_time = ((80 * sqp->sq_avg_drain_time) +
1301 		    (20 * (drv_hztousec(drain_time)/total_cnt)))/100;
1302 
1303 	sqp->sq_state &= ~(SQS_PROC | proc_type);
1304 
1305 	/*
1306 	 * If polling was turned on, turn it off and reduce the default
1307 	 * interrupt blank interval as well to bring new packets in faster
1308 	 * (reduces the latency when there is no backlog).
1309 	 */
1310 	if (poll_on && (sqp->sq_state & SQS_POLL_CAPAB)) {
1311 		ASSERT(sq_rx_ring != NULL);
1312 		SQS_POLLING_OFF(sqp, sq_rx_ring);
1313 	}
1314 }
1315 
1316 static void
1317 squeue_worker(squeue_t *sqp)
1318 {
1319 	kmutex_t *lock = &sqp->sq_lock;
1320 	kcondvar_t *async = &sqp->sq_async;
1321 	callb_cpr_t cprinfo;
1322 	hrtime_t now;
1323 #if SQUEUE_PROFILE
1324 	hrtime_t start;
1325 #endif
1326 
1327 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "nca");
1328 	mutex_enter(lock);
1329 
1330 	for (;;) {
1331 		while (sqp->sq_first == NULL || (sqp->sq_state & SQS_PROC)) {
1332 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1333 still_wait:
1334 			cv_wait(async, lock);
1335 			if (sqp->sq_state & SQS_PROC) {
1336 				goto still_wait;
1337 			}
1338 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1339 		}
1340 
1341 #if SQUEUE_PROFILE
1342 		if (SQ_PROFILING(sqp)) {
1343 			start = gethrtime();
1344 		}
1345 #endif
1346 
1347 		ASSERT(squeue_workerdrain_ns != 0);
1348 		now = gethrtime();
1349 		sqp->sq_run = curthread;
1350 		squeue_drain(sqp, SQS_WORKER, now +  squeue_workerdrain_ns);
1351 		sqp->sq_run = NULL;
1352 
1353 		if (sqp->sq_first != NULL) {
1354 			/*
1355 			 * Doing too much processing by worker thread
1356 			 * in presense of interrupts can be sub optimal.
1357 			 * Instead, once a drain is done by worker thread
1358 			 * for squeue_writerdrain_ns (the reason we are
1359 			 * here), we force wait for squeue_workerwait_tick
1360 			 * before doing more processing even if sq_wait is
1361 			 * set to 0.
1362 			 *
1363 			 * This can be counterproductive for performance
1364 			 * if worker thread is the only means to process
1365 			 * the packets (interrupts or writers are not
1366 			 * allowed inside the squeue).
1367 			 */
1368 			if (sqp->sq_tid == 0 &&
1369 			    !(sqp->sq_state & SQS_TMO_PROG)) {
1370 				timeout_id_t	tid;
1371 
1372 				sqp->sq_state |= SQS_TMO_PROG;
1373 				mutex_exit(&sqp->sq_lock);
1374 				tid = timeout(squeue_fire, sqp,
1375 				    squeue_workerwait_tick);
1376 				mutex_enter(&sqp->sq_lock);
1377 				/*
1378 				 * Check again if we still need
1379 				 * the timeout
1380 				 */
1381 				if (((sqp->sq_state & (SQS_TMO_PROG|SQS_PROC))
1382 				    == SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
1383 				    (sqp->sq_first != NULL)) {
1384 					sqp->sq_state &= ~SQS_TMO_PROG;
1385 					sqp->sq_awaken = lbolt;
1386 					sqp->sq_tid = tid;
1387 				} else if (sqp->sq_state & SQS_TMO_PROG) {
1388 					/* timeout not needed */
1389 					sqp->sq_state &= ~SQS_TMO_PROG;
1390 					mutex_exit(&(sqp)->sq_lock);
1391 					(void) untimeout(tid);
1392 					mutex_enter(&sqp->sq_lock);
1393 				}
1394 			}
1395 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1396 			cv_wait(async, lock);
1397 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1398 		}
1399 
1400 
1401 #if SQUEUE_PROFILE
1402 		if (SQ_PROFILING(sqp)) {
1403 			SQDELTA(sqp, sq_time_worker, gethrtime() - start);
1404 		}
1405 #endif
1406 	}
1407 }
1408 
1409 #if SQUEUE_PROFILE
1410 static int
1411 squeue_kstat_update(kstat_t *ksp, int rw)
1412 {
1413 	struct squeue_kstat *sqsp = &squeue_kstat;
1414 	squeue_t *sqp = ksp->ks_private;
1415 
1416 	if (rw == KSTAT_WRITE)
1417 		return (EACCES);
1418 
1419 #if SQUEUE_DEBUG
1420 	sqsp->sq_count.value.ui64 = sqp->sq_count;
1421 	sqsp->sq_max_qlen.value.ui64 = sqp->sq_stats.sq_max_qlen;
1422 #endif
1423 	sqsp->sq_npackets_worker.value.ui64 = sqp->sq_stats.sq_npackets_worker;
1424 	sqsp->sq_npackets_intr.value.ui64 = sqp->sq_stats.sq_npackets_intr;
1425 	sqsp->sq_npackets_other.value.ui64 = sqp->sq_stats.sq_npackets_other;
1426 	sqsp->sq_nqueued_intr.value.ui64 = sqp->sq_stats.sq_nqueued_intr;
1427 	sqsp->sq_nqueued_other.value.ui64 = sqp->sq_stats.sq_nqueued_other;
1428 	sqsp->sq_ndrains_worker.value.ui64 = sqp->sq_stats.sq_ndrains_worker;
1429 	sqsp->sq_ndrains_intr.value.ui64 = sqp->sq_stats.sq_ndrains_intr;
1430 	sqsp->sq_ndrains_other.value.ui64 = sqp->sq_stats.sq_ndrains_other;
1431 	sqsp->sq_time_worker.value.ui64 = sqp->sq_stats.sq_time_worker;
1432 	sqsp->sq_time_intr.value.ui64 = sqp->sq_stats.sq_time_intr;
1433 	sqsp->sq_time_other.value.ui64 = sqp->sq_stats.sq_time_other;
1434 	return (0);
1435 }
1436 #endif
1437 
1438 void
1439 squeue_profile_enable(squeue_t *sqp)
1440 {
1441 	mutex_enter(&sqp->sq_lock);
1442 	sqp->sq_state |= SQS_PROFILE;
1443 	mutex_exit(&sqp->sq_lock);
1444 }
1445 
1446 void
1447 squeue_profile_disable(squeue_t *sqp)
1448 {
1449 	mutex_enter(&sqp->sq_lock);
1450 	sqp->sq_state &= ~SQS_PROFILE;
1451 	mutex_exit(&sqp->sq_lock);
1452 }
1453 
1454 void
1455 squeue_profile_reset(squeue_t *sqp)
1456 {
1457 #if SQUEUE_PROFILE
1458 	bzero(&sqp->sq_stats, sizeof (sqstat_t));
1459 #endif
1460 }
1461 
1462 void
1463 squeue_profile_start(void)
1464 {
1465 #if SQUEUE_PROFILE
1466 	squeue_profile = B_TRUE;
1467 #endif
1468 }
1469 
1470 void
1471 squeue_profile_stop(void)
1472 {
1473 #if SQUEUE_PROFILE
1474 	squeue_profile = B_FALSE;
1475 #endif
1476 }
1477 
1478 uintptr_t *
1479 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1480 {
1481 	ASSERT(p < SQPRIVATE_MAX);
1482 
1483 	return (&sqp->sq_private[p]);
1484 }
1485 
1486 processorid_t
1487 squeue_binding(squeue_t *sqp)
1488 {
1489 	return (sqp->sq_bind);
1490 }
1491