1 /* Copyright (C) 2018-2020 Free Software Foundation, Inc.
2    Contributed by Nicolas Koenig
3 
4    This file is part of the GNU Fortran runtime library (libgfortran).
5 
6    Libgfortran is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3, or (at your option)
9    any later version.
10 
11    Libgfortran is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    Under Section 7 of GPL version 3, you are granted additional
17    permissions described in the GCC Runtime Library Exception, version
18    3.1, as published by the Free Software Foundation.
19 
20    You should have received a copy of the GNU General Public License and
21    a copy of the GCC Runtime Library Exception along with this program;
22    see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
23    <http://www.gnu.org/licenses/>.  */
24 
25 #include "libgfortran.h"
26 
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
29 #include "io.h"
30 #include "fbuf.h"
31 #include "format.h"
32 #include "unix.h"
33 #include <string.h>
34 #include <assert.h>
35 
36 #include <sys/types.h>
37 
38 #include "async.h"
39 #if ASYNC_IO
40 
41 DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
42 
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
44 DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
45 
46 /* Current unit for asynchronous I/O.  Needed for error reporting.  */
47 
48 __thread gfc_unit *thread_unit = NULL;
49 
50 /* Queue entry for the asynchronous I/O entry.  */
51 typedef struct transfer_queue
52 {
53   enum aio_do type;
54   struct transfer_queue *next;
55   struct st_parameter_dt *new_pdt;
56   transfer_args arg;
57   _Bool has_id;
58   int read_flag;
59 } transfer_queue;
60 
61 struct error {
62   st_parameter_dt *dtp;
63   int id;
64 };
65 
66 /* Helper function to exchange the old vs. a new PDT.  */
67 
68 static void
update_pdt(st_parameter_dt ** old,st_parameter_dt * new)69 update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
70   st_parameter_dt *temp;
71   NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
72   temp = *old;
73   *old = new;
74   if (temp)
75     free (temp);
76 }
77 
78 /* Destroy an adv_cond structure.  */
79 
80 static void
destroy_adv_cond(struct adv_cond * ac)81 destroy_adv_cond (struct adv_cond *ac)
82 {
83   T_ERROR (__gthread_cond_destroy, &ac->signal);
84 }
85 
86 /* Function invoked as start routine for a new asynchronous I/O unit.
87    Contains the main loop for accepting requests and handling them.  */
88 
89 static void *
async_io(void * arg)90 async_io (void *arg)
91 {
92   DEBUG_LINE (aio_prefix = TPREFIX);
93   transfer_queue *ctq = NULL, *prev = NULL;
94   gfc_unit *u = (gfc_unit *) arg;
95   async_unit *au = u->au;
96   LOCK (&au->lock);
97   thread_unit = u;
98   au->thread = __gthread_self ();
99   while (true)
100     {
101       /* Main loop.  At this point, au->lock is always held. */
102       WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
103       LOCK (&au->lock);
104       ctq = au->head;
105       prev = NULL;
106       /* Loop over the queue entries until they are finished.  */
107       while (ctq)
108 	{
109 	  if (prev)
110 	    free (prev);
111 	  prev = ctq;
112 	  if (!au->error.has_error)
113 	    {
114 	      UNLOCK (&au->lock);
115 
116 	      switch (ctq->type)
117 		{
118 		case AIO_WRITE_DONE:
119 		  NOTE ("Finalizing write");
120 		  st_write_done_worker (au->pdt);
121 		  UNLOCK (&au->io_lock);
122 		  break;
123 
124 		case AIO_READ_DONE:
125 		  NOTE ("Finalizing read");
126 		  st_read_done_worker (au->pdt);
127 		  UNLOCK (&au->io_lock);
128 		  break;
129 
130 		case AIO_DATA_TRANSFER_INIT:
131 		  NOTE ("Data transfer init");
132 		  LOCK (&au->io_lock);
133 		  update_pdt (&au->pdt, ctq->new_pdt);
134 		  data_transfer_init_worker (au->pdt, ctq->read_flag);
135 		  break;
136 
137 		case AIO_TRANSFER_SCALAR:
138 		  NOTE ("Starting scalar transfer");
139 		  ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
140 					    ctq->arg.scalar.data,
141 					    ctq->arg.scalar.i,
142 					    ctq->arg.scalar.s1,
143 					    ctq->arg.scalar.s2);
144 		  break;
145 
146 		case AIO_TRANSFER_ARRAY:
147 		  NOTE ("Starting array transfer");
148 		  NOTE ("ctq->arg.array.desc = %p",
149 			(void *) (ctq->arg.array.desc));
150 		  transfer_array_inner (au->pdt, ctq->arg.array.desc,
151 					ctq->arg.array.kind,
152 					ctq->arg.array.charlen);
153 		  free (ctq->arg.array.desc);
154 		  break;
155 
156 		case AIO_CLOSE:
157 		  NOTE ("Received AIO_CLOSE");
158 		  LOCK (&au->lock);
159 		  goto finish_thread;
160 
161 		default:
162 		  internal_error (NULL, "Invalid queue type");
163 		  break;
164 		}
165 	      LOCK (&au->lock);
166 	      if (unlikely (au->error.has_error))
167 		au->error.last_good_id = au->id.low - 1;
168 	    }
169 	  else
170 	    {
171 	      if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
172 		{
173 		  UNLOCK (&au->io_lock);
174 		}
175 	      else if (ctq->type == AIO_CLOSE)
176 		{
177 		  NOTE ("Received AIO_CLOSE during error condition");
178 		  goto finish_thread;
179 		}
180 	    }
181 
182   	  NOTE ("Next ctq, current id: %d", au->id.low);
183   	  if (ctq->has_id && au->id.waiting == au->id.low++)
184 	    SIGNAL (&au->id.done);
185 
186 	  ctq = ctq->next;
187 	}
188       au->tail = NULL;
189       au->head = NULL;
190       au->empty = 1;
191       SIGNAL (&au->emptysignal);
192     }
193  finish_thread:
194   au->tail = NULL;
195   au->head = NULL;
196   au->empty = 1;
197   SIGNAL (&au->emptysignal);
198   free (ctq);
199   UNLOCK (&au->lock);
200   return NULL;
201 }
202 
203 /* Free an asynchronous unit.  */
204 
205 static void
free_async_unit(async_unit * au)206 free_async_unit (async_unit *au)
207 {
208   if (au->tail)
209     internal_error (NULL, "Trying to free nonempty asynchronous unit");
210 
211   destroy_adv_cond (&au->work);
212   destroy_adv_cond (&au->emptysignal);
213   destroy_adv_cond (&au->id.done);
214   T_ERROR (__gthread_mutex_destroy, &au->lock);
215   free (au);
216 }
217 
218 /* Initialize an adv_cond structure.  */
219 
220 static void
init_adv_cond(struct adv_cond * ac)221 init_adv_cond (struct adv_cond *ac)
222 {
223   ac->pending = 0;
224   __GTHREAD_COND_INIT_FUNCTION (&ac->signal);
225 }
226 
227 /* Initialize an asyncronous unit, returning zero on success,
228  nonzero on failure.  It also sets u->au.  */
229 
230 void
init_async_unit(gfc_unit * u)231 init_async_unit (gfc_unit *u)
232 {
233   async_unit *au;
234   if (!__gthread_active_p ())
235     {
236       u->au = NULL;
237       return;
238     }
239 
240   au = (async_unit *) xmalloc (sizeof (async_unit));
241   u->au = au;
242   init_adv_cond (&au->work);
243   init_adv_cond (&au->emptysignal);
244   __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
245   __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
246   LOCK (&au->lock);
247   T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
248   au->pdt = NULL;
249   au->head = NULL;
250   au->tail = NULL;
251   au->empty = true;
252   au->id.waiting = -1;
253   au->id.low = 0;
254   au->id.high = 0;
255   au->error.fatal_error = 0;
256   au->error.has_error = 0;
257   au->error.last_good_id = 0;
258   init_adv_cond (&au->id.done);
259   UNLOCK (&au->lock);
260 }
261 
262 /* Enqueue a transfer statement.  */
263 
264 void
enqueue_transfer(async_unit * au,transfer_args * arg,enum aio_do type)265 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
266 {
267   transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
268   tq->arg = *arg;
269   tq->type = type;
270   tq->has_id = 0;
271   LOCK (&au->lock);
272   if (!au->tail)
273     au->head = tq;
274   else
275     au->tail->next = tq;
276   au->tail = tq;
277   REVOKE_SIGNAL (&(au->emptysignal));
278   au->empty = false;
279   SIGNAL (&au->work);
280   UNLOCK (&au->lock);
281 }
282 
283 /* Enqueue an st_write_done or st_read_done which contains an ID.  */
284 
285 int
enqueue_done_id(async_unit * au,enum aio_do type)286 enqueue_done_id (async_unit *au, enum aio_do type)
287 {
288   int ret;
289   transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
290 
291   tq->type = type;
292   tq->has_id = 1;
293   LOCK (&au->lock);
294   if (!au->tail)
295     au->head = tq;
296   else
297     au->tail->next = tq;
298   au->tail = tq;
299   REVOKE_SIGNAL (&(au->emptysignal));
300   au->empty = false;
301   ret = au->id.high++;
302   NOTE ("Enqueue id: %d", ret);
303   SIGNAL (&au->work);
304   UNLOCK (&au->lock);
305   return ret;
306 }
307 
308 /* Enqueue an st_write_done or st_read_done without an ID.  */
309 
310 void
enqueue_done(async_unit * au,enum aio_do type)311 enqueue_done (async_unit *au, enum aio_do type)
312 {
313   transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
314   tq->type = type;
315   tq->has_id = 0;
316   LOCK (&au->lock);
317   if (!au->tail)
318     au->head = tq;
319   else
320     au->tail->next = tq;
321   au->tail = tq;
322   REVOKE_SIGNAL (&(au->emptysignal));
323   au->empty = false;
324   SIGNAL (&au->work);
325   UNLOCK (&au->lock);
326 }
327 
328 /* Enqueue a CLOSE statement.  */
329 
330 void
enqueue_close(async_unit * au)331 enqueue_close (async_unit *au)
332 {
333   transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
334 
335   tq->type = AIO_CLOSE;
336   LOCK (&au->lock);
337   if (!au->tail)
338     au->head = tq;
339   else
340     au->tail->next = tq;
341   au->tail = tq;
342   REVOKE_SIGNAL (&(au->emptysignal));
343   au->empty = false;
344   SIGNAL (&au->work);
345   UNLOCK (&au->lock);
346 }
347 
348 /* The asynchronous unit keeps the currently active PDT around.
349    This function changes that to the current one.  */
350 
351 void
enqueue_data_transfer_init(async_unit * au,st_parameter_dt * dt,int read_flag)352 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
353 {
354   st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
355   transfer_queue *tq = xmalloc (sizeof (transfer_queue));
356 
357   memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
358 
359   NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
360   NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
361   tq->next = NULL;
362   tq->type = AIO_DATA_TRANSFER_INIT;
363   tq->read_flag = read_flag;
364   tq->has_id = 0;
365   tq->new_pdt = new;
366   LOCK (&au->lock);
367 
368   if (!au->tail)
369     au->head = tq;
370   else
371     au->tail->next = tq;
372   au->tail = tq;
373   REVOKE_SIGNAL (&(au->emptysignal));
374   au->empty = false;
375   SIGNAL (&au->work);
376   UNLOCK (&au->lock);
377 }
378 
379 /* Collect the errors that may have happened asynchronously.  Return true if
380    an error has been encountered.  */
381 
382 bool
collect_async_errors(st_parameter_common * cmp,async_unit * au)383 collect_async_errors (st_parameter_common *cmp, async_unit *au)
384 {
385   bool has_error = au->error.has_error;
386 
387   if (has_error)
388     {
389       if (generate_error_common (cmp, au->error.family, au->error.message))
390 	{
391 	  au->error.has_error = 0;
392 	  au->error.cmp = NULL;
393 	}
394       else
395 	{
396 	  /* The program will exit later.  */
397 	  au->error.fatal_error = true;
398 	}
399     }
400   return has_error;
401 }
402 
403 /* Perform a wait operation on an asynchronous unit with an ID specified,
404    which means collecting the errors that may have happened asynchronously.
405    Return true if an error has been encountered.  */
406 
407 bool
async_wait_id(st_parameter_common * cmp,async_unit * au,int i)408 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
409 {
410   bool ret;
411 
412   if (au == NULL)
413     return false;
414 
415   if (cmp == NULL)
416     cmp = au->error.cmp;
417 
418   if (au->error.has_error)
419     {
420       if (i <= au->error.last_good_id)
421 	return false;
422 
423       return collect_async_errors (cmp, au);
424     }
425 
426   LOCK (&au->lock);
427   if (i > au->id.high)
428     {
429       generate_error_common (cmp, LIBERROR_BAD_WAIT_ID, NULL);
430       UNLOCK (&au->lock);
431       return true;
432     }
433 
434   NOTE ("Waiting for id %d", i);
435   if (au->id.waiting < i)
436     au->id.waiting = i;
437   SIGNAL (&(au->work));
438   WAIT_SIGNAL_MUTEX (&(au->id.done),
439 		     (au->id.low >= au->id.waiting || au->empty), &au->lock);
440   LOCK (&au->lock);
441   ret = collect_async_errors (cmp, au);
442   UNLOCK (&au->lock);
443   return ret;
444 }
445 
446 /* Perform a wait operation an an asynchronous unit without an ID.  */
447 
448 bool
async_wait(st_parameter_common * cmp,async_unit * au)449 async_wait (st_parameter_common *cmp, async_unit *au)
450 {
451   bool ret;
452 
453   if (au == NULL)
454     return false;
455 
456   if (cmp == NULL)
457     cmp = au->error.cmp;
458 
459   LOCK (&(au->lock));
460   SIGNAL (&(au->work));
461 
462   if (au->empty)
463     {
464       ret = collect_async_errors (cmp, au);
465       UNLOCK (&au->lock);
466       return ret;
467     }
468 
469   WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
470   ret = collect_async_errors (cmp, au);
471   return ret;
472 }
473 
474 /* Close an asynchronous unit.  */
475 
476 void
async_close(async_unit * au)477 async_close (async_unit *au)
478 {
479   if (au == NULL)
480     return;
481 
482   NOTE ("Closing async unit");
483   enqueue_close (au);
484   T_ERROR (__gthread_join, au->thread, NULL);
485   free_async_unit (au);
486 }
487 
488 #else
489 
490 /* Only set u->au to NULL so no async I/O will happen.  */
491 
492 void
init_async_unit(gfc_unit * u)493 init_async_unit (gfc_unit *u)
494 {
495   u->au = NULL;
496   return;
497 }
498 
499 /* Do-nothing function, which will not be called.  */
500 
501 void
enqueue_transfer(async_unit * au,transfer_args * arg,enum aio_do type)502 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
503 {
504   return;
505 }
506 
507 /* Do-nothing function, which will not be called.  */
508 
509 int
enqueue_done_id(async_unit * au,enum aio_do type)510 enqueue_done_id (async_unit *au, enum aio_do type)
511 {
512   return 0;
513 }
514 
515 /* Do-nothing function, which will not be called.  */
516 
517 void
enqueue_done(async_unit * au,enum aio_do type)518 enqueue_done (async_unit *au, enum aio_do type)
519 {
520   return;
521 }
522 
523 /* Do-nothing function, which will not be called.  */
524 
525 void
enqueue_close(async_unit * au)526 enqueue_close (async_unit *au)
527 {
528   return;
529 }
530 
531 /* Do-nothing function, which will not be called.  */
532 
533 void
enqueue_data_transfer_init(async_unit * au,st_parameter_dt * dt,int read_flag)534 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
535 {
536   return;
537 }
538 
539 /* Do-nothing function, which will not be called.  */
540 
541 bool
collect_async_errors(st_parameter_common * cmp,async_unit * au)542 collect_async_errors (st_parameter_common *cmp, async_unit *au)
543 {
544   return false;
545 }
546 
547 /* Do-nothing function, which will not be called.  */
548 
549 bool
async_wait_id(st_parameter_common * cmp,async_unit * au,int i)550 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
551 {
552   return false;
553 }
554 
555 /* Do-nothing function, which will not be called.  */
556 
557 bool
async_wait(st_parameter_common * cmp,async_unit * au)558 async_wait (st_parameter_common *cmp, async_unit *au)
559 {
560   return false;
561 }
562 
563 /* Do-nothing function, which will not be called.  */
564 
565 void
async_close(async_unit * au)566 async_close (async_unit *au)
567 {
568   return;
569 }
570 
571 #endif
572