1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 /** @defgroup streaming Streams
36  *  @ingroup dnscore
37  *  @brief
38  *
39  *
40  *
41  * @{
42  *
43  *----------------------------------------------------------------------------*/
44 
45 #define __FILE_POOL_C__ 1
46 
47 #include "dnscore/dnscore-config.h"
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <sys/types.h>
51 #include <sys/stat.h>
52 #include <fcntl.h>
53 #include <unistd.h>
54 #include "dnscore/ptr_set.h"
55 #include "dnscore/mutex.h"
56 #include "dnscore/list-dl.h"
57 #include "dnscore/fdtools.h"
58 #include "dnscore/logger.h"
59 
60 #define FP_USE_ABSTRACT_FILES   1
61 #if HAS_FILEPOOL_CACHE
62 #define FP_USE_CACHED_FILES     1           // minimal impact apparently
63 #else
64 #define FP_USE_CACHED_FILES     0           // minimal impact apparently
65 #endif
66 
67 #if FP_USE_ABSTRACT_FILES
68 #include "dnscore/filesystem-file.h"
69 #include "dnscore/buffered-file.h"
70 #endif
71 
72 extern logger_handle *g_system_logger;
73 #define MODULE_MSG_HANDLE g_system_logger
74 
75 #define FILEPOOL_TAG    0x4c4f4f50454c4946 // FILEPOOL_TAG
76 
77 struct file_pool_t_
78 {
79     group_mutex_t mtx;
80     ptr_set name_to_file;
81     list_dl_s mru;
82     const char * name;
83     int opened_file_count_max;
84 #if FP_USE_CACHED_FILES
85     buffered_file_cache_t file_cache;
86 #endif
87 };
88 
89 typedef struct file_pool_t_* file_pool_t;
90 
91 struct file_common_t_
92 {
93     list_dl_node_s mru_node;    // the struct MUST start with a node
94     file_pool_t file_pool;      // the pool that manages this file
95     char *name;                 // the name of this file
96     size_t position;            // the current position in the current fd
97 #if FP_USE_ABSTRACT_FILES
98     file_t file;
99 #else
100     int fd;                     // the fd that accesses this file
101 #endif
102     int rc;                     // the number of references to this file      (opened)
103     int ioc;                    // the number of io operations going on       (immediately planned read/write/...)
104     bool old;                   // an old file has been renamed and cannot be closed anymore until its RC reaches 0
105 };
106 
107 typedef struct file_common_t_* file_common_t;
108 
109 struct file_pool_file_t_
110 {
111 #if DEBUG
112     u64 magic;
113 #endif
114     file_common_t common;       // the common part of the file
115     size_t position;            // the position in the file
116     int rdwr_flags;             // has this "handle" the right to access the file in read and/or write ?
117 };
118 
119 typedef struct file_pool_file_t_* file_pool_file_t;
120 
121 #include <dnscore/file-pool.h>
122 
123 file_pool_t
file_pool_init_ex(const char * const pool_name,int opened_file_count_max,u32 cache_entries)124 file_pool_init_ex(const char * const pool_name, int opened_file_count_max, u32 cache_entries)  // name is for logging
125 {
126 #if FP_USE_CACHED_FILES
127     buffered_file_cache_t file_cache = buffered_file_cache_new_instance(pool_name, cache_entries, 12, TRUE);
128 
129     if(file_cache == NULL)
130     {
131         log_err("file-pool: failed to instantiate new pool %s using %i file descriptors and a cache of %u pages", pool_name, opened_file_count_max, cache_entries);
132         return NULL;
133     }
134 #else
135     (void)cache_entries;
136 #endif
137 
138     file_pool_t fp;
139     ZALLOC_OBJECT_OR_DIE(fp, struct file_pool_t_, FILEPOOL_TAG);
140     group_mutex_init(&fp->mtx);
141     ptr_set_init(&fp->name_to_file);
142     fp->name_to_file.compare = ptr_set_asciizp_node_compare;
143     list_dl_init(&fp->mru);
144     fp->name = pool_name;
145     fp->opened_file_count_max = MAX(opened_file_count_max, 1);
146 
147 #if FP_USE_CACHED_FILES
148     fp->file_cache = file_cache;
149 #endif
150 
151     log_debug("file-pool: new pool %s using %i file descriptors at %p", pool_name, opened_file_count_max, fp);
152 
153     return fp;
154 }
155 
156 file_pool_t
file_pool_init(const char * const pool_name,int opened_file_count_max)157 file_pool_init(const char * const pool_name, int opened_file_count_max)  // name is for logging
158 {
159     file_pool_t fp = file_pool_init_ex(pool_name, opened_file_count_max, 4096);
160 
161     return fp;
162 }
163 
164 static void file_pool_close_unused_in_excess(file_pool_t fp, int fd_max);
165 static void file_common_destroy(file_common_t fc);
166 static void file_common_mru_unlink(file_common_t fc);
167 
file_pool_finalize_name_to_file(ptr_node * node)168 static void file_pool_finalize_name_to_file(ptr_node *node)
169 {
170     file_common_t fc = (file_common_t)node->value;
171     file_common_mru_unlink(fc);
172     file_common_destroy(fc);
173 }
174 
175 void
file_pool_finalize(file_pool_t fp)176 file_pool_finalize(file_pool_t fp)
177 {
178     if(fp == NULL)
179     {
180         return;
181     }
182 
183     log_debug("file-pool: deleting pool %s using %i file descriptors at %p", fp->name, fp->opened_file_count_max, fp);
184 
185     ptr_set_iterator iter;
186 
187     group_mutex_double_lock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
188     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
189     file_pool_close_unused_in_excess(fp, 0);
190 #if FP_USE_CACHED_FILES
191     buffered_file_cache_delete(fp->file_cache);
192 #endif
193     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
194     group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
195 
196     bool log_flush_required = FALSE;
197 
198     ptr_set_iterator_init(&fp->name_to_file, &iter);
199     while(ptr_set_iterator_hasnext(&iter))
200     {
201         ptr_node *node = ptr_set_iterator_next_node(&iter);
202         if(node->value != NULL)
203         {
204             file_common_t fc = (file_common_t)node->value;
205 
206             if(fc->rc > 0)
207             {
208                 log_err("file-pool: '%s' is still referenced %i times", fc->name, fc->rc);
209                 log_flush_required = TRUE;
210             }
211         }
212     }
213 
214     if(log_flush_required)
215     {
216         logger_flush();
217     }
218 
219     ptr_set_callback_and_destroy(&fp->name_to_file, file_pool_finalize_name_to_file);
220     group_mutex_destroy(&fp->mtx);
221     ZFREE_OBJECT(fp);
222 }
223 
224 #if FP_USE_ABSTRACT_FILES
225 static file_common_t
file_common_newinstance(file_pool_t fp,const char * name,file_t file)226 file_common_newinstance(file_pool_t fp, const char *name, file_t file)
227 #else
228 static file_common_t
229 file_common_newinstance(file_pool_t fp, const char *name, int fd)
230 #endif
231 {
232     file_common_t fc;
233     ZALLOC_OBJECT_OR_DIE(fc, struct file_common_t_, FILEPOOL_TAG);
234     fc->mru_node.next = NULL;
235     fc->mru_node.prev = NULL;
236     fc->mru_node.data = fc;
237     fc->file_pool = fp;
238     fc->name = strdup(name);
239     fc->position = 0;
240 #if FP_USE_ABSTRACT_FILES
241     fc->file = file;
242 #else
243     fc->fd = fd;
244 #endif
245     fc->rc = 0;
246     fc->ioc = 0;
247     fc->old = FALSE;
248     return fc;
249 }
250 
251 static void
file_common_destroy(file_common_t fc)252 file_common_destroy(file_common_t fc)
253 {
254     if(fc != NULL)
255     {
256 #if FP_USE_ABSTRACT_FILES
257         //yassert(fc->file == NULL);
258         if(fc->file != NULL)
259         {
260             if(fc->rc == 0)
261             {
262                 file_close(fc->file);
263                 fc->file = NULL;
264             }
265             else
266             {
267                 log_err("error: file_common_destroy called on a file with rc=%i: '%s'", fc->rc, STRNULL(fc->name));
268                 logger_flush();
269             }
270         }
271 #else
272         //yassert(fc->fd < 0);
273         if(fc->fd >= 0)
274         {
275             if(fc->rc == 0)
276             {
277                 close_ex(fc->fd);
278                 fc->fd = -1;
279             }
280             else
281             {
282                 log_err("error: file_common_destroy called on a file with rc=%i: '%s'", fc->rc, STRNULL(fc->name));
283                 logger_flush();
284             }
285         }
286 #endif
287         fc->file_pool = NULL;
288         free(fc->name);
289         ZFREE(fc, struct file_common_t_);
290     }
291 }
292 
293 static void
file_common_mru_to_link(file_common_t fc)294 file_common_mru_to_link(file_common_t fc)
295 {
296     file_common_t first_one = (file_common_t)list_dl_peek_first(&fc->file_pool->mru);
297     if(first_one != fc)
298     {
299         list_dl_insert_node(&fc->file_pool->mru, &fc->mru_node);
300     }
301 }
302 
303 static void
file_common_mru_to_first(file_common_t fc)304 file_common_mru_to_first(file_common_t fc)
305 {
306     file_common_t first_one = (file_common_t)list_dl_peek_first(&fc->file_pool->mru);
307     if(first_one != fc)
308     {
309         if(fc->mru_node.next != NULL)
310         {
311             list_dl_remove_node(&fc->file_pool->mru, &fc->mru_node);
312         }
313         list_dl_insert_node(&fc->file_pool->mru, &fc->mru_node);
314     }
315 }
316 
317 static void
file_common_mru_to_last(file_common_t fc)318 file_common_mru_to_last(file_common_t fc)
319 {
320     file_common_t last_one = (file_common_t)list_dl_peek_last(&fc->file_pool->mru);
321     if(last_one != fc)
322     {
323         if(fc->mru_node.next != NULL)
324         {
325             list_dl_remove_node(&fc->file_pool->mru, &fc->mru_node);
326         }
327         list_dl_append_node(&fc->file_pool->mru, &fc->mru_node);
328     }
329 }
330 
331 static void
file_common_mru_unlink(file_common_t fc)332 file_common_mru_unlink(file_common_t fc)
333 {
334     //file_common_t first_one = (file_common_t)list_dl_peek_first(&fc->file_pool->mru);
335     //if(first_one != fc)
336     {
337         list_dl_remove_node(&fc->file_pool->mru, &fc->mru_node);
338     }
339 }
340 
341 static bool
file_common_operating(file_common_t fc)342 file_common_operating(file_common_t fc)
343 {
344     return  (fc->ioc > 0) || (fc->old); // if there are operations running or the file is old (and thus is only waiting for RC reaching 0 to close)
345 }
346 
347 /**
348  * Close open FD above the given limit
349  *
350  * Locks must be (W,R)
351  *
352  * @param fp
353  * @param fd_max
354  */
355 
356 static void
file_pool_close_unused_in_excess(file_pool_t fp,int fd_max)357 file_pool_close_unused_in_excess(file_pool_t fp, int fd_max)
358 {
359     if(fd_max < 1)
360     {
361         fd_max = 1;
362     }
363 
364     while(list_dl_size(&fp->mru) > (u32)fd_max)
365     {
366         file_common_t fc = (file_common_t)list_dl_peek_last(&fp->mru);
367 
368         if(!file_common_operating(fc))
369         {
370 #if FP_USE_ABSTRACT_FILES
371             if(fc->file != NULL)
372             {
373                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
374                 file_close(fc->file);
375                 fc->file = NULL;
376                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
377             }
378 #else
379             if(fc->fd >= 0)
380             {
381                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
382                 close_ex(fc->fd);
383                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
384 
385                 fc->fd = -1;
386             }
387 #endif
388             file_common_mru_unlink(fc);
389 
390             if(fc->rc == 0)
391             {
392                 // nobody is pointing to this anymore
393 
394                 ptr_set_delete(&fp->name_to_file, fc->name);
395 
396                 file_common_destroy(fc);
397             }
398         }
399         else
400         {
401             group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
402             group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
403             break;
404         }
405     }
406 }
407 
408 ya_result
file_pool_unlink_from_pool_and_filename(file_pool_t fp,const char * filename)409 file_pool_unlink_from_pool_and_filename(file_pool_t fp, const char *filename)
410 {
411     ya_result ret;
412 
413     group_mutex_double_lock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
414 
415     if(unlink(filename) >= 0)
416     {
417         ret = SUCCESS;
418 
419         ptr_node *node;
420 
421         if((node = ptr_set_find(&fp->name_to_file, filename)) != NULL)
422         {
423             // the common node exists
424             // get a new file referencing that node
425 
426             group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
427 
428             file_common_t fc = (file_common_t)node->value;
429 
430             ptr_set_delete(&fp->name_to_file, fc->name);
431             file_common_mru_unlink(fc);
432             fc->old = TRUE;
433 
434             if(fc->rc == 0)
435             {
436                 if(fc->file != NULL)
437                 {
438                     file_close(fc->file);
439                     fc->file = NULL;
440                 }
441 
442                 file_common_destroy(fc);
443             }
444 
445             group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
446         }
447     }
448     else
449     {
450         ret = ERRNO_ERROR;
451     }
452 
453     group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
454 
455     return ret;
456 }
457 
458 static void
file_common_acquire(file_common_t fc)459 file_common_acquire(file_common_t fc)
460 {
461     ++fc->rc;
462 }
463 
464 static void
file_common_release(file_common_t fc)465 file_common_release(file_common_t fc)
466 {
467     --fc->rc;
468 
469     if(fc->rc <= 0)
470     {
471         assert(fc->rc == 0);
472 
473         // an old file has been unlinked
474         // it is not in the name_to_file set anymore
475         // it is not in the mru anymore
476 
477         if(!fc->old)
478         {
479             file_common_mru_to_last(fc);
480         }
481         else
482         {
483 #if FP_USE_ABSTRACT_FILES
484             if(fc->file != NULL)
485             {
486                 file_pool_t fp = fc->file_pool;
487 
488                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
489                 file_close(fc->file);
490                 fc->file = NULL;
491                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
492             }
493 #else
494             if(fc->fd >= 0)
495             {
496                 file_pool_t fp = fc->file_pool;
497 
498                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
499                 close_ex(fc->fd);
500                 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
501                 fc->fd = -1;
502             }
503 #endif
504             file_common_mru_unlink(fc);
505 
506             file_common_destroy(fc);
507         }
508     }
509 }
510 
511 static file_pool_file_t
file_newinstance(file_common_t common,int rdwr_flags)512 file_newinstance(file_common_t common, int rdwr_flags)
513 {
514     file_pool_file_t f;
515     ZALLOC_OBJECT_OR_DIE(f, struct file_pool_file_t_, FILEPOOL_TAG);
516     file_common_acquire(common);
517 #if DEBUG
518     f->magic = 0xf113B001;
519 #endif
520     f->common = common;
521     f->position = 0;
522     f->rdwr_flags = rdwr_flags;
523     return f;
524 }
525 
526 static void
file_destroy(file_pool_file_t f)527 file_destroy(file_pool_file_t f)
528 {
529     if(f != NULL)
530     {
531         file_common_t fc = f->common;
532         ZEROMEMORY(f, sizeof(struct file_pool_file_t_));
533         ZFREE(f, struct file_pool_file_t_);
534 
535         file_common_release(fc);
536     }
537 }
538 
539 /**
540  *
541  *
542  * @param fp
543  * @param filename
544  * @param flags
545  * @param mode
546  * @return a handle or NULL (errno being set)
547  */
548 
549 file_pool_file_t
file_pool_open_ex(file_pool_t fp,const char * filename,int flags,mode_t mode)550 file_pool_open_ex(file_pool_t fp, const char *filename, int flags, mode_t mode)
551 {
552     group_mutex_double_lock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
553 
554     file_common_t fc;
555     file_pool_file_t f;
556 
557     char absolute_filename[1024];
558 
559     ya_result ret;
560     if(ISOK(ret = file_get_absolute_path(filename, absolute_filename, sizeof(absolute_filename))))
561     {
562         filename = absolute_filename;
563     }
564 
565     ptr_node *node;
566 
567     if((node = ptr_set_find(&fp->name_to_file, filename)) != NULL)
568     {
569         // the common node exists
570         // get a new file referencing that node
571 
572         group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
573 
574         fc = (file_common_t)node->value;
575         f = file_newinstance(fc, flags);
576 
577         file_common_mru_to_first(fc);
578 
579         group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
580     }
581     else
582     {
583         // the first opening is required to eventually return an error code
584 
585 #if FP_USE_ABSTRACT_FILES
586         file_t file;
587 
588         ret = filesystem_file_create_ex(&file, filename, O_RDWR|flags, mode);
589 
590         if(ISOK(ret))
591         {
592 #if FP_USE_CACHED_FILES
593             buffered_file_init(&file, file, fp->file_cache); // only fails if one of the parameters is NULL
594 #endif
595             group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
596 
597             // close the exceeding fd(s)
598 
599             file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
600 
601             // we know the node does not exist
602 
603             node = ptr_set_insert(&fp->name_to_file, (char*)filename); // cast as it will be fixed later
604             fc = file_common_newinstance(fp, filename, file);
605 #else
606         int fd = open_create_ex(filename, O_RDWR|flags, mode);
607 
608         if(fd > 0)
609         {
610             group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
611 
612             // close the exceeding fd(s)
613 
614             file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
615 
616             // we know the node does not exist
617 
618             node = ptr_set_insert(&fp->name_to_file, (char*)filename); // cast as it will be fixed later
619             fc = file_common_newinstance(fp, filename, fd);
620 #endif
621             node->value = fc;
622             node->key = fc->name;
623 
624             f = file_newinstance(fc, flags);
625 
626             file_common_mru_to_link(fc);
627 
628             group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
629         }
630         else
631         {
632             f = NULL;
633         }
634     }
635 
636     group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
637 
638     return f;
639 }
640 
641 file_pool_file_t
642 file_dup(file_pool_file_t file)
643 {
644     group_mutex_double_lock(&file->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
645 
646     group_mutex_exchange_locks(&file->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
647 
648     file_pool_file_t file_dup = file_newinstance(file->common, file->rdwr_flags);
649 
650     file_common_mru_to_first(file->common);
651 
652     group_mutex_exchange_locks(&file->common->file_pool->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
653 
654     group_mutex_double_unlock(&file->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
655 
656     return file_dup;
657 }
658 
659 file_pool_file_t
660 file_pool_open(file_pool_t fp, const char *filename)
661 {
662     file_pool_file_t f = file_pool_open_ex(fp, filename, O_RDONLY|O_CLOEXEC, 0);
663     return f;
664 }
665 
666 file_pool_file_t
667 file_pool_create(file_pool_t fp, const char *filename, mode_t mode)
668 {
669     file_pool_file_t f = file_pool_open_ex(fp, filename, O_RDWR|O_CREAT|O_CLOEXEC, mode);
670     return f;
671 }
672 
673 file_pool_file_t
674 file_pool_create_excl(file_pool_t fp, const char *filename, mode_t mode)
675 {
676     file_pool_file_t f = file_pool_open_ex(fp, filename, O_RDWR|O_CREAT|O_CLOEXEC|O_EXCL, mode);
677     return f;
678 }
679 
680 static void
681 file_common_release_fd(file_pool_file_t f)
682 {
683     file_common_t fc = f->common;
684     file_pool_t fp = fc->file_pool;
685 
686     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
687 
688     fc->position = f->position;
689 
690     --fc->ioc;
691 
692     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
693 
694     group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
695 }
696 
697 static void
698 file_common_advance_position_release_fd(file_pool_file_t f, size_t offset)
699 {
700     file_common_t fc = f->common;
701     file_pool_t fp = fc->file_pool;
702 
703     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
704 
705     f->position += offset;
706     fc->position = f->position;
707 
708     --fc->ioc;
709 
710     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
711 
712     group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
713 }
714 
715 #if FP_USE_ABSTRACT_FILES
716 
717 static int
718 file_common_acquire_fd(file_pool_file_t f, file_t *filep)
719 {
720 #if DEBUG
721     yassert(f->magic == 0xf113B001);
722 #endif
723 
724     // prevent changes, take dibs on making changes
725 
726     group_mutex_double_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
727 
728     file_common_t fc = f->common;
729     file_pool_t fp = fc->file_pool;
730 
731     file_t file;
732     ya_result ret = SUCCESS;
733 
734     if((file = fc->file) == NULL)
735     {
736         // need to get a file
737 
738         group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
739 
740         file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
741 
742         group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
743 
744         ret = filesystem_file_open_ex(&file, fc->name, O_RDWR|O_CLOEXEC);
745 
746         if(FAIL(ret))
747         {
748             group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
749 
750             return ret;
751         }
752 
753 #if FP_USE_CACHED_FILES
754         buffered_file_init(&file, file, fp->file_cache); // only fails if one of the parameters is NULL
755 #endif
756         fc->file = file;
757         fc->position = 0;
758     }
759 
760     if(fc->position != f->position)
761     {
762         file_seek(file, f->position, SEEK_SET);
763     }
764 
765     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
766     file_common_mru_to_first(fc);
767     ++fc->ioc;
768     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
769 
770     // successful acquisition will be unlocked at release
771 
772     *filep = file;
773 
774     return ret;
775 }
776 
777 ya_result
778 file_pool_read(file_pool_file_t f, void *buffer, size_t bytes)
779 {
780     file_t file;
781     int ret = file_common_acquire_fd(f, &file);
782 
783     if(FAIL(ret))
784     {
785         return ret;
786     }
787 
788     // read
789 
790     ssize_t n = file_read(file, buffer, bytes);
791 
792     if(n >= 0)
793     {
794         file_common_advance_position_release_fd(f, n);
795     }
796     else
797     {
798         file_common_release_fd(f);
799     }
800 
801     return (ya_result)n;
802 }
803 
804 ya_result
805 file_pool_readfully(file_pool_file_t f, void *buffer, size_t bytes)
806 {
807     ya_result ret = file_pool_read(f, buffer, bytes);
808     return ret;
809 }
810 
811 ya_result
812 file_pool_write(file_pool_file_t f, const void *buffer, size_t bytes)
813 {
814     file_t file;
815     int ret = file_common_acquire_fd(f, &file);
816 
817     if(FAIL(ret))
818     {
819         return ret;
820     }
821 
822     // read
823 
824     ssize_t n = file_write(file, buffer, bytes);
825 
826     if(n >= 0)
827     {
828         file_common_advance_position_release_fd(f, n);
829     }
830     else
831     {
832         file_common_release_fd(f);
833     }
834 
835     return (ya_result)n;
836 }
837 
838 ya_result
839 file_pool_writefully(file_pool_file_t f, const void *buffer, size_t bytes)
840 {
841     ya_result ret = file_pool_write(f, buffer, bytes);
842     return ret;
843 }
844 
845 ya_result
846 file_pool_flush(file_pool_file_t f)
847 {
848     file_t file;
849     int ret = file_common_acquire_fd(f, &file);
850 
851     if(FAIL(ret))
852     {
853         return ret;
854     }
855 
856     // flush
857 
858     ret = file_flush(file);
859 
860     file_common_release_fd(f);
861 
862     return ret;
863 }
864 
865 ya_result
866 file_pool_get_size(file_pool_file_t f, size_t *sizep)
867 {
868     if(sizep != NULL)
869     {
870         file_t file;
871         int ret = file_common_acquire_fd(f, &file);
872 
873         if(FAIL(ret))
874         {
875             *sizep = 0;
876             return ret;
877         }
878 
879         *sizep = (size_t)file_size(file);
880 
881         file_common_release_fd(f);
882 
883         return ret;
884     }
885     else
886     {
887         return UNEXPECTED_NULL_ARGUMENT_ERROR;
888     }
889 }
890 
891 ya_result
892 file_pool_resize(file_pool_file_t f, size_t size)
893 {
894     file_t file;
895     int ret = file_common_acquire_fd(f, &file);
896 
897     if(FAIL(ret))
898     {
899         return ret;
900     }
901 
902     // truncate
903 
904     ret = file_resize(file, size);
905 
906     file_common_release_fd(f);
907 
908     return ret;
909 }
910 
911 #else
912 static int
913 file_common_acquire_fd(file_pool_file_t f)
914 {
915 #if DEBUG
916     yassert(f->magic == 0xf113B001);
917 #endif
918 
919     // prevent changes, take dibs on making changes
920 
921     group_mutex_double_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
922 
923     file_common_t fc = f->common;
924     file_pool_t fp = fc->file_pool;
925 
926     int fd;
927 
928     if((fd = fc->fd) < 0)
929     {
930         // need to get an fd
931 
932         group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
933 
934         file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
935 
936         group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
937 
938         if((fd = open_ex(fc->name, O_RDWR|O_CLOEXEC)) < 0)
939         {
940             fd = ERRNO_ERROR;
941 
942             group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
943 
944             return fd;
945         }
946 
947         fc->fd = fd;
948         fc->position = 0;
949     }
950 
951     if(fc->position != f->position)
952     {
953         lseek(fd, f->position, SEEK_SET);
954     }
955 
956     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
957     file_common_mru_to_first(fc);
958     ++fc->ioc;
959     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
960 
961     // successful acquisition will be unlocked at release
962 
963     return fd;
964 }
965 
966 ya_result
967 file_pool_read(file_pool_file_t f, void *buffer, size_t bytes)
968 {
969     int fd = file_common_acquire_fd(f);
970 
971     if(fd < 0)
972     {
973         return fd;
974     }
975 
976     // read
977 
978     for(;;)
979     {
980         ssize_t n = read(fd, buffer, bytes);
981 
982         if(n >= 0)
983         {
984             file_common_advance_position_release_fd(f, n);
985             return n;
986         }
987         else
988         {
989             int err = errno;
990 
991             if(err == EINTR)
992             {
993                 continue;
994             }
995 
996             file_common_release_fd(f);
997 
998             return err;
999         }
1000     }
1001 }
1002 
1003 ya_result
1004 file_pool_readfully(file_pool_file_t f, void *buffer, size_t bytes)
1005 {
1006     int fd = file_common_acquire_fd(f);
1007 
1008     if(fd < 0)
1009     {
1010         return fd;
1011     }
1012 
1013     // read
1014 
1015     size_t total = 0;
1016 
1017     for(;;)
1018     {
1019         ssize_t n = read(fd, buffer, bytes);
1020 
1021         if(n >= 0)
1022         {
1023             bytes -= n;
1024             total += n;
1025 
1026             if((n == 0) || (bytes == 0))
1027             {
1028                 file_common_advance_position_release_fd(f, total);
1029                 return total;
1030             }
1031         }
1032         else
1033         {
1034             int err = errno;
1035 
1036             if(err == EINTR)
1037             {
1038                 continue;
1039             }
1040 
1041             if(err == EAGAIN)
1042             {
1043                 continue;
1044             }
1045 
1046             file_common_advance_position_release_fd(f, total);
1047 
1048             return err;
1049         }
1050     }
1051 }
1052 
1053 ya_result
1054 file_pool_write(file_pool_file_t f, const void *buffer, size_t bytes)
1055 {
1056     int fd = file_common_acquire_fd(f);
1057 
1058     if(fd < 0)
1059     {
1060         return fd;
1061     }
1062 
1063     // write
1064 
1065     for(;;)
1066     {
1067         ssize_t n = write(fd, buffer, bytes);
1068 
1069         if(n >= 0)
1070         {
1071             file_common_advance_position_release_fd(f, n);
1072             return n;
1073         }
1074         else
1075         {
1076             int err = errno;
1077 
1078             if(err == EAGAIN)
1079             {
1080                 continue;
1081             }
1082 
1083             file_common_release_fd(f);
1084 
1085             return err;
1086         }
1087     }
1088 }
1089 
1090 ya_result
1091 file_pool_writefully(file_pool_file_t f, const void *buffer, size_t bytes)
1092 {
1093     int fd = file_common_acquire_fd(f);
1094 
1095     if(fd < 0)
1096     {
1097         return fd;
1098     }
1099 
1100     // write
1101 
1102     size_t total = 0;
1103 
1104     for(;;)
1105     {
1106         ssize_t n = write(fd, buffer, bytes);
1107 
1108         if(n >= 0)
1109         {
1110             bytes -= n;
1111             total += n;
1112 
1113             if((n == 0) || (bytes == 0))
1114             {
1115                 file_common_advance_position_release_fd(f, total);
1116                 return total;
1117             }
1118         }
1119         else
1120         {
1121             int err = errno;
1122 
1123             if(err == EINTR)
1124             {
1125                 continue;
1126             }
1127 
1128             if(err == EAGAIN)
1129             {
1130                 continue;
1131             }
1132 
1133             file_common_advance_position_release_fd(f, total);
1134 
1135             return err;
1136         }
1137     }
1138 }
1139 
1140 ya_result
1141 file_pool_flush(file_pool_file_t f)
1142 {
1143     int fd = file_common_acquire_fd(f);
1144 
1145     if(fd < 0)
1146     {
1147         return fd;
1148     }
1149 
1150     // flush
1151 
1152     ya_result ret = SUCCESS;
1153 
1154     if(fdatasync_ex(fd) < 0)
1155     {
1156         ret = ERRNO_ERROR;
1157     }
1158 
1159     file_common_release_fd(f);
1160 
1161     return ret;
1162 }
1163 
1164 ya_result
1165 file_pool_get_size(file_pool_file_t f, size_t *sizep)
1166 {
1167     if(sizep != NULL)
1168     {
1169         int fd = file_common_acquire_fd(f);
1170 
1171         if(fd < 0)
1172         {
1173             return fd;
1174         }
1175 
1176         // truncate
1177 
1178         ya_result ret = SUCCESS;
1179 
1180         struct stat st;
1181 
1182         if(fstat(fd, &st) >= 0)
1183         {
1184             *sizep = st.st_size;
1185         }
1186         else
1187         {
1188             ret = ERRNO_ERROR;
1189         }
1190 
1191         file_common_release_fd(f);
1192 
1193         return ret;
1194     }
1195     else
1196     {
1197         return ERROR;
1198     }
1199 }
1200 
1201 ya_result
1202 file_pool_resize(file_pool_file_t f, size_t size)
1203 {
1204     int fd = file_common_acquire_fd(f);
1205 
1206     if(fd < 0)
1207     {
1208         return fd;
1209     }
1210 
1211     // truncate
1212 
1213     ya_result ret = SUCCESS;
1214     if(ftruncate_ex(fd, size) < 0)
1215     {
1216         ret = ERRNO_ERROR;
1217     }
1218 
1219     file_common_release_fd(f);
1220 
1221     return ret;
1222 }
1223 
1224 #endif
1225 
1226 ssize_t
1227 file_pool_seek(file_pool_file_t f, ssize_t position, int from)
1228 {
1229     ssize_t ret;
1230     group_mutex_lock(&f->common->file_pool->mtx, GROUP_MUTEX_WRITE);
1231 
1232     switch(from)
1233     {
1234         case SEEK_SET:
1235         {
1236             f->position = position;
1237             ret = (ssize_t)f->position;
1238             break;
1239         }
1240         case SEEK_CUR:
1241         {
1242             ssize_t p = f->position + position;
1243             if(p >= 0)
1244             {
1245                 f->position = p;
1246             }
1247             else
1248             {
1249                 f->position = 0;
1250             }
1251             ret = (ssize_t)f->position;
1252             break;
1253         }
1254         case SEEK_END:
1255         {
1256             size_t size;
1257             if(ISOK(ret = file_pool_get_size(f, &size))) // can only fail if a NULL is given)
1258             {
1259                 ssize_t p = size + position;
1260 
1261                 if(p >= 0)
1262                 {
1263                     f->position = p;
1264                 }
1265                 else
1266                 {
1267                     f->position = 0;
1268                 }
1269                 ret = (ssize_t)f->position;
1270             }
1271 
1272             break;
1273         }
1274         default:
1275         {
1276             ret = INVALID_ARGUMENT_ERROR;
1277             break;
1278         }
1279     }
1280 
1281     group_mutex_unlock(&f->common->file_pool->mtx, GROUP_MUTEX_WRITE);
1282 
1283     return ret;
1284 }
1285 
1286 ya_result
1287 file_pool_tell(file_pool_file_t f, size_t *positionp)
1288 {
1289     if(positionp != NULL)
1290     {
1291         group_mutex_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ);
1292         *positionp = f->position;
1293         group_mutex_unlock(&f->common->file_pool->mtx, GROUP_MUTEX_READ);
1294 
1295         return SUCCESS;
1296     }
1297     else
1298     {
1299         return ERROR;
1300     }
1301 }
1302 
1303 // flushes, but only closes the file when fd are needed
1304 
1305 ya_result
1306 file_pool_close(file_pool_file_t f)
1307 {
1308     group_mutex_double_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
1309 
1310     file_common_t fc = f->common;
1311     file_pool_t fp = fc->file_pool;
1312 
1313 #if FP_USE_ABSTRACT_FILES
1314     if(fc->file != NULL)
1315     {
1316         file_flush(fc->file);
1317     }
1318 #else
1319     if(fc->fd >= 0)
1320     {
1321         fdatasync_ex(fc->fd);
1322     }
1323 #endif
1324 
1325     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
1326     file_destroy(f);
1327     group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
1328 
1329     group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
1330 
1331     return SUCCESS;
1332 }
1333 
1334 ya_result
1335 file_pool_unlink(file_pool_file_t f)
1336 {
1337     ya_result ret;
1338 
1339     if(f != NULL)
1340     {
1341         ret = INVALID_ARGUMENT_ERROR;
1342 
1343         if(f->common != NULL)
1344         {
1345             if(f->common->file_pool != NULL)
1346             {
1347                 ret = file_pool_unlink_from_pool_and_filename(f->common->file_pool, f->common->name);
1348             }
1349         }
1350     }
1351     else
1352     {
1353         ret = UNEXPECTED_NULL_ARGUMENT_ERROR;
1354     }
1355 
1356     return ret;
1357 }
1358 
1359 const char *
1360 file_pool_filename(const file_pool_file_t f)
1361 {
1362     if(f != NULL)
1363     {
1364         const file_common_t fc = f->common;
1365         return fc->name;
1366     }
1367     else
1368     {
1369         return "NULL";
1370     }
1371 }
1372 
1373 static ya_result
1374 file_pool_file_output_stream_write(output_stream* os, const u8* buffer, u32 len)
1375 {
1376     file_pool_file_t f = (file_pool_file_t)os->data;
1377     ya_result ret = file_pool_write(f, buffer, len);
1378     return ret;
1379 }
1380 
1381 static ya_result
1382 file_pool_file_output_stream_writefully(output_stream* os, const u8* buffer, u32 len)
1383 {
1384     file_pool_file_t f = (file_pool_file_t)os->data;
1385     ya_result ret = file_pool_writefully(f, buffer, len);
1386     return ret;
1387 }
1388 
1389 static ya_result
1390 file_pool_file_output_stream_flush(output_stream* os)
1391 {
1392     file_pool_file_t f = (file_pool_file_t)os->data;
1393     ya_result ret = INVALID_STATE_ERROR;
1394     if(f != NULL)
1395     {
1396         ret = file_pool_flush(f);
1397     }
1398     return ret;
1399 }
1400 
1401 static void
1402 file_pool_file_output_stream_close(output_stream* os)
1403 {
1404     file_pool_file_t f = (file_pool_file_t)os->data;
1405     if(f != NULL)
1406     {
1407         file_pool_close(f);
1408     }
1409 }
1410 
1411 static const output_stream_vtbl file_pool_file_output_stream_vtbl =
1412 {
1413     file_pool_file_output_stream_write,
1414     file_pool_file_output_stream_flush,
1415     file_pool_file_output_stream_close,
1416     "file_pool_file_output_stream",
1417 };
1418 
1419 static const output_stream_vtbl file_pool_file_full_output_stream_vtbl =
1420 {
1421     file_pool_file_output_stream_writefully,
1422     file_pool_file_output_stream_flush,
1423     file_pool_file_output_stream_close,
1424     "file_pool_file_output_stream",
1425 };
1426 
1427 void
1428 file_pool_file_output_stream_init(output_stream *os, file_pool_file_t f)
1429 {
1430     os->data = f;
1431     os->vtbl = &file_pool_file_output_stream_vtbl;
1432 }
1433 
1434 void
1435 file_pool_file_output_stream_set_full_writes(output_stream *os, bool full_writes)
1436 {
1437     if(full_writes)
1438     {
1439         os->vtbl = &file_pool_file_full_output_stream_vtbl;
1440     }
1441     else
1442     {
1443         os->vtbl = &file_pool_file_output_stream_vtbl;
1444     }
1445 }
1446 
1447 void
1448 file_pool_file_output_stream_detach(output_stream *os)
1449 {
1450     assert((os->vtbl == &file_pool_file_output_stream_vtbl) || (os->vtbl == &file_pool_file_full_output_stream_vtbl));
1451     os->data = NULL;
1452 }
1453 
1454 static ya_result
1455 file_pool_file_input_stream_read(input_stream* os, void* buffer, u32 len)
1456 {
1457     file_pool_file_t f = (file_pool_file_t)os->data;
1458     ya_result ret = file_pool_read(f, buffer, len);
1459     return ret;
1460 }
1461 
1462 static ya_result
1463 file_pool_file_input_stream_readfully(input_stream* os, void* buffer, u32 len)
1464 {
1465     file_pool_file_t f = (file_pool_file_t)os->data;
1466     ya_result ret = file_pool_readfully(f, buffer, len);
1467     return ret;
1468 }
1469 
1470 static ya_result
1471 file_pool_file_input_stream_skip(input_stream* os, u32 len)
1472 {
1473     file_pool_file_t f = (file_pool_file_t)os->data;
1474     ya_result ret = file_pool_seek(f, len, SEEK_CUR);
1475     return ret;
1476 }
1477 
1478 static void
1479 file_pool_file_input_stream_close(input_stream* os)
1480 {
1481     file_pool_file_t f = (file_pool_file_t)os->data;
1482     if(f != NULL)
1483     {
1484         file_pool_close(f);
1485     }
1486 }
1487 
1488 static const input_stream_vtbl file_pool_file_input_stream_vtbl =
1489 {
1490     file_pool_file_input_stream_read,
1491     file_pool_file_input_stream_skip,
1492     file_pool_file_input_stream_close,
1493     "file_pool_file_input_stream"
1494 };
1495 
1496 static const input_stream_vtbl file_pool_file_full_input_stream_vtbl =
1497 {
1498     file_pool_file_input_stream_readfully,
1499     file_pool_file_input_stream_skip,
1500     file_pool_file_input_stream_close,
1501     "file_pool_file_input_stream"
1502 };
1503 
1504 void
1505 file_pool_file_input_stream_init(input_stream *is, file_pool_file_t f)
1506 {
1507     is->data = f;
1508     is->vtbl = &file_pool_file_input_stream_vtbl;
1509 }
1510 
1511 void
1512 file_pool_file_input_stream_detach(input_stream *os)
1513 {
1514     assert((os->vtbl == &file_pool_file_input_stream_vtbl) || (os->vtbl == &file_pool_file_full_input_stream_vtbl));
1515     os->data = NULL;
1516 }
1517 
1518 void
1519 file_pool_file_input_stream_set_full_reads(input_stream *is, bool full_writes)
1520 {
1521     if(full_writes)
1522     {
1523         is->vtbl = &file_pool_file_full_input_stream_vtbl;
1524     }
1525     else
1526     {
1527         is->vtbl = &file_pool_file_input_stream_vtbl;
1528     }
1529 }
1530 
1531 /** @} */
1532