1 /*------------------------------------------------------------------------------
2 *
3 * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4 * The YADIFA TM software product is provided under the BSD 3-clause license:
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of EURid nor the names of its contributors may be
16 * used to endorse or promote products derived from this software
17 * without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 *
31 *------------------------------------------------------------------------------
32 *
33 */
34
35 /** @defgroup streaming Streams
36 * @ingroup dnscore
37 * @brief
38 *
39 *
40 *
41 * @{
42 *
43 *----------------------------------------------------------------------------*/
44
45 #define __FILE_POOL_C__ 1
46
47 #include "dnscore/dnscore-config.h"
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <sys/types.h>
51 #include <sys/stat.h>
52 #include <fcntl.h>
53 #include <unistd.h>
54 #include "dnscore/ptr_set.h"
55 #include "dnscore/mutex.h"
56 #include "dnscore/list-dl.h"
57 #include "dnscore/fdtools.h"
58 #include "dnscore/logger.h"
59
60 #define FP_USE_ABSTRACT_FILES 1
61 #if HAS_FILEPOOL_CACHE
62 #define FP_USE_CACHED_FILES 1 // minimal impact apparently
63 #else
64 #define FP_USE_CACHED_FILES 0 // minimal impact apparently
65 #endif
66
67 #if FP_USE_ABSTRACT_FILES
68 #include "dnscore/filesystem-file.h"
69 #include "dnscore/buffered-file.h"
70 #endif
71
72 extern logger_handle *g_system_logger;
73 #define MODULE_MSG_HANDLE g_system_logger
74
75 #define FILEPOOL_TAG 0x4c4f4f50454c4946 // FILEPOOL_TAG
76
77 struct file_pool_t_
78 {
79 group_mutex_t mtx;
80 ptr_set name_to_file;
81 list_dl_s mru;
82 const char * name;
83 int opened_file_count_max;
84 #if FP_USE_CACHED_FILES
85 buffered_file_cache_t file_cache;
86 #endif
87 };
88
89 typedef struct file_pool_t_* file_pool_t;
90
91 struct file_common_t_
92 {
93 list_dl_node_s mru_node; // the struct MUST start with a node
94 file_pool_t file_pool; // the pool that manages this file
95 char *name; // the name of this file
96 size_t position; // the current position in the current fd
97 #if FP_USE_ABSTRACT_FILES
98 file_t file;
99 #else
100 int fd; // the fd that accesses this file
101 #endif
102 int rc; // the number of references to this file (opened)
103 int ioc; // the number of io operations going on (immediately planned read/write/...)
104 bool old; // an old file has been renamed and cannot be closed anymore until its RC reaches 0
105 };
106
107 typedef struct file_common_t_* file_common_t;
108
109 struct file_pool_file_t_
110 {
111 #if DEBUG
112 u64 magic;
113 #endif
114 file_common_t common; // the common part of the file
115 size_t position; // the position in the file
116 int rdwr_flags; // has this "handle" the right to access the file in read and/or write ?
117 };
118
119 typedef struct file_pool_file_t_* file_pool_file_t;
120
121 #include <dnscore/file-pool.h>
122
123 file_pool_t
file_pool_init_ex(const char * const pool_name,int opened_file_count_max,u32 cache_entries)124 file_pool_init_ex(const char * const pool_name, int opened_file_count_max, u32 cache_entries) // name is for logging
125 {
126 #if FP_USE_CACHED_FILES
127 buffered_file_cache_t file_cache = buffered_file_cache_new_instance(pool_name, cache_entries, 12, TRUE);
128
129 if(file_cache == NULL)
130 {
131 log_err("file-pool: failed to instantiate new pool %s using %i file descriptors and a cache of %u pages", pool_name, opened_file_count_max, cache_entries);
132 return NULL;
133 }
134 #else
135 (void)cache_entries;
136 #endif
137
138 file_pool_t fp;
139 ZALLOC_OBJECT_OR_DIE(fp, struct file_pool_t_, FILEPOOL_TAG);
140 group_mutex_init(&fp->mtx);
141 ptr_set_init(&fp->name_to_file);
142 fp->name_to_file.compare = ptr_set_asciizp_node_compare;
143 list_dl_init(&fp->mru);
144 fp->name = pool_name;
145 fp->opened_file_count_max = MAX(opened_file_count_max, 1);
146
147 #if FP_USE_CACHED_FILES
148 fp->file_cache = file_cache;
149 #endif
150
151 log_debug("file-pool: new pool %s using %i file descriptors at %p", pool_name, opened_file_count_max, fp);
152
153 return fp;
154 }
155
156 file_pool_t
file_pool_init(const char * const pool_name,int opened_file_count_max)157 file_pool_init(const char * const pool_name, int opened_file_count_max) // name is for logging
158 {
159 file_pool_t fp = file_pool_init_ex(pool_name, opened_file_count_max, 4096);
160
161 return fp;
162 }
163
164 static void file_pool_close_unused_in_excess(file_pool_t fp, int fd_max);
165 static void file_common_destroy(file_common_t fc);
166 static void file_common_mru_unlink(file_common_t fc);
167
file_pool_finalize_name_to_file(ptr_node * node)168 static void file_pool_finalize_name_to_file(ptr_node *node)
169 {
170 file_common_t fc = (file_common_t)node->value;
171 file_common_mru_unlink(fc);
172 file_common_destroy(fc);
173 }
174
175 void
file_pool_finalize(file_pool_t fp)176 file_pool_finalize(file_pool_t fp)
177 {
178 if(fp == NULL)
179 {
180 return;
181 }
182
183 log_debug("file-pool: deleting pool %s using %i file descriptors at %p", fp->name, fp->opened_file_count_max, fp);
184
185 ptr_set_iterator iter;
186
187 group_mutex_double_lock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
188 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
189 file_pool_close_unused_in_excess(fp, 0);
190 #if FP_USE_CACHED_FILES
191 buffered_file_cache_delete(fp->file_cache);
192 #endif
193 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
194 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
195
196 bool log_flush_required = FALSE;
197
198 ptr_set_iterator_init(&fp->name_to_file, &iter);
199 while(ptr_set_iterator_hasnext(&iter))
200 {
201 ptr_node *node = ptr_set_iterator_next_node(&iter);
202 if(node->value != NULL)
203 {
204 file_common_t fc = (file_common_t)node->value;
205
206 if(fc->rc > 0)
207 {
208 log_err("file-pool: '%s' is still referenced %i times", fc->name, fc->rc);
209 log_flush_required = TRUE;
210 }
211 }
212 }
213
214 if(log_flush_required)
215 {
216 logger_flush();
217 }
218
219 ptr_set_callback_and_destroy(&fp->name_to_file, file_pool_finalize_name_to_file);
220 group_mutex_destroy(&fp->mtx);
221 ZFREE_OBJECT(fp);
222 }
223
224 #if FP_USE_ABSTRACT_FILES
225 static file_common_t
file_common_newinstance(file_pool_t fp,const char * name,file_t file)226 file_common_newinstance(file_pool_t fp, const char *name, file_t file)
227 #else
228 static file_common_t
229 file_common_newinstance(file_pool_t fp, const char *name, int fd)
230 #endif
231 {
232 file_common_t fc;
233 ZALLOC_OBJECT_OR_DIE(fc, struct file_common_t_, FILEPOOL_TAG);
234 fc->mru_node.next = NULL;
235 fc->mru_node.prev = NULL;
236 fc->mru_node.data = fc;
237 fc->file_pool = fp;
238 fc->name = strdup(name);
239 fc->position = 0;
240 #if FP_USE_ABSTRACT_FILES
241 fc->file = file;
242 #else
243 fc->fd = fd;
244 #endif
245 fc->rc = 0;
246 fc->ioc = 0;
247 fc->old = FALSE;
248 return fc;
249 }
250
251 static void
file_common_destroy(file_common_t fc)252 file_common_destroy(file_common_t fc)
253 {
254 if(fc != NULL)
255 {
256 #if FP_USE_ABSTRACT_FILES
257 //yassert(fc->file == NULL);
258 if(fc->file != NULL)
259 {
260 if(fc->rc == 0)
261 {
262 file_close(fc->file);
263 fc->file = NULL;
264 }
265 else
266 {
267 log_err("error: file_common_destroy called on a file with rc=%i: '%s'", fc->rc, STRNULL(fc->name));
268 logger_flush();
269 }
270 }
271 #else
272 //yassert(fc->fd < 0);
273 if(fc->fd >= 0)
274 {
275 if(fc->rc == 0)
276 {
277 close_ex(fc->fd);
278 fc->fd = -1;
279 }
280 else
281 {
282 log_err("error: file_common_destroy called on a file with rc=%i: '%s'", fc->rc, STRNULL(fc->name));
283 logger_flush();
284 }
285 }
286 #endif
287 fc->file_pool = NULL;
288 free(fc->name);
289 ZFREE(fc, struct file_common_t_);
290 }
291 }
292
293 static void
file_common_mru_to_link(file_common_t fc)294 file_common_mru_to_link(file_common_t fc)
295 {
296 file_common_t first_one = (file_common_t)list_dl_peek_first(&fc->file_pool->mru);
297 if(first_one != fc)
298 {
299 list_dl_insert_node(&fc->file_pool->mru, &fc->mru_node);
300 }
301 }
302
303 static void
file_common_mru_to_first(file_common_t fc)304 file_common_mru_to_first(file_common_t fc)
305 {
306 file_common_t first_one = (file_common_t)list_dl_peek_first(&fc->file_pool->mru);
307 if(first_one != fc)
308 {
309 if(fc->mru_node.next != NULL)
310 {
311 list_dl_remove_node(&fc->file_pool->mru, &fc->mru_node);
312 }
313 list_dl_insert_node(&fc->file_pool->mru, &fc->mru_node);
314 }
315 }
316
317 static void
file_common_mru_to_last(file_common_t fc)318 file_common_mru_to_last(file_common_t fc)
319 {
320 file_common_t last_one = (file_common_t)list_dl_peek_last(&fc->file_pool->mru);
321 if(last_one != fc)
322 {
323 if(fc->mru_node.next != NULL)
324 {
325 list_dl_remove_node(&fc->file_pool->mru, &fc->mru_node);
326 }
327 list_dl_append_node(&fc->file_pool->mru, &fc->mru_node);
328 }
329 }
330
331 static void
file_common_mru_unlink(file_common_t fc)332 file_common_mru_unlink(file_common_t fc)
333 {
334 //file_common_t first_one = (file_common_t)list_dl_peek_first(&fc->file_pool->mru);
335 //if(first_one != fc)
336 {
337 list_dl_remove_node(&fc->file_pool->mru, &fc->mru_node);
338 }
339 }
340
341 static bool
file_common_operating(file_common_t fc)342 file_common_operating(file_common_t fc)
343 {
344 return (fc->ioc > 0) || (fc->old); // if there are operations running or the file is old (and thus is only waiting for RC reaching 0 to close)
345 }
346
347 /**
348 * Close open FD above the given limit
349 *
350 * Locks must be (W,R)
351 *
352 * @param fp
353 * @param fd_max
354 */
355
356 static void
file_pool_close_unused_in_excess(file_pool_t fp,int fd_max)357 file_pool_close_unused_in_excess(file_pool_t fp, int fd_max)
358 {
359 if(fd_max < 1)
360 {
361 fd_max = 1;
362 }
363
364 while(list_dl_size(&fp->mru) > (u32)fd_max)
365 {
366 file_common_t fc = (file_common_t)list_dl_peek_last(&fp->mru);
367
368 if(!file_common_operating(fc))
369 {
370 #if FP_USE_ABSTRACT_FILES
371 if(fc->file != NULL)
372 {
373 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
374 file_close(fc->file);
375 fc->file = NULL;
376 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
377 }
378 #else
379 if(fc->fd >= 0)
380 {
381 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
382 close_ex(fc->fd);
383 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
384
385 fc->fd = -1;
386 }
387 #endif
388 file_common_mru_unlink(fc);
389
390 if(fc->rc == 0)
391 {
392 // nobody is pointing to this anymore
393
394 ptr_set_delete(&fp->name_to_file, fc->name);
395
396 file_common_destroy(fc);
397 }
398 }
399 else
400 {
401 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
402 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
403 break;
404 }
405 }
406 }
407
408 ya_result
file_pool_unlink_from_pool_and_filename(file_pool_t fp,const char * filename)409 file_pool_unlink_from_pool_and_filename(file_pool_t fp, const char *filename)
410 {
411 ya_result ret;
412
413 group_mutex_double_lock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
414
415 if(unlink(filename) >= 0)
416 {
417 ret = SUCCESS;
418
419 ptr_node *node;
420
421 if((node = ptr_set_find(&fp->name_to_file, filename)) != NULL)
422 {
423 // the common node exists
424 // get a new file referencing that node
425
426 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
427
428 file_common_t fc = (file_common_t)node->value;
429
430 ptr_set_delete(&fp->name_to_file, fc->name);
431 file_common_mru_unlink(fc);
432 fc->old = TRUE;
433
434 if(fc->rc == 0)
435 {
436 if(fc->file != NULL)
437 {
438 file_close(fc->file);
439 fc->file = NULL;
440 }
441
442 file_common_destroy(fc);
443 }
444
445 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
446 }
447 }
448 else
449 {
450 ret = ERRNO_ERROR;
451 }
452
453 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
454
455 return ret;
456 }
457
458 static void
file_common_acquire(file_common_t fc)459 file_common_acquire(file_common_t fc)
460 {
461 ++fc->rc;
462 }
463
464 static void
file_common_release(file_common_t fc)465 file_common_release(file_common_t fc)
466 {
467 --fc->rc;
468
469 if(fc->rc <= 0)
470 {
471 assert(fc->rc == 0);
472
473 // an old file has been unlinked
474 // it is not in the name_to_file set anymore
475 // it is not in the mru anymore
476
477 if(!fc->old)
478 {
479 file_common_mru_to_last(fc);
480 }
481 else
482 {
483 #if FP_USE_ABSTRACT_FILES
484 if(fc->file != NULL)
485 {
486 file_pool_t fp = fc->file_pool;
487
488 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
489 file_close(fc->file);
490 fc->file = NULL;
491 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
492 }
493 #else
494 if(fc->fd >= 0)
495 {
496 file_pool_t fp = fc->file_pool;
497
498 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
499 close_ex(fc->fd);
500 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
501 fc->fd = -1;
502 }
503 #endif
504 file_common_mru_unlink(fc);
505
506 file_common_destroy(fc);
507 }
508 }
509 }
510
511 static file_pool_file_t
file_newinstance(file_common_t common,int rdwr_flags)512 file_newinstance(file_common_t common, int rdwr_flags)
513 {
514 file_pool_file_t f;
515 ZALLOC_OBJECT_OR_DIE(f, struct file_pool_file_t_, FILEPOOL_TAG);
516 file_common_acquire(common);
517 #if DEBUG
518 f->magic = 0xf113B001;
519 #endif
520 f->common = common;
521 f->position = 0;
522 f->rdwr_flags = rdwr_flags;
523 return f;
524 }
525
526 static void
file_destroy(file_pool_file_t f)527 file_destroy(file_pool_file_t f)
528 {
529 if(f != NULL)
530 {
531 file_common_t fc = f->common;
532 ZEROMEMORY(f, sizeof(struct file_pool_file_t_));
533 ZFREE(f, struct file_pool_file_t_);
534
535 file_common_release(fc);
536 }
537 }
538
539 /**
540 *
541 *
542 * @param fp
543 * @param filename
544 * @param flags
545 * @param mode
546 * @return a handle or NULL (errno being set)
547 */
548
549 file_pool_file_t
file_pool_open_ex(file_pool_t fp,const char * filename,int flags,mode_t mode)550 file_pool_open_ex(file_pool_t fp, const char *filename, int flags, mode_t mode)
551 {
552 group_mutex_double_lock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
553
554 file_common_t fc;
555 file_pool_file_t f;
556
557 char absolute_filename[1024];
558
559 ya_result ret;
560 if(ISOK(ret = file_get_absolute_path(filename, absolute_filename, sizeof(absolute_filename))))
561 {
562 filename = absolute_filename;
563 }
564
565 ptr_node *node;
566
567 if((node = ptr_set_find(&fp->name_to_file, filename)) != NULL)
568 {
569 // the common node exists
570 // get a new file referencing that node
571
572 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
573
574 fc = (file_common_t)node->value;
575 f = file_newinstance(fc, flags);
576
577 file_common_mru_to_first(fc);
578
579 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
580 }
581 else
582 {
583 // the first opening is required to eventually return an error code
584
585 #if FP_USE_ABSTRACT_FILES
586 file_t file;
587
588 ret = filesystem_file_create_ex(&file, filename, O_RDWR|flags, mode);
589
590 if(ISOK(ret))
591 {
592 #if FP_USE_CACHED_FILES
593 buffered_file_init(&file, file, fp->file_cache); // only fails if one of the parameters is NULL
594 #endif
595 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
596
597 // close the exceeding fd(s)
598
599 file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
600
601 // we know the node does not exist
602
603 node = ptr_set_insert(&fp->name_to_file, (char*)filename); // cast as it will be fixed later
604 fc = file_common_newinstance(fp, filename, file);
605 #else
606 int fd = open_create_ex(filename, O_RDWR|flags, mode);
607
608 if(fd > 0)
609 {
610 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
611
612 // close the exceeding fd(s)
613
614 file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
615
616 // we know the node does not exist
617
618 node = ptr_set_insert(&fp->name_to_file, (char*)filename); // cast as it will be fixed later
619 fc = file_common_newinstance(fp, filename, fd);
620 #endif
621 node->value = fc;
622 node->key = fc->name;
623
624 f = file_newinstance(fc, flags);
625
626 file_common_mru_to_link(fc);
627
628 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
629 }
630 else
631 {
632 f = NULL;
633 }
634 }
635
636 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
637
638 return f;
639 }
640
641 file_pool_file_t
642 file_dup(file_pool_file_t file)
643 {
644 group_mutex_double_lock(&file->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
645
646 group_mutex_exchange_locks(&file->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
647
648 file_pool_file_t file_dup = file_newinstance(file->common, file->rdwr_flags);
649
650 file_common_mru_to_first(file->common);
651
652 group_mutex_exchange_locks(&file->common->file_pool->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
653
654 group_mutex_double_unlock(&file->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
655
656 return file_dup;
657 }
658
659 file_pool_file_t
660 file_pool_open(file_pool_t fp, const char *filename)
661 {
662 file_pool_file_t f = file_pool_open_ex(fp, filename, O_RDONLY|O_CLOEXEC, 0);
663 return f;
664 }
665
666 file_pool_file_t
667 file_pool_create(file_pool_t fp, const char *filename, mode_t mode)
668 {
669 file_pool_file_t f = file_pool_open_ex(fp, filename, O_RDWR|O_CREAT|O_CLOEXEC, mode);
670 return f;
671 }
672
673 file_pool_file_t
674 file_pool_create_excl(file_pool_t fp, const char *filename, mode_t mode)
675 {
676 file_pool_file_t f = file_pool_open_ex(fp, filename, O_RDWR|O_CREAT|O_CLOEXEC|O_EXCL, mode);
677 return f;
678 }
679
680 static void
681 file_common_release_fd(file_pool_file_t f)
682 {
683 file_common_t fc = f->common;
684 file_pool_t fp = fc->file_pool;
685
686 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
687
688 fc->position = f->position;
689
690 --fc->ioc;
691
692 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
693
694 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
695 }
696
697 static void
698 file_common_advance_position_release_fd(file_pool_file_t f, size_t offset)
699 {
700 file_common_t fc = f->common;
701 file_pool_t fp = fc->file_pool;
702
703 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
704
705 f->position += offset;
706 fc->position = f->position;
707
708 --fc->ioc;
709
710 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
711
712 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
713 }
714
715 #if FP_USE_ABSTRACT_FILES
716
717 static int
718 file_common_acquire_fd(file_pool_file_t f, file_t *filep)
719 {
720 #if DEBUG
721 yassert(f->magic == 0xf113B001);
722 #endif
723
724 // prevent changes, take dibs on making changes
725
726 group_mutex_double_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
727
728 file_common_t fc = f->common;
729 file_pool_t fp = fc->file_pool;
730
731 file_t file;
732 ya_result ret = SUCCESS;
733
734 if((file = fc->file) == NULL)
735 {
736 // need to get a file
737
738 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
739
740 file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
741
742 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
743
744 ret = filesystem_file_open_ex(&file, fc->name, O_RDWR|O_CLOEXEC);
745
746 if(FAIL(ret))
747 {
748 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
749
750 return ret;
751 }
752
753 #if FP_USE_CACHED_FILES
754 buffered_file_init(&file, file, fp->file_cache); // only fails if one of the parameters is NULL
755 #endif
756 fc->file = file;
757 fc->position = 0;
758 }
759
760 if(fc->position != f->position)
761 {
762 file_seek(file, f->position, SEEK_SET);
763 }
764
765 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
766 file_common_mru_to_first(fc);
767 ++fc->ioc;
768 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
769
770 // successful acquisition will be unlocked at release
771
772 *filep = file;
773
774 return ret;
775 }
776
777 ya_result
778 file_pool_read(file_pool_file_t f, void *buffer, size_t bytes)
779 {
780 file_t file;
781 int ret = file_common_acquire_fd(f, &file);
782
783 if(FAIL(ret))
784 {
785 return ret;
786 }
787
788 // read
789
790 ssize_t n = file_read(file, buffer, bytes);
791
792 if(n >= 0)
793 {
794 file_common_advance_position_release_fd(f, n);
795 }
796 else
797 {
798 file_common_release_fd(f);
799 }
800
801 return (ya_result)n;
802 }
803
804 ya_result
805 file_pool_readfully(file_pool_file_t f, void *buffer, size_t bytes)
806 {
807 ya_result ret = file_pool_read(f, buffer, bytes);
808 return ret;
809 }
810
811 ya_result
812 file_pool_write(file_pool_file_t f, const void *buffer, size_t bytes)
813 {
814 file_t file;
815 int ret = file_common_acquire_fd(f, &file);
816
817 if(FAIL(ret))
818 {
819 return ret;
820 }
821
822 // read
823
824 ssize_t n = file_write(file, buffer, bytes);
825
826 if(n >= 0)
827 {
828 file_common_advance_position_release_fd(f, n);
829 }
830 else
831 {
832 file_common_release_fd(f);
833 }
834
835 return (ya_result)n;
836 }
837
838 ya_result
839 file_pool_writefully(file_pool_file_t f, const void *buffer, size_t bytes)
840 {
841 ya_result ret = file_pool_write(f, buffer, bytes);
842 return ret;
843 }
844
845 ya_result
846 file_pool_flush(file_pool_file_t f)
847 {
848 file_t file;
849 int ret = file_common_acquire_fd(f, &file);
850
851 if(FAIL(ret))
852 {
853 return ret;
854 }
855
856 // flush
857
858 ret = file_flush(file);
859
860 file_common_release_fd(f);
861
862 return ret;
863 }
864
865 ya_result
866 file_pool_get_size(file_pool_file_t f, size_t *sizep)
867 {
868 if(sizep != NULL)
869 {
870 file_t file;
871 int ret = file_common_acquire_fd(f, &file);
872
873 if(FAIL(ret))
874 {
875 *sizep = 0;
876 return ret;
877 }
878
879 *sizep = (size_t)file_size(file);
880
881 file_common_release_fd(f);
882
883 return ret;
884 }
885 else
886 {
887 return UNEXPECTED_NULL_ARGUMENT_ERROR;
888 }
889 }
890
891 ya_result
892 file_pool_resize(file_pool_file_t f, size_t size)
893 {
894 file_t file;
895 int ret = file_common_acquire_fd(f, &file);
896
897 if(FAIL(ret))
898 {
899 return ret;
900 }
901
902 // truncate
903
904 ret = file_resize(file, size);
905
906 file_common_release_fd(f);
907
908 return ret;
909 }
910
911 #else
912 static int
913 file_common_acquire_fd(file_pool_file_t f)
914 {
915 #if DEBUG
916 yassert(f->magic == 0xf113B001);
917 #endif
918
919 // prevent changes, take dibs on making changes
920
921 group_mutex_double_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
922
923 file_common_t fc = f->common;
924 file_pool_t fp = fc->file_pool;
925
926 int fd;
927
928 if((fd = fc->fd) < 0)
929 {
930 // need to get an fd
931
932 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
933
934 file_pool_close_unused_in_excess(fp, fp->opened_file_count_max - 1);
935
936 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
937
938 if((fd = open_ex(fc->name, O_RDWR|O_CLOEXEC)) < 0)
939 {
940 fd = ERRNO_ERROR;
941
942 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
943
944 return fd;
945 }
946
947 fc->fd = fd;
948 fc->position = 0;
949 }
950
951 if(fc->position != f->position)
952 {
953 lseek(fd, f->position, SEEK_SET);
954 }
955
956 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
957 file_common_mru_to_first(fc);
958 ++fc->ioc;
959 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
960
961 // successful acquisition will be unlocked at release
962
963 return fd;
964 }
965
966 ya_result
967 file_pool_read(file_pool_file_t f, void *buffer, size_t bytes)
968 {
969 int fd = file_common_acquire_fd(f);
970
971 if(fd < 0)
972 {
973 return fd;
974 }
975
976 // read
977
978 for(;;)
979 {
980 ssize_t n = read(fd, buffer, bytes);
981
982 if(n >= 0)
983 {
984 file_common_advance_position_release_fd(f, n);
985 return n;
986 }
987 else
988 {
989 int err = errno;
990
991 if(err == EINTR)
992 {
993 continue;
994 }
995
996 file_common_release_fd(f);
997
998 return err;
999 }
1000 }
1001 }
1002
1003 ya_result
1004 file_pool_readfully(file_pool_file_t f, void *buffer, size_t bytes)
1005 {
1006 int fd = file_common_acquire_fd(f);
1007
1008 if(fd < 0)
1009 {
1010 return fd;
1011 }
1012
1013 // read
1014
1015 size_t total = 0;
1016
1017 for(;;)
1018 {
1019 ssize_t n = read(fd, buffer, bytes);
1020
1021 if(n >= 0)
1022 {
1023 bytes -= n;
1024 total += n;
1025
1026 if((n == 0) || (bytes == 0))
1027 {
1028 file_common_advance_position_release_fd(f, total);
1029 return total;
1030 }
1031 }
1032 else
1033 {
1034 int err = errno;
1035
1036 if(err == EINTR)
1037 {
1038 continue;
1039 }
1040
1041 if(err == EAGAIN)
1042 {
1043 continue;
1044 }
1045
1046 file_common_advance_position_release_fd(f, total);
1047
1048 return err;
1049 }
1050 }
1051 }
1052
1053 ya_result
1054 file_pool_write(file_pool_file_t f, const void *buffer, size_t bytes)
1055 {
1056 int fd = file_common_acquire_fd(f);
1057
1058 if(fd < 0)
1059 {
1060 return fd;
1061 }
1062
1063 // write
1064
1065 for(;;)
1066 {
1067 ssize_t n = write(fd, buffer, bytes);
1068
1069 if(n >= 0)
1070 {
1071 file_common_advance_position_release_fd(f, n);
1072 return n;
1073 }
1074 else
1075 {
1076 int err = errno;
1077
1078 if(err == EAGAIN)
1079 {
1080 continue;
1081 }
1082
1083 file_common_release_fd(f);
1084
1085 return err;
1086 }
1087 }
1088 }
1089
1090 ya_result
1091 file_pool_writefully(file_pool_file_t f, const void *buffer, size_t bytes)
1092 {
1093 int fd = file_common_acquire_fd(f);
1094
1095 if(fd < 0)
1096 {
1097 return fd;
1098 }
1099
1100 // write
1101
1102 size_t total = 0;
1103
1104 for(;;)
1105 {
1106 ssize_t n = write(fd, buffer, bytes);
1107
1108 if(n >= 0)
1109 {
1110 bytes -= n;
1111 total += n;
1112
1113 if((n == 0) || (bytes == 0))
1114 {
1115 file_common_advance_position_release_fd(f, total);
1116 return total;
1117 }
1118 }
1119 else
1120 {
1121 int err = errno;
1122
1123 if(err == EINTR)
1124 {
1125 continue;
1126 }
1127
1128 if(err == EAGAIN)
1129 {
1130 continue;
1131 }
1132
1133 file_common_advance_position_release_fd(f, total);
1134
1135 return err;
1136 }
1137 }
1138 }
1139
1140 ya_result
1141 file_pool_flush(file_pool_file_t f)
1142 {
1143 int fd = file_common_acquire_fd(f);
1144
1145 if(fd < 0)
1146 {
1147 return fd;
1148 }
1149
1150 // flush
1151
1152 ya_result ret = SUCCESS;
1153
1154 if(fdatasync_ex(fd) < 0)
1155 {
1156 ret = ERRNO_ERROR;
1157 }
1158
1159 file_common_release_fd(f);
1160
1161 return ret;
1162 }
1163
1164 ya_result
1165 file_pool_get_size(file_pool_file_t f, size_t *sizep)
1166 {
1167 if(sizep != NULL)
1168 {
1169 int fd = file_common_acquire_fd(f);
1170
1171 if(fd < 0)
1172 {
1173 return fd;
1174 }
1175
1176 // truncate
1177
1178 ya_result ret = SUCCESS;
1179
1180 struct stat st;
1181
1182 if(fstat(fd, &st) >= 0)
1183 {
1184 *sizep = st.st_size;
1185 }
1186 else
1187 {
1188 ret = ERRNO_ERROR;
1189 }
1190
1191 file_common_release_fd(f);
1192
1193 return ret;
1194 }
1195 else
1196 {
1197 return ERROR;
1198 }
1199 }
1200
1201 ya_result
1202 file_pool_resize(file_pool_file_t f, size_t size)
1203 {
1204 int fd = file_common_acquire_fd(f);
1205
1206 if(fd < 0)
1207 {
1208 return fd;
1209 }
1210
1211 // truncate
1212
1213 ya_result ret = SUCCESS;
1214 if(ftruncate_ex(fd, size) < 0)
1215 {
1216 ret = ERRNO_ERROR;
1217 }
1218
1219 file_common_release_fd(f);
1220
1221 return ret;
1222 }
1223
1224 #endif
1225
1226 ssize_t
1227 file_pool_seek(file_pool_file_t f, ssize_t position, int from)
1228 {
1229 ssize_t ret;
1230 group_mutex_lock(&f->common->file_pool->mtx, GROUP_MUTEX_WRITE);
1231
1232 switch(from)
1233 {
1234 case SEEK_SET:
1235 {
1236 f->position = position;
1237 ret = (ssize_t)f->position;
1238 break;
1239 }
1240 case SEEK_CUR:
1241 {
1242 ssize_t p = f->position + position;
1243 if(p >= 0)
1244 {
1245 f->position = p;
1246 }
1247 else
1248 {
1249 f->position = 0;
1250 }
1251 ret = (ssize_t)f->position;
1252 break;
1253 }
1254 case SEEK_END:
1255 {
1256 size_t size;
1257 if(ISOK(ret = file_pool_get_size(f, &size))) // can only fail if a NULL is given)
1258 {
1259 ssize_t p = size + position;
1260
1261 if(p >= 0)
1262 {
1263 f->position = p;
1264 }
1265 else
1266 {
1267 f->position = 0;
1268 }
1269 ret = (ssize_t)f->position;
1270 }
1271
1272 break;
1273 }
1274 default:
1275 {
1276 ret = INVALID_ARGUMENT_ERROR;
1277 break;
1278 }
1279 }
1280
1281 group_mutex_unlock(&f->common->file_pool->mtx, GROUP_MUTEX_WRITE);
1282
1283 return ret;
1284 }
1285
1286 ya_result
1287 file_pool_tell(file_pool_file_t f, size_t *positionp)
1288 {
1289 if(positionp != NULL)
1290 {
1291 group_mutex_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ);
1292 *positionp = f->position;
1293 group_mutex_unlock(&f->common->file_pool->mtx, GROUP_MUTEX_READ);
1294
1295 return SUCCESS;
1296 }
1297 else
1298 {
1299 return ERROR;
1300 }
1301 }
1302
1303 // flushes, but only closes the file when fd are needed
1304
1305 ya_result
1306 file_pool_close(file_pool_file_t f)
1307 {
1308 group_mutex_double_lock(&f->common->file_pool->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
1309
1310 file_common_t fc = f->common;
1311 file_pool_t fp = fc->file_pool;
1312
1313 #if FP_USE_ABSTRACT_FILES
1314 if(fc->file != NULL)
1315 {
1316 file_flush(fc->file);
1317 }
1318 #else
1319 if(fc->fd >= 0)
1320 {
1321 fdatasync_ex(fc->fd);
1322 }
1323 #endif
1324
1325 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
1326 file_destroy(f);
1327 group_mutex_exchange_locks(&fp->mtx, GROUP_MUTEX_WRITE, GROUP_MUTEX_READ);
1328
1329 group_mutex_double_unlock(&fp->mtx, GROUP_MUTEX_READ, GROUP_MUTEX_WRITE);
1330
1331 return SUCCESS;
1332 }
1333
1334 ya_result
1335 file_pool_unlink(file_pool_file_t f)
1336 {
1337 ya_result ret;
1338
1339 if(f != NULL)
1340 {
1341 ret = INVALID_ARGUMENT_ERROR;
1342
1343 if(f->common != NULL)
1344 {
1345 if(f->common->file_pool != NULL)
1346 {
1347 ret = file_pool_unlink_from_pool_and_filename(f->common->file_pool, f->common->name);
1348 }
1349 }
1350 }
1351 else
1352 {
1353 ret = UNEXPECTED_NULL_ARGUMENT_ERROR;
1354 }
1355
1356 return ret;
1357 }
1358
1359 const char *
1360 file_pool_filename(const file_pool_file_t f)
1361 {
1362 if(f != NULL)
1363 {
1364 const file_common_t fc = f->common;
1365 return fc->name;
1366 }
1367 else
1368 {
1369 return "NULL";
1370 }
1371 }
1372
1373 static ya_result
1374 file_pool_file_output_stream_write(output_stream* os, const u8* buffer, u32 len)
1375 {
1376 file_pool_file_t f = (file_pool_file_t)os->data;
1377 ya_result ret = file_pool_write(f, buffer, len);
1378 return ret;
1379 }
1380
1381 static ya_result
1382 file_pool_file_output_stream_writefully(output_stream* os, const u8* buffer, u32 len)
1383 {
1384 file_pool_file_t f = (file_pool_file_t)os->data;
1385 ya_result ret = file_pool_writefully(f, buffer, len);
1386 return ret;
1387 }
1388
1389 static ya_result
1390 file_pool_file_output_stream_flush(output_stream* os)
1391 {
1392 file_pool_file_t f = (file_pool_file_t)os->data;
1393 ya_result ret = INVALID_STATE_ERROR;
1394 if(f != NULL)
1395 {
1396 ret = file_pool_flush(f);
1397 }
1398 return ret;
1399 }
1400
1401 static void
1402 file_pool_file_output_stream_close(output_stream* os)
1403 {
1404 file_pool_file_t f = (file_pool_file_t)os->data;
1405 if(f != NULL)
1406 {
1407 file_pool_close(f);
1408 }
1409 }
1410
1411 static const output_stream_vtbl file_pool_file_output_stream_vtbl =
1412 {
1413 file_pool_file_output_stream_write,
1414 file_pool_file_output_stream_flush,
1415 file_pool_file_output_stream_close,
1416 "file_pool_file_output_stream",
1417 };
1418
1419 static const output_stream_vtbl file_pool_file_full_output_stream_vtbl =
1420 {
1421 file_pool_file_output_stream_writefully,
1422 file_pool_file_output_stream_flush,
1423 file_pool_file_output_stream_close,
1424 "file_pool_file_output_stream",
1425 };
1426
1427 void
1428 file_pool_file_output_stream_init(output_stream *os, file_pool_file_t f)
1429 {
1430 os->data = f;
1431 os->vtbl = &file_pool_file_output_stream_vtbl;
1432 }
1433
1434 void
1435 file_pool_file_output_stream_set_full_writes(output_stream *os, bool full_writes)
1436 {
1437 if(full_writes)
1438 {
1439 os->vtbl = &file_pool_file_full_output_stream_vtbl;
1440 }
1441 else
1442 {
1443 os->vtbl = &file_pool_file_output_stream_vtbl;
1444 }
1445 }
1446
1447 void
1448 file_pool_file_output_stream_detach(output_stream *os)
1449 {
1450 assert((os->vtbl == &file_pool_file_output_stream_vtbl) || (os->vtbl == &file_pool_file_full_output_stream_vtbl));
1451 os->data = NULL;
1452 }
1453
1454 static ya_result
1455 file_pool_file_input_stream_read(input_stream* os, void* buffer, u32 len)
1456 {
1457 file_pool_file_t f = (file_pool_file_t)os->data;
1458 ya_result ret = file_pool_read(f, buffer, len);
1459 return ret;
1460 }
1461
1462 static ya_result
1463 file_pool_file_input_stream_readfully(input_stream* os, void* buffer, u32 len)
1464 {
1465 file_pool_file_t f = (file_pool_file_t)os->data;
1466 ya_result ret = file_pool_readfully(f, buffer, len);
1467 return ret;
1468 }
1469
1470 static ya_result
1471 file_pool_file_input_stream_skip(input_stream* os, u32 len)
1472 {
1473 file_pool_file_t f = (file_pool_file_t)os->data;
1474 ya_result ret = file_pool_seek(f, len, SEEK_CUR);
1475 return ret;
1476 }
1477
1478 static void
1479 file_pool_file_input_stream_close(input_stream* os)
1480 {
1481 file_pool_file_t f = (file_pool_file_t)os->data;
1482 if(f != NULL)
1483 {
1484 file_pool_close(f);
1485 }
1486 }
1487
1488 static const input_stream_vtbl file_pool_file_input_stream_vtbl =
1489 {
1490 file_pool_file_input_stream_read,
1491 file_pool_file_input_stream_skip,
1492 file_pool_file_input_stream_close,
1493 "file_pool_file_input_stream"
1494 };
1495
1496 static const input_stream_vtbl file_pool_file_full_input_stream_vtbl =
1497 {
1498 file_pool_file_input_stream_readfully,
1499 file_pool_file_input_stream_skip,
1500 file_pool_file_input_stream_close,
1501 "file_pool_file_input_stream"
1502 };
1503
1504 void
1505 file_pool_file_input_stream_init(input_stream *is, file_pool_file_t f)
1506 {
1507 is->data = f;
1508 is->vtbl = &file_pool_file_input_stream_vtbl;
1509 }
1510
1511 void
1512 file_pool_file_input_stream_detach(input_stream *os)
1513 {
1514 assert((os->vtbl == &file_pool_file_input_stream_vtbl) || (os->vtbl == &file_pool_file_full_input_stream_vtbl));
1515 os->data = NULL;
1516 }
1517
1518 void
1519 file_pool_file_input_stream_set_full_reads(input_stream *is, bool full_writes)
1520 {
1521 if(full_writes)
1522 {
1523 is->vtbl = &file_pool_file_full_input_stream_vtbl;
1524 }
1525 else
1526 {
1527 is->vtbl = &file_pool_file_input_stream_vtbl;
1528 }
1529 }
1530
1531 /** @} */
1532