1 /* Copyright (C) 2018-2020 Free Software Foundation, Inc.
2 Contributed by Nicolas Koenig
3
4 This file is part of the GNU Fortran runtime library (libgfortran).
5
6 Libgfortran is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
9 any later version.
10
11 Libgfortran is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 Under Section 7 of GPL version 3, you are granted additional
17 permissions described in the GCC Runtime Library Exception, version
18 3.1, as published by the Free Software Foundation.
19
20 You should have received a copy of the GNU General Public License and
21 a copy of the GCC Runtime Library Exception along with this program;
22 see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
23 <http://www.gnu.org/licenses/>. */
24
25 #include "libgfortran.h"
26
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
29 #include "io.h"
30 #include "fbuf.h"
31 #include "format.h"
32 #include "unix.h"
33 #include <string.h>
34 #include <assert.h>
35
36 #include <sys/types.h>
37
38 #include "async.h"
39 #if ASYNC_IO
40
41 DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
42
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
44 DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
45
46 /* Current unit for asynchronous I/O. Needed for error reporting. */
47
48 __thread gfc_unit *thread_unit = NULL;
49
50 /* Queue entry for the asynchronous I/O entry. */
51 typedef struct transfer_queue
52 {
53 enum aio_do type;
54 struct transfer_queue *next;
55 struct st_parameter_dt *new_pdt;
56 transfer_args arg;
57 _Bool has_id;
58 int read_flag;
59 } transfer_queue;
60
61 struct error {
62 st_parameter_dt *dtp;
63 int id;
64 };
65
66 /* Helper function to exchange the old vs. a new PDT. */
67
68 static void
update_pdt(st_parameter_dt ** old,st_parameter_dt * new)69 update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
70 st_parameter_dt *temp;
71 NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
72 temp = *old;
73 *old = new;
74 if (temp)
75 free (temp);
76 }
77
78 /* Destroy an adv_cond structure. */
79
80 static void
destroy_adv_cond(struct adv_cond * ac)81 destroy_adv_cond (struct adv_cond *ac)
82 {
83 T_ERROR (__gthread_cond_destroy, &ac->signal);
84 }
85
86 /* Function invoked as start routine for a new asynchronous I/O unit.
87 Contains the main loop for accepting requests and handling them. */
88
89 static void *
async_io(void * arg)90 async_io (void *arg)
91 {
92 DEBUG_LINE (aio_prefix = TPREFIX);
93 transfer_queue *ctq = NULL, *prev = NULL;
94 gfc_unit *u = (gfc_unit *) arg;
95 async_unit *au = u->au;
96 LOCK (&au->lock);
97 thread_unit = u;
98 au->thread = __gthread_self ();
99 while (true)
100 {
101 /* Main loop. At this point, au->lock is always held. */
102 WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
103 LOCK (&au->lock);
104 ctq = au->head;
105 prev = NULL;
106 /* Loop over the queue entries until they are finished. */
107 while (ctq)
108 {
109 if (prev)
110 free (prev);
111 prev = ctq;
112 if (!au->error.has_error)
113 {
114 UNLOCK (&au->lock);
115
116 switch (ctq->type)
117 {
118 case AIO_WRITE_DONE:
119 NOTE ("Finalizing write");
120 st_write_done_worker (au->pdt);
121 UNLOCK (&au->io_lock);
122 break;
123
124 case AIO_READ_DONE:
125 NOTE ("Finalizing read");
126 st_read_done_worker (au->pdt);
127 UNLOCK (&au->io_lock);
128 break;
129
130 case AIO_DATA_TRANSFER_INIT:
131 NOTE ("Data transfer init");
132 LOCK (&au->io_lock);
133 update_pdt (&au->pdt, ctq->new_pdt);
134 data_transfer_init_worker (au->pdt, ctq->read_flag);
135 break;
136
137 case AIO_TRANSFER_SCALAR:
138 NOTE ("Starting scalar transfer");
139 ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
140 ctq->arg.scalar.data,
141 ctq->arg.scalar.i,
142 ctq->arg.scalar.s1,
143 ctq->arg.scalar.s2);
144 break;
145
146 case AIO_TRANSFER_ARRAY:
147 NOTE ("Starting array transfer");
148 NOTE ("ctq->arg.array.desc = %p",
149 (void *) (ctq->arg.array.desc));
150 transfer_array_inner (au->pdt, ctq->arg.array.desc,
151 ctq->arg.array.kind,
152 ctq->arg.array.charlen);
153 free (ctq->arg.array.desc);
154 break;
155
156 case AIO_CLOSE:
157 NOTE ("Received AIO_CLOSE");
158 LOCK (&au->lock);
159 goto finish_thread;
160
161 default:
162 internal_error (NULL, "Invalid queue type");
163 break;
164 }
165 LOCK (&au->lock);
166 if (unlikely (au->error.has_error))
167 au->error.last_good_id = au->id.low - 1;
168 }
169 else
170 {
171 if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
172 {
173 UNLOCK (&au->io_lock);
174 }
175 else if (ctq->type == AIO_CLOSE)
176 {
177 NOTE ("Received AIO_CLOSE during error condition");
178 goto finish_thread;
179 }
180 }
181
182 NOTE ("Next ctq, current id: %d", au->id.low);
183 if (ctq->has_id && au->id.waiting == au->id.low++)
184 SIGNAL (&au->id.done);
185
186 ctq = ctq->next;
187 }
188 au->tail = NULL;
189 au->head = NULL;
190 au->empty = 1;
191 SIGNAL (&au->emptysignal);
192 }
193 finish_thread:
194 au->tail = NULL;
195 au->head = NULL;
196 au->empty = 1;
197 SIGNAL (&au->emptysignal);
198 free (ctq);
199 UNLOCK (&au->lock);
200 return NULL;
201 }
202
203 /* Free an asynchronous unit. */
204
205 static void
free_async_unit(async_unit * au)206 free_async_unit (async_unit *au)
207 {
208 if (au->tail)
209 internal_error (NULL, "Trying to free nonempty asynchronous unit");
210
211 destroy_adv_cond (&au->work);
212 destroy_adv_cond (&au->emptysignal);
213 destroy_adv_cond (&au->id.done);
214 T_ERROR (__gthread_mutex_destroy, &au->lock);
215 free (au);
216 }
217
218 /* Initialize an adv_cond structure. */
219
220 static void
init_adv_cond(struct adv_cond * ac)221 init_adv_cond (struct adv_cond *ac)
222 {
223 ac->pending = 0;
224 __GTHREAD_COND_INIT_FUNCTION (&ac->signal);
225 }
226
227 /* Initialize an asyncronous unit, returning zero on success,
228 nonzero on failure. It also sets u->au. */
229
230 void
init_async_unit(gfc_unit * u)231 init_async_unit (gfc_unit *u)
232 {
233 async_unit *au;
234 if (!__gthread_active_p ())
235 {
236 u->au = NULL;
237 return;
238 }
239
240 au = (async_unit *) xmalloc (sizeof (async_unit));
241 u->au = au;
242 init_adv_cond (&au->work);
243 init_adv_cond (&au->emptysignal);
244 __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
245 __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
246 LOCK (&au->lock);
247 T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
248 au->pdt = NULL;
249 au->head = NULL;
250 au->tail = NULL;
251 au->empty = true;
252 au->id.waiting = -1;
253 au->id.low = 0;
254 au->id.high = 0;
255 au->error.fatal_error = 0;
256 au->error.has_error = 0;
257 au->error.last_good_id = 0;
258 init_adv_cond (&au->id.done);
259 UNLOCK (&au->lock);
260 }
261
262 /* Enqueue a transfer statement. */
263
264 void
enqueue_transfer(async_unit * au,transfer_args * arg,enum aio_do type)265 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
266 {
267 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
268 tq->arg = *arg;
269 tq->type = type;
270 tq->has_id = 0;
271 LOCK (&au->lock);
272 if (!au->tail)
273 au->head = tq;
274 else
275 au->tail->next = tq;
276 au->tail = tq;
277 REVOKE_SIGNAL (&(au->emptysignal));
278 au->empty = false;
279 SIGNAL (&au->work);
280 UNLOCK (&au->lock);
281 }
282
283 /* Enqueue an st_write_done or st_read_done which contains an ID. */
284
285 int
enqueue_done_id(async_unit * au,enum aio_do type)286 enqueue_done_id (async_unit *au, enum aio_do type)
287 {
288 int ret;
289 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
290
291 tq->type = type;
292 tq->has_id = 1;
293 LOCK (&au->lock);
294 if (!au->tail)
295 au->head = tq;
296 else
297 au->tail->next = tq;
298 au->tail = tq;
299 REVOKE_SIGNAL (&(au->emptysignal));
300 au->empty = false;
301 ret = au->id.high++;
302 NOTE ("Enqueue id: %d", ret);
303 SIGNAL (&au->work);
304 UNLOCK (&au->lock);
305 return ret;
306 }
307
308 /* Enqueue an st_write_done or st_read_done without an ID. */
309
310 void
enqueue_done(async_unit * au,enum aio_do type)311 enqueue_done (async_unit *au, enum aio_do type)
312 {
313 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
314 tq->type = type;
315 tq->has_id = 0;
316 LOCK (&au->lock);
317 if (!au->tail)
318 au->head = tq;
319 else
320 au->tail->next = tq;
321 au->tail = tq;
322 REVOKE_SIGNAL (&(au->emptysignal));
323 au->empty = false;
324 SIGNAL (&au->work);
325 UNLOCK (&au->lock);
326 }
327
328 /* Enqueue a CLOSE statement. */
329
330 void
enqueue_close(async_unit * au)331 enqueue_close (async_unit *au)
332 {
333 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
334
335 tq->type = AIO_CLOSE;
336 LOCK (&au->lock);
337 if (!au->tail)
338 au->head = tq;
339 else
340 au->tail->next = tq;
341 au->tail = tq;
342 REVOKE_SIGNAL (&(au->emptysignal));
343 au->empty = false;
344 SIGNAL (&au->work);
345 UNLOCK (&au->lock);
346 }
347
348 /* The asynchronous unit keeps the currently active PDT around.
349 This function changes that to the current one. */
350
351 void
enqueue_data_transfer_init(async_unit * au,st_parameter_dt * dt,int read_flag)352 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
353 {
354 st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
355 transfer_queue *tq = xmalloc (sizeof (transfer_queue));
356
357 memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
358
359 NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
360 NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
361 tq->next = NULL;
362 tq->type = AIO_DATA_TRANSFER_INIT;
363 tq->read_flag = read_flag;
364 tq->has_id = 0;
365 tq->new_pdt = new;
366 LOCK (&au->lock);
367
368 if (!au->tail)
369 au->head = tq;
370 else
371 au->tail->next = tq;
372 au->tail = tq;
373 REVOKE_SIGNAL (&(au->emptysignal));
374 au->empty = false;
375 SIGNAL (&au->work);
376 UNLOCK (&au->lock);
377 }
378
379 /* Collect the errors that may have happened asynchronously. Return true if
380 an error has been encountered. */
381
382 bool
collect_async_errors(st_parameter_common * cmp,async_unit * au)383 collect_async_errors (st_parameter_common *cmp, async_unit *au)
384 {
385 bool has_error = au->error.has_error;
386
387 if (has_error)
388 {
389 if (generate_error_common (cmp, au->error.family, au->error.message))
390 {
391 au->error.has_error = 0;
392 au->error.cmp = NULL;
393 }
394 else
395 {
396 /* The program will exit later. */
397 au->error.fatal_error = true;
398 }
399 }
400 return has_error;
401 }
402
403 /* Perform a wait operation on an asynchronous unit with an ID specified,
404 which means collecting the errors that may have happened asynchronously.
405 Return true if an error has been encountered. */
406
407 bool
async_wait_id(st_parameter_common * cmp,async_unit * au,int i)408 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
409 {
410 bool ret;
411
412 if (au == NULL)
413 return false;
414
415 if (cmp == NULL)
416 cmp = au->error.cmp;
417
418 if (au->error.has_error)
419 {
420 if (i <= au->error.last_good_id)
421 return false;
422
423 return collect_async_errors (cmp, au);
424 }
425
426 LOCK (&au->lock);
427 if (i > au->id.high)
428 {
429 generate_error_common (cmp, LIBERROR_BAD_WAIT_ID, NULL);
430 UNLOCK (&au->lock);
431 return true;
432 }
433
434 NOTE ("Waiting for id %d", i);
435 if (au->id.waiting < i)
436 au->id.waiting = i;
437 SIGNAL (&(au->work));
438 WAIT_SIGNAL_MUTEX (&(au->id.done),
439 (au->id.low >= au->id.waiting || au->empty), &au->lock);
440 LOCK (&au->lock);
441 ret = collect_async_errors (cmp, au);
442 UNLOCK (&au->lock);
443 return ret;
444 }
445
446 /* Perform a wait operation an an asynchronous unit without an ID. */
447
448 bool
async_wait(st_parameter_common * cmp,async_unit * au)449 async_wait (st_parameter_common *cmp, async_unit *au)
450 {
451 bool ret;
452
453 if (au == NULL)
454 return false;
455
456 if (cmp == NULL)
457 cmp = au->error.cmp;
458
459 LOCK (&(au->lock));
460 SIGNAL (&(au->work));
461
462 if (au->empty)
463 {
464 ret = collect_async_errors (cmp, au);
465 UNLOCK (&au->lock);
466 return ret;
467 }
468
469 WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
470 ret = collect_async_errors (cmp, au);
471 return ret;
472 }
473
474 /* Close an asynchronous unit. */
475
476 void
async_close(async_unit * au)477 async_close (async_unit *au)
478 {
479 if (au == NULL)
480 return;
481
482 NOTE ("Closing async unit");
483 enqueue_close (au);
484 T_ERROR (__gthread_join, au->thread, NULL);
485 free_async_unit (au);
486 }
487
488 #else
489
490 /* Only set u->au to NULL so no async I/O will happen. */
491
492 void
init_async_unit(gfc_unit * u)493 init_async_unit (gfc_unit *u)
494 {
495 u->au = NULL;
496 return;
497 }
498
499 /* Do-nothing function, which will not be called. */
500
501 void
enqueue_transfer(async_unit * au,transfer_args * arg,enum aio_do type)502 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
503 {
504 return;
505 }
506
507 /* Do-nothing function, which will not be called. */
508
509 int
enqueue_done_id(async_unit * au,enum aio_do type)510 enqueue_done_id (async_unit *au, enum aio_do type)
511 {
512 return 0;
513 }
514
515 /* Do-nothing function, which will not be called. */
516
517 void
enqueue_done(async_unit * au,enum aio_do type)518 enqueue_done (async_unit *au, enum aio_do type)
519 {
520 return;
521 }
522
523 /* Do-nothing function, which will not be called. */
524
525 void
enqueue_close(async_unit * au)526 enqueue_close (async_unit *au)
527 {
528 return;
529 }
530
531 /* Do-nothing function, which will not be called. */
532
533 void
enqueue_data_transfer_init(async_unit * au,st_parameter_dt * dt,int read_flag)534 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
535 {
536 return;
537 }
538
539 /* Do-nothing function, which will not be called. */
540
541 bool
collect_async_errors(st_parameter_common * cmp,async_unit * au)542 collect_async_errors (st_parameter_common *cmp, async_unit *au)
543 {
544 return false;
545 }
546
547 /* Do-nothing function, which will not be called. */
548
549 bool
async_wait_id(st_parameter_common * cmp,async_unit * au,int i)550 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
551 {
552 return false;
553 }
554
555 /* Do-nothing function, which will not be called. */
556
557 bool
async_wait(st_parameter_common * cmp,async_unit * au)558 async_wait (st_parameter_common *cmp, async_unit *au)
559 {
560 return false;
561 }
562
563 /* Do-nothing function, which will not be called. */
564
565 void
async_close(async_unit * au)566 async_close (async_unit *au)
567 {
568 return;
569 }
570
571 #endif
572