1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * Portions Copyright 2008 Denis Cheng
26  */
27 
28 #include "config.h"
29 
30 #include <sys/types.h>
31 #include <stddef.h>
32 #include <sys/ipc.h>
33 #include <sys/sem.h>
34 #include <sys/errno.h>
35 #include <sys/time.h>
36 #include <inttypes.h>
37 #include <fcntl.h>
38 #include <math.h>
39 #include <dirent.h>
40 
41 #ifndef HAVE_SYSV_SEM
42 #include <semaphore.h>
43 #endif /* HAVE_SYSV_SEM */
44 
45 #include "filebench.h"
46 #include "flowop.h"
47 #include "fileset.h"
48 #include "fb_random.h"
49 #include "utils.h"
50 #include "fsplug.h"
51 
52 /*
53  * These routines implement the flowops from the f language. Each
54  * flowop has has a name such as "read", and a set of function pointers
55  * to call for initialization, execution and destruction of the flowop.
56  * The table flowoplib_funcs[] contains a flowoplib struct for each
57  * implemented flowop. Most flowops use a generic initialization function
58  * and all currently use a generic destruction function. All flowop
59  * functions referenced from the table are in this file, though, of
60  * course, they often call functions from other files.
61  *
62  * The flowop_init() routine uses the flowoplib_funcs[] table to
63  * create an initial set of "instance 0" flowops, one for each type of
64  * flowop, from which all other flowops are derived. These "instance 0"
65  * flowops are initialized with information from the table including
66  * pointers for their fo_init, fo_func and fo_destroy functions. When
67  * a flowop definition is encountered in an f language script, the
68  * "type" of flowop, such as "read" is used to search for the
69  * "instance 0" flowop named "read", then a new flowop is allocated
70  * which inherits its function pointers and other initial properties
71  * from the instance 0 flowop, and is given a new name as specified
72  * by the "name=" attribute.
73  */
74 
75 static void flowoplib_destruct_noop(flowop_t *flowop);
76 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
77 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
78 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
79 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
80 static int flowoplib_block_init(flowop_t *flowop);
81 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
82 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
83 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
84 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
85 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
86 static int flowoplib_sempost_init(flowop_t *flowop);
87 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
88 static int flowoplib_semblock_init(flowop_t *flowop);
89 static void flowoplib_semblock_destruct(flowop_t *flowop);
90 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
91 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
92 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
93 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
94 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
95 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
96 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
97 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
98 static int flowoplib_makedir(threadflow_t *, flowop_t *flowop);
99 static int flowoplib_removedir(threadflow_t *, flowop_t *flowop);
100 static int flowoplib_listdir(threadflow_t *, flowop_t *flowop);
101 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
102 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
103 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
104 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
105 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
106 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
107 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
108 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
109 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
110 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
111 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
112 static int flowoplib_testrandvar_init(flowop_t *flowop);
113 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
114 
115 static flowop_proto_t flowoplib_funcs[] = {
116 	{FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowop_init_generic,
117 	flowoplib_write, flowop_destruct_generic},
118 	{FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowop_init_generic,
119 	flowoplib_read, flowop_destruct_generic},
120 	{FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
121 	flowoplib_block, flowop_destruct_generic},
122 	{FLOW_TYPE_SYNC, 0, "wakeup", flowop_init_generic,
123 	flowoplib_wakeup, flowop_destruct_generic},
124 	{FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
125 	flowoplib_semblock, flowoplib_semblock_destruct},
126 	{FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
127 	flowoplib_sempost, flowoplib_destruct_noop},
128 	{FLOW_TYPE_OTHER, 0, "hog", flowop_init_generic,
129 	flowoplib_hog, flowop_destruct_generic},
130 	{FLOW_TYPE_OTHER, 0, "delay", flowop_init_generic,
131 	flowoplib_delay, flowop_destruct_generic},
132 	{FLOW_TYPE_OTHER, 0, "eventlimit", flowop_init_generic,
133 	flowoplib_eventlimit, flowop_destruct_generic},
134 	{FLOW_TYPE_OTHER, 0, "bwlimit", flowop_init_generic,
135 	flowoplib_bwlimit, flowop_destruct_generic},
136 	{FLOW_TYPE_OTHER, 0, "iopslimit", flowop_init_generic,
137 	flowoplib_iopslimit, flowop_destruct_generic},
138 	{FLOW_TYPE_OTHER, 0, "opslimit", flowop_init_generic,
139 	flowoplib_opslimit, flowop_destruct_generic},
140 	{FLOW_TYPE_OTHER, 0, "finishoncount", flowop_init_generic,
141 	flowoplib_finishoncount, flowop_destruct_generic},
142 	{FLOW_TYPE_OTHER, 0, "finishonbytes", flowop_init_generic,
143 	flowoplib_finishonbytes, flowop_destruct_generic},
144 	{FLOW_TYPE_IO, 0, "openfile", flowop_init_generic,
145 	flowoplib_openfile, flowop_destruct_generic},
146 	{FLOW_TYPE_IO, 0, "createfile", flowop_init_generic,
147 	flowoplib_createfile, flowop_destruct_generic},
148 	{FLOW_TYPE_IO, 0, "closefile", flowop_init_generic,
149 	flowoplib_closefile, flowop_destruct_generic},
150 	{FLOW_TYPE_IO, 0, "makedir", flowop_init_generic,
151 	flowoplib_makedir, flowop_destruct_generic},
152 	{FLOW_TYPE_IO, 0, "removedir", flowop_init_generic,
153 	flowoplib_removedir, flowop_destruct_generic},
154 	{FLOW_TYPE_IO, 0, "listdir", flowop_init_generic,
155 	flowoplib_listdir, flowop_destruct_generic},
156 	{FLOW_TYPE_IO, 0, "fsync", flowop_init_generic,
157 	flowoplib_fsync, flowop_destruct_generic},
158 	{FLOW_TYPE_IO, 0, "fsyncset", flowop_init_generic,
159 	flowoplib_fsyncset, flowop_destruct_generic},
160 	{FLOW_TYPE_IO, 0, "statfile", flowop_init_generic,
161 	flowoplib_statfile, flowop_destruct_generic},
162 	{FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowop_init_generic,
163 	flowoplib_readwholefile, flowop_destruct_generic},
164 	{FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowop_init_generic,
165 	flowoplib_appendfile, flowop_destruct_generic},
166 	{FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowop_init_generic,
167 	flowoplib_appendfilerand, flowop_destruct_generic},
168 	{FLOW_TYPE_IO, 0, "deletefile", flowop_init_generic,
169 	flowoplib_deletefile, flowop_destruct_generic},
170 	{FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowop_init_generic,
171 	flowoplib_writewholefile, flowop_destruct_generic},
172 	{FLOW_TYPE_OTHER, 0, "print", flowop_init_generic,
173 	flowoplib_print, flowop_destruct_generic},
174 	/* routine to calculate mean and stddev for output from a randvar */
175 	{FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
176 	flowoplib_testrandvar, flowoplib_testrandvar_destruct}
177 };
178 
179 /*
180  * Loops through the list of flowops defined in this
181  * module, and creates and initializes a flowop for each one
182  * by calling flowop_flow_init. As a side effect of calling
183  * flowop_flow_init, the created flowops are placed on the
184  * master flowop list. All created flowops are set to
185  * instance "0".
186  */
187 void
188 flowoplib_flowinit(void)
189 {
190 	int nops = sizeof (flowoplib_funcs) / sizeof (flowop_proto_t);
191 
192 	flowop_add_from_proto(flowoplib_funcs, nops);
193 }
194 
195 /*
196  * Special total noop destruct
197  */
198 /* ARGSUSED */
199 static void
200 flowoplib_destruct_noop(flowop_t *flowop)
201 {
202 }
203 
204 /*
205  * Generates a file attribute from flags in the supplied flowop.
206  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC and advise for
207  * no random read (POSIX_FADV_RANDOM) as needed.
208  */
209 static int
210 flowoplib_fileattrs(flowop_t *flowop)
211 {
212 	int attrs = 0;
213 
214 	if (avd_get_bool(flowop->fo_directio))
215 		attrs |= FLOW_ATTR_DIRECTIO;
216 
217 	if (avd_get_bool(flowop->fo_dsync))
218 		attrs |= FLOW_ATTR_DSYNC;
219 
220 	if (avd_get_bool(flowop->fo_noreadahead))
221 		attrs |= FLOW_ATTR_FADV_RANDOM;
222 
223 	return (attrs);
224 }
225 
226 /*
227  * Obtain a filesetentry for a file. Result placed where filep points.
228  * Supply with a flowop and a flag to indicate whether an existent or
229  * non-existent file is required. Returns FILEBENCH_NORSC if all out
230  * of the appropriate type of directories, FILEBENCH_ERROR if the
231  * flowop does not point to a fileset, and FILEBENCH_OK otherwise.
232  */
233 static int
234 flowoplib_pickfile(filesetentry_t **filep, flowop_t *flowop, int flags, int tid)
235 {
236 	fileset_t	*fileset;
237 	int		fileindex;
238 
239 	if ((fileset = flowop->fo_fileset) == NULL) {
240 		filebench_log(LOG_ERROR, "flowop NO fileset");
241 		return (FILEBENCH_ERROR);
242 	}
243 
244 	if (flowop->fo_fileindex) {
245 		fileindex = (int)(avd_get_dbl(flowop->fo_fileindex));
246 		fileindex = fileindex % fileset->fs_constentries;
247 		flags |= FILESET_PICKBYINDEX;
248 	} else {
249 		fileindex = 0;
250 	}
251 
252 	if ((*filep = fileset_pick(fileset, FILESET_PICKFILE | flags,
253 	    tid, fileindex)) == NULL) {
254 		filebench_log(LOG_DEBUG_SCRIPT,
255 		    "flowop %s failed to pick file from fileset %s",
256 		    flowop->fo_name,
257 		    avd_get_str(fileset->fs_name));
258 		return (FILEBENCH_NORSC);
259 	}
260 
261 	return (FILEBENCH_OK);
262 }
263 
264 /*
265  * Obtain a filesetentry for a leaf directory. Result placed where dirp
266  * points. Supply with flowop and a flag to indicate whether an existent
267  * or non-existent leaf directory is required. Returns FILEBENCH_NORSC
268  * if all out of the appropriate type of directories, FILEBENCH_ERROR
269  * if the flowop does not point to a fileset, and FILEBENCH_OK otherwise.
270  */
271 static int
272 flowoplib_pickleafdir(filesetentry_t **dirp, flowop_t *flowop, int flags)
273 {
274 	fileset_t	*fileset;
275 	int		dirindex;
276 
277 	if ((fileset = flowop->fo_fileset) == NULL) {
278 		filebench_log(LOG_ERROR, "flowop NO fileset");
279 		return (FILEBENCH_ERROR);
280 	}
281 
282 	if (flowop->fo_fileindex) {
283 		dirindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
284 		    ((double)(fileset->fs_constleafdirs / 2)));
285 		dirindex = dirindex % fileset->fs_constleafdirs;
286 		flags |= FILESET_PICKBYINDEX;
287 	} else {
288 		dirindex = 0;
289 	}
290 
291 	if ((*dirp = fileset_pick(fileset,
292 	    FILESET_PICKLEAFDIR | flags, 0, dirindex)) == NULL) {
293 		filebench_log(LOG_DEBUG_SCRIPT,
294 		    "flowop %s failed to pick directory from fileset %s",
295 		    flowop->fo_name,
296 		    avd_get_str(fileset->fs_name));
297 		return (FILEBENCH_NORSC);
298 	}
299 
300 	return (FILEBENCH_OK);
301 }
302 
303 /*
304  * Searches for a file descriptor. Tries the flowop's fo_fdnumber first and
305  * returns with it if it has been explicitly set (greater than 0). It next
306  * checks to see if a rotating file descriptor policy is in effect, and if not
307  * returns the fdnumber regardless of what it is. (note that if it is 0, it
308  * just selects to the default file descriptor in the threadflow's tf_fd
309  * array). If the rotating fd policy is in effect, it cycles from the end of
310  * the tf_fd array to 0 and then starts over from the end.
311  *
312  * The routine returns an index into the threadflow's tf_fd table where the
313  * actual file descriptor will be found.
314  */
315 static int
316 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
317 {
318 	int fd = flowop->fo_fdnumber;
319 
320 	if (fd > 0) {
321 		filebench_log(LOG_DEBUG_IMPL, "picking explicitly set fd");
322 		goto retfd;
323 	}
324 
325 	if (!avd_get_bool(flowop->fo_rotatefd)) {
326 		filebench_log(LOG_DEBUG_IMPL, "picking default fd");
327 		goto retfd;
328 	}
329 
330 	filebench_log(LOG_DEBUG_IMPL, "picking rotor fd");
331 
332 	/* first time or we wraped around */
333 	if (!threadflow->tf_fdrotor)
334 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
335 
336 	threadflow->tf_fdrotor--;
337 	fd = threadflow->tf_fdrotor;
338 
339 retfd:
340 	filebench_log(LOG_DEBUG_IMPL, "picked fd = %d", fd);
341 	return fd;
342 }
343 
344 /*
345  * Determines the file descriptor to use, and attempts to open
346  * the file if it is not already open. Also determines the wss
347  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
348  * if flowop_openfile_common couldn't obtain an appropriate file
349  * from a the fileset, and FILEBENCH_OK otherwise.
350  */
351 static int
352 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
353     fbint_t *wssp, fb_fdesc_t **fdescp)
354 {
355 	int fd = flowoplib_fdnum(threadflow, flowop);
356 
357 	if (fd == -1)
358 		return (FILEBENCH_ERROR);
359 
360 	/* check for conflicting fdnumber and file name */
361 	if ((fd > 0) && (threadflow->tf_fse[fd] != NULL)) {
362 		char *fd_based_name;
363 
364 		fd_based_name =
365 		    avd_get_str(threadflow->tf_fse[fd]->fse_fileset->fs_name);
366 
367 		if (flowop->fo_filename != NULL) {
368 			char *fo_based_name;
369 
370 			fo_based_name = avd_get_str(flowop->fo_filename);
371 			if (strcmp(fd_based_name, fo_based_name) != 0) {
372 				filebench_log(LOG_ERROR, "Name of fd refer"
373 				    "enced fileset name (%s) CONFLICTS with"
374 				    " flowop supplied fileset name (%s)",
375 				    fd_based_name, fo_based_name);
376 				filebench_shutdown(1);
377 				return (FILEBENCH_ERROR);
378 			}
379 		}
380 	}
381 
382 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
383 		int ret;
384 
385 		if ((ret = flowoplib_openfile_common(
386 		    threadflow, flowop, fd)) != FILEBENCH_OK)
387 			return (ret);
388 
389 		if (threadflow->tf_fse[fd]) {
390 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
391 			    threadflow->tf_fse[fd]->fse_path);
392 		} else {
393 			filebench_log(LOG_DEBUG_IMPL,
394 			    "opened device %s/%s",
395 			    avd_get_str(flowop->fo_fileset->fs_path),
396 			    avd_get_str(flowop->fo_fileset->fs_name));
397 		}
398 	}
399 
400 	*fdescp = &(threadflow->tf_fd[fd]);
401 
402 	if ((*wssp = flowop->fo_constwss) == 0) {
403 		if (threadflow->tf_fse[fd])
404 			*wssp = threadflow->tf_fse[fd]->fse_size;
405 		else
406 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
407 	}
408 
409 	return (FILEBENCH_OK);
410 }
411 
412 /*
413  * Determines the io buffer or random offset into tf_mem for
414  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
415  */
416 static int
417 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
418     caddr_t *iobufp, fbint_t iosize)
419 {
420 	long memsize;
421 	size_t memoffset;
422 
423 	if (iosize == 0) {
424 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
425 		    flowop->fo_name);
426 		return (FILEBENCH_ERROR);
427 	}
428 
429 	/* If directio, we need to align buffer address by sector */
430 	if (flowoplib_fileattrs(flowop) & FLOW_ATTR_DIRECTIO)
431 		iosize = iosize + 512;
432 
433 	if ((memsize = threadflow->tf_constmemsize) != 0) {
434 		/* use tf_mem for I/O with random offset */
435 
436 		if (memsize < iosize) {
437 			filebench_log(LOG_ERROR,
438 			    "tf_memsize smaller than IO size for thread %s",
439 			    flowop->fo_name);
440 			return (FILEBENCH_ERROR);
441 		}
442 
443 		fb_random(&memoffset, memsize, iosize, NULL);
444 		*iobufp = threadflow->tf_mem + memoffset;
445 
446 	} else {
447 		/* use private I/O buffer */
448 		if ((flowop->fo_buf != NULL) &&
449 		    (flowop->fo_buf_size < iosize)) {
450 			/* too small, so free up and re-allocate */
451 			free(flowop->fo_buf);
452 			flowop->fo_buf = NULL;
453 		}
454 
455 		/*
456 		 * Allocate memory for the  buffer. The memory is freed
457 		 * by flowop_destruct_generic() or by this routine if more
458 		 * memory is needed for the buffer.
459 		 */
460 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
461 		    = (char *)malloc(iosize)) == NULL))
462 			return (FILEBENCH_ERROR);
463 
464 		flowop->fo_buf_size = iosize;
465 		*iobufp = flowop->fo_buf;
466 	}
467 
468 	if (flowoplib_fileattrs(flowop) & FLOW_ATTR_DIRECTIO)
469 		*iobufp = (caddr_t)((((unsigned long)(*iobufp) + 512) / 512) * 512);
470 
471 	return (FILEBENCH_OK);
472 }
473 
474 /*
475  * Determines the file descriptor to use, opens it if necessary, the
476  * io buffer or random offset into tf_mem for IO operation and the wss
477  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
478  */
479 int
480 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
481     fbint_t *wssp, caddr_t *iobufp, fb_fdesc_t **filedescp, fbint_t iosize)
482 {
483 	int ret;
484 
485 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
486 	    FILEBENCH_OK)
487 		return (ret);
488 
489 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
490 	    FILEBENCH_OK)
491 		return (ret);
492 
493 	return (FILEBENCH_OK);
494 }
495 
496 /*
497  * Emulate posix read / pread. If the flowop has a fileset,
498  * a file descriptor number index is fetched, otherwise a
499  * supplied fileobj file is used. In either case the specified
500  * file will be opened if not already open. If the flowop has
501  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
502  * returned.
503  *
504  * The actual read is done to a random offset in the
505  * threadflow's thread memory (tf_mem), with a size set by
506  * fo_iosize and at either a random disk offset within the
507  * working set size, or at the next sequential location. If
508  * any errors are encountered, FILEBENCH_ERROR is returned,
509  * if no appropriate file can be obtained from the fileset then
510  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
511  */
512 static int
513 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
514 {
515 	caddr_t iobuf;
516 	fbint_t wss;
517 	fbint_t iosize;
518 	fb_fdesc_t *fdesc;
519 	int ret;
520 
521 	iosize = avd_get_int(flowop->fo_iosize);
522 
523 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
524 	    &fdesc, iosize)) != FILEBENCH_OK)
525 		return (ret);
526 
527 	if (avd_get_bool(flowop->fo_random)) {
528 		uint64_t fileoffset;
529 
530 		if (iosize > wss) {
531 			filebench_log(LOG_ERROR,
532 			    "file size smaller than IO size for thread %s",
533 			    flowop->fo_name);
534 			return (FILEBENCH_ERROR);
535 		}
536 
537 		/* select randomly */
538 		fb_random64(&fileoffset, wss, iosize, NULL);
539 
540 		(void) flowop_beginop(threadflow, flowop);
541 		if ((ret = FB_PREAD(fdesc, iobuf,
542 		    iosize, (off64_t)fileoffset)) == -1) {
543 			(void) flowop_endop(threadflow, flowop, 0);
544 			filebench_log(LOG_ERROR,
545 			    "read file %s failed, offset %llu "
546 			    "io buffer %zd: %s",
547 			    avd_get_str(flowop->fo_fileset->fs_name),
548 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
549 			flowop_endop(threadflow, flowop, 0);
550 			return (FILEBENCH_ERROR);
551 		}
552 		(void) flowop_endop(threadflow, flowop, ret);
553 
554 		if ((ret == 0))
555 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
556 
557 	} else {
558 		(void) flowop_beginop(threadflow, flowop);
559 		if ((ret = FB_READ(fdesc, iobuf, iosize)) == -1) {
560 			(void) flowop_endop(threadflow, flowop, 0);
561 			filebench_log(LOG_ERROR,
562 			    "read file %s failed, io buffer %zd: %s",
563 			    avd_get_str(flowop->fo_fileset->fs_name),
564 			    iobuf, strerror(errno));
565 			(void) flowop_endop(threadflow, flowop, 0);
566 			return (FILEBENCH_ERROR);
567 		}
568 		(void) flowop_endop(threadflow, flowop, ret);
569 
570 		if ((ret == 0))
571 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
572 	}
573 
574 	return (FILEBENCH_OK);
575 }
576 
577 /*
578  * Initializes a "flowop_block" flowop. Specifically, it
579  * initializes the flowop's fo_cv and unlocks the fo_lock.
580  */
581 static int
582 flowoplib_block_init(flowop_t *flowop)
583 {
584 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
585 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
586 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
587 	(void) ipc_mutex_unlock(&flowop->fo_lock);
588 
589 	return (FILEBENCH_OK);
590 }
591 
592 /*
593  * Blocks the threadflow until woken up by flowoplib_wakeup.
594  * The routine blocks on the flowop's fo_cv condition variable.
595  */
596 static int
597 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
598 {
599 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
600 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
601 	(void) ipc_mutex_lock(&flowop->fo_lock);
602 
603 	flowop_beginop(threadflow, flowop);
604 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
605 	flowop_endop(threadflow, flowop, 0);
606 
607 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
608 	    flowop->fo_name, flowop->fo_instance);
609 
610 	(void) ipc_mutex_unlock(&flowop->fo_lock);
611 
612 	return (FILEBENCH_OK);
613 }
614 
615 /*
616  * Wakes up one or more target blocking flowops.
617  * Sends broadcasts on the fo_cv condition variables of all
618  * flowops on the target list, except those that are
619  * FLOW_MASTER flowops. The target list consists of all
620  * flowops whose name matches this flowop's "fo_targetname"
621  * attribute. The target list is generated on the first
622  * invocation, and the run will be shutdown if no targets
623  * are found. Otherwise the routine always returns FILEBENCH_OK.
624  */
625 static int
626 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
627 {
628 	flowop_t *target;
629 
630 	/* if this is the first wakeup, create the wakeup list */
631 	if (flowop->fo_targets == NULL) {
632 		flowop_t *result = flowop_find(flowop->fo_targetname);
633 
634 		flowop->fo_targets = result;
635 		if (result == NULL) {
636 			filebench_log(LOG_ERROR,
637 			    "wakeup: could not find op %s for thread %s",
638 			    flowop->fo_targetname,
639 			    threadflow->tf_name);
640 			filebench_shutdown(1);
641 		}
642 		while (result) {
643 			result->fo_targetnext =
644 			    result->fo_resultnext;
645 			result = result->fo_resultnext;
646 		}
647 	}
648 
649 	target = flowop->fo_targets;
650 
651 	/* wakeup the targets */
652 	while (target) {
653 		if (target->fo_instance == FLOW_MASTER) {
654 			target = target->fo_targetnext;
655 			continue;
656 		}
657 		filebench_log(LOG_DEBUG_IMPL,
658 		    "wakeup flow %s-%d at address %zx",
659 		    target->fo_name,
660 		    target->fo_instance,
661 		    &target->fo_cv);
662 
663 		flowop_beginop(threadflow, flowop);
664 		(void) ipc_mutex_lock(&target->fo_lock);
665 		(void) pthread_cond_broadcast(&target->fo_cv);
666 		(void) ipc_mutex_unlock(&target->fo_lock);
667 		flowop_endop(threadflow, flowop, 0);
668 
669 		target = target->fo_targetnext;
670 	}
671 
672 	return (FILEBENCH_OK);
673 }
674 
675 /*
676  * "think time" routines. the "hog" routine consumes cpu cycles as
677  * it "thinks", while the "delay" flowop simply calls sleep() to delay
678  * for a given number of seconds without consuming cpu cycles.
679  */
680 
681 
682 /*
683  * Consumes CPU cycles and memory bandwidth by looping for
684  * flowop->fo_value times. With each loop sets memory location
685  * threadflow->tf_mem to 1.
686  */
687 static int
688 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
689 {
690 	uint64_t value = avd_get_int(flowop->fo_value);
691 	int i;
692 
693 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
694 	flowop_beginop(threadflow, flowop);
695 	if (threadflow->tf_mem != NULL) {
696 		for (i = 0; i < value; i++)
697 			*(threadflow->tf_mem) = 1;
698 	}
699 	flowop_endop(threadflow, flowop, 0);
700 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
701 	return (FILEBENCH_OK);
702 }
703 
704 
705 /*
706  * Delays for fo_value seconds.
707  */
708 static int
709 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
710 {
711 	int value = avd_get_int(flowop->fo_value);
712 
713 	flowop_beginop(threadflow, flowop);
714 	(void) sleep(value);
715 	flowop_endop(threadflow, flowop, 0);
716 	return (FILEBENCH_OK);
717 }
718 
719 /*
720  * Rate limiting routines. This is the event consuming half of the
721  * event system. Each of the four following routines will limit the rate
722  * to one unit of either calls, issued I/O operations, issued filebench
723  * operations, or I/O bandwidth. Since there is only one event generator,
724  * the events will be divided amoung multiple instances of an event
725  * consumer, and further divided among different consumers if more than
726  * one has been defined. There is no mechanism to enforce equal sharing
727  * of events.
728  */
729 
730 /*
731  * Completes one invocation per posted event. If eventgen_q
732  * has an event count greater than zero, one will be removed
733  * (count decremented), otherwise the calling thread will
734  * block until another event has been posted. Always returns 0
735  */
736 static int
737 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
738 {
739 	/* Immediately bail if not set/enabled */
740 	if (!filebench_shm->shm_eventgen_enabled)
741 		return (FILEBENCH_OK);
742 
743 	if (flowop->fo_initted == 0) {
744 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
745 		    flowop, threadflow->tf_name, threadflow->tf_instance);
746 		flowop->fo_initted = 1;
747 	}
748 
749 	flowop_beginop(threadflow, flowop);
750 	while (filebench_shm->shm_eventgen_enabled) {
751 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
752 		if (filebench_shm->shm_eventgen_q > 0) {
753 			filebench_shm->shm_eventgen_q--;
754 			(void) ipc_mutex_unlock(
755 			    &filebench_shm->shm_eventgen_lock);
756 			break;
757 		}
758 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
759 		    &filebench_shm->shm_eventgen_lock);
760 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
761 	}
762 	flowop_endop(threadflow, flowop, 0);
763 	return (FILEBENCH_OK);
764 }
765 
766 static int
767 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
768 {
769 	if (flowop->fo_targetname[0] != '\0') {
770 
771 		/* Try to use statistics from specific flowop */
772 		flowop->fo_targets =
773 		    flowop_find_from_list(flowop->fo_targetname,
774 		    threadflow->tf_thrd_fops);
775 		if (flowop->fo_targets == NULL) {
776 			filebench_log(LOG_ERROR,
777 			    "limit target: could not find flowop %s",
778 			    flowop->fo_targetname);
779 			filebench_shutdown(1);
780 			return (FILEBENCH_ERROR);
781 		}
782 	} else {
783 		/* use total workload statistics */
784 		flowop->fo_targets = NULL;
785 	}
786 	return (FILEBENCH_OK);
787 }
788 
789 /*
790  * Blocks the calling thread if the number of issued I/O
791  * operations exceeds the number of posted events, thus
792  * limiting the average I/O operation rate to the rate
793  * specified by eventgen_hz. Always returns FILEBENCH_OK.
794  */
795 static int
796 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
797 {
798 	uint64_t iops;
799 	uint64_t delta;
800 	uint64_t events;
801 
802 	/* Immediately bail if not set/enabled */
803 	if (!filebench_shm->shm_eventgen_enabled)
804 		return (FILEBENCH_OK);
805 
806 	if (flowop->fo_initted == 0) {
807 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
808 		    flowop, threadflow->tf_name, threadflow->tf_instance);
809 		flowop->fo_initted = 1;
810 
811 		if (flowoplib_event_find_target(threadflow, flowop)
812 		    == FILEBENCH_ERROR)
813 			return (FILEBENCH_ERROR);
814 
815 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
816 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
817 			filebench_log(LOG_ERROR,
818 			    "WARNING: Flowop %s does no IO",
819 			    flowop->fo_targets->fo_name);
820 			filebench_shutdown(1);
821 			return (FILEBENCH_ERROR);
822 		}
823 	}
824 
825 	if (flowop->fo_targets) {
826 		/*
827 		 * Note that fs_count is already the sum of fs_rcount
828 		 * and fs_wcount if looking at a single flowop.
829 		 */
830 		iops = flowop->fo_targets->fo_stats.fs_count;
831 	} else {
832 		(void) ipc_mutex_lock(&controlstats_lock);
833 		iops = (controlstats.fs_rcount +
834 		    controlstats.fs_wcount);
835 		(void) ipc_mutex_unlock(&controlstats_lock);
836 	}
837 
838 	/* Is this the first time around */
839 	if (flowop->fo_tputlast == 0) {
840 		flowop->fo_tputlast = iops;
841 		return (FILEBENCH_OK);
842 	}
843 
844 	delta = iops - flowop->fo_tputlast;
845 	flowop->fo_tputbucket -= delta;
846 	flowop->fo_tputlast = iops;
847 
848 	/* No need to block if the q isn't empty */
849 	if (flowop->fo_tputbucket >= 0LL) {
850 		flowop_endop(threadflow, flowop, 0);
851 		return (FILEBENCH_OK);
852 	}
853 
854 	iops = flowop->fo_tputbucket * -1;
855 	events = iops;
856 
857 	flowop_beginop(threadflow, flowop);
858 	while (filebench_shm->shm_eventgen_enabled) {
859 
860 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
861 		if (filebench_shm->shm_eventgen_q >= events) {
862 			filebench_shm->shm_eventgen_q -= events;
863 			(void) ipc_mutex_unlock(
864 			    &filebench_shm->shm_eventgen_lock);
865 			flowop->fo_tputbucket += events;
866 			break;
867 		}
868 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
869 		    &filebench_shm->shm_eventgen_lock);
870 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
871 	}
872 	flowop_endop(threadflow, flowop, 0);
873 
874 	return (FILEBENCH_OK);
875 }
876 
877 /*
878  * Blocks the calling thread if the number of issued filebench
879  * operations exceeds the number of posted events, thus limiting
880  * the average filebench operation rate to the rate specified by
881  * eventgen_hz. Always returns FILEBENCH_OK.
882  */
883 static int
884 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
885 {
886 	uint64_t ops;
887 	uint64_t delta;
888 	uint64_t events;
889 
890 	/* Immediately bail if not set/enabled */
891 	if (!filebench_shm->shm_eventgen_enabled)
892 		return (FILEBENCH_OK);
893 
894 	if (flowop->fo_initted == 0) {
895 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
896 		    flowop, threadflow->tf_name, threadflow->tf_instance);
897 		flowop->fo_initted = 1;
898 
899 		if (flowoplib_event_find_target(threadflow, flowop)
900 		    == FILEBENCH_ERROR)
901 			return (FILEBENCH_ERROR);
902 	}
903 
904 	if (flowop->fo_targets) {
905 		ops = flowop->fo_targets->fo_stats.fs_count;
906 	} else {
907 		(void) ipc_mutex_lock(&controlstats_lock);
908 		ops = controlstats.fs_count;
909 		(void) ipc_mutex_unlock(&controlstats_lock);
910 	}
911 
912 	/* Is this the first time around */
913 	if (flowop->fo_tputlast == 0) {
914 		flowop->fo_tputlast = ops;
915 		return (FILEBENCH_OK);
916 	}
917 
918 	delta = ops - flowop->fo_tputlast;
919 	flowop->fo_tputbucket -= delta;
920 	flowop->fo_tputlast = ops;
921 
922 	/* No need to block if the q isn't empty */
923 	if (flowop->fo_tputbucket >= 0LL) {
924 		flowop_endop(threadflow, flowop, 0);
925 		return (FILEBENCH_OK);
926 	}
927 
928 	ops = flowop->fo_tputbucket * -1;
929 	events = ops;
930 
931 	flowop_beginop(threadflow, flowop);
932 	while (filebench_shm->shm_eventgen_enabled) {
933 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
934 		if (filebench_shm->shm_eventgen_q >= events) {
935 			filebench_shm->shm_eventgen_q -= events;
936 			(void) ipc_mutex_unlock(
937 			    &filebench_shm->shm_eventgen_lock);
938 			flowop->fo_tputbucket += events;
939 			break;
940 		}
941 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
942 		    &filebench_shm->shm_eventgen_lock);
943 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
944 	}
945 	flowop_endop(threadflow, flowop, 0);
946 
947 	return (FILEBENCH_OK);
948 }
949 
950 
951 /*
952  * Blocks the calling thread if the number of bytes of I/O
953  * issued exceeds one megabyte times the number of posted
954  * events, thus limiting the average I/O byte rate to one
955  * megabyte times the event rate as set by eventgen_hz.
956  * Always retuns FILEBENCH_OK.
957  */
958 static int
959 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
960 {
961 	uint64_t bytes;
962 	uint64_t delta;
963 	uint64_t events;
964 
965 	/* Immediately bail if not set/enabled */
966 	if (!filebench_shm->shm_eventgen_enabled)
967 		return (FILEBENCH_OK);
968 
969 	if (flowop->fo_initted == 0) {
970 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
971 		    flowop, threadflow->tf_name, threadflow->tf_instance);
972 		flowop->fo_initted = 1;
973 
974 		if (flowoplib_event_find_target(threadflow, flowop)
975 		    == FILEBENCH_ERROR)
976 			return (FILEBENCH_ERROR);
977 
978 		if ((flowop->fo_targets) &&
979 		    ((flowop->fo_targets->fo_attrs &
980 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
981 			filebench_log(LOG_ERROR,
982 			    "WARNING: Flowop %s does no Reads or Writes",
983 			    flowop->fo_targets->fo_name);
984 			filebench_shutdown(1);
985 			return (FILEBENCH_ERROR);
986 		}
987 	}
988 
989 	if (flowop->fo_targets) {
990 		/*
991 		 * Note that fs_bytes is already the sum of fs_rbytes
992 		 * and fs_wbytes if looking at a single flowop.
993 		 */
994 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
995 	} else {
996 		(void) ipc_mutex_lock(&controlstats_lock);
997 		bytes = (controlstats.fs_rbytes +
998 		    controlstats.fs_wbytes);
999 		(void) ipc_mutex_unlock(&controlstats_lock);
1000 	}
1001 
1002 	/* Is this the first time around? */
1003 	if (flowop->fo_tputlast == 0) {
1004 		flowop->fo_tputlast = bytes;
1005 		return (FILEBENCH_OK);
1006 	}
1007 
1008 	delta = bytes - flowop->fo_tputlast;
1009 	flowop->fo_tputbucket -= delta;
1010 	flowop->fo_tputlast = bytes;
1011 
1012 	/* No need to block if the q isn't empty */
1013 	if (flowop->fo_tputbucket >= 0LL) {
1014 		flowop_endop(threadflow, flowop, 0);
1015 		return (FILEBENCH_OK);
1016 	}
1017 
1018 	bytes = flowop->fo_tputbucket * -1;
1019 	events = (bytes / MB) + 1;
1020 
1021 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1022 	    (u_longlong_t)bytes, (u_longlong_t)events);
1023 
1024 	flowop_beginop(threadflow, flowop);
1025 	while (filebench_shm->shm_eventgen_enabled) {
1026 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1027 		if (filebench_shm->shm_eventgen_q >= events) {
1028 			filebench_shm->shm_eventgen_q -= events;
1029 			(void) ipc_mutex_unlock(
1030 			    &filebench_shm->shm_eventgen_lock);
1031 			flowop->fo_tputbucket += (events * MB);
1032 			break;
1033 		}
1034 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1035 		    &filebench_shm->shm_eventgen_lock);
1036 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1037 	}
1038 	flowop_endop(threadflow, flowop, 0);
1039 
1040 	return (FILEBENCH_OK);
1041 }
1042 
1043 /*
1044  * Stop worker thread when specified number of I/O bytes have been transferred.
1045  */
1046 static int
1047 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1048 {
1049 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
1050 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1051 						    /* Uses constant value */
1052 
1053 	if (flowop->fo_initted == 0) {
1054 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1055 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1056 		flowop->fo_initted = 1;
1057 
1058 		if (flowoplib_event_find_target(threadflow, flowop)
1059 		    == FILEBENCH_ERROR)
1060 			return (FILEBENCH_ERROR);
1061 
1062 		if ((flowop->fo_targets) &&
1063 		    ((flowop->fo_targets->fo_attrs &
1064 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1065 			filebench_log(LOG_ERROR,
1066 			    "WARNING: Flowop %s does no Reads or Writes",
1067 			    flowop->fo_targets->fo_name);
1068 			filebench_shutdown(1);
1069 			return (FILEBENCH_ERROR);
1070 		}
1071 	}
1072 
1073 	if (flowop->fo_targets) {
1074 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1075 	} else {
1076 		(void) ipc_mutex_lock(&controlstats_lock);
1077 		bytes_io = controlstats.fs_bytes;
1078 		(void) ipc_mutex_unlock(&controlstats_lock);
1079 	}
1080 
1081 	flowop_beginop(threadflow, flowop);
1082 	if (bytes_io > byte_lim) {
1083 		flowop_endop(threadflow, flowop, 0);
1084 		return (FILEBENCH_NORSC);
1085 	}
1086 	flowop_endop(threadflow, flowop, 0);
1087 
1088 	return (FILEBENCH_OK);
1089 }
1090 
1091 /*
1092  * Stop worker thread when specified number of I/O operations have been
1093  * transferred.
1094  */
1095 static int
1096 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1097 {
1098 	uint64_t ops;
1099 	uint64_t count = flowop->fo_constvalue; /* use constant value */
1100 
1101 	if (flowop->fo_initted == 0) {
1102 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1103 		    flowop, threadflow->tf_name, threadflow->tf_instance);
1104 		flowop->fo_initted = 1;
1105 
1106 		if (flowoplib_event_find_target(threadflow, flowop)
1107 		    == FILEBENCH_ERROR)
1108 			return (FILEBENCH_ERROR);
1109 	}
1110 
1111 	if (flowop->fo_targets) {
1112 		ops = flowop->fo_targets->fo_stats.fs_count;
1113 	} else {
1114 		(void) ipc_mutex_lock(&controlstats_lock);
1115 		ops = controlstats.fs_count;
1116 		(void) ipc_mutex_unlock(&controlstats_lock);
1117 	}
1118 
1119 	flowop_beginop(threadflow, flowop);
1120 	if (ops >= count) {
1121 		flowop_endop(threadflow, flowop, 0);
1122 		return (FILEBENCH_NORSC);
1123 	}
1124 	flowop_endop(threadflow, flowop, 0);
1125 
1126 	return (FILEBENCH_OK);
1127 }
1128 
1129 /*
1130  * Semaphore synchronization using either System V semaphores or
1131  * posix semaphores. If System V semaphores are available, they will be
1132  * used, otherwise posix semaphores will be used.
1133  */
1134 
1135 
1136 /*
1137  * Initializes the filebench "block on semaphore" flowop.
1138  * If System V semaphores are implemented, the routine
1139  * initializes the System V semaphore subsystem if it hasn't
1140  * already been initialized, also allocates a pair of semids
1141  * and initializes the highwater System V semaphore.
1142  * If no System V semaphores, then does nothing special.
1143  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1144  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1145  * on success.
1146  */
1147 static int
1148 flowoplib_semblock_init(flowop_t *flowop)
1149 {
1150 
1151 #ifdef HAVE_SYSV_SEM
1152 	int sys_semid;
1153 	struct sembuf sbuf[2];
1154 	int highwater;
1155 
1156 	ipc_seminit();
1157 
1158 	flowop->fo_semid_lw = ipc_semidalloc();
1159 	flowop->fo_semid_hw = ipc_semidalloc();
1160 
1161 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1162 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1163 
1164 	sys_semid = filebench_shm->shm_sys_semid;
1165 
1166 	if ((highwater = flowop->fo_semid_hw) == 0)
1167 		highwater = flowop->fo_constvalue; /* use constant value */
1168 
1169 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1170 
1171 	sbuf[0].sem_num = (short)highwater;
1172 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1173 	sbuf[0].sem_flg = 0;
1174 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1175 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1176 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1177 		return (FILEBENCH_ERROR);
1178 	}
1179 #else
1180 	filebench_log(LOG_DEBUG_IMPL,
1181 	    "flow %s-%d semblock init with posix semaphore",
1182 	    flowop->fo_name, flowop->fo_instance);
1183 
1184 	sem_init(&flowop->fo_sem, 1, 0);
1185 #endif	/* HAVE_SYSV_SEM */
1186 
1187 	if (!(avd_get_bool(flowop->fo_blocking)))
1188 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1189 
1190 	return (FILEBENCH_OK);
1191 }
1192 
1193 /*
1194  * Releases the semids for the System V semaphore allocated
1195  * to this flowop. If not using System V semaphores, then
1196  * it is effectively just a no-op.
1197  */
1198 static void
1199 flowoplib_semblock_destruct(flowop_t *flowop)
1200 {
1201 #ifdef HAVE_SYSV_SEM
1202 	ipc_semidfree(flowop->fo_semid_lw);
1203 	ipc_semidfree(flowop->fo_semid_hw);
1204 #else
1205 	sem_destroy(&flowop->fo_sem);
1206 #endif /* HAVE_SYSV_SEM */
1207 }
1208 
1209 /*
1210  * Attempts to pass a System V or posix semaphore as appropriate,
1211  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1212  * semphores is not available or cannot be acquired, or if the initial
1213  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1214  */
1215 static int
1216 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1217 {
1218 
1219 #ifdef HAVE_SYSV_SEM
1220 	struct sembuf sbuf[2];
1221 	int value = avd_get_int(flowop->fo_value);
1222 	int sys_semid;
1223 	struct timespec timeout;
1224 
1225 	sys_semid = filebench_shm->shm_sys_semid;
1226 
1227 	filebench_log(LOG_DEBUG_IMPL,
1228 	    "flow %s-%d sem blocking on id %x num %x value %d",
1229 	    flowop->fo_name, flowop->fo_instance, sys_semid,
1230 	    flowop->fo_semid_hw, value);
1231 
1232 	/* Post, decrement the increment the hw queue */
1233 	sbuf[0].sem_num = flowop->fo_semid_hw;
1234 	sbuf[0].sem_op = (short)value;
1235 	sbuf[0].sem_flg = 0;
1236 	sbuf[1].sem_num = flowop->fo_semid_lw;
1237 	sbuf[1].sem_op = value * -1;
1238 	sbuf[1].sem_flg = 0;
1239 	timeout.tv_sec = 600;
1240 	timeout.tv_nsec = 0;
1241 
1242 	if (avd_get_bool(flowop->fo_blocking))
1243 		(void) ipc_mutex_unlock(&flowop->fo_lock);
1244 
1245 	flowop_beginop(threadflow, flowop);
1246 
1247 #ifdef HAVE_SEMTIMEDOP
1248 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1249 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1250 #else
1251 	(void) semop(sys_semid, &sbuf[0], 1);
1252 	(void) semop(sys_semid, &sbuf[1], 1);
1253 #endif /* HAVE_SEMTIMEDOP */
1254 
1255 	if (avd_get_bool(flowop->fo_blocking))
1256 		(void) ipc_mutex_lock(&flowop->fo_lock);
1257 
1258 	flowop_endop(threadflow, flowop, 0);
1259 
1260 #else
1261 	int value = avd_get_int(flowop->fo_value);
1262 	int i;
1263 
1264 	filebench_log(LOG_DEBUG_IMPL,
1265 	    "flow %s-%d sem blocking on posix semaphore",
1266 	    flowop->fo_name, flowop->fo_instance);
1267 
1268 	/* Decrement sem by value */
1269 	for (i = 0; i < value; i++) {
1270 		if (sem_wait(&flowop->fo_sem) == -1) {
1271 			filebench_log(LOG_ERROR, "semop wait failed");
1272 			return (FILEBENCH_ERROR);
1273 		}
1274 	}
1275 
1276 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1277 	    flowop->fo_name, flowop->fo_instance);
1278 #endif /* HAVE_SYSV_SEM */
1279 
1280 	return (FILEBENCH_OK);
1281 }
1282 
1283 /*
1284  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1285  */
1286 /* ARGSUSED */
1287 static int
1288 flowoplib_sempost_init(flowop_t *flowop)
1289 {
1290 #ifdef HAVE_SYSV_SEM
1291 	ipc_seminit();
1292 #endif /* HAVE_SYSV_SEM */
1293 	return (FILEBENCH_OK);
1294 }
1295 
1296 /*
1297  * Post to a System V or posix semaphore as appropriate.
1298  * On the first call for a given flowop instance, this routine
1299  * will use the fo_targetname attribute to locate all semblock
1300  * flowops that are expecting posts from this flowop. All
1301  * target flowops on this list will have a post operation done
1302  * to their semaphores on each call.
1303  */
1304 static int
1305 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1306 {
1307 	flowop_t *target;
1308 
1309 	filebench_log(LOG_DEBUG_IMPL,
1310 	    "sempost flow %s-%d",
1311 	    flowop->fo_name,
1312 	    flowop->fo_instance);
1313 
1314 	/* if this is the first post, create the post list */
1315 	if (flowop->fo_targets == NULL) {
1316 		flowop_t *result = flowop_find(flowop->fo_targetname);
1317 
1318 		flowop->fo_targets = result;
1319 
1320 		if (result == NULL) {
1321 			filebench_log(LOG_ERROR,
1322 			    "sempost: could not find op %s for thread %s",
1323 			    flowop->fo_targetname,
1324 			    threadflow->tf_name);
1325 			filebench_shutdown(1);
1326 		}
1327 
1328 		while (result) {
1329 			result->fo_targetnext =
1330 			    result->fo_resultnext;
1331 			result = result->fo_resultnext;
1332 		}
1333 	}
1334 
1335 	target = flowop->fo_targets;
1336 
1337 	flowop_beginop(threadflow, flowop);
1338 	/* post to the targets */
1339 	while (target) {
1340 #ifdef HAVE_SYSV_SEM
1341 		struct sembuf sbuf[2];
1342 		int sys_semid;
1343 		int blocking;
1344 #else
1345 		int i;
1346 #endif /* HAVE_SYSV_SEM */
1347 		struct timespec timeout;
1348 		int value = (int)avd_get_int(flowop->fo_value);
1349 
1350 		if (target->fo_instance == FLOW_MASTER) {
1351 			target = target->fo_targetnext;
1352 			continue;
1353 		}
1354 
1355 #ifdef HAVE_SYSV_SEM
1356 
1357 		filebench_log(LOG_DEBUG_IMPL,
1358 		    "sempost flow %s-%d num %x",
1359 		    target->fo_name,
1360 		    target->fo_instance,
1361 		    target->fo_semid_lw);
1362 
1363 		sys_semid = filebench_shm->shm_sys_semid;
1364 		sbuf[0].sem_num = target->fo_semid_lw;
1365 		sbuf[0].sem_op = (short)value;
1366 		sbuf[0].sem_flg = 0;
1367 		sbuf[1].sem_num = target->fo_semid_hw;
1368 		sbuf[1].sem_op = value * -1;
1369 		sbuf[1].sem_flg = 0;
1370 		timeout.tv_sec = 600;
1371 		timeout.tv_nsec = 0;
1372 
1373 		if (avd_get_bool(flowop->fo_blocking))
1374 			blocking = 1;
1375 		else
1376 			blocking = 0;
1377 
1378 #ifdef HAVE_SEMTIMEDOP
1379 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1380 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
1381 #else
1382 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1383 		    (errno && (errno != EAGAIN))) {
1384 #endif /* HAVE_SEMTIMEDOP */
1385 			filebench_log(LOG_ERROR, "semop post failed: %s",
1386 			    strerror(errno));
1387 			return (FILEBENCH_ERROR);
1388 		}
1389 
1390 		filebench_log(LOG_DEBUG_IMPL,
1391 		    "flow %s-%d finished posting",
1392 		    target->fo_name, target->fo_instance);
1393 #else
1394 		filebench_log(LOG_DEBUG_IMPL,
1395 		    "sempost flow %s-%d to posix semaphore",
1396 		    target->fo_name,
1397 		    target->fo_instance);
1398 
1399 		/* Increment sem by value */
1400 		for (i = 0; i < value; i++) {
1401 			if (sem_post(&target->fo_sem) == -1) {
1402 				filebench_log(LOG_ERROR, "semop post failed");
1403 				return (FILEBENCH_ERROR);
1404 			}
1405 		}
1406 
1407 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1408 		    target->fo_name, target->fo_instance);
1409 #endif /* HAVE_SYSV_SEM */
1410 
1411 		target = target->fo_targetnext;
1412 	}
1413 	flowop_endop(threadflow, flowop, 0);
1414 
1415 	return (FILEBENCH_OK);
1416 }
1417 
1418 
1419 /*
1420  * Section for exercising create / open / close / delete operations
1421  * on files within a fileset. For proper operation, the flowop attribute
1422  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1423  * so that the same file is opened and later closed. "fd" is an index
1424  * into a pair of arrays maintained by threadflows, one of which
1425  * contains the operating system assigned file descriptors and the other
1426  * a pointer to the filesetentry whose file the file descriptor
1427  * references. An openfile flowop defined without fd being set will use
1428  * the default (0) fd or, if specified, rotate through fd indices, but
1429  * createfile and closefile must use the default or a specified fd.
1430  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1431  * of fd attribute.
1432  */
1433 
1434 /*
1435  * Emulates (and actually does) file open. Obtains a file descriptor
1436  * index, then calls flowoplib_openfile_common() to open. Returns
1437  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1438  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1439  * FILEBENCH_NORSC, FILEBENCH_OK).
1440  */
1441 static int
1442 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1443 {
1444 	int fd = flowoplib_fdnum(threadflow, flowop);
1445 
1446 	if (fd == -1)
1447 		return (FILEBENCH_ERROR);
1448 
1449 	return (flowoplib_openfile_common(threadflow, flowop, fd));
1450 }
1451 
1452 /*
1453  * Common file opening code for filesets. Uses the supplied
1454  * file descriptor index to determine the tf_fd entry to use.
1455  * If the entry is empty (0) and the fileset exists, fileset
1456  * pick is called to select a fileset entry to use. The file
1457  * specified in the filesetentry is opened, and the returned
1458  * operating system file descriptor and a pointer to the
1459  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1460  * respectively. Returns FILEBENCH_ERROR on error,
1461  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1462  * and FILEBENCH_OK on success.
1463  */
1464 static int
1465 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1466 {
1467 	filesetentry_t *file;
1468 	char *fileset_name;
1469 	int tid = 0;
1470 	int openflag = 0;
1471 	int err;
1472 
1473 	if (flowop->fo_fileset == NULL) {
1474 		filebench_log(LOG_ERROR, "flowop NULL file");
1475 		return (FILEBENCH_ERROR);
1476 	}
1477 
1478 	if ((fileset_name =
1479 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1480 		filebench_log(LOG_ERROR,
1481 		    "flowop %s: fileset has no name", flowop->fo_name);
1482 		return (FILEBENCH_ERROR);
1483 	}
1484 
1485 	/*
1486 	 * set the open flag for read only or read/write, as appropriate.
1487 	 */
1488 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE)
1489 		openflag = O_RDONLY;
1490 	else if (avd_get_bool(flowop->fo_fileset->fs_writeonly) == TRUE)
1491 		openflag = O_WRONLY;
1492 	else
1493 		openflag = O_RDWR;
1494 
1495 	/*
1496 	 * If the flowop doesn't default to persistent fd
1497 	 * then get unique thread ID for use by fileset_pick
1498 	 */
1499 	if (avd_get_bool(flowop->fo_rotatefd))
1500 		tid = threadflow->tf_utid;
1501 
1502 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1503 		filebench_log(LOG_ERROR,
1504 		    "flowop %s attempted to open without closing on fd %d",
1505 		    flowop->fo_name, fd);
1506 		return (FILEBENCH_ERROR);
1507 	}
1508 
1509 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1510 		int open_attrs = 0;
1511 		char name[MAXPATHLEN];
1512 
1513 		(void) fb_strlcpy(name,
1514 		    avd_get_str(flowop->fo_fileset->fs_path), MAXPATHLEN);
1515 		(void) fb_strlcat(name, "/", MAXPATHLEN);
1516 		(void) fb_strlcat(name, fileset_name, MAXPATHLEN);
1517 
1518 		if (avd_get_bool(flowop->fo_dsync))
1519 			open_attrs |= O_SYNC;
1520 
1521 #ifdef HAVE_O_DIRECT
1522 		if (flowoplib_fileattrs(flowop) & FLOW_ATTR_DIRECTIO)
1523 			open_attrs |= O_DIRECT;
1524 #endif /* HAVE_O_DIRECT */
1525 
1526 		filebench_log(LOG_DEBUG_SCRIPT,
1527 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
1528 
1529 		if (FB_OPEN(&(threadflow->tf_fd[fd]), name,
1530 		    openflag | open_attrs, 0666) == FILEBENCH_ERROR) {
1531 			filebench_log(LOG_ERROR,
1532 			    "Failed to open raw device %s: %s",
1533 			    name, strerror(errno));
1534 			return (FILEBENCH_ERROR);
1535 		}
1536 
1537 #ifdef HAVE_DIRECTIO
1538 		if (flowoplib_fileattrs(flowop) & FLOW_ATTR_DIRECTIO)
1539 			(void)directio(threadflow->tf_fd[fd].fd_num, DIRECTIO_ON);
1540 #endif /* HAVE_DIRECTIO */
1541 
1542 #ifdef HAVE_NOCACHE_FCNTL
1543 		if (flowoplib_fileattrs(flowop) & FLOW_ATTR_DIRECTIO)
1544 			(void)fcntl(threadflow->tf_fd[fd].fd_num, F_NOCACHE, 1);
1545 #endif /* HAVE_NOCACHE_FCNTL */
1546 
1547 		/* Disable read ahead with the help of fadvise, if asked for */
1548 		if (flowoplib_fileattrs(flowop) & FLOW_ATTR_FADV_RANDOM) {
1549 #ifdef HAVE_FADVISE
1550 			if (posix_fadvise(threadflow->tf_fd[fd].fd_num, 0, 0, POSIX_FADV_RANDOM)
1551 				!= FILEBENCH_OK) {
1552 				filebench_log(LOG_ERROR,
1553 					"Failed to disable read ahead for raw device %s, with status %s",
1554 				    	name, strerror(errno));
1555 				return (FILEBENCH_ERROR);
1556 			}
1557 			filebench_log(LOG_INFO, "** Read ahead disabled ** ");
1558 #else
1559 		filebench_log(LOG_INFO, "** Read ahead was NOT disabled: not supported on this platform! **");
1560 #endif
1561 		}
1562 
1563 		threadflow->tf_fse[fd] = NULL;
1564 
1565 		return (FILEBENCH_OK);
1566 	}
1567 
1568 	if ((err = flowoplib_pickfile(&file, flowop,
1569 	    FILESET_PICKEXISTS, tid)) != FILEBENCH_OK) {
1570 		filebench_log(LOG_DEBUG_SCRIPT,
1571 		    "flowop %s failed to pick file from %s on fd %d",
1572 		    flowop->fo_name, fileset_name, fd);
1573 		return (err);
1574 	}
1575 
1576 	threadflow->tf_fse[fd] = file;
1577 
1578 	flowop_beginop(threadflow, flowop);
1579 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1580 	    file, openflag, 0666, flowoplib_fileattrs(flowop));
1581 	flowop_endop(threadflow, flowop, 0);
1582 
1583 	if (err == FILEBENCH_ERROR) {
1584 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1585 		    flowop->fo_name, file->fse_path);
1586 		return (FILEBENCH_ERROR);
1587 	}
1588 
1589 	filebench_log(LOG_DEBUG_SCRIPT,
1590 	    "flowop %s: opened %s fd[%d] = %d",
1591 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1592 
1593 	return (FILEBENCH_OK);
1594 }
1595 
1596 /*
1597  * Emulate create of a file. Uses the flowoplib_fdnum to select
1598  * tf_fd and tf_fse array locations to put the created file's file
1599  * descriptor and filesetentry respectively. Uses flowoplib_pickfile()
1600  * to select a specific filesetentry whose file does not currently
1601  * exist for the file create operation. Then calls
1602  * fileset_openfile() with the O_CREATE flag set to create the
1603  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1604  * already in use, the flowop has no associated fileset, or
1605  * the create call fails. Returns 1 if a filesetentry with a
1606  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1607  */
1608 static int
1609 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1610 {
1611 	filesetentry_t *file;
1612 	int openflag = O_CREAT;
1613 	int fd;
1614 	int err;
1615 
1616 	fd = flowoplib_fdnum(threadflow, flowop);
1617 
1618 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1619 		filebench_log(LOG_ERROR,
1620 		    "flowop %s attempted to create without closing on fd %d",
1621 		    flowop->fo_name, fd);
1622 		return (FILEBENCH_ERROR);
1623 	}
1624 
1625 	if (flowop->fo_fileset == NULL) {
1626 		filebench_log(LOG_ERROR, "flowop NULL file");
1627 		return (FILEBENCH_ERROR);
1628 	}
1629 
1630 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE)
1631 		openflag |= O_RDONLY;
1632 	else if (avd_get_bool(flowop->fo_fileset->fs_writeonly) == TRUE)
1633 		openflag |= O_WRONLY;
1634 	else
1635 		openflag |= O_RDWR;
1636 
1637 	/* can't be used with raw devices */
1638 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1639 		filebench_log(LOG_ERROR,
1640 		    "flowop %s attempted to a createfile on RAW device",
1641 		    flowop->fo_name);
1642 		return (FILEBENCH_ERROR);
1643 	}
1644 
1645 	if ((err = flowoplib_pickfile(&file, flowop,
1646 	    FILESET_PICKNOEXIST, 0)) != FILEBENCH_OK) {
1647 		filebench_log(LOG_DEBUG_SCRIPT,
1648 		    "flowop %s failed to pick file from fileset %s",
1649 		    flowop->fo_name,
1650 		    avd_get_str(flowop->fo_fileset->fs_name));
1651 		return (err);
1652 	}
1653 
1654 	threadflow->tf_fse[fd] = file;
1655 
1656 	flowop_beginop(threadflow, flowop);
1657 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1658 		file, openflag, 0666, flowoplib_fileattrs(flowop));
1659 	flowop_endop(threadflow, flowop, 0);
1660 
1661 	if (err == FILEBENCH_ERROR) {
1662 		filebench_log(LOG_ERROR, "failed to create file %s",
1663 		    flowop->fo_name);
1664 		return (FILEBENCH_ERROR);
1665 	}
1666 
1667 	filebench_log(LOG_DEBUG_SCRIPT,
1668 	    "flowop %s: created %s fd[%d] = %d",
1669 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1670 
1671 	return (FILEBENCH_OK);
1672 }
1673 
1674 /*
1675  * Emulates delete of a file. If a valid fd is provided, it uses the
1676  * filesetentry stored at that fd location to select the file to be
1677  * deleted, otherwise it picks an arbitrary filesetentry
1678  * whose file exists. It then uses unlink() to delete it and Clears
1679  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1680  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1681  * filesetentry cannot be found, and FILEBENCH_OK on success.
1682  */
1683 static int
1684 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1685 {
1686 	filesetentry_t *file;
1687 	fileset_t *fileset;
1688 	char path[MAXPATHLEN];
1689 	char *pathtmp;
1690 	int fd;
1691 
1692 	fd = flowoplib_fdnum(threadflow, flowop);
1693 
1694 	/* if fd specified, use it to access file */
1695 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1696 
1697 		/* indicate that the file will be deleted */
1698 		threadflow->tf_fse[fd] = NULL;
1699 
1700 		/* if here, we still have a valid file pointer */
1701 		fileset = file->fse_fileset;
1702 	} else {
1703 
1704 		/* Otherwise, pick arbitrary file */
1705 		file = NULL;
1706 		fileset = flowop->fo_fileset;
1707 	}
1708 
1709 
1710 	if (fileset == NULL) {
1711 		filebench_log(LOG_ERROR, "flowop NULL file");
1712 		return (FILEBENCH_ERROR);
1713 	}
1714 
1715 	/* can't be used with raw devices */
1716 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1717 		filebench_log(LOG_ERROR,
1718 		    "flowop %s attempted a deletefile on RAW device",
1719 		    flowop->fo_name);
1720 		return (FILEBENCH_ERROR);
1721 	}
1722 
1723 	if (file == NULL) {
1724 		int err;
1725 
1726 		/* pick arbitrary, existing (allocated) file */
1727 		if ((err = flowoplib_pickfile(&file, flowop,
1728 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
1729 			filebench_log(LOG_DEBUG_SCRIPT,
1730 			    "flowop %s failed to pick file", flowop->fo_name);
1731 			return (err);
1732 		}
1733 	} else {
1734 		/* delete specific file. wait for it to be non-busy */
1735 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1736 		while (file->fse_flags & FSE_BUSY) {
1737 			file->fse_flags |= FSE_THRD_WAITNG;
1738 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1739 			    &fileset->fs_pick_lock);
1740 		}
1741 
1742 		/* File now available, grab it for deletion */
1743 		file->fse_flags |= FSE_BUSY;
1744 		fileset->fs_idle_files--;
1745 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1746 	}
1747 
1748 	/* don't delete if anyone (other than me) has file open */
1749 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
1750 		if (file->fse_open_cnt > 1) {
1751 			filebench_log(LOG_DEBUG_SCRIPT,
1752 			    "flowop %s can't delete file opened by other"
1753 			    " threads at fd = %d", flowop->fo_name, fd);
1754 			fileset_unbusy(file, FALSE, FALSE, 0);
1755 			return (FILEBENCH_OK);
1756 		} else {
1757 			filebench_log(LOG_DEBUG_SCRIPT,
1758 			    "flowop %s deleting still open file at fd = %d",
1759 			    flowop->fo_name, fd);
1760 		}
1761 	} else if (file->fse_open_cnt > 0) {
1762 		filebench_log(LOG_DEBUG_SCRIPT,
1763 		    "flowop %s can't delete file opened by other"
1764 		    " threads at fd = %d, open count = %d",
1765 		    flowop->fo_name, fd, file->fse_open_cnt);
1766 		fileset_unbusy(file, FALSE, FALSE, 0);
1767 		return (FILEBENCH_OK);
1768 	}
1769 
1770 	(void) fb_strlcpy(path, avd_get_str(fileset->fs_path), MAXPATHLEN);
1771 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1772 	(void) fb_strlcat(path, avd_get_str(fileset->fs_name), MAXPATHLEN);
1773 	pathtmp = fileset_resolvepath(file);
1774 	(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
1775 	free(pathtmp);
1776 
1777 	/* delete the selected file */
1778 	flowop_beginop(threadflow, flowop);
1779 	(void) FB_UNLINK(path);
1780 	flowop_endop(threadflow, flowop, 0);
1781 
1782 	/* indicate that it is no longer busy and no longer exists */
1783 	fileset_unbusy(file, TRUE, FALSE, -file->fse_open_cnt);
1784 
1785 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1786 
1787 	return (FILEBENCH_OK);
1788 }
1789 
1790 /*
1791  * Emulates fsync of a file. Obtains the file descriptor index
1792  * from the flowop, obtains the actual file descriptor from
1793  * the threadflow's table, checks to be sure it is still an
1794  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
1795  * if the file no longer is open, FILEBENCH_OK otherwise.
1796  */
1797 static int
1798 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1799 {
1800 	filesetentry_t *file;
1801 	int fd;
1802 
1803 	fd = flowoplib_fdnum(threadflow, flowop);
1804 
1805 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1806 		filebench_log(LOG_ERROR,
1807 		    "flowop %s attempted to fsync a closed fd %d",
1808 		    flowop->fo_name, fd);
1809 		return (FILEBENCH_ERROR);
1810 	}
1811 
1812 	file = threadflow->tf_fse[fd];
1813 
1814 	if ((file == NULL) ||
1815 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1816 		filebench_log(LOG_ERROR,
1817 		    "flowop %s attempted to a fsync a RAW device",
1818 		    flowop->fo_name);
1819 		return (FILEBENCH_ERROR);
1820 	}
1821 
1822 	/* Measure time to fsync */
1823 	flowop_beginop(threadflow, flowop);
1824 	(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1825 	flowop_endop(threadflow, flowop, 0);
1826 
1827 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1828 
1829 	return (FILEBENCH_OK);
1830 }
1831 
1832 /*
1833  * Emulate fsync of an entire fileset. Search through the
1834  * threadflow's file descriptor array, doing fsync() on each
1835  * open file that belongs to the flowop's fileset. Always
1836  * returns FILEBENCH_OK.
1837  */
1838 static int
1839 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1840 {
1841 	int fd;
1842 
1843 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1844 		filesetentry_t *file;
1845 
1846 		/* Match the file set to fsync */
1847 		if ((threadflow->tf_fse[fd] == NULL) ||
1848 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1849 			continue;
1850 
1851 		/* Measure time to fsync */
1852 		flowop_beginop(threadflow, flowop);
1853 		(void) FB_FSYNC(&threadflow->tf_fd[fd]);
1854 		flowop_endop(threadflow, flowop, 0);
1855 
1856 		file = threadflow->tf_fse[fd];
1857 
1858 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1859 		    file->fse_path);
1860 	}
1861 
1862 	return (FILEBENCH_OK);
1863 }
1864 
1865 /*
1866  * Emulate close of a file.  Obtains the file descriptor index
1867  * from the flowop, obtains the actual file descriptor from the
1868  * threadflow's table, checks to be sure it is still an open
1869  * file, then does a close operation on it. Then sets the
1870  * threadflow file descriptor table entry to 0, and the file set
1871  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
1872  * FILEBENCH_OK otherwise.
1873  */
1874 static int
1875 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1876 {
1877 	filesetentry_t *file;
1878 	fileset_t *fileset;
1879 	int fd;
1880 
1881 	fd = flowoplib_fdnum(threadflow, flowop);
1882 
1883 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1884 		filebench_log(LOG_ERROR,
1885 		    "flowop %s attempted to close an already closed fd %d",
1886 		    flowop->fo_name, fd);
1887 		return (FILEBENCH_ERROR);
1888 	}
1889 
1890 	file = threadflow->tf_fse[fd];
1891 	fileset = file->fse_fileset;
1892 
1893 	/* Wait for it to be non-busy */
1894 	(void) ipc_mutex_lock(&fileset->fs_pick_lock);
1895 	while (file->fse_flags & FSE_BUSY) {
1896 		file->fse_flags |= FSE_THRD_WAITNG;
1897 		(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1898 		    &fileset->fs_pick_lock);
1899 	}
1900 
1901 	/* File now available, grab it for closing */
1902 	file->fse_flags |= FSE_BUSY;
1903 
1904 	/* if last open, set declare idle */
1905 	if (file->fse_open_cnt == 1)
1906 		fileset->fs_idle_files--;
1907 
1908 	(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1909 
1910 	/* Measure time to close */
1911 	flowop_beginop(threadflow, flowop);
1912 	(void) FB_CLOSE(&threadflow->tf_fd[fd]);
1913 	flowop_endop(threadflow, flowop, 0);
1914 
1915 	fileset_unbusy(file, FALSE, FALSE, -1);
1916 
1917 	threadflow->tf_fd[fd].fd_ptr = NULL;
1918 
1919 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1920 
1921 	return (FILEBENCH_OK);
1922 }
1923 
1924 /*
1925  * Obtain the full pathname of the directory described by the filesetentry
1926  * indicated by "dir", and copy it into the character array pointed to by
1927  * path. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
1928  */
1929 static int
1930 flowoplib_getdirpath(filesetentry_t *dir, char *path)
1931 {
1932 	char		*fileset_path;
1933 	char		*fileset_name;
1934 	char		*part_path;
1935 
1936 	if ((fileset_path = avd_get_str(dir->fse_fileset->fs_path)) == NULL) {
1937 		filebench_log(LOG_ERROR, "Fileset path not set");
1938 		return (FILEBENCH_ERROR);
1939 	}
1940 
1941 	if ((fileset_name = avd_get_str(dir->fse_fileset->fs_name)) == NULL) {
1942 		filebench_log(LOG_ERROR, "Fileset name not set");
1943 		return (FILEBENCH_ERROR);
1944 	}
1945 
1946 	(void) fb_strlcpy(path, fileset_path, MAXPATHLEN);
1947 	(void) fb_strlcat(path, "/", MAXPATHLEN);
1948 	(void) fb_strlcat(path, fileset_name, MAXPATHLEN);
1949 
1950 	if ((part_path = fileset_resolvepath(dir)) == NULL)
1951 		return (FILEBENCH_ERROR);
1952 
1953 	(void) fb_strlcat(path, part_path, MAXPATHLEN);
1954 	free(part_path);
1955 
1956 	return (FILEBENCH_OK);
1957 }
1958 
1959 /*
1960  * Use mkdir to create a directory.  Obtains the fileset name from the
1961  * flowop, selects a non-existent leaf directory and obtains its full
1962  * path, then uses mkdir to create it on the storage subsystem (make it
1963  * existent). Returns FILEBENCH_NORSC is there are no more non-existent
1964  * directories in the fileset, FILEBENCH_ERROR on other errors, and
1965  * FILEBENCH_OK on success.
1966  */
1967 static int
1968 flowoplib_makedir(threadflow_t *threadflow, flowop_t *flowop)
1969 {
1970 	filesetentry_t	*dir;
1971 	int		ret;
1972 	char		full_path[MAXPATHLEN];
1973 
1974 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
1975 	    FILESET_PICKNOEXIST)) != FILEBENCH_OK)
1976 		return (ret);
1977 
1978 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
1979 		return (ret);
1980 
1981 	flowop_beginop(threadflow, flowop);
1982 	(void) FB_MKDIR(full_path, 0755);
1983 	flowop_endop(threadflow, flowop, 0);
1984 
1985 	/* indicate that it is no longer busy and now exists */
1986 	fileset_unbusy(dir, TRUE, TRUE, 0);
1987 
1988 	return (FILEBENCH_OK);
1989 }
1990 
1991 /*
1992  * Use rmdir to delete a directory.  Obtains the fileset name from the
1993  * flowop, selects an existent leaf directory and obtains its full path,
1994  * then uses rmdir to remove it from the storage subsystem (make it
1995  * non-existent). Returns FILEBENCH_NORSC is there are no more existent
1996  * directories in the fileset, FILEBENCH_ERROR on other errors, and
1997  * FILEBENCH_OK on success.
1998  */
1999 static int
2000 flowoplib_removedir(threadflow_t *threadflow, flowop_t *flowop)
2001 {
2002 	filesetentry_t *dir;
2003 	int		ret;
2004 	char		full_path[MAXPATHLEN];
2005 
2006 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
2007 	    FILESET_PICKEXISTS)) != FILEBENCH_OK)
2008 		return (ret);
2009 
2010 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2011 		return (ret);
2012 
2013 	flowop_beginop(threadflow, flowop);
2014 	(void) FB_RMDIR(full_path);
2015 	flowop_endop(threadflow, flowop, 0);
2016 
2017 	/* indicate that it is no longer busy and no longer exists */
2018 	fileset_unbusy(dir, TRUE, FALSE, 0);
2019 
2020 	return (FILEBENCH_OK);
2021 }
2022 
2023 /*
2024  * Use opendir(), multiple readdir() calls, and closedir() to list the
2025  * contents of a directory.  Obtains the fileset name from the
2026  * flowop, selects a normal subdirectory (which always exist) and obtains
2027  * its full path, then uses opendir() to get a DIR handle to it from the
2028  * file system, a readdir() loop to access each directory entry, and
2029  * finally cleans up with a closedir(). The latency reported is the total
2030  * for all this activity, and it also reports the total number of bytes
2031  * in the entries as the amount "read". Returns FILEBENCH_ERROR on errors,
2032  * and FILEBENCH_OK on success.
2033  */
2034 static int
2035 flowoplib_listdir(threadflow_t *threadflow, flowop_t *flowop)
2036 {
2037 	fileset_t	*fileset;
2038 	filesetentry_t	*dir;
2039 	DIR		*dir_handle;
2040 	struct dirent	*direntp;
2041 	int		dir_bytes = 0;
2042 	int		ret;
2043 	char		full_path[MAXPATHLEN];
2044 
2045 	if ((fileset = flowop->fo_fileset) == NULL) {
2046 		filebench_log(LOG_ERROR, "flowop NO fileset");
2047 		return (FILEBENCH_ERROR);
2048 	}
2049 
2050 	if ((dir = fileset_pick(fileset, FILESET_PICKDIR, 0, 0)) == NULL) {
2051 		filebench_log(LOG_DEBUG_SCRIPT,
2052 		    "flowop %s failed to pick directory from fileset %s",
2053 		    flowop->fo_name,
2054 		    avd_get_str(fileset->fs_name));
2055 		return (FILEBENCH_ERROR);
2056 	}
2057 
2058 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2059 		return (ret);
2060 
2061 	flowop_beginop(threadflow, flowop);
2062 
2063 	/* open the directory */
2064 	if ((dir_handle = FB_OPENDIR(full_path)) == NULL) {
2065 		filebench_log(LOG_ERROR,
2066 		    "flowop %s failed to open directory in fileset %s\n",
2067 		    flowop->fo_name, avd_get_str(fileset->fs_name));
2068 		return (FILEBENCH_ERROR);
2069 	}
2070 
2071 	/* read through the directory entries */
2072 	while ((direntp = FB_READDIR(dir_handle)) != NULL) {
2073 		dir_bytes += (strlen(direntp->d_name) +
2074 		    sizeof (struct dirent) - 1);
2075 	}
2076 
2077 	/* close the directory */
2078 	(void) FB_CLOSEDIR(dir_handle);
2079 
2080 	flowop_endop(threadflow, flowop, dir_bytes);
2081 
2082 	/* indicate that it is no longer busy */
2083 	fileset_unbusy(dir, FALSE, FALSE, 0);
2084 
2085 	return (FILEBENCH_OK);
2086 }
2087 
2088 /*
2089  * Emulate stat of a file. Picks an arbitrary filesetentry with
2090  * an existing file from the flowop's fileset, then performs a
2091  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2092  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2093  * cannot be found, and FILEBENCH_OK on success.
2094  */
2095 static int
2096 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2097 {
2098 	filesetentry_t *file;
2099 	fileset_t *fileset;
2100 	struct stat64 statbuf;
2101 	int fd;
2102 
2103 	fd = flowoplib_fdnum(threadflow, flowop);
2104 
2105 	/* if fd specified and the file is open, use it to access file */
2106 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
2107 
2108 		/* check whether file handle still valid */
2109 		if ((file = threadflow->tf_fse[fd]) == NULL) {
2110 			filebench_log(LOG_DEBUG_SCRIPT,
2111 			    "flowop %s trying to stat NULL file at fd = %d",
2112 			    flowop->fo_name, fd);
2113 			return (FILEBENCH_ERROR);
2114 		}
2115 
2116 		/* if here, we still have a valid file pointer */
2117 		fileset = file->fse_fileset;
2118 	} else {
2119 		/* Otherwise, pick arbitrary file */
2120 		file = NULL;
2121 		fileset = flowop->fo_fileset;
2122 	}
2123 
2124 	if (fileset == NULL) {
2125 		filebench_log(LOG_ERROR,
2126 		    "statfile with no fileset specified");
2127 		return (FILEBENCH_ERROR);
2128 	}
2129 
2130 	/* can't be used with raw devices */
2131 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2132 		filebench_log(LOG_ERROR,
2133 		    "flowop %s attempted do a statfile on a RAW device",
2134 		    flowop->fo_name);
2135 		return (FILEBENCH_ERROR);
2136 	}
2137 
2138 	if (file == NULL) {
2139 		char path[MAXPATHLEN];
2140 		char *pathtmp;
2141 		int err;
2142 
2143 		/* pick arbitrary, existing (allocated) file */
2144 		if ((err = flowoplib_pickfile(&file, flowop,
2145 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
2146 			filebench_log(LOG_DEBUG_SCRIPT,
2147 			    "Statfile flowop %s failed to pick file",
2148 			    flowop->fo_name);
2149 			return (err);
2150 		}
2151 
2152 		/* resolve path and do a stat on file */
2153 		(void) fb_strlcpy(path, avd_get_str(fileset->fs_path),
2154 		    MAXPATHLEN);
2155 		(void) fb_strlcat(path, "/", MAXPATHLEN);
2156 		(void) fb_strlcat(path, avd_get_str(fileset->fs_name),
2157 		    MAXPATHLEN);
2158 		pathtmp = fileset_resolvepath(file);
2159 		(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
2160 		free(pathtmp);
2161 
2162 		/* stat the file */
2163 		flowop_beginop(threadflow, flowop);
2164 		if (FB_STAT(path, &statbuf) == -1)
2165 			filebench_log(LOG_ERROR,
2166 			    "statfile flowop %s failed", flowop->fo_name);
2167 		flowop_endop(threadflow, flowop, 0);
2168 
2169 		fileset_unbusy(file, FALSE, FALSE, 0);
2170 	} else {
2171 		/* stat specific file */
2172 		flowop_beginop(threadflow, flowop);
2173 		if (FB_FSTAT(&threadflow->tf_fd[fd], &statbuf) == -1)
2174 			filebench_log(LOG_ERROR,
2175 			    "statfile flowop %s failed", flowop->fo_name);
2176 		flowop_endop(threadflow, flowop, 0);
2177 
2178 	}
2179 
2180 	return (FILEBENCH_OK);
2181 }
2182 
2183 
2184 /*
2185  * Additional reads and writes. Read and write whole files, write
2186  * and append to files. Some of these work with both fileobjs and
2187  * filesets, others only with filesets. The flowoplib_write routine
2188  * writes from thread memory, while the others read or write using
2189  * fo_buf memory. Note that both flowoplib_read() and
2190  * flowoplib_aiowrite() use thread memory as well.
2191  */
2192 
2193 
2194 /*
2195  * Emulate a read of a whole file. The file must be open with
2196  * file descriptor and filesetentry stored at the locations indexed
2197  * by the flowop's fdnumber. It then seeks to the beginning of the
2198  * associated file, and reads fs_iosize bytes at a time until the end
2199  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2200  * out of files, and FILEBENCH_OK on success.
2201  */
2202 static int
2203 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2204 {
2205 	caddr_t iobuf;
2206 	off64_t bytes = 0;
2207 	fb_fdesc_t *fdesc;
2208 	uint64_t wss;
2209 	fbint_t iosize;
2210 	int ret;
2211 	char zerordbuf;
2212 
2213 	/* get the file to use */
2214 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2215 	    &fdesc)) != FILEBENCH_OK)
2216 		return (ret);
2217 
2218 	/* an I/O size of zero means read entire working set with one I/O */
2219 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2220 		iosize = wss;
2221 
2222 	/*
2223 	 * The file may actually be 0 bytes long, in which case skip
2224 	 * the buffer set up call (which would fail) and substitute
2225 	 * a small buffer, which won't really be used.
2226 	 */
2227 	if (iosize == 0) {
2228 		iobuf = (caddr_t)&zerordbuf;
2229 		filebench_log(LOG_DEBUG_SCRIPT,
2230 		    "flowop %s read zero length file", flowop->fo_name);
2231 	} else {
2232 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2233 		    iosize) != 0)
2234 			return (FILEBENCH_ERROR);
2235 	}
2236 
2237 	/* Measure time to read bytes */
2238 	flowop_beginop(threadflow, flowop);
2239 	(void) FB_LSEEK(fdesc, 0, SEEK_SET);
2240 	while ((ret = FB_READ(fdesc, iobuf, iosize)) > 0)
2241 		bytes += ret;
2242 
2243 	flowop_endop(threadflow, flowop, bytes);
2244 
2245 	if (ret < 0) {
2246 		filebench_log(LOG_ERROR,
2247 		    "readwhole fail Failed to read whole file: %s",
2248 		    strerror(errno));
2249 		return (FILEBENCH_ERROR);
2250 	}
2251 
2252 	return (FILEBENCH_OK);
2253 }
2254 
2255 /*
2256  * Emulate a write to a file of size fo_iosize.  Will write
2257  * to a file from a fileset if the flowop's fo_fileset field
2258  * specifies one or its fdnumber is non zero. Otherwise it
2259  * will write to a fileobj file, if one exists. If the file
2260  * is not currently open, the routine will attempt to open
2261  * it. The flowop's fo_wss parameter will be used to set the
2262  * maximum file size if it is non-zero, otherwise the
2263  * filesetentry's  fse_size will be used. A random memory
2264  * buffer offset is calculated, and, if fo_random is TRUE,
2265  * a random file offset is used for the write. Otherwise the
2266  * write is to the next sequential location. Returns
2267  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2268  * obtain a file, or FILEBENCH_OK on success.
2269  */
2270 static int
2271 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2272 {
2273 	caddr_t iobuf;
2274 	fbint_t wss;
2275 	fbint_t iosize;
2276 	fb_fdesc_t *fdesc;
2277 	int ret;
2278 
2279 	iosize = avd_get_int(flowop->fo_iosize);
2280 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2281 	    &fdesc, iosize)) != FILEBENCH_OK)
2282 		return (ret);
2283 
2284 	if (avd_get_bool(flowop->fo_random)) {
2285 		uint64_t fileoffset;
2286 
2287 		if (wss < iosize) {
2288 			filebench_log(LOG_ERROR,
2289 			    "file size smaller than IO size for thread %s",
2290 			    flowop->fo_name);
2291 			return (FILEBENCH_ERROR);
2292 		}
2293 
2294 		/* select randomly */
2295 		fb_random64(&fileoffset, wss, iosize, NULL);
2296 
2297 		flowop_beginop(threadflow, flowop);
2298 		if (FB_PWRITE(fdesc, iobuf,
2299 		    iosize, (off64_t)fileoffset) == -1) {
2300 			filebench_log(LOG_ERROR, "write failed, "
2301 			    "offset %llu io buffer %zd: %s",
2302 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
2303 			flowop_endop(threadflow, flowop, 0);
2304 			return (FILEBENCH_ERROR);
2305 		}
2306 		flowop_endop(threadflow, flowop, iosize);
2307 	} else {
2308 		flowop_beginop(threadflow, flowop);
2309 		if (FB_WRITE(fdesc, iobuf, iosize) == -1) {
2310 			filebench_log(LOG_ERROR,
2311 			    "write failed, io buffer %zd: %s",
2312 			    iobuf, strerror(errno));
2313 			flowop_endop(threadflow, flowop, 0);
2314 			return (FILEBENCH_ERROR);
2315 		}
2316 		flowop_endop(threadflow, flowop, iosize);
2317 	}
2318 
2319 	return (FILEBENCH_OK);
2320 }
2321 
2322 /*
2323  * Emulate a write of a whole file.  The size of the file
2324  * is taken from a filesetentry identified by fo_srcfdnumber or
2325  * from the working set size, while the file descriptor used is
2326  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2327  * length length until full file has been written. Returns FILEBENCH_ERROR on
2328  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2329  */
2330 static int
2331 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2332 {
2333 	caddr_t iobuf;
2334 	filesetentry_t *file;
2335 	int wsize;
2336 	off64_t seek;
2337 	off64_t bytes = 0;
2338 	uint64_t wss;
2339 	fbint_t iosize;
2340 	fb_fdesc_t *fdesc;
2341 	int srcfd = flowop->fo_srcfdnumber;
2342 	int ret;
2343 	char zerowrtbuf;
2344 
2345 	/* get the file to use */
2346 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2347 	    &fdesc)) != FILEBENCH_OK)
2348 		return (ret);
2349 
2350 	/* an I/O size of zero means write entire working set with one I/O */
2351 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2352 		iosize = wss;
2353 
2354 	/*
2355 	 * The file may actually be 0 bytes long, in which case skip
2356 	 * the buffer set up call (which would fail) and substitute
2357 	 * a small buffer, which won't really be used.
2358 	 */
2359 	if (iosize == 0) {
2360 		iobuf = (caddr_t)&zerowrtbuf;
2361 		filebench_log(LOG_DEBUG_SCRIPT,
2362 		    "flowop %s wrote zero length file", flowop->fo_name);
2363 	} else {
2364 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2365 		    iosize) != 0)
2366 			return (FILEBENCH_ERROR);
2367 	}
2368 
2369 	file = threadflow->tf_fse[srcfd];
2370 	if ((srcfd != 0) && (file == NULL)) {
2371 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2372 		    flowop->fo_name);
2373 		return (FILEBENCH_ERROR);
2374 	}
2375 
2376 	if (file)
2377 		wss = file->fse_size;
2378 
2379 	wsize = (int)MIN(wss, iosize);
2380 
2381 	/* Measure time to write bytes */
2382 	flowop_beginop(threadflow, flowop);
2383 	for (seek = 0; seek < wss; seek += wsize) {
2384 		ret = FB_WRITE(fdesc, iobuf, wsize);
2385 		if (ret != wsize) {
2386 			filebench_log(LOG_ERROR,
2387 			    "Failed to write %d bytes on fd %d: %s",
2388 			    wsize, fdesc->fd_num, strerror(errno));
2389 			flowop_endop(threadflow, flowop, 0);
2390 			return (FILEBENCH_ERROR);
2391 		}
2392 		wsize = (int)MIN(wss - seek, iosize);
2393 		bytes += ret;
2394 	}
2395 	flowop_endop(threadflow, flowop, bytes);
2396 
2397 	return (FILEBENCH_OK);
2398 }
2399 
2400 
2401 /*
2402  * Emulate a fixed size append to a file. Will append data to
2403  * a file chosen from a fileset if the flowop's fo_fileset
2404  * field specifies one or if its fdnumber is non zero.
2405  * Otherwise it will write to a fileobj file, if one exists.
2406  * The flowop's fo_wss parameter will be used to set the
2407  * maximum file size if it is non-zero, otherwise the
2408  * filesetentry's fse_size will be used. A random memory
2409  * buffer offset is calculated, then a logical seek to the
2410  * end of file is done followed by a write of fo_iosize
2411  * bytes. Writes are actually done from fo_buf, rather than
2412  * tf_mem as is done with flowoplib_write(), and no check
2413  * is made to see if fo_iosize exceeds the size of fo_buf.
2414  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2415  * files in the fileset, FILEBENCH_OK on success.
2416  */
2417 static int
2418 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2419 {
2420 	caddr_t iobuf;
2421 	fb_fdesc_t *fdesc;
2422 	fbint_t wss;
2423 	fbint_t iosize;
2424 	int ret;
2425 
2426 	iosize = avd_get_int(flowop->fo_iosize);
2427 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2428 	    &fdesc, iosize)) != FILEBENCH_OK)
2429 		return (ret);
2430 
2431 	/* XXX wss is not being used */
2432 
2433 	/* Measure time to write bytes */
2434 	flowop_beginop(threadflow, flowop);
2435 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2436 	ret = FB_WRITE(fdesc, iobuf, iosize);
2437 	if (ret != iosize) {
2438 		filebench_log(LOG_ERROR,
2439 		    "Failed to write %llu bytes on fd %d: %s",
2440 		    (u_longlong_t)iosize, fdesc->fd_num, strerror(errno));
2441 		flowop_endop(threadflow, flowop, ret);
2442 		return (FILEBENCH_ERROR);
2443 	}
2444 	flowop_endop(threadflow, flowop, ret);
2445 
2446 	return (FILEBENCH_OK);
2447 }
2448 
2449 /*
2450  * Emulate a random size append to a file. Will append data
2451  * to a file chosen from a fileset if the flowop's fo_fileset
2452  * field specifies one or if its fdnumber is non zero. Otherwise
2453  * it will write to a fileobj file, if one exists. The flowop's
2454  * fo_wss parameter will be used to set the maximum file size
2455  * if it is non-zero, otherwise the filesetentry's fse_size
2456  * will be used.  A random transfer size (but at most fo_iosize
2457  * bytes) and a random memory offset are calculated. A logical
2458  * seek to the end of file is done, then writes of up to
2459  * FILE_ALLOC_BLOCK in size are done until the full transfer
2460  * size has been written. Writes are actually done from fo_buf,
2461  * rather than tf_mem as is done with flowoplib_write().
2462  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2463  * files in the fileset, FILEBENCH_OK on success.
2464  */
2465 static int
2466 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2467 {
2468 	caddr_t iobuf;
2469 	uint64_t appendsize;
2470 	fb_fdesc_t *fdesc;
2471 	fbint_t wss;
2472 	fbint_t iosize;
2473 	int ret = 0;
2474 
2475 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2476 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2477 		    flowop->fo_name);
2478 		return (FILEBENCH_ERROR);
2479 	}
2480 
2481 	fb_random64(&appendsize, iosize, 1LL, NULL);
2482 
2483 	/* skip if attempting zero length append */
2484 	if (appendsize == 0) {
2485 		flowop_beginop(threadflow, flowop);
2486 		flowop_endop(threadflow, flowop, 0LL);
2487 		return (FILEBENCH_OK);
2488 	}
2489 
2490 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2491 	    &fdesc, appendsize)) != FILEBENCH_OK)
2492 		return (ret);
2493 
2494 	/* XXX wss is not being used */
2495 
2496 	/* Measure time to write bytes */
2497 	flowop_beginop(threadflow, flowop);
2498 
2499 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
2500 	ret = FB_WRITE(fdesc, iobuf, appendsize);
2501 	if (ret != appendsize) {
2502 		filebench_log(LOG_ERROR,
2503 		    "Failed to write %llu bytes on fd %d: %s",
2504 		    (u_longlong_t)appendsize, fdesc->fd_num, strerror(errno));
2505 		flowop_endop(threadflow, flowop, 0);
2506 		return (FILEBENCH_ERROR);
2507 	}
2508 
2509 	flowop_endop(threadflow, flowop, appendsize);
2510 
2511 	return (FILEBENCH_OK);
2512 }
2513 
2514 typedef struct testrandvar_priv {
2515 	uint64_t sample_count;
2516 	double val_sum;
2517 	double sqr_sum;
2518 } testrandvar_priv_t;
2519 
2520 /*
2521  * flowop to calculate various statistics from the number stream
2522  * produced by a random variable. This allows verification that the
2523  * random distribution used to define the random variable is producing
2524  * the expected distribution of random numbers.
2525  */
2526 /* ARGSUSED */
2527 static int
2528 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2529 {
2530 	testrandvar_priv_t	*mystats;
2531 	double			value;
2532 
2533 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2534 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2535 		filebench_shutdown(1);
2536 		return (-1);
2537 	}
2538 
2539 	value = avd_get_dbl(flowop->fo_value);
2540 
2541 	mystats->sample_count++;
2542 	mystats->val_sum += value;
2543 	mystats->sqr_sum += (value * value);
2544 
2545 	return (0);
2546 }
2547 
2548 /*
2549  * Initialize the private data area used to accumulate the statistics
2550  */
2551 static int
2552 flowoplib_testrandvar_init(flowop_t *flowop)
2553 {
2554 	testrandvar_priv_t	*mystats;
2555 
2556 	if ((mystats = (testrandvar_priv_t *)
2557 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
2558 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
2559 		filebench_shutdown(1);
2560 		return (-1);
2561 	}
2562 
2563 	mystats->sample_count = 0;
2564 	mystats->val_sum = 0;
2565 	mystats->sqr_sum = 0;
2566 	flowop->fo_private = (void *)mystats;
2567 
2568 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2569 	return (0);
2570 }
2571 
2572 /*
2573  * Print out the accumulated statistics, and free the private storage
2574  */
2575 static void
2576 flowoplib_testrandvar_destruct(flowop_t *flowop)
2577 {
2578 	testrandvar_priv_t	*mystats;
2579 	double mean, std_dev, dbl_count;
2580 
2581 	(void) ipc_mutex_lock(&flowop->fo_lock);
2582 	if ((mystats = (testrandvar_priv_t *)
2583 	    flowop->fo_private) == NULL) {
2584 		(void) ipc_mutex_unlock(&flowop->fo_lock);
2585 		return;
2586 	}
2587 
2588 	flowop->fo_private = NULL;
2589 	(void) ipc_mutex_unlock(&flowop->fo_lock);
2590 
2591 	dbl_count = (double)mystats->sample_count;
2592 	mean = mystats->val_sum / dbl_count;
2593 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2594 
2595 	filebench_log(LOG_VERBOSE,
2596 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2597 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
2598 	free(mystats);
2599 }
2600 
2601 /*
2602  * prints message to the console from within a thread
2603  */
2604 static int
2605 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2606 {
2607 	procflow_t *procflow;
2608 
2609 	procflow = threadflow->tf_process;
2610 	filebench_log(LOG_INFO,
2611 	    "Message from process (%s,%d), thread (%s,%d): %s",
2612 	    procflow->pf_name, procflow->pf_instance,
2613 	    threadflow->tf_name, threadflow->tf_instance,
2614 	    avd_get_str(flowop->fo_value));
2615 
2616 	return (FILEBENCH_OK);
2617 }
2618