1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include "config.h"
27 
28 #ifdef HAVE_LWPS
29 #include <sys/lwp.h>
30 #endif
31 #include <fcntl.h>
32 #include "filebench.h"
33 #include "flowop.h"
34 #include "stats.h"
35 #include "ioprio.h"
36 
37 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
38     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
39 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
40 static int flowop_composite_init(flowop_t *flowop);
41 static void flowop_composite_destruct(flowop_t *flowop);
42 
43 /*
44  * A collection of flowop support functions. The actual code that
45  * implements the various flowops is in flowop_library.c.
46  *
47  * Routines for defining, creating, initializing and destroying
48  * flowops, cyclically invoking the flowops on each threadflow's flowop
49  * list, collecting statistics about flowop execution, and other
50  * housekeeping duties are included in this file.
51  *
52  * User Defined Composite Flowops
53  *    The ability to define new flowops as lists of built-in or previously
54  * defined flowops has been added to Filebench. In a sense they are like
55  * in-line subroutines, which can have default attributes set at definition
56  * time and passed arguments at invocation time. Like other flowops (and
57  * unlike conventional subroutines) you can invoke them with an iteration
58  * count (the "iter" attribute), and they will loop through their associated
59  * list of flowops for "iter" number of times each time they are encountered
60  * in the thread or outer composite flowop which invokes them.
61  *
62  * Composite flowops are created with a "define" command, are given a name,
63  * optional default attributes, and local variable definitions on the
64  * "define" command line, followed by a brace enclosed list of flowops
65  * to execute. The enclosed flowops may include attributes that reference
66  * the local variables, as well as constants and global variables.
67  *
68  * Composite flowops are used pretty much like regular flowops, but you can
69  * also set local variables to constants or global variables ($local_var =
70  * [$var | $random_var | string | boolean | integer | double]) as part of
71  * the invocation. Thus each invocation can pass customized values to its
72  * inner flowops, greatly increasing their generality.
73  *
74  * All flowops are placed on a global, singly linked list, with fo_next
75  * being the link pointer for this list. The are also placed on a private
76  * list for the thread or composite flowop they are part of. The tf_thrd_fops
77  * pointer in the thread will point to the list of top level flowops in the
78  * thread, which are linked together by fo_exec_next. If any of these flowops
79  * are composite flowops, they will have a list of second level flowops rooted
80  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
81  * of all flowops, and an n-arry tree of threads, composite flowops, and
82  * flowops, with composite flowops being the branch nodes in the tree.
83  *
84  * To illustrate, if we have three first level flowops, the first of which is
85  * a composite flowop consisting of two other flowops, we get:
86  *
87  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
88  *			   flowop->fo_comp_fops		    |
89  *				    |			    V
90  *				    |			flowop->fo_exec_next
91  *				    |
92  *				    V
93  *				flowop->fo_exec_next -> flowop->fo_exec_next
94  *
95  * And all five flowops (plus others from any other threads) are on a global
96  * list linked with fo_next.
97  */
98 
99 /*
100  * Prints the name and instance number of each flowop in
101  * the supplied list to the filebench log.
102  */
103 int
flowop_printlist(flowop_t * list)104 flowop_printlist(flowop_t *list)
105 {
106 	flowop_t *flowop = list;
107 
108 	while (flowop) {
109 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
110 		    flowop->fo_name, flowop->fo_instance);
111 		flowop = flowop->fo_exec_next;
112 	}
113 	return (0);
114 }
115 
116 /*
117  * Prints the name and instance number of all flowops on
118  * the master flowop list to the console and the filebench log.
119  */
120 void
flowop_printall(void)121 flowop_printall(void)
122 {
123 	flowop_t *flowop = filebench_shm->shm_flowoplist;
124 
125 	while (flowop) {
126 		filebench_log(LOG_INFO, "flowop-list %s-%d",
127 		    flowop->fo_name, flowop->fo_instance);
128 		flowop = flowop->fo_next;
129 	}
130 }
131 
132 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
133 					(e.tv_nsec - s.tv_nsec))
134 /*
135  * Puts current high-resolution time in start time entry for threadflow.
136  */
137 void
flowop_beginop(threadflow_t * threadflow,flowop_t * flowop)138 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
139 {
140 	/* Start of op for this thread */
141 	threadflow->tf_stime = gethrtime();
142 }
143 
144 struct flowstats controlstats;
145 pthread_mutex_t controlstats_lock;
146 static int controlstats_zeroed = 0;
147 
148 static void
flowop_populate_distribution(flowop_t * flowop,unsigned long long ll_delay)149 flowop_populate_distribution(flowop_t *flowop,  unsigned long long ll_delay)
150 {
151 	unsigned int i;
152 	unsigned long long iii;
153 
154 	iii = 1;
155 	for (i = 0; i < OSPROF_BUCKET_NUMBER; i++) {
156 		if (ll_delay < iii)
157 			break;
158 		iii <<= 1;
159 	}
160 	flowop->fo_stats.fs_distribution[i]++;
161 }
162 
163 /*
164  * Updates flowop's latency statistics, using saved start
165  * time and current high resolution time. Updates flowop's
166  * io count and transferred bytes statistics. Also updates
167  * threadflow's and flowop's cumulative read or write byte
168  * and io count statistics.
169  */
170 void
flowop_endop(threadflow_t * threadflow,flowop_t * flowop,int64_t bytes)171 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
172 {
173 	unsigned long long ll_delay;
174 
175 	ll_delay = (gethrtime() - threadflow->tf_stime);
176 
177 	/* setting minimum and maximum latencies for this flowop */
178 	if (!flowop->fo_stats.fs_minlat || ll_delay < flowop->fo_stats.fs_minlat)
179 		flowop->fo_stats.fs_minlat = ll_delay;
180 
181 	if (ll_delay > flowop->fo_stats.fs_maxlat)
182 		flowop->fo_stats.fs_maxlat = ll_delay;
183 
184 	flowop->fo_stats.fs_total_lat += ll_delay;
185 	flowop->fo_stats.fs_count++;
186 	flowop->fo_stats.fs_bytes += bytes;
187 	(void) ipc_mutex_lock(&controlstats_lock);
188 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
189 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
190 		controlstats.fs_count++;
191 		controlstats.fs_bytes += bytes;
192 	}
193 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
194 		threadflow->tf_stats.fs_rbytes += bytes;
195 		threadflow->tf_stats.fs_rcount++;
196 		flowop->fo_stats.fs_rcount++;
197 		controlstats.fs_rbytes += bytes;
198 		controlstats.fs_rcount++;
199 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
200 		threadflow->tf_stats.fs_wbytes += bytes;
201 		threadflow->tf_stats.fs_wcount++;
202 		flowop->fo_stats.fs_wcount++;
203 		controlstats.fs_wbytes += bytes;
204 		controlstats.fs_wcount++;
205 	}
206 
207 	if (filebench_shm->lathist_enabled)
208 		flowop_populate_distribution(flowop, ll_delay);
209 
210 	(void) ipc_mutex_unlock(&controlstats_lock);
211 }
212 
213 /*
214  * Calls the flowop's initialization function, pointed to by
215  * flowop->fo_init.
216  */
217 static int
flowop_initflow(flowop_t * flowop)218 flowop_initflow(flowop_t *flowop)
219 {
220 	/*
221 	 * save static copies of two items, in case they are supplied
222 	 * from random variables
223 	 */
224 	if (flowop->fo_value)
225 		flowop->fo_constvalue = avd_get_int(flowop->fo_value);
226 
227 	if (flowop->fo_wss)
228 		flowop->fo_constwss = avd_get_int(flowop->fo_wss);
229 
230 	if ((*flowop->fo_init)(flowop) < 0) {
231 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
232 		    flowop->fo_name, flowop->fo_instance);
233 		return (-1);
234 	}
235 	return (0);
236 }
237 
238 static int
flowop_create_runtime_flowops(threadflow_t * threadflow,flowop_t ** ops_list_ptr)239 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
240 {
241 	flowop_t *flowop = *ops_list_ptr;
242 	char *name;
243 
244 	while (flowop) {
245 		flowop_t *newflowop;
246 
247 		if (flowop == *ops_list_ptr)
248 			*ops_list_ptr = NULL;
249 
250 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
251 		    flowop, ops_list_ptr, 1, 0);
252 		if (newflowop == NULL)
253 			return (FILEBENCH_ERROR);
254 
255 		/* check for fo_filename attribute, and resolve if present */
256 		if (flowop->fo_filename) {
257 			name = avd_get_str(flowop->fo_filename);
258 			newflowop->fo_fileset = fileset_find(name);
259 
260 			if (newflowop->fo_fileset == NULL) {
261 				filebench_log(LOG_ERROR,
262 				    "flowop %s: file %s not found",
263 				    newflowop->fo_name, name);
264 				filebench_shutdown(1);
265 			}
266 		}
267 
268 		if (flowop_initflow(newflowop) < 0) {
269 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
270 			    newflowop->fo_name);
271 		}
272 
273 		flowop = flowop->fo_exec_next;
274 	}
275 
276 	return (FILEBENCH_OK);
277 }
278 
279 /*
280  * Calls the flowop's destruct function, pointed to by
281  * flowop->fo_destruct.
282  */
283 static void
flowop_destructflow(flowop_t * flowop)284 flowop_destructflow(flowop_t *flowop)
285 {
286 	(*flowop->fo_destruct)(flowop);
287 }
288 
289 /*
290  * call the destruct funtions of all the threadflow's flowops,
291  * if it is still flagged as "running".
292  */
293 void
flowop_destruct_all_flows(threadflow_t * threadflow)294 flowop_destruct_all_flows(threadflow_t *threadflow)
295 {
296 	flowop_t *flowop;
297 
298 	/* wait a moment to give other threads a chance to stop too */
299 	(void) sleep(1);
300 
301 	(void) ipc_mutex_lock(&threadflow->tf_lock);
302 
303 	/* prepare to call destruct flow routines, if necessary */
304 	if (threadflow->tf_running == 0) {
305 
306 		/* allready destroyed */
307 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
308 		return;
309 	}
310 
311 	flowop = threadflow->tf_thrd_fops;
312 	threadflow->tf_running = 0;
313 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
314 
315 	while (flowop) {
316 		flowop_destructflow(flowop);
317 		flowop = flowop->fo_exec_next;
318 	}
319 }
320 
321 /*
322  * The final initialization and main execution loop for the
323  * worker threads. Sets threadflow and flowop start times,
324  * waits for all process to start, then creates the runtime
325  * flowops from those defined by the F language workload
326  * script. It does some more initialization, then enters a
327  * loop to repeatedly execute the flowops on the flowop list
328  * until an abort condition is detected, at which time it exits.
329  * This is the starting routine for the new worker thread
330  * created by threadflow_createthread(), and is not currently
331  * called from anywhere else.
332  */
333 void
flowop_start(threadflow_t * threadflow)334 flowop_start(threadflow_t *threadflow)
335 {
336 	flowop_t *flowop;
337 	size_t memsize;
338 	int ret = FILEBENCH_OK;
339 
340 	set_thread_ioprio(threadflow);
341 
342 	(void) ipc_mutex_lock(&controlstats_lock);
343 	if (!controlstats_zeroed) {
344 		(void) memset(&controlstats, 0, sizeof (controlstats));
345 		controlstats_zeroed = 1;
346 	}
347 	(void) ipc_mutex_unlock(&controlstats_lock);
348 
349 	flowop = threadflow->tf_thrd_fops;
350 
351 	/* Hold the flowop find lock as reader to prevent lookups */
352 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
353 
354 	/* Create the runtime flowops from those defined by the script */
355 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
356 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
357 	    != FILEBENCH_OK) {
358 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
359 		filebench_shutdown(1);
360 		return;
361 	}
362 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
363 
364 	/* Release the find lock as reader to allow lookups */
365 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
366 
367 	/* Set to the start of the new flowop list */
368 	flowop = threadflow->tf_thrd_fops;
369 
370 #ifdef HAVE_LWPS
371 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
372 	    threadflow,
373 	    _lwp_self());
374 #endif
375 
376 	/*
377 	 * Now we set tf_running flag to indicate to the main process
378 	 * that the worker thread is running. However, the thread is
379 	 * still not executing the workload, as it is blocked by the
380 	 * shm_run_lock. Main thread will release this lock when all
381 	 * threads set their tf_running flag to 1.
382 	 */
383 	threadflow->tf_abort = 0;
384 	threadflow->tf_running = 1;
385 
386 	/*
387 	 * Block until all processes have started, acting like
388 	 * a barrier. The original filebench process initially
389 	 * holds the run_lock as a reader, preventing any of the
390 	 * threads from obtaining the writer lock, and hence
391 	 * passing this point. Once all processes and threads
392 	 * have been created, the original process unlocks
393 	 * run_lock, allowing each waiting thread to lock
394 	 * and then immediately unlock it, then begin running.
395 	 */
396 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
397 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
398 
399 	memsize = (size_t)threadflow->tf_constmemsize;
400 
401 	/*
402 	 * Alloc from ISM, which should have been created before the main process
403 	 * wakes up the current process by releasing shm_run_lock.
404 	 */
405 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
406 		threadflow->tf_mem =
407 		    ipc_ismmalloc(memsize);
408 	} else {
409 		threadflow->tf_mem =
410 		    malloc(memsize);
411 	}
412 
413 	(void) memset(threadflow->tf_mem, 0, memsize);
414 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
415 
416 	/* Main filebench worker loop */
417 	while (ret == FILEBENCH_OK) {
418 		int i, count;
419 
420 		/* Abort if asked */
421 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
422 			break;
423 
424 		/* Be quiet while stats are gathered */
425 		if (filebench_shm->shm_bequiet) {
426 			(void) sleep(1);
427 			continue;
428 		}
429 
430 		/* Take it easy until everyone is ready to go */
431 		if (!filebench_shm->shm_procs_running) {
432 			(void) sleep(1);
433 			continue;
434 		}
435 
436 		if (flowop == NULL) {
437 			filebench_log(LOG_ERROR, "flowop_read null flowop");
438 			return;
439 		}
440 
441 		/* Execute the flowop for fo_iters times */
442 		count = (int)avd_get_int(flowop->fo_iters);
443 		for (i = 0; i < count; i++) {
444 
445 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
446 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
447 			    flowop->fo_instance);
448 
449 			ret = (*flowop->fo_func)(threadflow, flowop);
450 
451 			/*
452 			 * Return value FILEBENCH_ERROR means "flowop
453 			 * failed, stop the filebench run"
454 			 */
455 			if (ret == FILEBENCH_ERROR) {
456 				filebench_log(LOG_ERROR,
457 				    "%s-%d: flowop %s-%d failed",
458 				    threadflow->tf_name,
459 				    threadflow->tf_instance,
460 				    flowop->fo_name,
461 				    flowop->fo_instance);
462 				(void) ipc_mutex_lock(&threadflow->tf_lock);
463 				threadflow->tf_abort = 1;
464 				filebench_shm->shm_f_abort =
465 				    FILEBENCH_ABORT_ERROR;
466 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
467 				break;
468 			}
469 
470 			/*
471 			 * Return value of FILEBENCH_NORSC means "stop
472 			 * the filebench run" if in "end on no work mode",
473 			 * otherwise it indicates an error
474 			 */
475 			if (ret == FILEBENCH_NORSC) {
476 				(void) ipc_mutex_lock(&threadflow->tf_lock);
477 				threadflow->tf_abort = FILEBENCH_DONE;
478 				if (filebench_shm->shm_rmode ==
479 				    FILEBENCH_MODE_Q1STDONE) {
480 					filebench_shm->shm_f_abort =
481 					    FILEBENCH_ABORT_RSRC;
482 				} else if (filebench_shm->shm_rmode !=
483 				    FILEBENCH_MODE_QALLDONE) {
484 					filebench_log(LOG_ERROR1,
485 					    "WARNING! Run stopped early:\n   "
486 					    "             flowop %s-%d could "
487 					    "not obtain a file. Please\n      "
488 					    "          reduce runtime, "
489 					    "increase fileset entries "
490 					    "($nfiles), or switch modes.",
491 					    flowop->fo_name,
492 					    flowop->fo_instance);
493 					filebench_shm->shm_f_abort =
494 					    FILEBENCH_ABORT_ERROR;
495 				}
496 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
497 				break;
498 			}
499 
500 			/*
501 			 * Return value of FILEBENCH_DONE means "stop
502 			 * the filebench run without error"
503 			 */
504 			if (ret == FILEBENCH_DONE) {
505 				(void) ipc_mutex_lock(&threadflow->tf_lock);
506 				threadflow->tf_abort = FILEBENCH_DONE;
507 				filebench_shm->shm_f_abort =
508 				    FILEBENCH_ABORT_DONE;
509 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
510 				break;
511 			}
512 
513 			/*
514 			 * If we get here and the return is something other
515 			 * than FILEBENCH_OK, it means a spurious code
516 			 * was returned, so treat as major error. This
517 			 * probably indicates a bug in the flowop.
518 			 */
519 			if (ret != FILEBENCH_OK) {
520 				filebench_log(LOG_ERROR,
521 				    "Flowop %s unexpected return value = %d\n",
522 				    flowop->fo_name, ret);
523 				filebench_shm->shm_f_abort =
524 				    FILEBENCH_ABORT_ERROR;
525 				break;
526 			}
527 		}
528 
529 		/* advance to next flowop */
530 		flowop = flowop->fo_exec_next;
531 
532 		/* but if at end of list, start over from the beginning */
533 		if (flowop == NULL) {
534 			flowop = threadflow->tf_thrd_fops;
535 			threadflow->tf_stats.fs_count++;
536 		}
537 	}
538 
539 #ifdef HAVE_LWPS
540 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
541 	    _lwp_self());
542 #endif
543 
544 	/* Tell flowops to destroy locally acquired state */
545 	flowop_destruct_all_flows(threadflow);
546 
547 	pthread_exit(&threadflow->tf_abort);
548 }
549 
550  /*
551   * For master mode we add flowops from the generic library, flowops that are
552   * file system specific, and adjust the vector of functions used by the
553   * generic library.  For worker mode we only need to adjust the vector.
554   */
555 void
flowop_init(int ismaster)556 flowop_init(int ismaster)
557 {
558 	if (ismaster) {
559 		(void) pthread_mutex_init(&controlstats_lock,
560 	   			 ipc_mutexattr(IPC_MUTEX_NORMAL));
561 		flowoplib_flowinit();
562 	}
563 
564 	switch (filebench_shm->shm_filesys_type) {
565 	case LOCAL_FS_PLUG:
566 		if (ismaster)
567 			fb_lfs_newflowops();
568 		fb_lfs_funcvecinit();
569 		break;
570 	case NFS3_PLUG:
571 	case NFS4_PLUG:
572 	case CIFS_PLUG:
573 		break;
574 	}
575 }
576 
577 /*
578  * Delete the designated flowop from the thread's flowop list.
579  */
580 static void
flowop_delete(flowop_t ** flowoplist,flowop_t * flowop)581 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
582 {
583 	flowop_t *entry = *flowoplist;
584 	int found = 0;
585 
586 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
587 	    flowop->fo_name,
588 	    flowop->fo_instance);
589 
590 	/* Delete from thread's flowop list */
591 	if (flowop == *flowoplist) {
592 		/* First on list */
593 		*flowoplist = flowop->fo_exec_next;
594 		filebench_log(LOG_DEBUG_IMPL,
595 		    "Delete0 flowop: (%s-%d)",
596 		    flowop->fo_name,
597 		    flowop->fo_instance);
598 	} else {
599 		while (entry->fo_exec_next) {
600 			filebench_log(LOG_DEBUG_IMPL,
601 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
602 			    entry->fo_exec_next->fo_name,
603 			    entry->fo_exec_next->fo_instance,
604 			    flowop->fo_name,
605 			    flowop->fo_instance);
606 
607 			if (flowop == entry->fo_exec_next) {
608 				/* Delete */
609 				filebench_log(LOG_DEBUG_IMPL,
610 				    "Deleted0 flowop: (%s-%d)",
611 				    entry->fo_exec_next->fo_name,
612 				    entry->fo_exec_next->fo_instance);
613 				entry->fo_exec_next =
614 				    entry->fo_exec_next->fo_exec_next;
615 				break;
616 			}
617 			entry = entry->fo_exec_next;
618 		}
619 	}
620 
621 	/* Delete from global list */
622 	entry = filebench_shm->shm_flowoplist;
623 
624 	if (flowop == filebench_shm->shm_flowoplist) {
625 		/* First on list */
626 		filebench_shm->shm_flowoplist = flowop->fo_next;
627 		found = 1;
628 	} else {
629 		while (entry->fo_next) {
630 			filebench_log(LOG_DEBUG_IMPL,
631 			    "Delete flowop: (%s-%d) == (%s-%d)",
632 			    entry->fo_next->fo_name,
633 			    entry->fo_next->fo_instance,
634 			    flowop->fo_name,
635 			    flowop->fo_instance);
636 
637 			if (flowop == entry->fo_next) {
638 				/* Delete */
639 				entry->fo_next = entry->fo_next->fo_next;
640 				found = 1;
641 				break;
642 			}
643 
644 			entry = entry->fo_next;
645 		}
646 	}
647 	if (found) {
648 		filebench_log(LOG_DEBUG_IMPL,
649 		    "Deleted flowop: (%s-%d)",
650 		    flowop->fo_name,
651 		    flowop->fo_instance);
652 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
653 	} else {
654 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
655 		    flowop->fo_name,
656 		    flowop->fo_instance);
657 	}
658 }
659 
660 /*
661  * Deletes all the flowops from a flowop list.
662  */
663 void
flowop_delete_all(flowop_t ** flowoplist)664 flowop_delete_all(flowop_t **flowoplist)
665 {
666 	flowop_t *flowop = *flowoplist;
667 	flowop_t *next_flowop;
668 
669 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
670 
671 	while (flowop) {
672 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
673 		    flowop->fo_name, flowop->fo_instance);
674 
675 		if (flowop->fo_instance &&
676 		    (flowop->fo_instance == FLOW_MASTER)) {
677 			flowop = flowop->fo_exec_next;
678 			continue;
679 		}
680 		next_flowop = flowop->fo_exec_next;
681 		flowop_delete(flowoplist, flowop);
682 		flowop = next_flowop;
683 	}
684 
685 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
686 }
687 
688 /*
689  * Allocates a flowop entity and initializes it with inherited
690  * contents from the "inherit" flowop, if it is supplied, or
691  * with zeros otherwise. In either case the fo_next and fo_exec_next
692  * pointers are set to NULL, and fo_thread is set to point to
693  * the owning threadflow. The initialized flowop is placed at
694  * the head of the global flowop list, and also placed on the
695  * tail of the supplied local flowop list, which will either
696  * be a threadflow's tf_thrd_fops list or a composite flowop's
697  * fo_comp_fops list. The routine locks the flowop's fo_lock and
698  * leaves it held on return. If successful, it returns a pointer
699  * to the allocated and initialized flowop, otherwise it returns NULL.
700  *
701  * filebench_shm->shm_flowop_lock must be held by caller.
702  */
703 static flowop_t *
flowop_define_common(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)704 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
705     flowop_t **flowoplist_hdp, int instance, int type)
706 {
707 	flowop_t *flowop;
708 
709 	if (name == NULL)
710 		return (NULL);
711 
712 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
713 		filebench_log(LOG_ERROR,
714 		    "flowop_define: Can't malloc flowop");
715 		return (NULL);
716 	}
717 
718 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
719 	    name, instance, flowop);
720 
721 	if (flowop == NULL)
722 		return (NULL);
723 
724 	if (inherit) {
725 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
726 		(void) pthread_mutex_init(&flowop->fo_lock,
727 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
728 		(void) ipc_mutex_lock(&flowop->fo_lock);
729 		flowop->fo_next = NULL;
730 		flowop->fo_exec_next = NULL;
731 		filebench_log(LOG_DEBUG_IMPL,
732 		    "flowop %s-%d calling init", name, instance);
733 	} else {
734 		(void) memset(flowop, 0, sizeof (flowop_t));
735 		flowop->fo_iters = avd_int_alloc(1);
736 		flowop->fo_type = type;
737 		(void) pthread_mutex_init(&flowop->fo_lock,
738 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
739 		(void) ipc_mutex_lock(&flowop->fo_lock);
740 	}
741 
742 	/* Create backpointer to thread */
743 	flowop->fo_thread = threadflow;
744 
745 	/* Add flowop to global list */
746 	if (filebench_shm->shm_flowoplist == NULL) {
747 		filebench_shm->shm_flowoplist = flowop;
748 		flowop->fo_next = NULL;
749 	} else {
750 		flowop->fo_next = filebench_shm->shm_flowoplist;
751 		filebench_shm->shm_flowoplist = flowop;
752 	}
753 
754 	(void) strcpy(flowop->fo_name, name);
755 	flowop->fo_instance = instance;
756 
757 	if (flowoplist_hdp == NULL)
758 		return (flowop);
759 
760 	/* Add flowop to thread op list */
761 	if (*flowoplist_hdp == NULL) {
762 		*flowoplist_hdp = flowop;
763 		flowop->fo_exec_next = NULL;
764 	} else {
765 		flowop_t *flowend;
766 
767 		/* Find the end of the thread list */
768 		flowend = *flowoplist_hdp;
769 		while (flowend->fo_exec_next != NULL)
770 			flowend = flowend->fo_exec_next;
771 		flowend->fo_exec_next = flowop;
772 		flowop->fo_exec_next = NULL;
773 	}
774 
775 	return (flowop);
776 }
777 
778 /*
779  * Calls flowop_define_common() to allocate and initialize a
780  * flowop, and holds the shared flowop_lock during the call.
781  * It releases the created flowop's fo_lock when done.
782  */
783 flowop_t *
flowop_define(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)784 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
785     flowop_t **flowoplist_hdp, int instance, int type)
786 {
787 	flowop_t	*flowop;
788 
789 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
790 	flowop = flowop_define_common(threadflow, name,
791 	    inherit, flowoplist_hdp, instance, type);
792 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
793 
794 	if (flowop == NULL)
795 		return (NULL);
796 
797 	(void) ipc_mutex_unlock(&flowop->fo_lock);
798 
799 	return (flowop);
800 }
801 
802 /*
803  * Calls flowop_define_common() to allocate and initialize a
804  * composite flowop, and holds the shared flowop_lock during the call.
805  * It releases the created flowop's fo_lock when done.
806  */
807 flowop_t *
flowop_new_composite_define(char * name)808 flowop_new_composite_define(char *name)
809 {
810 	flowop_t *flowop;
811 
812 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
813 	flowop = flowop_define_common(NULL, name,
814 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
815 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
816 
817 	if (flowop == NULL)
818 		return (NULL);
819 
820 	flowop->fo_func = flowop_composite;
821 	flowop->fo_init = flowop_composite_init;
822 	flowop->fo_destruct = flowop_composite_destruct;
823 	(void) ipc_mutex_unlock(&flowop->fo_lock);
824 
825 	return (flowop);
826 }
827 
828 /*
829  * Attempts to take a write lock on the flowop_find_lock that is
830  * defined in interprocess shared memory. Since each call to
831  * flowop_start() holds a read lock on flowop_find_lock, this
832  * routine effectively blocks until all instances of
833  * flowop_start() have finished. The flowop_find() routine calls
834  * this routine so that flowops won't be searched for until all
835  * flowops have been created by flowop_start.
836  */
837 static void
flowop_find_barrier(void)838 flowop_find_barrier(void)
839 {
840 	/* Block on wrlock to ensure find waits for all creates */
841 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
842 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
843 }
844 
845 /*
846  * Returns a list of flowops named "name" from the master
847  * flowop list.
848  */
849 flowop_t *
flowop_find(char * name)850 flowop_find(char *name)
851 {
852 	flowop_t *flowop;
853 	flowop_t *result = NULL;
854 
855 	flowop_find_barrier();
856 
857 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
858 
859 	flowop = filebench_shm->shm_flowoplist;
860 
861 	while (flowop) {
862 		if (strcmp(name, flowop->fo_name) == 0) {
863 
864 			/* Add flowop to result list */
865 			if (result == NULL) {
866 				result = flowop;
867 				flowop->fo_resultnext = NULL;
868 			} else {
869 				flowop->fo_resultnext = result;
870 				result = flowop;
871 			}
872 		}
873 		flowop = flowop->fo_next;
874 	}
875 
876 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
877 
878 	return result;
879 }
880 
881 /*
882  * Returns a pointer to the specified instance of flowop
883  * "name" from the global list.
884  */
885 flowop_t *
flowop_find_one(char * name,int instance)886 flowop_find_one(char *name, int instance)
887 {
888 	flowop_t *test_flowop;
889 
890 	flowop_find_barrier();
891 
892 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
893 
894 	test_flowop = filebench_shm->shm_flowoplist;
895 
896 	while (test_flowop) {
897 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
898 		    (instance == test_flowop->fo_instance))
899 			break;
900 
901 		test_flowop = test_flowop->fo_next;
902 	}
903 
904 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
905 
906 	return (test_flowop);
907 }
908 
909 /*
910  * recursively searches through lists of flowops on a given thread
911  * and those on any included composite flowops for the named flowop.
912  * either returns with a pointer to the named flowop or NULL if it
913  * cannot be found.
914  */
915 static flowop_t *
flowop_recurse_search(char * path,char * name,flowop_t * list)916 flowop_recurse_search(char *path, char *name, flowop_t *list)
917 {
918 	flowop_t *test_flowop;
919 	char fullname[MAXPATHLEN];
920 
921 	test_flowop = list;
922 
923 	/*
924 	 * when searching a list of inner flowops, "path" is the fullname
925 	 * of the containing composite flowop. Use it to form the
926 	 * full name of the inner flowop to search for.
927 	 */
928 	if (path) {
929 		if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
930 			filebench_log(LOG_ERROR,
931 			    "composite flowop path name %s.%s too long",
932 			    path, name);
933 			return (NULL);
934 		}
935 
936 		/* create composite_name.name for recursive search */
937 		(void) strcpy(fullname, path);
938 		(void) strcat(fullname, ".");
939 		(void) strcat(fullname, name);
940 	} else {
941 		(void) strcpy(fullname, name);
942 	}
943 
944 	/*
945 	 * loop through all flowops on the supplied tf_thrd_fops (flowop)
946 	 * list or fo_comp_fops (inner flowop) list.
947 	 */
948 	while (test_flowop) {
949 		if (strcmp(fullname, test_flowop->fo_name) == 0)
950 			return (test_flowop);
951 
952 		if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
953 			flowop_t *found_flowop;
954 
955 			found_flowop = flowop_recurse_search(
956 			    test_flowop->fo_name, name,
957 			    test_flowop->fo_comp_fops);
958 
959 			if (found_flowop)
960 				return (found_flowop);
961 		}
962 		test_flowop = test_flowop->fo_exec_next;
963 	}
964 
965 	/* not found here or on any child lists */
966 	return (NULL);
967 }
968 
969 /*
970  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
971  * list of flowops. Returns the named flowop if found, or NULL.
972  */
973 flowop_t *
flowop_find_from_list(char * name,flowop_t * list)974 flowop_find_from_list(char *name, flowop_t *list)
975 {
976 	flowop_t *found_flowop;
977 
978 	flowop_find_barrier();
979 
980 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
981 
982 	found_flowop = flowop_recurse_search(NULL, name, list);
983 
984 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
985 
986 	return (found_flowop);
987 }
988 
989 /*
990  * Composite flowop method. Does one pass through its list of
991  * inner flowops per iteration.
992  */
993 static int
flowop_composite(threadflow_t * threadflow,flowop_t * flowop)994 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
995 {
996 	flowop_t	*inner_flowop;
997 
998 	/* get the first flowop in the list */
999 	inner_flowop = flowop->fo_comp_fops;
1000 
1001 	/* make a pass through the list of sub flowops */
1002 	while (inner_flowop) {
1003 		int	i, count;
1004 
1005 		/* Abort if asked */
1006 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
1007 			return (FILEBENCH_DONE);
1008 
1009 		/* Execute the flowop for fo_iters times */
1010 		count = (int)avd_get_int(inner_flowop->fo_iters);
1011 		for (i = 0; i < count; i++) {
1012 
1013 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
1014 			    "%s-%d", threadflow->tf_name,
1015 			    inner_flowop->fo_name,
1016 			    inner_flowop->fo_instance);
1017 
1018 			switch ((*inner_flowop->fo_func)(threadflow,
1019 			    inner_flowop)) {
1020 
1021 			/* all done */
1022 			case FILEBENCH_DONE:
1023 				return (FILEBENCH_DONE);
1024 
1025 			/* quit if inner flowop limit reached */
1026 			case FILEBENCH_NORSC:
1027 				return (FILEBENCH_NORSC);
1028 
1029 			/* quit on inner flowop error */
1030 			case FILEBENCH_ERROR:
1031 				filebench_log(LOG_ERROR,
1032 				    "inner flowop %s failed",
1033 				    inner_flowop->fo_name);
1034 				return (FILEBENCH_ERROR);
1035 
1036 			/* otherwise keep going */
1037 			default:
1038 				break;
1039 			}
1040 
1041 		}
1042 
1043 		/* advance to next flowop */
1044 		inner_flowop = inner_flowop->fo_exec_next;
1045 	}
1046 
1047 	/* finished with this pass */
1048 	return (FILEBENCH_OK);
1049 }
1050 
1051 /*
1052  * Composite flowop initialization. Creates runtime inner flowops
1053  * from prototype inner flowops.
1054  */
1055 static int
flowop_composite_init(flowop_t * flowop)1056 flowop_composite_init(flowop_t *flowop)
1057 {
1058 	int err;
1059 
1060 	err = flowop_create_runtime_flowops(flowop->fo_thread,
1061 	    &flowop->fo_comp_fops);
1062 	if (err != FILEBENCH_OK)
1063 		return (err);
1064 
1065 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1066 	return (0);
1067 }
1068 
1069 /*
1070  * clean up inner flowops
1071  */
1072 static void
flowop_composite_destruct(flowop_t * flowop)1073 flowop_composite_destruct(flowop_t *flowop)
1074 {
1075 	flowop_t *inner_flowop = flowop->fo_comp_fops;
1076 
1077 	while (inner_flowop) {
1078 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1079 		    inner_flowop->fo_name, inner_flowop->fo_instance);
1080 
1081 		if (inner_flowop->fo_instance &&
1082 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
1083 			inner_flowop = inner_flowop->fo_exec_next;
1084 			continue;
1085 		}
1086 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1087 		inner_flowop = inner_flowop->fo_exec_next;
1088 	}
1089 }
1090 
1091 /*
1092  * Support routines for libraries of flowops
1093  */
1094 
1095 int
flowop_init_generic(flowop_t * flowop)1096 flowop_init_generic(flowop_t *flowop)
1097 {
1098 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1099 	return (FILEBENCH_OK);
1100 }
1101 
1102 void
flowop_destruct_generic(flowop_t * flowop)1103 flowop_destruct_generic(flowop_t *flowop)
1104 {
1105 	char *buf;
1106 
1107 	/* release any local resources held by the flowop */
1108 	(void) ipc_mutex_lock(&flowop->fo_lock);
1109 	buf = flowop->fo_buf;
1110 	flowop->fo_buf = NULL;
1111 	(void) ipc_mutex_unlock(&flowop->fo_lock);
1112 
1113 	if (buf)
1114 		free(buf);
1115 }
1116 
1117 
1118 /*
1119  * Loops through the supplied list of flowop *prototypes*, creates
1120  * corresponding flowops by calling flowop_define(), and initializes flowops.
1121  * flowop_define() places flowops on the master flowop list.  All created
1122  * flowops are set to instance FLOW_DEFINITION (0).
1123  */
1124 void
flowop_add_from_proto(flowop_proto_t * list,int nops)1125 flowop_add_from_proto(flowop_proto_t *list, int nops)
1126 {
1127 	int i;
1128 
1129 	for (i = 0; i < nops; i++) {
1130 		flowop_t *flowop;
1131 		flowop_proto_t *flproto;
1132 
1133 		flproto = &(list[i]);
1134 
1135 
1136 		flowop = flowop_define(NULL, flproto->fl_name, NULL,
1137 			NULL, FLOW_DEFINITION, flproto->fl_type);
1138 		if (!flowop) {
1139 			filebench_log(LOG_ERROR, "failed to create flowop %s\n",
1140 			  	  flproto->fl_name);
1141 			filebench_shutdown(1);
1142 		}
1143 
1144 		flowop->fo_func = flproto->fl_func;
1145 		flowop->fo_init = flproto->fl_init;
1146 		flowop->fo_destruct = flproto->fl_destruct;
1147 		flowop->fo_attrs = flproto->fl_attrs;
1148 	}
1149 }
1150