1 /*-
2  * Copyright (c) 2017-2019 Hans Petter Selasky
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice unmodified, this list of conditions, and the following
10  *    disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26 
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29 
30 #include <linux/workqueue.h>
31 #include <linux/wait.h>
32 #include <linux/compat.h>
33 #include <linux/spinlock.h>
34 
35 #include <sys/kernel.h>
36 
37 /*
38  * Define all work struct states
39  */
40 enum {
41 	WORK_ST_IDLE,			/* idle - not started */
42 	WORK_ST_TIMER,			/* timer is being started */
43 	WORK_ST_TASK,			/* taskqueue is being queued */
44 	WORK_ST_EXEC,			/* callback is being called */
45 	WORK_ST_CANCEL,			/* cancel is being requested */
46 	WORK_ST_MAX,
47 };
48 
49 /*
50  * Define global workqueues
51  */
52 static struct workqueue_struct *linux_system_short_wq;
53 static struct workqueue_struct *linux_system_long_wq;
54 
55 struct workqueue_struct *system_wq;
56 struct workqueue_struct *system_long_wq;
57 struct workqueue_struct *system_unbound_wq;
58 struct workqueue_struct *system_highpri_wq;
59 struct workqueue_struct *system_power_efficient_wq;
60 
61 static int linux_default_wq_cpus = 4;
62 
63 static void linux_delayed_work_timer_fn(void *);
64 
65 /*
66  * This function atomically updates the work state and returns the
67  * previous state at the time of update.
68  */
69 static uint8_t
70 linux_update_state(atomic_t *v, const uint8_t *pstate)
71 {
72 	int c, old;
73 
74 	c = v->counter;
75 
76 	while ((old = atomic_cmpxchg(v, c, pstate[c])) != c)
77 		c = old;
78 
79 	return (c);
80 }
81 
82 /*
83  * A LinuxKPI task is allowed to free itself inside the callback function
84  * and cannot safely be referred after the callback function has
85  * completed. This function gives the linux_work_fn() function a hint,
86  * that the task is not going away and can have its state checked
87  * again. Without this extra hint LinuxKPI tasks cannot be serialized
88  * accross multiple worker threads.
89  */
90 static bool
91 linux_work_exec_unblock(struct work_struct *work)
92 {
93 	struct workqueue_struct *wq;
94 	struct work_exec *exec;
95 	bool retval = false;
96 
97 	wq = work->work_queue;
98 	if (unlikely(wq == NULL))
99 		goto done;
100 
101 	WQ_EXEC_LOCK(wq);
102 	TAILQ_FOREACH(exec, &wq->exec_head, entry) {
103 		if (exec->target == work) {
104 			exec->target = NULL;
105 			retval = true;
106 			break;
107 		}
108 	}
109 	WQ_EXEC_UNLOCK(wq);
110 done:
111 	return (retval);
112 }
113 
114 static void
115 linux_delayed_work_enqueue(struct delayed_work *dwork)
116 {
117 	struct taskqueue *tq;
118 
119 	tq = dwork->work.work_queue->taskqueue;
120 	taskqueue_enqueue(tq, &dwork->work.work_task);
121 }
122 
123 /*
124  * This function queues the given work structure on the given
125  * workqueue. It returns non-zero if the work was successfully
126  * [re-]queued. Else the work is already pending for completion.
127  */
128 bool
129 linux_queue_work_on(int cpu __unused, struct workqueue_struct *wq,
130     struct work_struct *work)
131 {
132 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
133 		[WORK_ST_IDLE] = WORK_ST_TASK,		/* start queuing task */
134 		[WORK_ST_TIMER] = WORK_ST_TIMER,	/* NOP */
135 		[WORK_ST_TASK] = WORK_ST_TASK,		/* NOP */
136 		[WORK_ST_EXEC] = WORK_ST_TASK,		/* queue task another time */
137 		[WORK_ST_CANCEL] = WORK_ST_TASK,	/* start queuing task again */
138 	};
139 
140 	if (atomic_read(&wq->draining) != 0)
141 		return (!work_pending(work));
142 
143 	switch (linux_update_state(&work->state, states)) {
144 	case WORK_ST_EXEC:
145 	case WORK_ST_CANCEL:
146 		if (linux_work_exec_unblock(work) != 0)
147 			return (true);
148 		/* FALLTHROUGH */
149 	case WORK_ST_IDLE:
150 		work->work_queue = wq;
151 		taskqueue_enqueue(wq->taskqueue, &work->work_task);
152 		return (true);
153 	default:
154 		return (false);		/* already on a queue */
155 	}
156 }
157 
158 /*
159  * This function queues the given work structure on the given
160  * workqueue after a given delay in ticks. It returns non-zero if the
161  * work was successfully [re-]queued. Else the work is already pending
162  * for completion.
163  */
164 bool
165 linux_queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
166     struct delayed_work *dwork, unsigned delay)
167 {
168 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
169 		[WORK_ST_IDLE] = WORK_ST_TIMER,		/* start timeout */
170 		[WORK_ST_TIMER] = WORK_ST_TIMER,	/* NOP */
171 		[WORK_ST_TASK] = WORK_ST_TASK,		/* NOP */
172 		[WORK_ST_EXEC] = WORK_ST_TIMER,		/* start timeout */
173 		[WORK_ST_CANCEL] = WORK_ST_TIMER,	/* start timeout */
174 	};
175 
176 	if (atomic_read(&wq->draining) != 0)
177 		return (!work_pending(&dwork->work));
178 
179 	switch (linux_update_state(&dwork->work.state, states)) {
180 	case WORK_ST_EXEC:
181 	case WORK_ST_CANCEL:
182 		if (delay == 0 && linux_work_exec_unblock(&dwork->work) != 0) {
183 			dwork->timer.expires = jiffies;
184 			return (true);
185 		}
186 		/* FALLTHROUGH */
187 	case WORK_ST_IDLE:
188 		dwork->work.work_queue = wq;
189 		dwork->timer.expires = jiffies + delay;
190 
191 		if (delay == 0) {
192 			linux_delayed_work_enqueue(dwork);
193 		} else if (unlikely(cpu != WORK_CPU_UNBOUND)) {
194 			mtx_lock(&dwork->timer.mtx);
195 			callout_reset_on(&dwork->timer.callout, delay,
196 			    &linux_delayed_work_timer_fn, dwork, cpu);
197 			mtx_unlock(&dwork->timer.mtx);
198 		} else {
199 			mtx_lock(&dwork->timer.mtx);
200 			callout_reset(&dwork->timer.callout, delay,
201 			    &linux_delayed_work_timer_fn, dwork);
202 			mtx_unlock(&dwork->timer.mtx);
203 		}
204 		return (true);
205 	default:
206 		return (false);		/* already on a queue */
207 	}
208 }
209 
210 void
211 linux_work_fn(void *context, int pending)
212 {
213 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
214 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
215 		[WORK_ST_TIMER] = WORK_ST_EXEC,		/* delayed work w/o timeout */
216 		[WORK_ST_TASK] = WORK_ST_EXEC,		/* call callback */
217 		[WORK_ST_EXEC] = WORK_ST_IDLE,		/* complete callback */
218 		[WORK_ST_CANCEL] = WORK_ST_EXEC,	/* failed to cancel */
219 	};
220 	struct work_struct *work;
221 	struct workqueue_struct *wq;
222 	struct work_exec exec;
223 	struct task_struct *task;
224 
225 	task = current;
226 
227 	/* setup local variables */
228 	work = context;
229 	wq = work->work_queue;
230 
231 	/* store target pointer */
232 	exec.target = work;
233 
234 	/* insert executor into list */
235 	WQ_EXEC_LOCK(wq);
236 	TAILQ_INSERT_TAIL(&wq->exec_head, &exec, entry);
237 	while (1) {
238 		switch (linux_update_state(&work->state, states)) {
239 		case WORK_ST_TIMER:
240 		case WORK_ST_TASK:
241 		case WORK_ST_CANCEL:
242 			WQ_EXEC_UNLOCK(wq);
243 
244 			/* set current work structure */
245 			task->work = work;
246 
247 			/* call work function */
248 			work->func(work);
249 
250 			/* set current work structure */
251 			task->work = NULL;
252 
253 			WQ_EXEC_LOCK(wq);
254 			/* check if unblocked */
255 			if (exec.target != work) {
256 				/* reapply block */
257 				exec.target = work;
258 				break;
259 			}
260 			/* FALLTHROUGH */
261 		default:
262 			goto done;
263 		}
264 	}
265 done:
266 	/* remove executor from list */
267 	TAILQ_REMOVE(&wq->exec_head, &exec, entry);
268 	WQ_EXEC_UNLOCK(wq);
269 }
270 
271 void
272 linux_delayed_work_fn(void *context, int pending)
273 {
274 	struct delayed_work *dwork = context;
275 
276 	/*
277 	 * Make sure the timer belonging to the delayed work gets
278 	 * drained before invoking the work function. Else the timer
279 	 * mutex may still be in use which can lead to use-after-free
280 	 * situations, because the work function might free the work
281 	 * structure before returning.
282 	 */
283 	callout_drain(&dwork->timer.callout);
284 
285 	linux_work_fn(&dwork->work, pending);
286 }
287 
288 static void
289 linux_delayed_work_timer_fn(void *arg)
290 {
291 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
292 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
293 		[WORK_ST_TIMER] = WORK_ST_TASK,		/* start queueing task */
294 		[WORK_ST_TASK] = WORK_ST_TASK,		/* NOP */
295 		[WORK_ST_EXEC] = WORK_ST_EXEC,		/* NOP */
296 		[WORK_ST_CANCEL] = WORK_ST_TASK,	/* failed to cancel */
297 	};
298 	struct delayed_work *dwork = arg;
299 
300 	switch (linux_update_state(&dwork->work.state, states)) {
301 	case WORK_ST_TIMER:
302 	case WORK_ST_CANCEL:
303 		linux_delayed_work_enqueue(dwork);
304 		break;
305 	default:
306 		break;
307 	}
308 }
309 
310 /*
311  * This function cancels the given work structure in a synchronous
312  * fashion. It returns non-zero if the work was successfully
313  * cancelled. Else the work was already cancelled.
314  */
315 bool
316 linux_cancel_work_sync(struct work_struct *work)
317 {
318 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
319 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
320 		[WORK_ST_TIMER] = WORK_ST_TIMER,	/* can't happen */
321 		[WORK_ST_TASK] = WORK_ST_IDLE,		/* cancel and drain */
322 		[WORK_ST_EXEC] = WORK_ST_IDLE,		/* too late, drain */
323 		[WORK_ST_CANCEL] = WORK_ST_IDLE,	/* cancel and drain */
324 	};
325 	struct taskqueue *tq;
326 	bool retval = false;
327 
328 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
329 	    "linux_cancel_work_sync() might sleep");
330 retry:
331 	switch (linux_update_state(&work->state, states)) {
332 	case WORK_ST_IDLE:
333 	case WORK_ST_TIMER:
334 		return (retval);
335 	case WORK_ST_EXEC:
336 		tq = work->work_queue->taskqueue;
337 		if (taskqueue_cancel(tq, &work->work_task, NULL) != 0)
338 			taskqueue_drain(tq, &work->work_task);
339 		goto retry;	/* work may have restarted itself */
340 	default:
341 		tq = work->work_queue->taskqueue;
342 		if (taskqueue_cancel(tq, &work->work_task, NULL) != 0)
343 			taskqueue_drain(tq, &work->work_task);
344 		retval = true;
345 		goto retry;
346 	}
347 }
348 
349 /*
350  * This function atomically stops the timer and callback. The timer
351  * callback will not be called after this function returns. This
352  * functions returns true when the timeout was cancelled. Else the
353  * timeout was not started or has already been called.
354  */
355 static inline bool
356 linux_cancel_timer(struct delayed_work *dwork, bool drain)
357 {
358 	bool cancelled;
359 
360 	mtx_lock(&dwork->timer.mtx);
361 	cancelled = (callout_stop(&dwork->timer.callout) == 1);
362 	mtx_unlock(&dwork->timer.mtx);
363 
364 	/* check if we should drain */
365 	if (drain)
366 		callout_drain(&dwork->timer.callout);
367 	return (cancelled);
368 }
369 
370 /*
371  * This function cancels the given delayed work structure in a
372  * non-blocking fashion. It returns non-zero if the work was
373  * successfully cancelled. Else the work may still be busy or already
374  * cancelled.
375  */
376 bool
377 linux_cancel_delayed_work(struct delayed_work *dwork)
378 {
379 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
380 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
381 		[WORK_ST_TIMER] = WORK_ST_CANCEL,	/* try to cancel */
382 		[WORK_ST_TASK] = WORK_ST_CANCEL,	/* try to cancel */
383 		[WORK_ST_EXEC] = WORK_ST_EXEC,		/* NOP */
384 		[WORK_ST_CANCEL] = WORK_ST_CANCEL,	/* NOP */
385 	};
386 	struct taskqueue *tq;
387 
388 	switch (linux_update_state(&dwork->work.state, states)) {
389 	case WORK_ST_TIMER:
390 	case WORK_ST_CANCEL:
391 		if (linux_cancel_timer(dwork, 0)) {
392 			atomic_cmpxchg(&dwork->work.state,
393 			    WORK_ST_CANCEL, WORK_ST_IDLE);
394 			return (true);
395 		}
396 		/* FALLTHROUGH */
397 	case WORK_ST_TASK:
398 		tq = dwork->work.work_queue->taskqueue;
399 		if (taskqueue_cancel(tq, &dwork->work.work_task, NULL) == 0) {
400 			atomic_cmpxchg(&dwork->work.state,
401 			    WORK_ST_CANCEL, WORK_ST_IDLE);
402 			return (true);
403 		}
404 		/* FALLTHROUGH */
405 	default:
406 		return (false);
407 	}
408 }
409 
410 /*
411  * This function cancels the given work structure in a synchronous
412  * fashion. It returns non-zero if the work was successfully
413  * cancelled. Else the work was already cancelled.
414  */
415 bool
416 linux_cancel_delayed_work_sync(struct delayed_work *dwork)
417 {
418 	static const uint8_t states[WORK_ST_MAX] __aligned(8) = {
419 		[WORK_ST_IDLE] = WORK_ST_IDLE,		/* NOP */
420 		[WORK_ST_TIMER] = WORK_ST_IDLE,		/* cancel and drain */
421 		[WORK_ST_TASK] = WORK_ST_IDLE,		/* cancel and drain */
422 		[WORK_ST_EXEC] = WORK_ST_IDLE,		/* too late, drain */
423 		[WORK_ST_CANCEL] = WORK_ST_IDLE,	/* cancel and drain */
424 	};
425 	struct taskqueue *tq;
426 	bool retval = false;
427 
428 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
429 	    "linux_cancel_delayed_work_sync() might sleep");
430 retry:
431 	switch (linux_update_state(&dwork->work.state, states)) {
432 	case WORK_ST_IDLE:
433 		return (retval);
434 	case WORK_ST_EXEC:
435 		tq = dwork->work.work_queue->taskqueue;
436 		if (taskqueue_cancel(tq, &dwork->work.work_task, NULL) != 0)
437 			taskqueue_drain(tq, &dwork->work.work_task);
438 		goto retry;	/* work may have restarted itself */
439 	case WORK_ST_TIMER:
440 	case WORK_ST_CANCEL:
441 		if (linux_cancel_timer(dwork, 1)) {
442 			/*
443 			 * Make sure taskqueue is also drained before
444 			 * returning:
445 			 */
446 			tq = dwork->work.work_queue->taskqueue;
447 			taskqueue_drain(tq, &dwork->work.work_task);
448 			retval = true;
449 			goto retry;
450 		}
451 		/* FALLTHROUGH */
452 	default:
453 		tq = dwork->work.work_queue->taskqueue;
454 		if (taskqueue_cancel(tq, &dwork->work.work_task, NULL) != 0)
455 			taskqueue_drain(tq, &dwork->work.work_task);
456 		retval = true;
457 		goto retry;
458 	}
459 }
460 
461 /*
462  * This function waits until the given work structure is completed.
463  * It returns non-zero if the work was successfully
464  * waited for. Else the work was not waited for.
465  */
466 bool
467 linux_flush_work(struct work_struct *work)
468 {
469 	struct taskqueue *tq;
470 	bool retval;
471 
472 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
473 	    "linux_flush_work() might sleep");
474 
475 	switch (atomic_read(&work->state)) {
476 	case WORK_ST_IDLE:
477 		return (false);
478 	default:
479 		tq = work->work_queue->taskqueue;
480 		retval = taskqueue_poll_is_busy(tq, &work->work_task);
481 		taskqueue_drain(tq, &work->work_task);
482 		return (retval);
483 	}
484 }
485 
486 /*
487  * This function waits until the given delayed work structure is
488  * completed. It returns non-zero if the work was successfully waited
489  * for. Else the work was not waited for.
490  */
491 bool
492 linux_flush_delayed_work(struct delayed_work *dwork)
493 {
494 	struct taskqueue *tq;
495 	bool retval;
496 
497 	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
498 	    "linux_flush_delayed_work() might sleep");
499 
500 	switch (atomic_read(&dwork->work.state)) {
501 	case WORK_ST_IDLE:
502 		return (false);
503 	case WORK_ST_TIMER:
504 		if (linux_cancel_timer(dwork, 1))
505 			linux_delayed_work_enqueue(dwork);
506 		/* FALLTHROUGH */
507 	default:
508 		tq = dwork->work.work_queue->taskqueue;
509 		retval = taskqueue_poll_is_busy(tq, &dwork->work.work_task);
510 		taskqueue_drain(tq, &dwork->work.work_task);
511 		return (retval);
512 	}
513 }
514 
515 /*
516  * This function returns true if the given work is pending, and not
517  * yet executing:
518  */
519 bool
520 linux_work_pending(struct work_struct *work)
521 {
522 	switch (atomic_read(&work->state)) {
523 	case WORK_ST_TIMER:
524 	case WORK_ST_TASK:
525 	case WORK_ST_CANCEL:
526 		return (true);
527 	default:
528 		return (false);
529 	}
530 }
531 
532 /*
533  * This function returns true if the given work is busy.
534  */
535 bool
536 linux_work_busy(struct work_struct *work)
537 {
538 	struct taskqueue *tq;
539 
540 	switch (atomic_read(&work->state)) {
541 	case WORK_ST_IDLE:
542 		return (false);
543 	case WORK_ST_EXEC:
544 		tq = work->work_queue->taskqueue;
545 		return (taskqueue_poll_is_busy(tq, &work->work_task));
546 	default:
547 		return (true);
548 	}
549 }
550 
551 struct workqueue_struct *
552 linux_create_workqueue_common(const char *name, int cpus)
553 {
554 	struct workqueue_struct *wq;
555 
556 	/*
557 	 * If zero CPUs are specified use the default number of CPUs:
558 	 */
559 	if (cpus == 0)
560 		cpus = linux_default_wq_cpus;
561 
562 	wq = kmalloc(sizeof(*wq), M_WAITOK | M_ZERO);
563 	wq->taskqueue = taskqueue_create(name, M_WAITOK,
564 	    taskqueue_thread_enqueue, &wq->taskqueue);
565 	atomic_set(&wq->draining, 0);
566 	taskqueue_start_threads(&wq->taskqueue, cpus, PWAIT, "%s", name);
567 	TAILQ_INIT(&wq->exec_head);
568 	mtx_init(&wq->exec_mtx, "linux_wq_exec", NULL, MTX_DEF);
569 
570 	return (wq);
571 }
572 
573 void
574 linux_destroy_workqueue(struct workqueue_struct *wq)
575 {
576 	atomic_inc(&wq->draining);
577 	drain_workqueue(wq);
578 	taskqueue_free(wq->taskqueue);
579 	mtx_destroy(&wq->exec_mtx);
580 	kfree(wq);
581 }
582 
583 void
584 linux_init_delayed_work(struct delayed_work *dwork, work_func_t func)
585 {
586 	memset(dwork, 0, sizeof(*dwork));
587 	dwork->work.func = func;
588 	TASK_INIT(&dwork->work.work_task, 0, linux_delayed_work_fn, dwork);
589 	mtx_init(&dwork->timer.mtx, spin_lock_name("lkpi-dwork"), NULL,
590 	    MTX_DEF | MTX_NOWITNESS);
591 	callout_init_mtx(&dwork->timer.callout, &dwork->timer.mtx, 0);
592 }
593 
594 struct work_struct *
595 linux_current_work(void)
596 {
597 	return (current->work);
598 }
599 
600 static void
601 linux_work_init(void *arg)
602 {
603 	int max_wq_cpus = mp_ncpus + 1;
604 
605 	/* avoid deadlock when there are too few threads */
606 	if (max_wq_cpus < 4)
607 		max_wq_cpus = 4;
608 
609 	/* set default number of CPUs */
610 	linux_default_wq_cpus = max_wq_cpus;
611 
612 	linux_system_short_wq = alloc_workqueue("linuxkpi_short_wq", 0, max_wq_cpus);
613 	linux_system_long_wq = alloc_workqueue("linuxkpi_long_wq", 0, max_wq_cpus);
614 
615 	/* populate the workqueue pointers */
616 	system_long_wq = linux_system_long_wq;
617 	system_wq = linux_system_short_wq;
618 	system_power_efficient_wq = linux_system_short_wq;
619 	system_unbound_wq = linux_system_short_wq;
620 	system_highpri_wq = linux_system_short_wq;
621 }
622 SYSINIT(linux_work_init, SI_SUB_TASKQ, SI_ORDER_THIRD, linux_work_init, NULL);
623 
624 static void
625 linux_work_uninit(void *arg)
626 {
627 	destroy_workqueue(linux_system_short_wq);
628 	destroy_workqueue(linux_system_long_wq);
629 
630 	/* clear workqueue pointers */
631 	system_long_wq = NULL;
632 	system_wq = NULL;
633 	system_power_efficient_wq = NULL;
634 	system_unbound_wq = NULL;
635 	system_highpri_wq = NULL;
636 }
637 SYSUNINIT(linux_work_uninit, SI_SUB_TASKQ, SI_ORDER_THIRD, linux_work_uninit, NULL);
638