1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 #include "config.h"
27
28 #ifdef HAVE_LWPS
29 #include <sys/lwp.h>
30 #endif
31 #include <fcntl.h>
32 #include "filebench.h"
33 #include "flowop.h"
34 #include "stats.h"
35 #include "ioprio.h"
36
37 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
38 flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
39 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
40 static int flowop_composite_init(flowop_t *flowop);
41 static void flowop_composite_destruct(flowop_t *flowop);
42
43 /*
44 * A collection of flowop support functions. The actual code that
45 * implements the various flowops is in flowop_library.c.
46 *
47 * Routines for defining, creating, initializing and destroying
48 * flowops, cyclically invoking the flowops on each threadflow's flowop
49 * list, collecting statistics about flowop execution, and other
50 * housekeeping duties are included in this file.
51 *
52 * User Defined Composite Flowops
53 * The ability to define new flowops as lists of built-in or previously
54 * defined flowops has been added to Filebench. In a sense they are like
55 * in-line subroutines, which can have default attributes set at definition
56 * time and passed arguments at invocation time. Like other flowops (and
57 * unlike conventional subroutines) you can invoke them with an iteration
58 * count (the "iter" attribute), and they will loop through their associated
59 * list of flowops for "iter" number of times each time they are encountered
60 * in the thread or outer composite flowop which invokes them.
61 *
62 * Composite flowops are created with a "define" command, are given a name,
63 * optional default attributes, and local variable definitions on the
64 * "define" command line, followed by a brace enclosed list of flowops
65 * to execute. The enclosed flowops may include attributes that reference
66 * the local variables, as well as constants and global variables.
67 *
68 * Composite flowops are used pretty much like regular flowops, but you can
69 * also set local variables to constants or global variables ($local_var =
70 * [$var | $random_var | string | boolean | integer | double]) as part of
71 * the invocation. Thus each invocation can pass customized values to its
72 * inner flowops, greatly increasing their generality.
73 *
74 * All flowops are placed on a global, singly linked list, with fo_next
75 * being the link pointer for this list. The are also placed on a private
76 * list for the thread or composite flowop they are part of. The tf_thrd_fops
77 * pointer in the thread will point to the list of top level flowops in the
78 * thread, which are linked together by fo_exec_next. If any of these flowops
79 * are composite flowops, they will have a list of second level flowops rooted
80 * at the composite flowop's fo_comp_fops pointer. So, there is one big list
81 * of all flowops, and an n-arry tree of threads, composite flowops, and
82 * flowops, with composite flowops being the branch nodes in the tree.
83 *
84 * To illustrate, if we have three first level flowops, the first of which is
85 * a composite flowop consisting of two other flowops, we get:
86 *
87 * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
88 * flowop->fo_comp_fops |
89 * | V
90 * | flowop->fo_exec_next
91 * |
92 * V
93 * flowop->fo_exec_next -> flowop->fo_exec_next
94 *
95 * And all five flowops (plus others from any other threads) are on a global
96 * list linked with fo_next.
97 */
98
99 /*
100 * Prints the name and instance number of each flowop in
101 * the supplied list to the filebench log.
102 */
103 int
flowop_printlist(flowop_t * list)104 flowop_printlist(flowop_t *list)
105 {
106 flowop_t *flowop = list;
107
108 while (flowop) {
109 filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
110 flowop->fo_name, flowop->fo_instance);
111 flowop = flowop->fo_exec_next;
112 }
113 return (0);
114 }
115
116 /*
117 * Prints the name and instance number of all flowops on
118 * the master flowop list to the console and the filebench log.
119 */
120 void
flowop_printall(void)121 flowop_printall(void)
122 {
123 flowop_t *flowop = filebench_shm->shm_flowoplist;
124
125 while (flowop) {
126 filebench_log(LOG_INFO, "flowop-list %s-%d",
127 flowop->fo_name, flowop->fo_instance);
128 flowop = flowop->fo_next;
129 }
130 }
131
132 #define TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
133 (e.tv_nsec - s.tv_nsec))
134 /*
135 * Puts current high-resolution time in start time entry for threadflow.
136 */
137 void
flowop_beginop(threadflow_t * threadflow,flowop_t * flowop)138 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
139 {
140 /* Start of op for this thread */
141 threadflow->tf_stime = gethrtime();
142 }
143
144 struct flowstats controlstats;
145 pthread_mutex_t controlstats_lock;
146 static int controlstats_zeroed = 0;
147
148 static void
flowop_populate_distribution(flowop_t * flowop,unsigned long long ll_delay)149 flowop_populate_distribution(flowop_t *flowop, unsigned long long ll_delay)
150 {
151 unsigned int i;
152 unsigned long long iii;
153
154 iii = 1;
155 for (i = 0; i < OSPROF_BUCKET_NUMBER; i++) {
156 if (ll_delay < iii)
157 break;
158 iii <<= 1;
159 }
160 flowop->fo_stats.fs_distribution[i]++;
161 }
162
163 /*
164 * Updates flowop's latency statistics, using saved start
165 * time and current high resolution time. Updates flowop's
166 * io count and transferred bytes statistics. Also updates
167 * threadflow's and flowop's cumulative read or write byte
168 * and io count statistics.
169 */
170 void
flowop_endop(threadflow_t * threadflow,flowop_t * flowop,int64_t bytes)171 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
172 {
173 unsigned long long ll_delay;
174
175 ll_delay = (gethrtime() - threadflow->tf_stime);
176
177 /* setting minimum and maximum latencies for this flowop */
178 if (!flowop->fo_stats.fs_minlat || ll_delay < flowop->fo_stats.fs_minlat)
179 flowop->fo_stats.fs_minlat = ll_delay;
180
181 if (ll_delay > flowop->fo_stats.fs_maxlat)
182 flowop->fo_stats.fs_maxlat = ll_delay;
183
184 flowop->fo_stats.fs_total_lat += ll_delay;
185 flowop->fo_stats.fs_count++;
186 flowop->fo_stats.fs_bytes += bytes;
187 (void) ipc_mutex_lock(&controlstats_lock);
188 if ((flowop->fo_type & FLOW_TYPE_IO) ||
189 (flowop->fo_type & FLOW_TYPE_AIO)) {
190 controlstats.fs_count++;
191 controlstats.fs_bytes += bytes;
192 }
193 if (flowop->fo_attrs & FLOW_ATTR_READ) {
194 threadflow->tf_stats.fs_rbytes += bytes;
195 threadflow->tf_stats.fs_rcount++;
196 flowop->fo_stats.fs_rcount++;
197 controlstats.fs_rbytes += bytes;
198 controlstats.fs_rcount++;
199 } else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
200 threadflow->tf_stats.fs_wbytes += bytes;
201 threadflow->tf_stats.fs_wcount++;
202 flowop->fo_stats.fs_wcount++;
203 controlstats.fs_wbytes += bytes;
204 controlstats.fs_wcount++;
205 }
206
207 if (filebench_shm->lathist_enabled)
208 flowop_populate_distribution(flowop, ll_delay);
209
210 (void) ipc_mutex_unlock(&controlstats_lock);
211 }
212
213 /*
214 * Calls the flowop's initialization function, pointed to by
215 * flowop->fo_init.
216 */
217 static int
flowop_initflow(flowop_t * flowop)218 flowop_initflow(flowop_t *flowop)
219 {
220 /*
221 * save static copies of two items, in case they are supplied
222 * from random variables
223 */
224 if (flowop->fo_value)
225 flowop->fo_constvalue = avd_get_int(flowop->fo_value);
226
227 if (flowop->fo_wss)
228 flowop->fo_constwss = avd_get_int(flowop->fo_wss);
229
230 if ((*flowop->fo_init)(flowop) < 0) {
231 filebench_log(LOG_ERROR, "flowop %s-%d init failed",
232 flowop->fo_name, flowop->fo_instance);
233 return (-1);
234 }
235 return (0);
236 }
237
238 static int
flowop_create_runtime_flowops(threadflow_t * threadflow,flowop_t ** ops_list_ptr)239 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
240 {
241 flowop_t *flowop = *ops_list_ptr;
242 char *name;
243
244 while (flowop) {
245 flowop_t *newflowop;
246
247 if (flowop == *ops_list_ptr)
248 *ops_list_ptr = NULL;
249
250 newflowop = flowop_define_common(threadflow, flowop->fo_name,
251 flowop, ops_list_ptr, 1, 0);
252 if (newflowop == NULL)
253 return (FILEBENCH_ERROR);
254
255 /* check for fo_filename attribute, and resolve if present */
256 if (flowop->fo_filename) {
257 name = avd_get_str(flowop->fo_filename);
258 newflowop->fo_fileset = fileset_find(name);
259
260 if (newflowop->fo_fileset == NULL) {
261 filebench_log(LOG_ERROR,
262 "flowop %s: file %s not found",
263 newflowop->fo_name, name);
264 filebench_shutdown(1);
265 }
266 }
267
268 if (flowop_initflow(newflowop) < 0) {
269 filebench_log(LOG_ERROR, "Flowop init of %s failed",
270 newflowop->fo_name);
271 }
272
273 flowop = flowop->fo_exec_next;
274 }
275
276 return (FILEBENCH_OK);
277 }
278
279 /*
280 * Calls the flowop's destruct function, pointed to by
281 * flowop->fo_destruct.
282 */
283 static void
flowop_destructflow(flowop_t * flowop)284 flowop_destructflow(flowop_t *flowop)
285 {
286 (*flowop->fo_destruct)(flowop);
287 }
288
289 /*
290 * call the destruct funtions of all the threadflow's flowops,
291 * if it is still flagged as "running".
292 */
293 void
flowop_destruct_all_flows(threadflow_t * threadflow)294 flowop_destruct_all_flows(threadflow_t *threadflow)
295 {
296 flowop_t *flowop;
297
298 /* wait a moment to give other threads a chance to stop too */
299 (void) sleep(1);
300
301 (void) ipc_mutex_lock(&threadflow->tf_lock);
302
303 /* prepare to call destruct flow routines, if necessary */
304 if (threadflow->tf_running == 0) {
305
306 /* allready destroyed */
307 (void) ipc_mutex_unlock(&threadflow->tf_lock);
308 return;
309 }
310
311 flowop = threadflow->tf_thrd_fops;
312 threadflow->tf_running = 0;
313 (void) ipc_mutex_unlock(&threadflow->tf_lock);
314
315 while (flowop) {
316 flowop_destructflow(flowop);
317 flowop = flowop->fo_exec_next;
318 }
319 }
320
321 /*
322 * The final initialization and main execution loop for the
323 * worker threads. Sets threadflow and flowop start times,
324 * waits for all process to start, then creates the runtime
325 * flowops from those defined by the F language workload
326 * script. It does some more initialization, then enters a
327 * loop to repeatedly execute the flowops on the flowop list
328 * until an abort condition is detected, at which time it exits.
329 * This is the starting routine for the new worker thread
330 * created by threadflow_createthread(), and is not currently
331 * called from anywhere else.
332 */
333 void
flowop_start(threadflow_t * threadflow)334 flowop_start(threadflow_t *threadflow)
335 {
336 flowop_t *flowop;
337 size_t memsize;
338 int ret = FILEBENCH_OK;
339
340 set_thread_ioprio(threadflow);
341
342 (void) ipc_mutex_lock(&controlstats_lock);
343 if (!controlstats_zeroed) {
344 (void) memset(&controlstats, 0, sizeof (controlstats));
345 controlstats_zeroed = 1;
346 }
347 (void) ipc_mutex_unlock(&controlstats_lock);
348
349 flowop = threadflow->tf_thrd_fops;
350
351 /* Hold the flowop find lock as reader to prevent lookups */
352 (void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
353
354 /* Create the runtime flowops from those defined by the script */
355 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
356 if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
357 != FILEBENCH_OK) {
358 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
359 filebench_shutdown(1);
360 return;
361 }
362 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
363
364 /* Release the find lock as reader to allow lookups */
365 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
366
367 /* Set to the start of the new flowop list */
368 flowop = threadflow->tf_thrd_fops;
369
370 #ifdef HAVE_LWPS
371 filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
372 threadflow,
373 _lwp_self());
374 #endif
375
376 /*
377 * Now we set tf_running flag to indicate to the main process
378 * that the worker thread is running. However, the thread is
379 * still not executing the workload, as it is blocked by the
380 * shm_run_lock. Main thread will release this lock when all
381 * threads set their tf_running flag to 1.
382 */
383 threadflow->tf_abort = 0;
384 threadflow->tf_running = 1;
385
386 /*
387 * Block until all processes have started, acting like
388 * a barrier. The original filebench process initially
389 * holds the run_lock as a reader, preventing any of the
390 * threads from obtaining the writer lock, and hence
391 * passing this point. Once all processes and threads
392 * have been created, the original process unlocks
393 * run_lock, allowing each waiting thread to lock
394 * and then immediately unlock it, then begin running.
395 */
396 (void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
397 (void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
398
399 memsize = (size_t)threadflow->tf_constmemsize;
400
401 /*
402 * Alloc from ISM, which should have been created before the main process
403 * wakes up the current process by releasing shm_run_lock.
404 */
405 if (threadflow->tf_attrs & THREADFLOW_USEISM) {
406 threadflow->tf_mem =
407 ipc_ismmalloc(memsize);
408 } else {
409 threadflow->tf_mem =
410 malloc(memsize);
411 }
412
413 (void) memset(threadflow->tf_mem, 0, memsize);
414 filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
415
416 /* Main filebench worker loop */
417 while (ret == FILEBENCH_OK) {
418 int i, count;
419
420 /* Abort if asked */
421 if (threadflow->tf_abort || filebench_shm->shm_f_abort)
422 break;
423
424 /* Be quiet while stats are gathered */
425 if (filebench_shm->shm_bequiet) {
426 (void) sleep(1);
427 continue;
428 }
429
430 /* Take it easy until everyone is ready to go */
431 if (!filebench_shm->shm_procs_running) {
432 (void) sleep(1);
433 continue;
434 }
435
436 if (flowop == NULL) {
437 filebench_log(LOG_ERROR, "flowop_read null flowop");
438 return;
439 }
440
441 /* Execute the flowop for fo_iters times */
442 count = (int)avd_get_int(flowop->fo_iters);
443 for (i = 0; i < count; i++) {
444
445 filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
446 "%s-%d", threadflow->tf_name, flowop->fo_name,
447 flowop->fo_instance);
448
449 ret = (*flowop->fo_func)(threadflow, flowop);
450
451 /*
452 * Return value FILEBENCH_ERROR means "flowop
453 * failed, stop the filebench run"
454 */
455 if (ret == FILEBENCH_ERROR) {
456 filebench_log(LOG_ERROR,
457 "%s-%d: flowop %s-%d failed",
458 threadflow->tf_name,
459 threadflow->tf_instance,
460 flowop->fo_name,
461 flowop->fo_instance);
462 (void) ipc_mutex_lock(&threadflow->tf_lock);
463 threadflow->tf_abort = 1;
464 filebench_shm->shm_f_abort =
465 FILEBENCH_ABORT_ERROR;
466 (void) ipc_mutex_unlock(&threadflow->tf_lock);
467 break;
468 }
469
470 /*
471 * Return value of FILEBENCH_NORSC means "stop
472 * the filebench run" if in "end on no work mode",
473 * otherwise it indicates an error
474 */
475 if (ret == FILEBENCH_NORSC) {
476 (void) ipc_mutex_lock(&threadflow->tf_lock);
477 threadflow->tf_abort = FILEBENCH_DONE;
478 if (filebench_shm->shm_rmode ==
479 FILEBENCH_MODE_Q1STDONE) {
480 filebench_shm->shm_f_abort =
481 FILEBENCH_ABORT_RSRC;
482 } else if (filebench_shm->shm_rmode !=
483 FILEBENCH_MODE_QALLDONE) {
484 filebench_log(LOG_ERROR1,
485 "WARNING! Run stopped early:\n "
486 " flowop %s-%d could "
487 "not obtain a file. Please\n "
488 " reduce runtime, "
489 "increase fileset entries "
490 "($nfiles), or switch modes.",
491 flowop->fo_name,
492 flowop->fo_instance);
493 filebench_shm->shm_f_abort =
494 FILEBENCH_ABORT_ERROR;
495 }
496 (void) ipc_mutex_unlock(&threadflow->tf_lock);
497 break;
498 }
499
500 /*
501 * Return value of FILEBENCH_DONE means "stop
502 * the filebench run without error"
503 */
504 if (ret == FILEBENCH_DONE) {
505 (void) ipc_mutex_lock(&threadflow->tf_lock);
506 threadflow->tf_abort = FILEBENCH_DONE;
507 filebench_shm->shm_f_abort =
508 FILEBENCH_ABORT_DONE;
509 (void) ipc_mutex_unlock(&threadflow->tf_lock);
510 break;
511 }
512
513 /*
514 * If we get here and the return is something other
515 * than FILEBENCH_OK, it means a spurious code
516 * was returned, so treat as major error. This
517 * probably indicates a bug in the flowop.
518 */
519 if (ret != FILEBENCH_OK) {
520 filebench_log(LOG_ERROR,
521 "Flowop %s unexpected return value = %d\n",
522 flowop->fo_name, ret);
523 filebench_shm->shm_f_abort =
524 FILEBENCH_ABORT_ERROR;
525 break;
526 }
527 }
528
529 /* advance to next flowop */
530 flowop = flowop->fo_exec_next;
531
532 /* but if at end of list, start over from the beginning */
533 if (flowop == NULL) {
534 flowop = threadflow->tf_thrd_fops;
535 threadflow->tf_stats.fs_count++;
536 }
537 }
538
539 #ifdef HAVE_LWPS
540 filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
541 _lwp_self());
542 #endif
543
544 /* Tell flowops to destroy locally acquired state */
545 flowop_destruct_all_flows(threadflow);
546
547 pthread_exit(&threadflow->tf_abort);
548 }
549
550 /*
551 * For master mode we add flowops from the generic library, flowops that are
552 * file system specific, and adjust the vector of functions used by the
553 * generic library. For worker mode we only need to adjust the vector.
554 */
555 void
flowop_init(int ismaster)556 flowop_init(int ismaster)
557 {
558 if (ismaster) {
559 (void) pthread_mutex_init(&controlstats_lock,
560 ipc_mutexattr(IPC_MUTEX_NORMAL));
561 flowoplib_flowinit();
562 }
563
564 switch (filebench_shm->shm_filesys_type) {
565 case LOCAL_FS_PLUG:
566 if (ismaster)
567 fb_lfs_newflowops();
568 fb_lfs_funcvecinit();
569 break;
570 case NFS3_PLUG:
571 case NFS4_PLUG:
572 case CIFS_PLUG:
573 break;
574 }
575 }
576
577 /*
578 * Delete the designated flowop from the thread's flowop list.
579 */
580 static void
flowop_delete(flowop_t ** flowoplist,flowop_t * flowop)581 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
582 {
583 flowop_t *entry = *flowoplist;
584 int found = 0;
585
586 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
587 flowop->fo_name,
588 flowop->fo_instance);
589
590 /* Delete from thread's flowop list */
591 if (flowop == *flowoplist) {
592 /* First on list */
593 *flowoplist = flowop->fo_exec_next;
594 filebench_log(LOG_DEBUG_IMPL,
595 "Delete0 flowop: (%s-%d)",
596 flowop->fo_name,
597 flowop->fo_instance);
598 } else {
599 while (entry->fo_exec_next) {
600 filebench_log(LOG_DEBUG_IMPL,
601 "Delete0 flowop: (%s-%d) == (%s-%d)",
602 entry->fo_exec_next->fo_name,
603 entry->fo_exec_next->fo_instance,
604 flowop->fo_name,
605 flowop->fo_instance);
606
607 if (flowop == entry->fo_exec_next) {
608 /* Delete */
609 filebench_log(LOG_DEBUG_IMPL,
610 "Deleted0 flowop: (%s-%d)",
611 entry->fo_exec_next->fo_name,
612 entry->fo_exec_next->fo_instance);
613 entry->fo_exec_next =
614 entry->fo_exec_next->fo_exec_next;
615 break;
616 }
617 entry = entry->fo_exec_next;
618 }
619 }
620
621 /* Delete from global list */
622 entry = filebench_shm->shm_flowoplist;
623
624 if (flowop == filebench_shm->shm_flowoplist) {
625 /* First on list */
626 filebench_shm->shm_flowoplist = flowop->fo_next;
627 found = 1;
628 } else {
629 while (entry->fo_next) {
630 filebench_log(LOG_DEBUG_IMPL,
631 "Delete flowop: (%s-%d) == (%s-%d)",
632 entry->fo_next->fo_name,
633 entry->fo_next->fo_instance,
634 flowop->fo_name,
635 flowop->fo_instance);
636
637 if (flowop == entry->fo_next) {
638 /* Delete */
639 entry->fo_next = entry->fo_next->fo_next;
640 found = 1;
641 break;
642 }
643
644 entry = entry->fo_next;
645 }
646 }
647 if (found) {
648 filebench_log(LOG_DEBUG_IMPL,
649 "Deleted flowop: (%s-%d)",
650 flowop->fo_name,
651 flowop->fo_instance);
652 ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
653 } else {
654 filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
655 flowop->fo_name,
656 flowop->fo_instance);
657 }
658 }
659
660 /*
661 * Deletes all the flowops from a flowop list.
662 */
663 void
flowop_delete_all(flowop_t ** flowoplist)664 flowop_delete_all(flowop_t **flowoplist)
665 {
666 flowop_t *flowop = *flowoplist;
667 flowop_t *next_flowop;
668
669 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
670
671 while (flowop) {
672 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
673 flowop->fo_name, flowop->fo_instance);
674
675 if (flowop->fo_instance &&
676 (flowop->fo_instance == FLOW_MASTER)) {
677 flowop = flowop->fo_exec_next;
678 continue;
679 }
680 next_flowop = flowop->fo_exec_next;
681 flowop_delete(flowoplist, flowop);
682 flowop = next_flowop;
683 }
684
685 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
686 }
687
688 /*
689 * Allocates a flowop entity and initializes it with inherited
690 * contents from the "inherit" flowop, if it is supplied, or
691 * with zeros otherwise. In either case the fo_next and fo_exec_next
692 * pointers are set to NULL, and fo_thread is set to point to
693 * the owning threadflow. The initialized flowop is placed at
694 * the head of the global flowop list, and also placed on the
695 * tail of the supplied local flowop list, which will either
696 * be a threadflow's tf_thrd_fops list or a composite flowop's
697 * fo_comp_fops list. The routine locks the flowop's fo_lock and
698 * leaves it held on return. If successful, it returns a pointer
699 * to the allocated and initialized flowop, otherwise it returns NULL.
700 *
701 * filebench_shm->shm_flowop_lock must be held by caller.
702 */
703 static flowop_t *
flowop_define_common(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)704 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
705 flowop_t **flowoplist_hdp, int instance, int type)
706 {
707 flowop_t *flowop;
708
709 if (name == NULL)
710 return (NULL);
711
712 if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
713 filebench_log(LOG_ERROR,
714 "flowop_define: Can't malloc flowop");
715 return (NULL);
716 }
717
718 filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
719 name, instance, flowop);
720
721 if (flowop == NULL)
722 return (NULL);
723
724 if (inherit) {
725 (void) memcpy(flowop, inherit, sizeof (flowop_t));
726 (void) pthread_mutex_init(&flowop->fo_lock,
727 ipc_mutexattr(IPC_MUTEX_PRI_ROB));
728 (void) ipc_mutex_lock(&flowop->fo_lock);
729 flowop->fo_next = NULL;
730 flowop->fo_exec_next = NULL;
731 filebench_log(LOG_DEBUG_IMPL,
732 "flowop %s-%d calling init", name, instance);
733 } else {
734 (void) memset(flowop, 0, sizeof (flowop_t));
735 flowop->fo_iters = avd_int_alloc(1);
736 flowop->fo_type = type;
737 (void) pthread_mutex_init(&flowop->fo_lock,
738 ipc_mutexattr(IPC_MUTEX_PRI_ROB));
739 (void) ipc_mutex_lock(&flowop->fo_lock);
740 }
741
742 /* Create backpointer to thread */
743 flowop->fo_thread = threadflow;
744
745 /* Add flowop to global list */
746 if (filebench_shm->shm_flowoplist == NULL) {
747 filebench_shm->shm_flowoplist = flowop;
748 flowop->fo_next = NULL;
749 } else {
750 flowop->fo_next = filebench_shm->shm_flowoplist;
751 filebench_shm->shm_flowoplist = flowop;
752 }
753
754 (void) strcpy(flowop->fo_name, name);
755 flowop->fo_instance = instance;
756
757 if (flowoplist_hdp == NULL)
758 return (flowop);
759
760 /* Add flowop to thread op list */
761 if (*flowoplist_hdp == NULL) {
762 *flowoplist_hdp = flowop;
763 flowop->fo_exec_next = NULL;
764 } else {
765 flowop_t *flowend;
766
767 /* Find the end of the thread list */
768 flowend = *flowoplist_hdp;
769 while (flowend->fo_exec_next != NULL)
770 flowend = flowend->fo_exec_next;
771 flowend->fo_exec_next = flowop;
772 flowop->fo_exec_next = NULL;
773 }
774
775 return (flowop);
776 }
777
778 /*
779 * Calls flowop_define_common() to allocate and initialize a
780 * flowop, and holds the shared flowop_lock during the call.
781 * It releases the created flowop's fo_lock when done.
782 */
783 flowop_t *
flowop_define(threadflow_t * threadflow,char * name,flowop_t * inherit,flowop_t ** flowoplist_hdp,int instance,int type)784 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
785 flowop_t **flowoplist_hdp, int instance, int type)
786 {
787 flowop_t *flowop;
788
789 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
790 flowop = flowop_define_common(threadflow, name,
791 inherit, flowoplist_hdp, instance, type);
792 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
793
794 if (flowop == NULL)
795 return (NULL);
796
797 (void) ipc_mutex_unlock(&flowop->fo_lock);
798
799 return (flowop);
800 }
801
802 /*
803 * Calls flowop_define_common() to allocate and initialize a
804 * composite flowop, and holds the shared flowop_lock during the call.
805 * It releases the created flowop's fo_lock when done.
806 */
807 flowop_t *
flowop_new_composite_define(char * name)808 flowop_new_composite_define(char *name)
809 {
810 flowop_t *flowop;
811
812 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
813 flowop = flowop_define_common(NULL, name,
814 NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
815 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
816
817 if (flowop == NULL)
818 return (NULL);
819
820 flowop->fo_func = flowop_composite;
821 flowop->fo_init = flowop_composite_init;
822 flowop->fo_destruct = flowop_composite_destruct;
823 (void) ipc_mutex_unlock(&flowop->fo_lock);
824
825 return (flowop);
826 }
827
828 /*
829 * Attempts to take a write lock on the flowop_find_lock that is
830 * defined in interprocess shared memory. Since each call to
831 * flowop_start() holds a read lock on flowop_find_lock, this
832 * routine effectively blocks until all instances of
833 * flowop_start() have finished. The flowop_find() routine calls
834 * this routine so that flowops won't be searched for until all
835 * flowops have been created by flowop_start.
836 */
837 static void
flowop_find_barrier(void)838 flowop_find_barrier(void)
839 {
840 /* Block on wrlock to ensure find waits for all creates */
841 (void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
842 (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
843 }
844
845 /*
846 * Returns a list of flowops named "name" from the master
847 * flowop list.
848 */
849 flowop_t *
flowop_find(char * name)850 flowop_find(char *name)
851 {
852 flowop_t *flowop;
853 flowop_t *result = NULL;
854
855 flowop_find_barrier();
856
857 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
858
859 flowop = filebench_shm->shm_flowoplist;
860
861 while (flowop) {
862 if (strcmp(name, flowop->fo_name) == 0) {
863
864 /* Add flowop to result list */
865 if (result == NULL) {
866 result = flowop;
867 flowop->fo_resultnext = NULL;
868 } else {
869 flowop->fo_resultnext = result;
870 result = flowop;
871 }
872 }
873 flowop = flowop->fo_next;
874 }
875
876 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
877
878 return result;
879 }
880
881 /*
882 * Returns a pointer to the specified instance of flowop
883 * "name" from the global list.
884 */
885 flowop_t *
flowop_find_one(char * name,int instance)886 flowop_find_one(char *name, int instance)
887 {
888 flowop_t *test_flowop;
889
890 flowop_find_barrier();
891
892 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
893
894 test_flowop = filebench_shm->shm_flowoplist;
895
896 while (test_flowop) {
897 if ((strcmp(name, test_flowop->fo_name) == 0) &&
898 (instance == test_flowop->fo_instance))
899 break;
900
901 test_flowop = test_flowop->fo_next;
902 }
903
904 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
905
906 return (test_flowop);
907 }
908
909 /*
910 * recursively searches through lists of flowops on a given thread
911 * and those on any included composite flowops for the named flowop.
912 * either returns with a pointer to the named flowop or NULL if it
913 * cannot be found.
914 */
915 static flowop_t *
flowop_recurse_search(char * path,char * name,flowop_t * list)916 flowop_recurse_search(char *path, char *name, flowop_t *list)
917 {
918 flowop_t *test_flowop;
919 char fullname[MAXPATHLEN];
920
921 test_flowop = list;
922
923 /*
924 * when searching a list of inner flowops, "path" is the fullname
925 * of the containing composite flowop. Use it to form the
926 * full name of the inner flowop to search for.
927 */
928 if (path) {
929 if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
930 filebench_log(LOG_ERROR,
931 "composite flowop path name %s.%s too long",
932 path, name);
933 return (NULL);
934 }
935
936 /* create composite_name.name for recursive search */
937 (void) strcpy(fullname, path);
938 (void) strcat(fullname, ".");
939 (void) strcat(fullname, name);
940 } else {
941 (void) strcpy(fullname, name);
942 }
943
944 /*
945 * loop through all flowops on the supplied tf_thrd_fops (flowop)
946 * list or fo_comp_fops (inner flowop) list.
947 */
948 while (test_flowop) {
949 if (strcmp(fullname, test_flowop->fo_name) == 0)
950 return (test_flowop);
951
952 if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
953 flowop_t *found_flowop;
954
955 found_flowop = flowop_recurse_search(
956 test_flowop->fo_name, name,
957 test_flowop->fo_comp_fops);
958
959 if (found_flowop)
960 return (found_flowop);
961 }
962 test_flowop = test_flowop->fo_exec_next;
963 }
964
965 /* not found here or on any child lists */
966 return (NULL);
967 }
968
969 /*
970 * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
971 * list of flowops. Returns the named flowop if found, or NULL.
972 */
973 flowop_t *
flowop_find_from_list(char * name,flowop_t * list)974 flowop_find_from_list(char *name, flowop_t *list)
975 {
976 flowop_t *found_flowop;
977
978 flowop_find_barrier();
979
980 (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
981
982 found_flowop = flowop_recurse_search(NULL, name, list);
983
984 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
985
986 return (found_flowop);
987 }
988
989 /*
990 * Composite flowop method. Does one pass through its list of
991 * inner flowops per iteration.
992 */
993 static int
flowop_composite(threadflow_t * threadflow,flowop_t * flowop)994 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
995 {
996 flowop_t *inner_flowop;
997
998 /* get the first flowop in the list */
999 inner_flowop = flowop->fo_comp_fops;
1000
1001 /* make a pass through the list of sub flowops */
1002 while (inner_flowop) {
1003 int i, count;
1004
1005 /* Abort if asked */
1006 if (threadflow->tf_abort || filebench_shm->shm_f_abort)
1007 return (FILEBENCH_DONE);
1008
1009 /* Execute the flowop for fo_iters times */
1010 count = (int)avd_get_int(inner_flowop->fo_iters);
1011 for (i = 0; i < count; i++) {
1012
1013 filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
1014 "%s-%d", threadflow->tf_name,
1015 inner_flowop->fo_name,
1016 inner_flowop->fo_instance);
1017
1018 switch ((*inner_flowop->fo_func)(threadflow,
1019 inner_flowop)) {
1020
1021 /* all done */
1022 case FILEBENCH_DONE:
1023 return (FILEBENCH_DONE);
1024
1025 /* quit if inner flowop limit reached */
1026 case FILEBENCH_NORSC:
1027 return (FILEBENCH_NORSC);
1028
1029 /* quit on inner flowop error */
1030 case FILEBENCH_ERROR:
1031 filebench_log(LOG_ERROR,
1032 "inner flowop %s failed",
1033 inner_flowop->fo_name);
1034 return (FILEBENCH_ERROR);
1035
1036 /* otherwise keep going */
1037 default:
1038 break;
1039 }
1040
1041 }
1042
1043 /* advance to next flowop */
1044 inner_flowop = inner_flowop->fo_exec_next;
1045 }
1046
1047 /* finished with this pass */
1048 return (FILEBENCH_OK);
1049 }
1050
1051 /*
1052 * Composite flowop initialization. Creates runtime inner flowops
1053 * from prototype inner flowops.
1054 */
1055 static int
flowop_composite_init(flowop_t * flowop)1056 flowop_composite_init(flowop_t *flowop)
1057 {
1058 int err;
1059
1060 err = flowop_create_runtime_flowops(flowop->fo_thread,
1061 &flowop->fo_comp_fops);
1062 if (err != FILEBENCH_OK)
1063 return (err);
1064
1065 (void) ipc_mutex_unlock(&flowop->fo_lock);
1066 return (0);
1067 }
1068
1069 /*
1070 * clean up inner flowops
1071 */
1072 static void
flowop_composite_destruct(flowop_t * flowop)1073 flowop_composite_destruct(flowop_t *flowop)
1074 {
1075 flowop_t *inner_flowop = flowop->fo_comp_fops;
1076
1077 while (inner_flowop) {
1078 filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1079 inner_flowop->fo_name, inner_flowop->fo_instance);
1080
1081 if (inner_flowop->fo_instance &&
1082 (inner_flowop->fo_instance == FLOW_MASTER)) {
1083 inner_flowop = inner_flowop->fo_exec_next;
1084 continue;
1085 }
1086 flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1087 inner_flowop = inner_flowop->fo_exec_next;
1088 }
1089 }
1090
1091 /*
1092 * Support routines for libraries of flowops
1093 */
1094
1095 int
flowop_init_generic(flowop_t * flowop)1096 flowop_init_generic(flowop_t *flowop)
1097 {
1098 (void) ipc_mutex_unlock(&flowop->fo_lock);
1099 return (FILEBENCH_OK);
1100 }
1101
1102 void
flowop_destruct_generic(flowop_t * flowop)1103 flowop_destruct_generic(flowop_t *flowop)
1104 {
1105 char *buf;
1106
1107 /* release any local resources held by the flowop */
1108 (void) ipc_mutex_lock(&flowop->fo_lock);
1109 buf = flowop->fo_buf;
1110 flowop->fo_buf = NULL;
1111 (void) ipc_mutex_unlock(&flowop->fo_lock);
1112
1113 if (buf)
1114 free(buf);
1115 }
1116
1117
1118 /*
1119 * Loops through the supplied list of flowop *prototypes*, creates
1120 * corresponding flowops by calling flowop_define(), and initializes flowops.
1121 * flowop_define() places flowops on the master flowop list. All created
1122 * flowops are set to instance FLOW_DEFINITION (0).
1123 */
1124 void
flowop_add_from_proto(flowop_proto_t * list,int nops)1125 flowop_add_from_proto(flowop_proto_t *list, int nops)
1126 {
1127 int i;
1128
1129 for (i = 0; i < nops; i++) {
1130 flowop_t *flowop;
1131 flowop_proto_t *flproto;
1132
1133 flproto = &(list[i]);
1134
1135
1136 flowop = flowop_define(NULL, flproto->fl_name, NULL,
1137 NULL, FLOW_DEFINITION, flproto->fl_type);
1138 if (!flowop) {
1139 filebench_log(LOG_ERROR, "failed to create flowop %s\n",
1140 flproto->fl_name);
1141 filebench_shutdown(1);
1142 }
1143
1144 flowop->fo_func = flproto->fl_func;
1145 flowop->fo_init = flproto->fl_init;
1146 flowop->fo_destruct = flproto->fl_destruct;
1147 flowop->fo_attrs = flproto->fl_attrs;
1148 }
1149 }
1150