xref: /freebsd/contrib/lib9p/threadpool.c (revision 134e1779)
1*134e1779SJakub Wojciech Klama /*
2*134e1779SJakub Wojciech Klama  * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
3*134e1779SJakub Wojciech Klama  * All rights reserved
4*134e1779SJakub Wojciech Klama  *
5*134e1779SJakub Wojciech Klama  * Redistribution and use in source and binary forms, with or without
6*134e1779SJakub Wojciech Klama  * modification, are permitted providing that the following conditions
7*134e1779SJakub Wojciech Klama  * are met:
8*134e1779SJakub Wojciech Klama  * 1. Redistributions of source code must retain the above copyright
9*134e1779SJakub Wojciech Klama  *    notice, this list of conditions and the following disclaimer.
10*134e1779SJakub Wojciech Klama  * 2. Redistributions in binary form must reproduce the above copyright
11*134e1779SJakub Wojciech Klama  *    notice, this list of conditions and the following disclaimer in the
12*134e1779SJakub Wojciech Klama  *    documentation and/or other materials provided with the distribution.
13*134e1779SJakub Wojciech Klama  *
14*134e1779SJakub Wojciech Klama  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15*134e1779SJakub Wojciech Klama  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16*134e1779SJakub Wojciech Klama  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17*134e1779SJakub Wojciech Klama  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
18*134e1779SJakub Wojciech Klama  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19*134e1779SJakub Wojciech Klama  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20*134e1779SJakub Wojciech Klama  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21*134e1779SJakub Wojciech Klama  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22*134e1779SJakub Wojciech Klama  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23*134e1779SJakub Wojciech Klama  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24*134e1779SJakub Wojciech Klama  * POSSIBILITY OF SUCH DAMAGE.
25*134e1779SJakub Wojciech Klama  *
26*134e1779SJakub Wojciech Klama  */
27*134e1779SJakub Wojciech Klama 
28*134e1779SJakub Wojciech Klama #include <errno.h>
29*134e1779SJakub Wojciech Klama #include <stdlib.h>
30*134e1779SJakub Wojciech Klama #include <pthread.h>
31*134e1779SJakub Wojciech Klama #if defined(__FreeBSD__)
32*134e1779SJakub Wojciech Klama #include <pthread_np.h>
33*134e1779SJakub Wojciech Klama #endif
34*134e1779SJakub Wojciech Klama #include <sys/queue.h>
35*134e1779SJakub Wojciech Klama #include "lib9p.h"
36*134e1779SJakub Wojciech Klama #include "threadpool.h"
37*134e1779SJakub Wojciech Klama 
38*134e1779SJakub Wojciech Klama static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
39*134e1779SJakub Wojciech Klama     struct l9p_request *req);
40*134e1779SJakub Wojciech Klama 
41*134e1779SJakub Wojciech Klama static void *
l9p_responder(void * arg)42*134e1779SJakub Wojciech Klama l9p_responder(void *arg)
43*134e1779SJakub Wojciech Klama {
44*134e1779SJakub Wojciech Klama 	struct l9p_threadpool *tp;
45*134e1779SJakub Wojciech Klama 	struct l9p_worker *worker = arg;
46*134e1779SJakub Wojciech Klama 	struct l9p_request *req;
47*134e1779SJakub Wojciech Klama 
48*134e1779SJakub Wojciech Klama 	tp = worker->ltw_tp;
49*134e1779SJakub Wojciech Klama 	for (;;) {
50*134e1779SJakub Wojciech Klama 		/* get next reply to send */
51*134e1779SJakub Wojciech Klama 		pthread_mutex_lock(&tp->ltp_mtx);
52*134e1779SJakub Wojciech Klama 		while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)
53*134e1779SJakub Wojciech Klama 			pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);
54*134e1779SJakub Wojciech Klama 		if (worker->ltw_exiting) {
55*134e1779SJakub Wojciech Klama 			pthread_mutex_unlock(&tp->ltp_mtx);
56*134e1779SJakub Wojciech Klama 			break;
57*134e1779SJakub Wojciech Klama 		}
58*134e1779SJakub Wojciech Klama 
59*134e1779SJakub Wojciech Klama 		/* off reply queue */
60*134e1779SJakub Wojciech Klama 		req = STAILQ_FIRST(&tp->ltp_replyq);
61*134e1779SJakub Wojciech Klama 		STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
62*134e1779SJakub Wojciech Klama 
63*134e1779SJakub Wojciech Klama 		/* request is now in final glide path, can't be Tflush-ed */
64*134e1779SJakub Wojciech Klama 		req->lr_workstate = L9P_WS_REPLYING;
65*134e1779SJakub Wojciech Klama 
66*134e1779SJakub Wojciech Klama 		/* any flushers waiting for this request can go now */
67*134e1779SJakub Wojciech Klama 		if (req->lr_flushstate != L9P_FLUSH_NONE)
68*134e1779SJakub Wojciech Klama 			l9p_threadpool_rflush(tp, req);
69*134e1779SJakub Wojciech Klama 
70*134e1779SJakub Wojciech Klama 		pthread_mutex_unlock(&tp->ltp_mtx);
71*134e1779SJakub Wojciech Klama 
72*134e1779SJakub Wojciech Klama 		/* send response */
73*134e1779SJakub Wojciech Klama 		l9p_respond(req, false, true);
74*134e1779SJakub Wojciech Klama 	}
75*134e1779SJakub Wojciech Klama 	return (NULL);
76*134e1779SJakub Wojciech Klama }
77*134e1779SJakub Wojciech Klama 
78*134e1779SJakub Wojciech Klama static void *
l9p_worker(void * arg)79*134e1779SJakub Wojciech Klama l9p_worker(void *arg)
80*134e1779SJakub Wojciech Klama {
81*134e1779SJakub Wojciech Klama 	struct l9p_threadpool *tp;
82*134e1779SJakub Wojciech Klama 	struct l9p_worker *worker = arg;
83*134e1779SJakub Wojciech Klama 	struct l9p_request *req;
84*134e1779SJakub Wojciech Klama 
85*134e1779SJakub Wojciech Klama 	tp = worker->ltw_tp;
86*134e1779SJakub Wojciech Klama 	pthread_mutex_lock(&tp->ltp_mtx);
87*134e1779SJakub Wojciech Klama 	for (;;) {
88*134e1779SJakub Wojciech Klama 		while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)
89*134e1779SJakub Wojciech Klama 			pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);
90*134e1779SJakub Wojciech Klama 		if (worker->ltw_exiting)
91*134e1779SJakub Wojciech Klama 			break;
92*134e1779SJakub Wojciech Klama 
93*134e1779SJakub Wojciech Klama 		/* off work queue; now work-in-progress, by us */
94*134e1779SJakub Wojciech Klama 		req = STAILQ_FIRST(&tp->ltp_workq);
95*134e1779SJakub Wojciech Klama 		STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
96*134e1779SJakub Wojciech Klama 		req->lr_workstate = L9P_WS_INPROGRESS;
97*134e1779SJakub Wojciech Klama 		req->lr_worker = worker;
98*134e1779SJakub Wojciech Klama 		pthread_mutex_unlock(&tp->ltp_mtx);
99*134e1779SJakub Wojciech Klama 
100*134e1779SJakub Wojciech Klama 		/* actually try the request */
101*134e1779SJakub Wojciech Klama 		req->lr_error = l9p_dispatch_request(req);
102*134e1779SJakub Wojciech Klama 
103*134e1779SJakub Wojciech Klama 		/* move to responder queue, updating work-state */
104*134e1779SJakub Wojciech Klama 		pthread_mutex_lock(&tp->ltp_mtx);
105*134e1779SJakub Wojciech Klama 		req->lr_workstate = L9P_WS_RESPQUEUED;
106*134e1779SJakub Wojciech Klama 		req->lr_worker = NULL;
107*134e1779SJakub Wojciech Klama 		STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
108*134e1779SJakub Wojciech Klama 
109*134e1779SJakub Wojciech Klama 		/* signal the responder */
110*134e1779SJakub Wojciech Klama 		pthread_cond_signal(&tp->ltp_reply_cv);
111*134e1779SJakub Wojciech Klama 	}
112*134e1779SJakub Wojciech Klama 	pthread_mutex_unlock(&tp->ltp_mtx);
113*134e1779SJakub Wojciech Klama 	return (NULL);
114*134e1779SJakub Wojciech Klama }
115*134e1779SJakub Wojciech Klama 
116*134e1779SJakub Wojciech Klama /*
117*134e1779SJakub Wojciech Klama  * Just before finally replying to a request that got touched by
118*134e1779SJakub Wojciech Klama  * a Tflush request, we enqueue its flushers (requests of type
119*134e1779SJakub Wojciech Klama  * Tflush, which are now on the flushee's lr_flushq) onto the
120*134e1779SJakub Wojciech Klama  * response queue.
121*134e1779SJakub Wojciech Klama  */
122*134e1779SJakub Wojciech Klama static void
l9p_threadpool_rflush(struct l9p_threadpool * tp,struct l9p_request * req)123*134e1779SJakub Wojciech Klama l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
124*134e1779SJakub Wojciech Klama {
125*134e1779SJakub Wojciech Klama 	struct l9p_request *flusher;
126*134e1779SJakub Wojciech Klama 
127*134e1779SJakub Wojciech Klama 	/*
128*134e1779SJakub Wojciech Klama 	 * https://swtch.com/plan9port/man/man9/flush.html says:
129*134e1779SJakub Wojciech Klama 	 *
130*134e1779SJakub Wojciech Klama 	 * "Should multiple Tflushes be received for a pending
131*134e1779SJakub Wojciech Klama 	 * request, they must be answered in order.  A Rflush for
132*134e1779SJakub Wojciech Klama 	 * any of the multiple Tflushes implies an answer for all
133*134e1779SJakub Wojciech Klama 	 * previous ones.  Therefore, should a server receive a
134*134e1779SJakub Wojciech Klama 	 * request and then multiple flushes for that request, it
135*134e1779SJakub Wojciech Klama 	 * need respond only to the last flush."  This means
136*134e1779SJakub Wojciech Klama 	 * we could march through the queue of flushers here,
137*134e1779SJakub Wojciech Klama 	 * marking all but the last one as "to be dropped" rather
138*134e1779SJakub Wojciech Klama 	 * than "to be replied-to".
139*134e1779SJakub Wojciech Klama 	 *
140*134e1779SJakub Wojciech Klama 	 * However, we'll leave that for later, if ever -- it
141*134e1779SJakub Wojciech Klama 	 * should be harmless to respond to each, in order.
142*134e1779SJakub Wojciech Klama 	 */
143*134e1779SJakub Wojciech Klama 	STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
144*134e1779SJakub Wojciech Klama 		flusher->lr_workstate = L9P_WS_RESPQUEUED;
145*134e1779SJakub Wojciech Klama #ifdef notdef
146*134e1779SJakub Wojciech Klama 		if (not the last) {
147*134e1779SJakub Wojciech Klama 			flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
148*134e1779SJakub Wojciech Klama 			/* or, flusher->lr_drop = true ? */
149*134e1779SJakub Wojciech Klama 		}
150*134e1779SJakub Wojciech Klama #endif
151*134e1779SJakub Wojciech Klama 		STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
152*134e1779SJakub Wojciech Klama 	}
153*134e1779SJakub Wojciech Klama }
154*134e1779SJakub Wojciech Klama 
155*134e1779SJakub Wojciech Klama int
l9p_threadpool_init(struct l9p_threadpool * tp,int size)156*134e1779SJakub Wojciech Klama l9p_threadpool_init(struct l9p_threadpool *tp, int size)
157*134e1779SJakub Wojciech Klama {
158*134e1779SJakub Wojciech Klama 	struct l9p_worker *worker;
159*134e1779SJakub Wojciech Klama #if defined(__FreeBSD__)
160*134e1779SJakub Wojciech Klama 	char threadname[16];
161*134e1779SJakub Wojciech Klama #endif
162*134e1779SJakub Wojciech Klama 	int error;
163*134e1779SJakub Wojciech Klama 	int i, nworkers, nresponders;
164*134e1779SJakub Wojciech Klama 
165*134e1779SJakub Wojciech Klama 	if (size <= 0)
166*134e1779SJakub Wojciech Klama 		return (EINVAL);
167*134e1779SJakub Wojciech Klama 	error = pthread_mutex_init(&tp->ltp_mtx, NULL);
168*134e1779SJakub Wojciech Klama 	if (error)
169*134e1779SJakub Wojciech Klama 		return (error);
170*134e1779SJakub Wojciech Klama 	error = pthread_cond_init(&tp->ltp_work_cv, NULL);
171*134e1779SJakub Wojciech Klama 	if (error)
172*134e1779SJakub Wojciech Klama 		goto fail_work_cv;
173*134e1779SJakub Wojciech Klama 	error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
174*134e1779SJakub Wojciech Klama 	if (error)
175*134e1779SJakub Wojciech Klama 		goto fail_reply_cv;
176*134e1779SJakub Wojciech Klama 
177*134e1779SJakub Wojciech Klama 	STAILQ_INIT(&tp->ltp_workq);
178*134e1779SJakub Wojciech Klama 	STAILQ_INIT(&tp->ltp_replyq);
179*134e1779SJakub Wojciech Klama 	LIST_INIT(&tp->ltp_workers);
180*134e1779SJakub Wojciech Klama 
181*134e1779SJakub Wojciech Klama 	nresponders = 0;
182*134e1779SJakub Wojciech Klama 	nworkers = 0;
183*134e1779SJakub Wojciech Klama 	for (i = 0; i <= size; i++) {
184*134e1779SJakub Wojciech Klama 		worker = calloc(1, sizeof(struct l9p_worker));
185*134e1779SJakub Wojciech Klama 		worker->ltw_tp = tp;
186*134e1779SJakub Wojciech Klama 		worker->ltw_responder = i == 0;
187*134e1779SJakub Wojciech Klama 		error = pthread_create(&worker->ltw_thread, NULL,
188*134e1779SJakub Wojciech Klama 		    worker->ltw_responder ? l9p_responder : l9p_worker,
189*134e1779SJakub Wojciech Klama 		    (void *)worker);
190*134e1779SJakub Wojciech Klama 		if (error) {
191*134e1779SJakub Wojciech Klama 			free(worker);
192*134e1779SJakub Wojciech Klama 			break;
193*134e1779SJakub Wojciech Klama 		}
194*134e1779SJakub Wojciech Klama 		if (worker->ltw_responder)
195*134e1779SJakub Wojciech Klama 			nresponders++;
196*134e1779SJakub Wojciech Klama 		else
197*134e1779SJakub Wojciech Klama 			nworkers++;
198*134e1779SJakub Wojciech Klama 
199*134e1779SJakub Wojciech Klama #if defined(__FreeBSD__)
200*134e1779SJakub Wojciech Klama 		if (worker->ltw_responder) {
201*134e1779SJakub Wojciech Klama 			pthread_set_name_np(worker->ltw_thread, "9p-responder");
202*134e1779SJakub Wojciech Klama 		} else {
203*134e1779SJakub Wojciech Klama 			sprintf(threadname, "9p-worker:%d", i - 1);
204*134e1779SJakub Wojciech Klama 			pthread_set_name_np(worker->ltw_thread, threadname);
205*134e1779SJakub Wojciech Klama 		}
206*134e1779SJakub Wojciech Klama #endif
207*134e1779SJakub Wojciech Klama 
208*134e1779SJakub Wojciech Klama 		LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
209*134e1779SJakub Wojciech Klama 	}
210*134e1779SJakub Wojciech Klama 	if (nresponders == 0 || nworkers == 0) {
211*134e1779SJakub Wojciech Klama 		/* need the one responder, and at least one worker */
212*134e1779SJakub Wojciech Klama 		l9p_threadpool_shutdown(tp);
213*134e1779SJakub Wojciech Klama 		return (error);
214*134e1779SJakub Wojciech Klama 	}
215*134e1779SJakub Wojciech Klama 	return (0);
216*134e1779SJakub Wojciech Klama 
217*134e1779SJakub Wojciech Klama 	/*
218*134e1779SJakub Wojciech Klama 	 * We could avoid these labels by having multiple destroy
219*134e1779SJakub Wojciech Klama 	 * paths (one for each error case), or by having booleans
220*134e1779SJakub Wojciech Klama 	 * for which variables were initialized.  Neither is very
221*134e1779SJakub Wojciech Klama 	 * appealing...
222*134e1779SJakub Wojciech Klama 	 */
223*134e1779SJakub Wojciech Klama fail_reply_cv:
224*134e1779SJakub Wojciech Klama 	pthread_cond_destroy(&tp->ltp_work_cv);
225*134e1779SJakub Wojciech Klama fail_work_cv:
226*134e1779SJakub Wojciech Klama 	pthread_mutex_destroy(&tp->ltp_mtx);
227*134e1779SJakub Wojciech Klama 
228*134e1779SJakub Wojciech Klama 	return (error);
229*134e1779SJakub Wojciech Klama }
230*134e1779SJakub Wojciech Klama 
231*134e1779SJakub Wojciech Klama /*
232*134e1779SJakub Wojciech Klama  * Run a request, usually by queueing it.
233*134e1779SJakub Wojciech Klama  */
234*134e1779SJakub Wojciech Klama void
l9p_threadpool_run(struct l9p_threadpool * tp,struct l9p_request * req)235*134e1779SJakub Wojciech Klama l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
236*134e1779SJakub Wojciech Klama {
237*134e1779SJakub Wojciech Klama 
238*134e1779SJakub Wojciech Klama 	/*
239*134e1779SJakub Wojciech Klama 	 * Flush requests must be handled specially, since they
240*134e1779SJakub Wojciech Klama 	 * can cancel / kill off regular requests.  (But we can
241*134e1779SJakub Wojciech Klama 	 * run them through the regular dispatch mechanism.)
242*134e1779SJakub Wojciech Klama 	 */
243*134e1779SJakub Wojciech Klama 	if (req->lr_req.hdr.type == L9P_TFLUSH) {
244*134e1779SJakub Wojciech Klama 		/* not on a work queue yet so we can touch state */
245*134e1779SJakub Wojciech Klama 		req->lr_workstate = L9P_WS_IMMEDIATE;
246*134e1779SJakub Wojciech Klama 		(void) l9p_dispatch_request(req);
247*134e1779SJakub Wojciech Klama 	} else {
248*134e1779SJakub Wojciech Klama 		pthread_mutex_lock(&tp->ltp_mtx);
249*134e1779SJakub Wojciech Klama 		req->lr_workstate = L9P_WS_NOTSTARTED;
250*134e1779SJakub Wojciech Klama 		STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
251*134e1779SJakub Wojciech Klama 		pthread_cond_signal(&tp->ltp_work_cv);
252*134e1779SJakub Wojciech Klama 		pthread_mutex_unlock(&tp->ltp_mtx);
253*134e1779SJakub Wojciech Klama 	}
254*134e1779SJakub Wojciech Klama }
255*134e1779SJakub Wojciech Klama 
256*134e1779SJakub Wojciech Klama /*
257*134e1779SJakub Wojciech Klama  * Run a Tflush request.  Called via l9p_dispatch_request() since
258*134e1779SJakub Wojciech Klama  * it has some debug code in it, but not called from worker thread.
259*134e1779SJakub Wojciech Klama  */
260*134e1779SJakub Wojciech Klama int
l9p_threadpool_tflush(struct l9p_request * req)261*134e1779SJakub Wojciech Klama l9p_threadpool_tflush(struct l9p_request *req)
262*134e1779SJakub Wojciech Klama {
263*134e1779SJakub Wojciech Klama 	struct l9p_connection *conn;
264*134e1779SJakub Wojciech Klama 	struct l9p_threadpool *tp;
265*134e1779SJakub Wojciech Klama 	struct l9p_request *flushee;
266*134e1779SJakub Wojciech Klama 	uint16_t oldtag;
267*134e1779SJakub Wojciech Klama 	enum l9p_flushstate nstate;
268*134e1779SJakub Wojciech Klama 
269*134e1779SJakub Wojciech Klama 	/*
270*134e1779SJakub Wojciech Klama 	 * Find what we're supposed to flush (the flushee, as it were).
271*134e1779SJakub Wojciech Klama 	 */
272*134e1779SJakub Wojciech Klama 	req->lr_error = 0;	/* Tflush always succeeds */
273*134e1779SJakub Wojciech Klama 	conn = req->lr_conn;
274*134e1779SJakub Wojciech Klama 	tp = &conn->lc_tp;
275*134e1779SJakub Wojciech Klama 	oldtag = req->lr_req.tflush.oldtag;
276*134e1779SJakub Wojciech Klama 	ht_wrlock(&conn->lc_requests);
277*134e1779SJakub Wojciech Klama 	flushee = ht_find_locked(&conn->lc_requests, oldtag);
278*134e1779SJakub Wojciech Klama 	if (flushee == NULL) {
279*134e1779SJakub Wojciech Klama 		/*
280*134e1779SJakub Wojciech Klama 		 * Nothing to flush!  The old request must have
281*134e1779SJakub Wojciech Klama 		 * been done and gone already.  Just queue this
282*134e1779SJakub Wojciech Klama 		 * Tflush for a success reply.
283*134e1779SJakub Wojciech Klama 		 */
284*134e1779SJakub Wojciech Klama 		ht_unlock(&conn->lc_requests);
285*134e1779SJakub Wojciech Klama 		pthread_mutex_lock(&tp->ltp_mtx);
286*134e1779SJakub Wojciech Klama 		goto done;
287*134e1779SJakub Wojciech Klama 	}
288*134e1779SJakub Wojciech Klama 
289*134e1779SJakub Wojciech Klama 	/*
290*134e1779SJakub Wojciech Klama 	 * Found the original request.  We'll need to inspect its
291*134e1779SJakub Wojciech Klama 	 * work-state to figure out what to do.
292*134e1779SJakub Wojciech Klama 	 */
293*134e1779SJakub Wojciech Klama 	pthread_mutex_lock(&tp->ltp_mtx);
294*134e1779SJakub Wojciech Klama 	ht_unlock(&conn->lc_requests);
295*134e1779SJakub Wojciech Klama 
296*134e1779SJakub Wojciech Klama 	switch (flushee->lr_workstate) {
297*134e1779SJakub Wojciech Klama 
298*134e1779SJakub Wojciech Klama 	case L9P_WS_NOTSTARTED:
299*134e1779SJakub Wojciech Klama 		/*
300*134e1779SJakub Wojciech Klama 		 * Flushee is on work queue, but not yet being
301*134e1779SJakub Wojciech Klama 		 * handled by a worker.
302*134e1779SJakub Wojciech Klama 		 *
303*134e1779SJakub Wojciech Klama 		 * The documentation -- see
304*134e1779SJakub Wojciech Klama 		 * http://ericvh.github.io/9p-rfc/rfc9p2000.html
305*134e1779SJakub Wojciech Klama 		 * https://swtch.com/plan9port/man/man9/flush.html
306*134e1779SJakub Wojciech Klama 		 * -- says that "the server should answer the
307*134e1779SJakub Wojciech Klama 		 * flush message immediately".  However, Linux
308*134e1779SJakub Wojciech Klama 		 * sends flush requests for operations that
309*134e1779SJakub Wojciech Klama 		 * must finish, such as Tclunk, and it's not
310*134e1779SJakub Wojciech Klama 		 * possible to *answer* the flush request until
311*134e1779SJakub Wojciech Klama 		 * it has been handled (if necessary) or aborted
312*134e1779SJakub Wojciech Klama 		 * (if allowed).
313*134e1779SJakub Wojciech Klama 		 *
314*134e1779SJakub Wojciech Klama 		 * We therefore now just  the original request
315*134e1779SJakub Wojciech Klama 		 * and let the request-handler do whatever is
316*134e1779SJakub Wojciech Klama 		 * appropriate.  NOTE: we could have a table of
317*134e1779SJakub Wojciech Klama 		 * "requests that can be aborted without being
318*134e1779SJakub Wojciech Klama 		 * run" vs "requests that must be run to be
319*134e1779SJakub Wojciech Klama 		 * aborted", but for now that seems like an
320*134e1779SJakub Wojciech Klama 		 * unnecessary complication.
321*134e1779SJakub Wojciech Klama 		 */
322*134e1779SJakub Wojciech Klama 		nstate = L9P_FLUSH_REQUESTED_PRE_START;
323*134e1779SJakub Wojciech Klama 		break;
324*134e1779SJakub Wojciech Klama 
325*134e1779SJakub Wojciech Klama 	case L9P_WS_IMMEDIATE:
326*134e1779SJakub Wojciech Klama 		/*
327*134e1779SJakub Wojciech Klama 		 * This state only applies to Tflush requests, and
328*134e1779SJakub Wojciech Klama 		 * flushing a Tflush is illegal.  But we'll do nothing
329*134e1779SJakub Wojciech Klama 		 * special here, which will make us act like a flush
330*134e1779SJakub Wojciech Klama 		 * request for the flushee that arrived too late to
331*134e1779SJakub Wojciech Klama 		 * do anything about the flushee.
332*134e1779SJakub Wojciech Klama 		 */
333*134e1779SJakub Wojciech Klama 		nstate = L9P_FLUSH_REQUESTED_POST_START;
334*134e1779SJakub Wojciech Klama 		break;
335*134e1779SJakub Wojciech Klama 
336*134e1779SJakub Wojciech Klama 	case L9P_WS_INPROGRESS:
337*134e1779SJakub Wojciech Klama 		/*
338*134e1779SJakub Wojciech Klama 		 * Worker thread flushee->lr_worker is working on it.
339*134e1779SJakub Wojciech Klama 		 * Kick it to get it out of blocking system calls.
340*134e1779SJakub Wojciech Klama 		 * (This requires that it carefully set up some
341*134e1779SJakub Wojciech Klama 		 * signal handlers, and may be FreeBSD-dependent,
342*134e1779SJakub Wojciech Klama 		 * it probably cannot be handled this way on MacOS.)
343*134e1779SJakub Wojciech Klama 		 */
344*134e1779SJakub Wojciech Klama #ifdef notyet
345*134e1779SJakub Wojciech Klama 		pthread_kill(...);
346*134e1779SJakub Wojciech Klama #endif
347*134e1779SJakub Wojciech Klama 		nstate = L9P_FLUSH_REQUESTED_POST_START;
348*134e1779SJakub Wojciech Klama 		break;
349*134e1779SJakub Wojciech Klama 
350*134e1779SJakub Wojciech Klama 	case L9P_WS_RESPQUEUED:
351*134e1779SJakub Wojciech Klama 		/*
352*134e1779SJakub Wojciech Klama 		 * The flushee is already in the response queue.
353*134e1779SJakub Wojciech Klama 		 * We'll just mark it as having had some flush
354*134e1779SJakub Wojciech Klama 		 * action applied.
355*134e1779SJakub Wojciech Klama 		 */
356*134e1779SJakub Wojciech Klama 		nstate = L9P_FLUSH_TOOLATE;
357*134e1779SJakub Wojciech Klama 		break;
358*134e1779SJakub Wojciech Klama 
359*134e1779SJakub Wojciech Klama 	case L9P_WS_REPLYING:
360*134e1779SJakub Wojciech Klama 		/*
361*134e1779SJakub Wojciech Klama 		 * Although we found the flushee, it's too late to
362*134e1779SJakub Wojciech Klama 		 * make us depend on it: it's already heading out
363*134e1779SJakub Wojciech Klama 		 * the door as a reply.
364*134e1779SJakub Wojciech Klama 		 *
365*134e1779SJakub Wojciech Klama 		 * We don't want to do anything to the flushee.
366*134e1779SJakub Wojciech Klama 		 * Instead, we want to work the same way as if
367*134e1779SJakub Wojciech Klama 		 * we had never found the tag.
368*134e1779SJakub Wojciech Klama 		 */
369*134e1779SJakub Wojciech Klama 		goto done;
370*134e1779SJakub Wojciech Klama 	}
371*134e1779SJakub Wojciech Klama 
372*134e1779SJakub Wojciech Klama 	/*
373*134e1779SJakub Wojciech Klama 	 * Now add us to the list of Tflush-es that are waiting
374*134e1779SJakub Wojciech Klama 	 * for the flushee (creating the list if needed, i.e., if
375*134e1779SJakub Wojciech Klama 	 * this is the first Tflush for the flushee).  We (req)
376*134e1779SJakub Wojciech Klama 	 * will get queued for reply later, when the responder
377*134e1779SJakub Wojciech Klama 	 * processes the flushee and calls l9p_threadpool_rflush().
378*134e1779SJakub Wojciech Klama 	 */
379*134e1779SJakub Wojciech Klama 	if (flushee->lr_flushstate == L9P_FLUSH_NONE)
380*134e1779SJakub Wojciech Klama 		STAILQ_INIT(&flushee->lr_flushq);
381*134e1779SJakub Wojciech Klama 	flushee->lr_flushstate = nstate;
382*134e1779SJakub Wojciech Klama 	STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
383*134e1779SJakub Wojciech Klama 
384*134e1779SJakub Wojciech Klama 	pthread_mutex_unlock(&tp->ltp_mtx);
385*134e1779SJakub Wojciech Klama 
386*134e1779SJakub Wojciech Klama 	return (0);
387*134e1779SJakub Wojciech Klama 
388*134e1779SJakub Wojciech Klama done:
389*134e1779SJakub Wojciech Klama 	/*
390*134e1779SJakub Wojciech Klama 	 * This immediate op is ready to be replied-to now, so just
391*134e1779SJakub Wojciech Klama 	 * stick it onto the reply queue.
392*134e1779SJakub Wojciech Klama 	 */
393*134e1779SJakub Wojciech Klama 	req->lr_workstate = L9P_WS_RESPQUEUED;
394*134e1779SJakub Wojciech Klama 	STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
395*134e1779SJakub Wojciech Klama 	pthread_mutex_unlock(&tp->ltp_mtx);
396*134e1779SJakub Wojciech Klama 	pthread_cond_signal(&tp->ltp_reply_cv);
397*134e1779SJakub Wojciech Klama 	return (0);
398*134e1779SJakub Wojciech Klama }
399*134e1779SJakub Wojciech Klama 
400*134e1779SJakub Wojciech Klama int
l9p_threadpool_shutdown(struct l9p_threadpool * tp)401*134e1779SJakub Wojciech Klama l9p_threadpool_shutdown(struct l9p_threadpool *tp)
402*134e1779SJakub Wojciech Klama {
403*134e1779SJakub Wojciech Klama 	struct l9p_worker *worker, *tmp;
404*134e1779SJakub Wojciech Klama 
405*134e1779SJakub Wojciech Klama 	LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
406*134e1779SJakub Wojciech Klama 		pthread_mutex_lock(&tp->ltp_mtx);
407*134e1779SJakub Wojciech Klama 		worker->ltw_exiting = true;
408*134e1779SJakub Wojciech Klama 		if (worker->ltw_responder)
409*134e1779SJakub Wojciech Klama 			pthread_cond_signal(&tp->ltp_reply_cv);
410*134e1779SJakub Wojciech Klama 		else
411*134e1779SJakub Wojciech Klama 			pthread_cond_broadcast(&tp->ltp_work_cv);
412*134e1779SJakub Wojciech Klama 		pthread_mutex_unlock(&tp->ltp_mtx);
413*134e1779SJakub Wojciech Klama 		pthread_join(worker->ltw_thread, NULL);
414*134e1779SJakub Wojciech Klama 		LIST_REMOVE(worker, ltw_link);
415*134e1779SJakub Wojciech Klama 		free(worker);
416*134e1779SJakub Wojciech Klama 	}
417*134e1779SJakub Wojciech Klama 	pthread_cond_destroy(&tp->ltp_reply_cv);
418*134e1779SJakub Wojciech Klama 	pthread_cond_destroy(&tp->ltp_work_cv);
419*134e1779SJakub Wojciech Klama 	pthread_mutex_destroy(&tp->ltp_mtx);
420*134e1779SJakub Wojciech Klama 
421*134e1779SJakub Wojciech Klama 	return (0);
422*134e1779SJakub Wojciech Klama }
423