1 /*
2  * Rated submission helpers
3  *
4  * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5  *
6  */
7 #include <assert.h>
8 #include "fio.h"
9 #include "ioengines.h"
10 #include "lib/getrusage.h"
11 #include "rate-submit.h"
12 
check_overlap(struct io_u * io_u)13 static void check_overlap(struct io_u *io_u)
14 {
15 	int i, res;
16 	struct thread_data *td;
17 
18 	/*
19 	 * Allow only one thread to check for overlap at a time to prevent two
20 	 * threads from thinking the coast is clear and then submitting IOs
21 	 * that overlap with each other.
22 	 *
23 	 * If an overlap is found, release the lock and re-acquire it before
24 	 * checking again to give other threads a chance to make progress.
25 	 *
26 	 * If no overlap is found, release the lock when the io_u's
27 	 * IO_U_F_FLIGHT flag is set so that this io_u can be checked by other
28 	 * threads as they assess overlap.
29 	 */
30 	res = pthread_mutex_lock(&overlap_check);
31 	assert(res == 0);
32 
33 retry:
34 	for_each_td(td, i) {
35 		if (td->runstate <= TD_SETTING_UP ||
36 		    td->runstate >= TD_FINISHING ||
37 		    !td->o.serialize_overlap ||
38 		    td->o.io_submit_mode != IO_MODE_OFFLOAD)
39 			continue;
40 
41 		if (!in_flight_overlap(&td->io_u_all, io_u))
42 			continue;
43 
44 		res = pthread_mutex_unlock(&overlap_check);
45 		assert(res == 0);
46 		res = pthread_mutex_lock(&overlap_check);
47 		assert(res == 0);
48 		goto retry;
49 	}
50 }
51 
io_workqueue_fn(struct submit_worker * sw,struct workqueue_work * work)52 static int io_workqueue_fn(struct submit_worker *sw,
53 			   struct workqueue_work *work)
54 {
55 	struct io_u *io_u = container_of(work, struct io_u, work);
56 	const enum fio_ddir ddir = io_u->ddir;
57 	struct thread_data *td = sw->priv;
58 	int ret, error;
59 
60 	if (td->o.serialize_overlap)
61 		check_overlap(io_u);
62 
63 	dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
64 
65 	io_u_set(td, io_u, IO_U_F_NO_FILE_PUT);
66 
67 	td->cur_depth++;
68 
69 	do {
70 		ret = td_io_queue(td, io_u);
71 		if (ret != FIO_Q_BUSY)
72 			break;
73 		ret = io_u_queued_complete(td, 1);
74 		if (ret > 0)
75 			td->cur_depth -= ret;
76 		else if (ret < 0)
77 			break;
78 		io_u_clear(td, io_u, IO_U_F_FLIGHT);
79 	} while (1);
80 
81 	dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
82 
83 	error = io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
84 
85 	if (ret == FIO_Q_COMPLETED)
86 		td->cur_depth--;
87 	else if (ret == FIO_Q_QUEUED) {
88 		unsigned int min_evts;
89 
90 		if (td->o.iodepth == 1)
91 			min_evts = 1;
92 		else
93 			min_evts = 0;
94 
95 		ret = io_u_queued_complete(td, min_evts);
96 		if (ret > 0)
97 			td->cur_depth -= ret;
98 	}
99 
100 	if (error || td->error) {
101 		pthread_mutex_lock(&td->io_u_lock);
102 		pthread_cond_signal(&td->parent->free_cond);
103 		pthread_mutex_unlock(&td->io_u_lock);
104 	}
105 
106 	return 0;
107 }
108 
io_workqueue_pre_sleep_flush_fn(struct submit_worker * sw)109 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
110 {
111 	struct thread_data *td = sw->priv;
112 
113 	if (td->error)
114 		return false;
115 	if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
116 		return true;
117 
118 	return false;
119 }
120 
io_workqueue_pre_sleep_fn(struct submit_worker * sw)121 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
122 {
123 	struct thread_data *td = sw->priv;
124 	int ret;
125 
126 	ret = io_u_quiesce(td);
127 	if (ret > 0)
128 		td->cur_depth -= ret;
129 }
130 
io_workqueue_alloc_fn(struct submit_worker * sw)131 static int io_workqueue_alloc_fn(struct submit_worker *sw)
132 {
133 	struct thread_data *td;
134 
135 	td = calloc(1, sizeof(*td));
136 	sw->priv = td;
137 	return 0;
138 }
139 
io_workqueue_free_fn(struct submit_worker * sw)140 static void io_workqueue_free_fn(struct submit_worker *sw)
141 {
142 	free(sw->priv);
143 	sw->priv = NULL;
144 }
145 
io_workqueue_init_worker_fn(struct submit_worker * sw)146 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
147 {
148 	struct thread_data *parent = sw->wq->td;
149 	struct thread_data *td = sw->priv;
150 
151 	memcpy(&td->o, &parent->o, sizeof(td->o));
152 	memcpy(&td->ts, &parent->ts, sizeof(td->ts));
153 	td->o.uid = td->o.gid = -1U;
154 	dup_files(td, parent);
155 	td->eo = parent->eo;
156 	fio_options_mem_dupe(td);
157 
158 	if (ioengine_load(td))
159 		goto err;
160 
161 	td->pid = gettid();
162 
163 	INIT_FLIST_HEAD(&td->io_log_list);
164 	INIT_FLIST_HEAD(&td->io_hist_list);
165 	INIT_FLIST_HEAD(&td->verify_list);
166 	INIT_FLIST_HEAD(&td->trim_list);
167 	td->io_hist_tree = RB_ROOT;
168 
169 	td->o.iodepth = 1;
170 	if (td_io_init(td))
171 		goto err_io_init;
172 
173 	if (td->io_ops->post_init && td->io_ops->post_init(td))
174 		goto err_io_init;
175 
176 	set_epoch_time(td, td->o.log_unix_epoch);
177 	fio_getrusage(&td->ru_start);
178 	clear_io_state(td, 1);
179 
180 	td_set_runstate(td, TD_RUNNING);
181 	td->flags |= TD_F_CHILD | TD_F_NEED_LOCK;
182 	td->parent = parent;
183 	return 0;
184 
185 err_io_init:
186 	close_ioengine(td);
187 err:
188 	return 1;
189 
190 }
191 
io_workqueue_exit_worker_fn(struct submit_worker * sw,unsigned int * sum_cnt)192 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
193 					unsigned int *sum_cnt)
194 {
195 	struct thread_data *td = sw->priv;
196 
197 	(*sum_cnt)++;
198 	sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
199 
200 	fio_options_free(td);
201 	close_and_free_files(td);
202 	if (td->io_ops)
203 		close_ioengine(td);
204 	td_set_runstate(td, TD_EXITED);
205 }
206 
207 #ifdef CONFIG_SFAA
sum_val(uint64_t * dst,uint64_t * src)208 static void sum_val(uint64_t *dst, uint64_t *src)
209 {
210 	if (*src) {
211 		__sync_fetch_and_add(dst, *src);
212 		*src = 0;
213 	}
214 }
215 #else
sum_val(uint64_t * dst,uint64_t * src)216 static void sum_val(uint64_t *dst, uint64_t *src)
217 {
218 	if (*src) {
219 		*dst += *src;
220 		*src = 0;
221 	}
222 }
223 #endif
224 
pthread_double_unlock(pthread_mutex_t * lock1,pthread_mutex_t * lock2)225 static void pthread_double_unlock(pthread_mutex_t *lock1,
226 				  pthread_mutex_t *lock2)
227 {
228 #ifndef CONFIG_SFAA
229 	pthread_mutex_unlock(lock1);
230 	pthread_mutex_unlock(lock2);
231 #endif
232 }
233 
pthread_double_lock(pthread_mutex_t * lock1,pthread_mutex_t * lock2)234 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
235 {
236 #ifndef CONFIG_SFAA
237 	if (lock1 < lock2) {
238 		pthread_mutex_lock(lock1);
239 		pthread_mutex_lock(lock2);
240 	} else {
241 		pthread_mutex_lock(lock2);
242 		pthread_mutex_lock(lock1);
243 	}
244 #endif
245 }
246 
sum_ddir(struct thread_data * dst,struct thread_data * src,enum fio_ddir ddir)247 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
248 		     enum fio_ddir ddir)
249 {
250 	pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
251 
252 	sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
253 	sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
254 	sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
255 	sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
256 	sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
257 
258 	pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
259 }
260 
io_workqueue_update_acct_fn(struct submit_worker * sw)261 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
262 {
263 	struct thread_data *src = sw->priv;
264 	struct thread_data *dst = sw->wq->td;
265 
266 	if (td_read(src))
267 		sum_ddir(dst, src, DDIR_READ);
268 	if (td_write(src))
269 		sum_ddir(dst, src, DDIR_WRITE);
270 	if (td_trim(src))
271 		sum_ddir(dst, src, DDIR_TRIM);
272 
273 }
274 
275 static struct workqueue_ops rated_wq_ops = {
276 	.fn			= io_workqueue_fn,
277 	.pre_sleep_flush_fn	= io_workqueue_pre_sleep_flush_fn,
278 	.pre_sleep_fn		= io_workqueue_pre_sleep_fn,
279 	.update_acct_fn		= io_workqueue_update_acct_fn,
280 	.alloc_worker_fn	= io_workqueue_alloc_fn,
281 	.free_worker_fn		= io_workqueue_free_fn,
282 	.init_worker_fn		= io_workqueue_init_worker_fn,
283 	.exit_worker_fn		= io_workqueue_exit_worker_fn,
284 };
285 
rate_submit_init(struct thread_data * td,struct sk_out * sk_out)286 int rate_submit_init(struct thread_data *td, struct sk_out *sk_out)
287 {
288 	if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
289 		return 0;
290 
291 	return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out);
292 }
293 
rate_submit_exit(struct thread_data * td)294 void rate_submit_exit(struct thread_data *td)
295 {
296 	if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
297 		return;
298 
299 	workqueue_exit(&td->io_wq);
300 }
301