xref: /minix/minix/lib/libblockdriver/driver_mt.c (revision ebfedea0)
1 /* This file contains the multithreaded driver interface.
2  *
3  * Changes:
4  *   Aug 27, 2011   created (A. Welzel)
5  *
6  * The entry points into this file are:
7  *   blockdriver_mt_task:	the main message loop of the driver
8  *   blockdriver_mt_terminate:	break out of the main message loop
9  *   blockdriver_mt_sleep:	put the current thread to sleep
10  *   blockdriver_mt_wakeup:	wake up a sleeping thread
11  *   blockdriver_mt_set_workers:set the number of worker threads
12  */
13 
14 #include <minix/blockdriver_mt.h>
15 #include <minix/mthread.h>
16 #include <assert.h>
17 
18 #include "const.h"
19 #include "driver.h"
20 #include "driver_mt.h"
21 #include "mq.h"
22 
23 /* A thread ID is composed of a device ID and a per-device worker thread ID.
24  * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive.
25  */
26 #define MAKE_TID(did, wid)	((did) * MAX_WORKERS + (wid))
27 #define TID_DEVICE(tid)		((tid) / MAX_WORKERS)
28 #define TID_WORKER(tid)		((tid) % MAX_WORKERS)
29 
30 typedef unsigned int worker_id_t;
31 
32 typedef enum {
33   STATE_DEAD,
34   STATE_RUNNING,
35   STATE_BUSY,
36   STATE_EXITED
37 } worker_state;
38 
39 /* Structure with information about a worker thread. */
40 typedef struct {
41   device_id_t device_id;
42   worker_id_t worker_id;
43   worker_state state;
44   mthread_thread_t mthread;
45   mthread_event_t sleep_event;
46 } worker_t;
47 
48 /* Structure with information about a device. */
49 typedef struct {
50   device_id_t id;
51   unsigned int workers;
52   worker_t worker[MAX_WORKERS];
53   mthread_event_t queue_event;
54   mthread_rwlock_t barrier;
55 } device_t;
56 
57 static struct blockdriver *bdtab;
58 static int running = FALSE;
59 
60 static mthread_key_t worker_key;
61 
62 static device_t device[MAX_DEVICES];
63 
64 static worker_t *exited[MAX_THREADS];
65 static int num_exited = 0;
66 
67 /*===========================================================================*
68  *				enqueue					     *
69  *===========================================================================*/
70 static void enqueue(device_t *dp, const message *m_src, int ipc_status)
71 {
72 /* Enqueue a message into the device's queue, and signal the event.
73  * Must be called from the master thread.
74  */
75 
76   if (!mq_enqueue(dp->id, m_src, ipc_status))
77 	panic("blockdriver_mt: enqueue failed (message queue full)");
78 
79   mthread_event_fire(&dp->queue_event);
80 }
81 
82 /*===========================================================================*
83  *				try_dequeue				     *
84  *===========================================================================*/
85 static int try_dequeue(device_t *dp, message *m_dst, int *ipc_status)
86 {
87 /* See if a message can be dequeued from the current worker thread's device
88  * queue. If so, dequeue the message and return TRUE. If not, return FALSE.
89  * Must be called from a worker thread. Does not block.
90  */
91 
92   return mq_dequeue(dp->id, m_dst, ipc_status);
93 }
94 
95 /*===========================================================================*
96  *				dequeue					     *
97  *===========================================================================*/
98 static int dequeue(device_t *dp, worker_t *wp, message *m_dst,
99   int *ipc_status)
100 {
101 /* Dequeue a message from the current worker thread's device queue. Block the
102  * current thread if necessary. Must be called from a worker thread. Either
103  * succeeds with a message (TRUE) or indicates that the thread should be
104  * terminated (FALSE).
105  */
106 
107   do {
108 	mthread_event_wait(&dp->queue_event);
109 
110 	/* If we were woken up as a result of terminate or set_workers, break
111 	 * out of the loop and terminate the thread.
112 	 */
113 	if (!running || wp->worker_id >= dp->workers)
114 		return FALSE;
115   } while (!try_dequeue(dp, m_dst, ipc_status));
116 
117   return TRUE;
118 }
119 
120 /*===========================================================================*
121  *				is_transfer_req				     *
122  *===========================================================================*/
123 static int is_transfer_req(int type)
124 {
125 /* Return whether the given block device request is a transfer request.
126  */
127 
128   switch (type) {
129   case BDEV_READ:
130   case BDEV_WRITE:
131   case BDEV_GATHER:
132   case BDEV_SCATTER:
133 	return TRUE;
134 
135   default:
136 	return FALSE;
137   }
138 }
139 
140 /*===========================================================================*
141  *				worker_thread				     *
142  *===========================================================================*/
143 static void *worker_thread(void *param)
144 {
145 /* The worker thread loop. Set up the thread-specific reference to itself and
146  * start looping. The loop consists of blocking dequeing and handling messages.
147  * After handling a message, the thread might have been stopped, so we check
148  * for this condition and exit if so.
149  */
150   worker_t *wp;
151   device_t *dp;
152   thread_id_t tid;
153   message m;
154   int ipc_status;
155 
156   wp = (worker_t *) param;
157   assert(wp != NULL);
158   dp = &device[wp->device_id];
159   tid = MAKE_TID(wp->device_id, wp->worker_id);
160 
161   if (mthread_setspecific(worker_key, wp))
162 	panic("blockdriver_mt: could not save local thread pointer");
163 
164   while (running && wp->worker_id < dp->workers) {
165 
166 	/* See if a new message is available right away. */
167 	if (!try_dequeue(dp, &m, &ipc_status)) {
168 
169 		/* If not, block waiting for a new message or a thread
170 		 * termination event.
171 		 */
172 		if (!dequeue(dp, wp, &m, &ipc_status))
173 			break;
174 	}
175 
176 	/* Even if the thread was stopped before, a new message resumes it. */
177 	wp->state = STATE_BUSY;
178 
179 	/* If the request is a transfer request, we acquire the read barrier
180 	 * lock. Otherwise, we acquire the write lock.
181 	 */
182 	if (is_transfer_req(m.m_type))
183 		mthread_rwlock_rdlock(&dp->barrier);
184 	else
185 		mthread_rwlock_wrlock(&dp->barrier);
186 
187 	/* Handle the request and send a reply. */
188 	blockdriver_process_on_thread(bdtab, &m, ipc_status, tid);
189 
190 	/* Switch the thread back to running state, and unlock the barrier. */
191 	wp->state = STATE_RUNNING;
192 	mthread_rwlock_unlock(&dp->barrier);
193   }
194 
195   /* Clean up and terminate this thread. */
196   if (mthread_setspecific(worker_key, NULL))
197 	panic("blockdriver_mt: could not delete local thread pointer");
198 
199   wp->state = STATE_EXITED;
200 
201   exited[num_exited++] = wp;
202 
203   return NULL;
204 }
205 
206 /*===========================================================================*
207  *				master_create_worker			     *
208  *===========================================================================*/
209 static void master_create_worker(worker_t *wp, worker_id_t worker_id,
210   device_id_t device_id)
211 {
212 /* Start a new worker thread.
213  */
214   mthread_attr_t attr;
215   int r;
216 
217   wp->device_id = device_id;
218   wp->worker_id = worker_id;
219   wp->state = STATE_RUNNING;
220 
221   /* Initialize synchronization primitives. */
222   mthread_event_init(&wp->sleep_event);
223 
224   r = mthread_attr_init(&attr);
225   if (r != 0)
226 	panic("blockdriver_mt: could not initialize attributes (%d)", r);
227 
228   r = mthread_attr_setstacksize(&attr, STACK_SIZE);
229   if (r != 0)
230 	panic("blockdriver_mt: could not set stack size (%d)", r);
231 
232   r = mthread_create(&wp->mthread, &attr, worker_thread, (void *) wp);
233   if (r != 0)
234 	panic("blockdriver_mt: could not start thread %d (%d)", worker_id, r);
235 
236   mthread_attr_destroy(&attr);
237 }
238 
239 /*===========================================================================*
240  *				master_destroy_worker			     *
241  *===========================================================================*/
242 static void master_destroy_worker(worker_t *wp)
243 {
244 /* Clean up resources used by an exited worker thread.
245  */
246 
247   assert(wp != NULL);
248   assert(wp->state == STATE_EXITED);
249 
250   /* Join the thread. */
251   if (mthread_join(wp->mthread, NULL))
252 	panic("blockdriver_mt: could not join thread %d", wp->worker_id);
253 
254   /* Destroy resources. */
255   mthread_event_destroy(&wp->sleep_event);
256 
257   wp->state = STATE_DEAD;
258 }
259 
260 /*===========================================================================*
261  *				master_handle_exits			     *
262  *===========================================================================*/
263 static void master_handle_exits(void)
264 {
265 /* Destroy the remains of all exited threads.
266  */
267   int i;
268 
269   for (i = 0; i < num_exited; i++)
270 	master_destroy_worker(exited[i]);
271 
272   num_exited = 0;
273 }
274 
275 /*===========================================================================*
276  *				master_yield				     *
277  *===========================================================================*/
278 static void master_yield(void)
279 {
280 /* Let worker threads run, and clean up any exited threads.
281  */
282 
283   mthread_yield_all();
284 
285   if (num_exited > 0)
286 	master_handle_exits();
287 }
288 
289 /*===========================================================================*
290  *				master_handle_message			     *
291  *===========================================================================*/
292 static void master_handle_message(message *m_ptr, int ipc_status)
293 {
294 /* For real request messages, query the device ID, start a thread if none is
295  * free and the maximum number of threads for that device has not yet been
296  * reached, and enqueue the message in the devices's message queue. All other
297  * messages are handled immediately from the main thread.
298  */
299   device_id_t id;
300   worker_t *wp;
301   device_t *dp;
302   unsigned int wid;
303   int r;
304 
305   /* If this is not a block driver request, we cannot get the minor device
306    * associated with it, and thus we can not tell which thread should process
307    * it either. In that case, the master thread has to handle it instead.
308    */
309   if (is_ipc_notify(ipc_status) || !IS_BDEV_RQ(m_ptr->m_type)) {
310 	/* Process as 'other' message. */
311 	blockdriver_process_on_thread(bdtab, m_ptr, ipc_status, MAIN_THREAD);
312 
313 	return;
314   }
315 
316   /* Query the device ID. Upon failure, send the error code to the caller. */
317   r = (*bdtab->bdr_device)(m_ptr->m_lbdev_lblockdriver_msg.minor, &id);
318   if (r != OK) {
319 	blockdriver_reply(m_ptr, ipc_status, r);
320 
321 	return;
322   }
323 
324   /* Look up the device control block. */
325   assert(id >= 0 && id < MAX_DEVICES);
326   dp = &device[id];
327 
328   /* Find the first non-busy worker thread. */
329   for (wid = 0; wid < dp->workers; wid++)
330 	if (dp->worker[wid].state != STATE_BUSY)
331 		break;
332 
333   /* If the worker thread is dead, start a thread now, unless we have already
334    * reached the maximum number of threads.
335    */
336   if (wid < dp->workers) {
337 	wp = &dp->worker[wid];
338 
339 	assert(wp->state != STATE_EXITED);
340 
341 	/* If the non-busy thread has not yet been created, create one now. */
342 	if (wp->state == STATE_DEAD)
343 		master_create_worker(wp, wid, dp->id);
344   }
345 
346   /* Enqueue the message at the device queue. */
347   enqueue(dp, m_ptr, ipc_status);
348 }
349 
350 /*===========================================================================*
351  *				master_init				     *
352  *===========================================================================*/
353 static void master_init(struct blockdriver *bdp)
354 {
355 /* Initialize the state of the master thread.
356  */
357   int i, j;
358 
359   assert(bdp != NULL);
360   assert(bdp->bdr_device != NULL);
361 
362   bdtab = bdp;
363 
364   /* Initialize device-specific data structures. */
365   for (i = 0; i < MAX_DEVICES; i++) {
366 	device[i].id = i;
367 	device[i].workers = 1;
368 	mthread_event_init(&device[i].queue_event);
369 	mthread_rwlock_init(&device[i].barrier);
370 
371 	for (j = 0; j < MAX_WORKERS; j++)
372 		device[i].worker[j].state = STATE_DEAD;
373   }
374 
375   /* Initialize a per-thread key, where each worker thread stores its own
376    * reference to the worker structure.
377    */
378   if (mthread_key_create(&worker_key, NULL))
379 	panic("blockdriver_mt: error initializing worker key");
380 }
381 
382 /*===========================================================================*
383  *				blockdriver_mt_get_tid			     *
384  *===========================================================================*/
385 thread_id_t blockdriver_mt_get_tid(void)
386 {
387 /* Return back the ID of this thread.
388  */
389   worker_t *wp;
390 
391   wp = (worker_t *) mthread_getspecific(worker_key);
392 
393   if (wp == NULL)
394 	panic("blockdriver_mt: master thread cannot query thread ID\n");
395 
396   return MAKE_TID(wp->device_id, wp->worker_id);
397 }
398 
399 /*===========================================================================*
400  *				blockdriver_mt_receive			     *
401  *===========================================================================*/
402 static void blockdriver_mt_receive(message *m_ptr, int *ipc_status)
403 {
404 /* Receive a message.
405  */
406   int r;
407 
408   r = sef_receive_status(ANY, m_ptr, ipc_status);
409 
410   if (r != OK)
411 	panic("blockdriver_mt: sef_receive_status() returned %d", r);
412 }
413 
414 /*===========================================================================*
415  *				blockdriver_mt_task			     *
416  *===========================================================================*/
417 void blockdriver_mt_task(struct blockdriver *driver_tab)
418 {
419 /* The multithreaded driver task.
420  */
421   int ipc_status, i;
422   message mess;
423 
424   /* Initialize first if necessary. */
425   if (!running) {
426 	master_init(driver_tab);
427 
428 	running = TRUE;
429   }
430 
431   /* The main message loop. */
432   while (running) {
433 	/* Receive a message. */
434 	blockdriver_mt_receive(&mess, &ipc_status);
435 
436 	/* Dispatch the message. */
437 	master_handle_message(&mess, ipc_status);
438 
439 	/* Let worker threads run. */
440 	master_yield();
441   }
442 
443   /* Free up resources. */
444   for (i = 0; i < MAX_DEVICES; i++)
445 	mthread_event_destroy(&device[i].queue_event);
446 }
447 
448 /*===========================================================================*
449  *				blockdriver_mt_terminate		     *
450  *===========================================================================*/
451 void blockdriver_mt_terminate(void)
452 {
453 /* Instruct libblockdriver to shut down.
454  */
455 
456   running = FALSE;
457 }
458 
459 /*===========================================================================*
460  *				blockdriver_mt_sleep			     *
461  *===========================================================================*/
462 void blockdriver_mt_sleep(void)
463 {
464 /* Let the current thread sleep until it gets woken up by the master thread.
465  */
466   worker_t *wp;
467 
468   wp = (worker_t *) mthread_getspecific(worker_key);
469 
470   if (wp == NULL)
471 	panic("blockdriver_mt: master thread cannot sleep");
472 
473   mthread_event_wait(&wp->sleep_event);
474 }
475 
476 /*===========================================================================*
477  *				blockdriver_mt_wakeup			     *
478  *===========================================================================*/
479 void blockdriver_mt_wakeup(thread_id_t id)
480 {
481 /* Wake up a sleeping worker thread from the master thread.
482  */
483   worker_t *wp;
484   device_id_t device_id;
485   worker_id_t worker_id;
486 
487   device_id = TID_DEVICE(id);
488   worker_id = TID_WORKER(id);
489 
490   assert(device_id >= 0 && device_id < MAX_DEVICES);
491   assert(worker_id < MAX_WORKERS);
492 
493   wp = &device[device_id].worker[worker_id];
494 
495   assert(wp->state == STATE_RUNNING || wp->state == STATE_BUSY);
496 
497   mthread_event_fire(&wp->sleep_event);
498 }
499 
500 /*===========================================================================*
501  *				blockdriver_mt_set_workers		     *
502  *===========================================================================*/
503 void blockdriver_mt_set_workers(device_id_t id, unsigned int workers)
504 {
505 /* Set the number of worker threads for the given device.
506  */
507   device_t *dp;
508 
509   assert(id >= 0 && id < MAX_DEVICES);
510 
511   if (workers > MAX_WORKERS)
512 	workers = MAX_WORKERS;
513 
514   dp = &device[id];
515 
516   /* If we are cleaning up, wake up all threads waiting on a queue event. */
517   if (workers == 1 && dp->workers > workers)
518 	mthread_event_fire_all(&dp->queue_event);
519 
520   dp->workers = workers;
521 }
522 
523 /*===========================================================================*
524  *				blockdriver_mt_is_idle			     *
525  *===========================================================================*/
526 int blockdriver_mt_is_idle(void)
527 {
528 /* Return whether the block driver is idle. This means that it has no enqueued
529  * requests and no busy worker threads. Used for live update functionality.
530  */
531   unsigned int did, wid;
532 
533   for (did = 0; did < MAX_DEVICES; did++) {
534 	if (!mq_isempty(did))
535 		return FALSE;
536 
537 	for (wid = 0; wid < device[did].workers; wid++)
538 		if (device[did].worker[wid].state == STATE_BUSY)
539 			return FALSE;
540   }
541 
542   return TRUE;
543 }
544 
545 /*===========================================================================*
546  *				blockdriver_mt_suspend			     *
547  *===========================================================================*/
548 void blockdriver_mt_suspend(void)
549 {
550 /* Suspend the driver operation in order to facilitate a live update.
551  * Suspension involves shutting down all worker threads, because transferring
552  * thread stacks is currently not supported by the state transfer framework.
553  */
554   unsigned int did;
555 
556   assert(running);
557   assert(blockdriver_mt_is_idle());
558 
559   /* We terminate the worker threads by simulating a driver shutdown. */
560   running = FALSE;
561 
562   for (did = 0; did < MAX_DEVICES; did++)
563 	mthread_event_fire_all(&device[did].queue_event);
564 
565   master_yield();
566 }
567 
568 /*===========================================================================*
569  *				blockdriver_mt_resume			     *
570  *===========================================================================*/
571 void blockdriver_mt_resume(void)
572 {
573 /* Resume regular operation after a (successful or failed) live update. We do
574  * not recreate worker threads; instead, they are recreated lazily as new
575  * requests come in.
576  */
577 
578   assert(!running);
579 
580   running = TRUE;
581 }
582