1 /*
2 ** Copyright (C) 2004-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8
9 /* #define SKTHREAD_DEBUG_MUTEX */
10 #include <silk/silk.h>
11
12 RCSIDENT("$SiLK: stream-cache.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
13
14 #include <silk/redblack.h>
15 #include <silk/sklog.h>
16 #include <silk/sksite.h>
17 #include <silk/skvector.h>
18 #include <silk/utils.h>
19 #include "stream-cache.h"
20
21 /* use TRACEMSG_LEVEL as our tracing variable */
22 #define TRACEMSG(lvl, msg) TRACEMSG_TO_TRACEMSGLVL(lvl, msg)
23 #include <silk/sktracemsg.h>
24
25
26 /* DEFINES AND TYPEDEFS */
27
28 /*
29 * Maximum value for the time stamp
30 */
31 #define MAX_TIME (sktime_t)INT64_MAX
32
33
34 /* Message to print when fail to initialize mutex */
35 #define FMT_MUTEX_FAILURE \
36 "Could not initialize a mutex at %s:%d", __FILE__, __LINE__
37
38
39 /**
40 * stream_cache_t contains a red-black tree of cache_entry_t objects.
41 * The number of valid entries in the array is specified by
42 * 'size'. The cache also contains a redblack tree that is used to
43 * index the entries. Finally, there is a mutex on the cache; the
44 * mutex will be a pthread_rwlock_t mutex if read/write locks are
45 * supported on this system.
46 */
47 struct stream_cache_st {
48 /* the redblack tree used for searching */
49 struct rbtree *rbtree;
50 /* function called by skCacheLookupOrOpenAdd() to open a file that
51 * is not currently in the cache */
52 cache_open_fn_t open_callback;
53 /* current number of open entries */
54 unsigned int open_count;
55 /* maximum number of open entries the user specified */
56 unsigned int max_open_count;
57 /* total number of entries (open and closed) */
58 unsigned int total_count;
59 /* mutex for the cache */
60 RWMUTEX mutex;
61 };
62 /* typedef struct stream_cache_st stream_cache_t; // stream-cache.h */
63
64
65 /**
66 * The cache_entry_t contains information about an active file in
67 * the stream cache. The structure contains the file key, the
68 * number of records in the file, and the open file handle.
69 *
70 * Users of the stream-cache should view the cache_entry_t as
71 * opaque. Use the macros and functions to access its members.
72 */
73 struct cache_entry_st {
74 /** the key */
75 cache_key_t key;
76 /** The mutex associated with this entry */
77 pthread_mutex_t mutex;
78 /** the number of records written to the file since it was added
79 * to the cache */
80 uint64_t total_rec_count;
81 /** the number of records in the file when it was opened */
82 uint64_t opened_rec_count;
83 /** when this entry was last accessed */
84 sktime_t last_accessed;
85 /** the name of the file */
86 const char *filename;
87 /** the open file handle */
88 skstream_t *stream;
89 /** the more recently accessed entry */
90 cache_entry_t *more_recent;
91 /** the less recently accessed entry */
92 cache_entry_t *less_recent;
93 };
94 /* typedef struct cache_entry_st cache_entry_t; // stream-cache.h */
95
96
97 /**
98 * cache_file_iter_t is returned by skCacheFlush() and
99 * skCacheCloseAll().
100 */
101 struct cache_file_iter_st {
102 sk_vector_t *vector;
103 size_t pos;
104 };
105 /* typedef struct cache_file_iter_st cache_file_iter_t; // stream-cache.h */
106
107
108 /**
109 * cache_file_t contains information about a file that exists in
110 * the stream cache. The structure contains the file's key, name,
111 * and the number of records written to the file since it was added
112 * to the cache or since the most recent call to skCacheFlush().
113 *
114 * A vector of these structures may be returned by skCacheFlush()
115 * or skCacheCloseAll().
116 */
117 struct cache_file_st {
118 /* the key for this closed file */
119 cache_key_t key;
120 /* the number of records in the file as of opening or the most
121 * recent flush, used for log messages */
122 uint64_t rec_count;
123 /* the name of the file */
124 const char *filename;
125 };
126 typedef struct cache_file_st cache_file_t;
127
128
129 /* FUNCTION DEFINITIONS */
130
131 /**
132 * Close the stream that 'entry' wraps and destroy the stream. In
133 * addition, update the entry's 'total_rec_count'.
134 *
135 * This function expects the caller to have the entry's mutex.
136 *
137 * The entry's stream must be open.
138 *
139 * Return the result of calling skStreamClose(). Log an error
140 * message if skStreamClose() fails.
141 */
142 static int
cacheEntryClose(cache_entry_t * entry)143 cacheEntryClose(
144 cache_entry_t *entry)
145 {
146 uint64_t new_count;
147 int rv;
148
149 assert(entry);
150 assert(entry->stream);
151 ASSERT_MUTEX_LOCKED(&entry->mutex);
152
153 TRACEMSG(2, ("cache: Closing file '%s'", entry->filename));
154
155 /* update the record count */
156 new_count = skStreamGetRecordCount(entry->stream);
157 assert(entry->opened_rec_count <= new_count);
158 entry->total_rec_count += new_count - entry->opened_rec_count;
159
160 /* close the stream */
161 rv = skStreamClose(entry->stream);
162 if (rv) {
163 skStreamPrintLastErr(entry->stream, rv, &NOTICEMSG);
164 }
165 skStreamDestroy(&entry->stream);
166
167 return rv;
168 }
169
170 /*
171 * direction = cacheEntryCompare(a, b, config);
172 *
173 * The comparison function used by the redblack tree.
174 */
175 static int
cacheEntryCompare(const void * entry1_v,const void * entry2_v,const void UNUSED (* config))176 cacheEntryCompare(
177 const void *entry1_v,
178 const void *entry2_v,
179 const void UNUSED(*config))
180 {
181 cache_key_t *key1 = &((cache_entry_t *)entry1_v)->key;
182 cache_key_t *key2 = &((cache_entry_t *)entry2_v)->key;
183
184 if (key1->sensor_id != key2->sensor_id) {
185 return ((key1->sensor_id < key2->sensor_id) ? -1 : 1);
186 }
187 if (key1->flowtype_id != key2->flowtype_id) {
188 return ((key1->flowtype_id < key2->flowtype_id) ? -1 : 1);
189 }
190 if (key1->time_stamp < key2->time_stamp) {
191 return -1;
192 }
193 return (key1->time_stamp > key2->time_stamp);
194 }
195
196
197 /**
198 * Close the stream associated with the cache_entry_t 'entry' if it
199 * is open and destroy the 'entry'. Does not remove 'entry' from
200 * the red-black tree. This function assumes the caller holds the
201 * entry's mutex. This is a no-op if 'entry' is NULL.
202 *
203 * Return the result of skStreamClose() or 0 if stream was already
204 * closed.
205 */
206 static int
cacheEntryDestroy(cache_entry_t * entry)207 cacheEntryDestroy(
208 cache_entry_t *entry)
209 {
210 int rv = 0;
211
212 if (entry) {
213 ASSERT_MUTEX_LOCKED(&entry->mutex);
214
215 if (entry->stream) {
216 rv = cacheEntryClose(entry);
217 }
218 free((void*)entry->filename);
219 MUTEX_UNLOCK(&entry->mutex);
220 MUTEX_DESTROY(&entry->mutex);
221 free(entry);
222 }
223 return rv;
224 }
225
226
227 /**
228 * Return the interator entry at 'pos' or return NULL if 'pos' is
229 * out of range.
230 */
231 static cache_file_t *
cacheFileIterAt(cache_file_iter_t * iter,size_t pos)232 cacheFileIterAt(
233 cache_file_iter_t *iter,
234 size_t pos)
235 {
236 assert(iter);
237 assert(iter->vector);
238 return (cache_file_t *)skVectorGetValuePointer(iter->vector, pos);
239 }
240
241
242 /* lock cache, then close and destroy all streams. unlock cache. */
243 int
skCacheCloseAll(stream_cache_t * cache,cache_file_iter_t ** file_iter)244 skCacheCloseAll(
245 stream_cache_t *cache,
246 cache_file_iter_t **file_iter)
247 {
248 sk_vector_t *vector;
249 struct rbtree *closed_tree;
250 RBLIST *iter;
251 cache_file_t closed;
252 cache_entry_t *entry;
253 int retval = 0;
254 int rv;
255
256 assert(cache);
257
258 if (NULL == file_iter) {
259 vector = NULL;
260 } else {
261 *file_iter = (cache_file_iter_t *)calloc(1, sizeof(cache_file_iter_t));
262 vector = skVectorNew(sizeof(cache_file_t));
263 if (*file_iter && vector) {
264 (*file_iter)->vector = vector;
265 } else {
266 skAppPrintOutOfMemory(NULL);
267 skVectorDestroy(vector);
268 free(*file_iter);
269 *file_iter = NULL;
270 vector = NULL;
271 }
272 }
273
274 TRACEMSG(1, ("cache: Closing cache: %u total, %u open, %u closed...",
275 cache->total_count, cache->open_count,
276 cache->total_count - cache->open_count));
277
278 WRITE_LOCK(&cache->mutex);
279 if (0 == cache->total_count) {
280 RW_MUTEX_UNLOCK(&cache->mutex);
281 return 0;
282 }
283
284 TRACEMSG(2, ("cache: Closing cache: Closing files..."));
285
286 /* close all open streams */
287 iter = rbopenlist(cache->rbtree);
288 while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
289 MUTEX_LOCK(&entry->mutex);
290 if (entry->stream) {
291 rv = cacheEntryClose(entry);
292 if (rv) {
293 retval = -1;
294 }
295 }
296 MUTEX_UNLOCK(&entry->mutex);
297 }
298 rbcloselist(iter);
299
300 TRACEMSG(2, ("cache: Closing cache: Creating new rbtree..."));
301
302 /* get a handle to the existing red-black tree and create a new
303 * one on the cache. */
304 closed_tree = cache->rbtree;
305 cache->rbtree = rbinit(&cacheEntryCompare, NULL);
306 if (cache->rbtree == NULL) {
307 skAppPrintOutOfMemory(NULL);
308 RW_MUTEX_UNLOCK(&cache->mutex);
309 skAbort();
310 }
311 cache->open_count = 0;
312 cache->total_count = 0;
313
314 /* release the mutex */
315 RW_MUTEX_UNLOCK(&cache->mutex);
316
317 if (NULL == vector) {
318 /* destroy all the entries */
319 TRACEMSG(2, ("cache: Closing cache: Destroying entries..."));
320 iter = rbopenlist(closed_tree);
321 while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
322 MUTEX_LOCK(&entry->mutex);
323 assert(NULL == entry->stream);
324 cacheEntryDestroy(entry);
325 }
326 rbcloselist(iter);
327 } else {
328 /* move all entries that have a record count from the rbtree
329 * into the vector */
330 TRACEMSG(2, ("cache: Closing cache: Filling iterator..."));
331 iter = rbopenlist(closed_tree);
332 while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
333 MUTEX_LOCK(&entry->mutex);
334 assert(NULL == entry->stream);
335 if (entry->total_rec_count) {
336 closed.key = entry->key;
337 closed.rec_count = entry->total_rec_count;
338 closed.filename = entry->filename;
339 entry->filename = NULL;
340 if (skVectorAppendValue(vector, &closed)) {
341 skAppPrintOutOfMemory(NULL);
342 free((void *)closed.filename);
343 }
344 }
345 cacheEntryDestroy(entry);
346 }
347 rbcloselist(iter);
348 }
349
350 /* done with the tree */
351 rbdestroy(closed_tree);
352
353 TRACEMSG(1, ("cache: Closing cache: Done."));
354
355 return retval;
356 }
357
358
359 /* create a cache with the specified size and open callback function */
360 stream_cache_t *
skCacheCreate(size_t max_size,cache_open_fn_t open_fn)361 skCacheCreate(
362 size_t max_size,
363 cache_open_fn_t open_fn)
364 {
365 stream_cache_t *cache = NULL;
366
367 /* verify input */
368 if (max_size < STREAM_CACHE_MINIMUM_SIZE) {
369 CRITMSG(("Illegal maximum size (%" SK_PRIuZ ") for stream cache;"
370 " must use value >= %u"),
371 max_size, STREAM_CACHE_MINIMUM_SIZE);
372 return NULL;
373 }
374 if (NULL == open_fn) {
375 CRITMSG("No open function provided to stream cache");
376 return NULL;
377 }
378
379 cache = (stream_cache_t *)calloc(1, sizeof(stream_cache_t));
380 if (cache == NULL) {
381 skAppPrintOutOfMemory(NULL);
382 return NULL;
383 }
384
385 if (RW_MUTEX_INIT(&cache->mutex)) {
386 CRITMSG(FMT_MUTEX_FAILURE);
387 free(cache);
388 return NULL;
389 }
390
391 cache->rbtree = rbinit(&cacheEntryCompare, NULL);
392 if (cache->rbtree == NULL) {
393 skAppPrintOutOfMemory(NULL);
394 RW_MUTEX_DESTROY(&cache->mutex);
395 free(cache);
396 return NULL;
397 }
398
399 cache->max_open_count = max_size;
400 cache->open_callback = open_fn;
401
402 return cache;
403 }
404
405
406 /* close all streams, destroy them, and destroy the cache */
407 int
skCacheDestroy(stream_cache_t * cache)408 skCacheDestroy(
409 stream_cache_t *cache)
410 {
411 int retval;
412
413 if (NULL == cache) {
414 TRACEMSG(1, ("cache: Tried to destroy unitialized stream cache"));
415 return 0;
416 }
417
418 TRACEMSG(1, ("cache: Destroying cache: %u total, %u open, %u closed...",
419 cache->total_count, cache->open_count,
420 cache->total_count - cache->open_count));
421
422 /* close any open files */
423 retval = skCacheCloseAll(cache, NULL);
424
425 /* destroy the redblack tree */
426 rbdestroy(cache->rbtree);
427
428 RW_MUTEX_UNLOCK(&cache->mutex);
429 RW_MUTEX_DESTROY(&cache->mutex);
430
431 /* Free the structure itself */
432 free(cache);
433
434 TRACEMSG(1, ("cache: Destroying cache: Done."));
435
436 return retval;
437 }
438
439
440 /* return the stream member of an entry */
441 skstream_t *
skCacheEntryGetStream(const cache_entry_t * entry)442 skCacheEntryGetStream(
443 const cache_entry_t *entry)
444 {
445 assert(entry);
446 ASSERT_MUTEX_LOCKED(&((cache_entry_t *)entry)->mutex);
447 return entry->stream;
448 }
449
450
451 /* unlock the entry */
452 void
skCacheEntryRelease(cache_entry_t * entry)453 skCacheEntryRelease(
454 cache_entry_t *entry)
455 {
456 assert(entry);
457 ASSERT_MUTEX_LOCKED(&entry->mutex);
458 MUTEX_UNLOCK(&entry->mutex);
459 }
460
461
462 /* return the number of files in the iterator */
463 size_t
skCacheFileIterCountEntries(const cache_file_iter_t * iter)464 skCacheFileIterCountEntries(
465 const cache_file_iter_t *iter)
466 {
467 assert(iter);
468 return skVectorGetCount(iter->vector);
469 }
470
471 /* destory the iterator and all the files it contains */
472 void
skCacheFileIterDestroy(cache_file_iter_t * iter)473 skCacheFileIterDestroy(
474 cache_file_iter_t *iter)
475 {
476 cache_file_t *file;
477 size_t i;
478
479 if (iter) {
480 for (i = 0; (file = (cache_file_t *)cacheFileIterAt(iter, i)); ++i) {
481 free((void *)file->filename);
482 }
483 skVectorDestroy(iter->vector);
484 free(iter);
485 }
486 }
487
488
489 /* get the next filename and record-count from the iterator */
490 int
skCacheFileIterNext(cache_file_iter_t * iter,const char ** filename,uint64_t * record_count)491 skCacheFileIterNext(
492 cache_file_iter_t *iter,
493 const char **filename,
494 uint64_t *record_count)
495 {
496 cache_file_t *file;
497
498 assert(iter);
499 assert(filename);
500 assert(record_count);
501
502 file = cacheFileIterAt(iter, iter->pos);
503 if (NULL == file) {
504 assert(skVectorGetCount(iter->vector) == iter->pos);
505 return SK_ITERATOR_NO_MORE_ENTRIES;
506 }
507 ++iter->pos;
508 *filename = file->filename;
509 *record_count = file->rec_count;
510 return SK_ITERATOR_OK;
511 }
512
513
514
515 /* flush all streams in the cache */
516 int
skCacheFlush(stream_cache_t * cache,cache_file_iter_t ** file_iter)517 skCacheFlush(
518 stream_cache_t *cache,
519 cache_file_iter_t **file_iter)
520 {
521 #if TRACEMSG_LEVEL >= 3
522 char tstamp[SKTIMESTAMP_STRLEN];
523 #endif
524 sktime_t inactive_time;
525 cache_entry_t *entry;
526 cache_entry_t *del_entry;
527 uint64_t old_count;
528 sk_vector_t *vector;
529 RBLIST *iter;
530 cache_file_t flushed;
531 int retval = 0;
532 int rv;
533
534 assert(cache);
535 assert(file_iter);
536
537 *file_iter = (cache_file_iter_t *)calloc(1, sizeof(cache_file_iter_t));
538 vector = skVectorNew(sizeof(cache_file_t));
539 if (NULL == *file_iter || NULL == vector) {
540 skAppPrintOutOfMemory(NULL);
541 skVectorDestroy(vector);
542 free(*file_iter);
543 *file_iter = NULL;
544 return -1;
545 }
546 (*file_iter)->vector = vector;
547
548 if (NULL == cache) {
549 TRACEMSG(1, ("cache: Tried to flush unitialized stream cache."));
550 return 0;
551 }
552
553 WRITE_LOCK(&cache->mutex);
554
555 /* compute the time for determining the inactive files */
556 inactive_time = sktimeNow() - STREAM_CACHE_INACTIVE_TIMEOUT;
557
558 TRACEMSG(1, ("cache: Flushing cache: %u total, %u open, %u closed...",
559 cache->total_count, cache->open_count,
560 cache->total_count - cache->open_count));
561 TRACEMSG(3, ("cache: Flushing cache: Closing files inactive since %s...",
562 sktimestamp_r(tstamp, inactive_time, 0)));
563
564 /* entry to delete from rbtree; delete it after moving to the next
565 * entry in the tree */
566 del_entry = NULL;
567
568 iter = rbopenlist(cache->rbtree);
569 while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
570 if (del_entry) {
571 rbdelete(del_entry, cache->rbtree);
572 cacheEntryDestroy(del_entry);
573 --cache->total_count;
574 del_entry = NULL;
575 }
576 MUTEX_LOCK(&entry->mutex);
577 if (entry->stream && (entry->last_accessed > inactive_time)) {
578 /* file is still active; flush it */
579 rv = skStreamFlush(entry->stream);
580 if (rv) {
581 skStreamPrintLastErr(entry->stream, rv, &NOTICEMSG);
582 retval = -1;
583 }
584 old_count = entry->opened_rec_count;
585 entry->opened_rec_count = skStreamGetRecordCount(entry->stream);
586 assert(old_count <= entry->opened_rec_count);
587 entry->total_rec_count += entry->opened_rec_count - old_count;
588 if (entry->total_rec_count) {
589 /* append an entry to vector; copy the filename */
590 flushed.filename = strdup(entry->filename);
591 if (!flushed.filename) {
592 skAppPrintOutOfMemory(NULL);
593 } else {
594 flushed.key = entry->key;
595 flushed.rec_count = entry->total_rec_count;
596 entry->total_rec_count = 0;
597 if (skVectorAppendValue(vector, &flushed)) {
598 skAppPrintOutOfMemory(NULL);
599 free((void *)flushed.filename);
600 }
601 }
602 }
603 MUTEX_UNLOCK(&entry->mutex);
604 } else {
605 /* stream is inactive or closed; delete the entry */
606 del_entry = entry;
607 if (entry->stream) {
608 TRACEMSG(3, ("cache: Flushing cache:"
609 " Closing inactive file %s; last_accessed %s",
610 entry->filename,
611 sktimestamp_r(tstamp, entry->last_accessed, 0)));
612 rv = cacheEntryClose(entry);
613 if (rv) {
614 retval = -1;
615 }
616 --cache->open_count;
617 }
618 if (entry->total_rec_count) {
619 /* append an entry to the vector; steal the filename
620 * since the entry is being destroyed */
621 flushed.key = entry->key;
622 flushed.rec_count = entry->total_rec_count;
623 flushed.filename = entry->filename;
624 entry->filename = NULL;
625 if (skVectorAppendValue(vector, &flushed)) {
626 skAppPrintOutOfMemory(NULL);
627 free((void *)flushed.filename);
628 }
629 }
630 }
631 }
632 rbcloselist(iter);
633
634 if (del_entry) {
635 rbdelete(del_entry, cache->rbtree);
636 cacheEntryDestroy(del_entry);
637 --cache->total_count;
638 }
639
640 TRACEMSG(1, ("cache: Flushing cache. %u total, %u open. Done.",
641 cache->total_count, cache->open_count));
642
643 RW_MUTEX_UNLOCK(&cache->mutex);
644
645 return retval;
646 }
647
648
649 /* find an entry in the cache. if not present, use the open-callback
650 * function to open/create the stream and then add it. */
651 int
skCacheLookupOrOpenAdd(stream_cache_t * cache,const cache_key_t * key,void * caller_data,cache_entry_t ** out_entry)652 skCacheLookupOrOpenAdd(
653 stream_cache_t *cache,
654 const cache_key_t *key,
655 void *caller_data,
656 cache_entry_t **out_entry)
657 {
658 #ifdef SK_HAVE_PTHREAD_RWLOCK
659 int have_writelock = 0;
660 #endif
661 cache_entry_t search_key;
662 cache_entry_t *e;
663 cache_entry_t *entry;
664 int retval = 0;
665 int rv;
666 #if TRACEMSG_LEVEL >= 3
667 char tstamp[SKTIMESTAMP_STRLEN];
668 char sensor[SK_MAX_STRLEN_SENSOR+1];
669 char flowtype[SK_MAX_STRLEN_FLOWTYPE+1];
670
671 sktimestamp_r(tstamp, key->time_stamp, SKTIMESTAMP_NOMSEC);
672 sksiteSensorGetName(sensor, sizeof(sensor), key->sensor_id);
673 sksiteFlowtypeGetName(flowtype, sizeof(flowtype), key->flowtype_id);
674 #endif /* TRACEMSG_LEVEL */
675
676 search_key.key.time_stamp = key->time_stamp;
677 search_key.key.sensor_id = key->sensor_id;
678 search_key.key.flowtype_id = key->flowtype_id;
679
680 /* do a lookup holding only the read lock; if there is no support
681 * for read-write locks, the entire cache is locked. */
682 READ_LOCK(&cache->mutex);
683
684 LOOKUP:
685 /* try to find the entry */
686 entry = (cache_entry_t *)rbfind(&search_key, cache->rbtree);
687 TRACEMSG(3, ("cache: Lookup: %s for stream %s %s %s",
688 ((entry) ? "hit" : "miss"), tstamp, sensor, flowtype));
689
690 /* if we find it and the stream is open, return it */
691 if (entry && entry->stream) {
692 MUTEX_LOCK(&entry->mutex);
693 TRACEMSG(2, ("cache: Lookup: found open stream '%s'",entry->filename));
694 entry->last_accessed = sktimeNow();
695 *out_entry = entry;
696 retval = 0;
697 goto END;
698 }
699
700 #ifdef SK_HAVE_PTHREAD_RWLOCK
701 if (!have_writelock) {
702 have_writelock = 1;
703 /*
704 * we need to either add or reopen the stream. We want to get a
705 * write lock on the cache, but first we must release the read
706 * lock on the cache.
707 *
708 * skip all of these steps if there is no support for read-write
709 * locks, since the entire cache is already locked.
710 */
711 RW_MUTEX_UNLOCK(&cache->mutex);
712 WRITE_LOCK(&cache->mutex);
713
714 /* search for the entry again, in case it was added or opened
715 * between releasing the read lock on the cache and getting
716 * the write lock on the cache */
717 goto LOOKUP;
718 }
719 #endif /* SK_HAVE_PTHREAD_RWLOCK */
720
721 *out_entry = NULL;
722 retval = -1;
723
724 if (entry) {
725 MUTEX_LOCK(&entry->mutex);
726 /* use the callback to open the file */
727 entry->stream = cache->open_callback(key, caller_data, entry->filename);
728 if (NULL == entry->stream) {
729 MUTEX_UNLOCK(&entry->mutex);
730 goto END;
731 }
732 if (strcmp(entry->filename, skStreamGetPathname(entry->stream))) {
733 DEBUGMSG("Pathname changed");
734 free((void *)entry->filename);
735 entry->filename = strdup(skStreamGetPathname(entry->stream));
736 if (NULL == entry->filename) {
737 skAppPrintOutOfMemory(NULL);
738 MUTEX_UNLOCK(&entry->mutex);
739 goto END;
740 }
741 }
742 ++cache->open_count;
743 TRACEMSG(1, ("cache: Lookup: Opened known file '%s'", entry->filename));
744
745 } else {
746 /* create a new entry */
747 entry = (cache_entry_t *)calloc(1, sizeof(cache_entry_t));
748 if (NULL == entry) {
749 skAppPrintOutOfMemory(NULL);
750 goto END;
751 }
752 if (MUTEX_INIT(&entry->mutex)) {
753 CRITMSG(FMT_MUTEX_FAILURE);
754 free(entry);
755 goto END;
756 }
757 MUTEX_LOCK(&entry->mutex);
758 /* use the callback to open the file */
759 entry->stream = cache->open_callback(key, caller_data, NULL);
760 if (NULL == entry->stream) {
761 cacheEntryDestroy(entry);
762 goto END;
763 }
764 entry->filename = strdup(skStreamGetPathname(entry->stream));
765 if (NULL == entry->filename) {
766 skAppPrintOutOfMemory(NULL);
767 cacheEntryDestroy(entry);
768 goto END;
769 }
770
771 entry->key.time_stamp = key->time_stamp;
772 entry->key.sensor_id = key->sensor_id;
773 entry->key.flowtype_id = key->flowtype_id;
774 entry->total_rec_count = 0;
775 entry->last_accessed = MAX_TIME;
776
777 /* add the entry to the redblack tree */
778 e = (cache_entry_t *)rbsearch(entry, cache->rbtree);
779 if (e != entry) {
780 if (e == NULL) {
781 skAppPrintOutOfMemory(NULL);
782 cacheEntryDestroy(entry);
783 goto END;
784 }
785 CRITMSG(("Duplicate entries in stream cache "
786 "for time=%" PRId64 " sensor=%d flowtype=%d"),
787 key->time_stamp, key->sensor_id, key->flowtype_id);
788 skAbort();
789 }
790
791 ++cache->total_count;
792 ++cache->open_count;
793
794 TRACEMSG(1, ("cache: Lookup: Opened new file '%s'", entry->filename));
795 }
796
797 retval = 0;
798
799 TRACEMSG(2, ("cache: Lookup: %u total, %u open, %u max, %u closed",
800 cache->total_count, cache->open_count, cache->max_open_count,
801 cache->total_count - cache->open_count));
802
803 if (cache->open_count > cache->max_open_count) {
804 /* The cache is full: close the least recently used stream.
805 * This uses a linear search over all entries. */
806 RBLIST *iter;
807 cache_entry_t *min_entry;
808 sktime_t min_time;
809
810 min_entry = NULL;
811 min_time = MAX_TIME;
812
813 /* unlock the entry's mutex to avoid a deadlock */
814 MUTEX_UNLOCK(&entry->mutex);
815
816 /* visit entries in the red-black tree; this only finds open
817 * entries since closed entries have their time set to
818 * MAX_TIME. */
819 iter = rbopenlist(cache->rbtree);
820 while ((e = (cache_entry_t *)rbreadlist(iter)) != NULL) {
821 MUTEX_LOCK(&e->mutex);
822 if (e->last_accessed < min_time) {
823 min_time = e->last_accessed;
824 min_entry = e;
825 }
826 MUTEX_UNLOCK(&e->mutex);
827 }
828 rbcloselist(iter);
829
830 assert(min_time < MAX_TIME);
831 assert(min_entry != NULL);
832 assert(min_entry != entry);
833 assert(min_entry->stream != NULL);
834
835 MUTEX_LOCK(&min_entry->mutex);
836 rv = cacheEntryClose(min_entry);
837 if (rv) {
838 retval = -1;
839 }
840 min_entry->last_accessed = MAX_TIME;
841 MUTEX_UNLOCK(&min_entry->mutex);
842
843 --cache->open_count;
844
845 /* re-lock the entry's mutex */
846 MUTEX_LOCK(&entry->mutex);
847 }
848
849 /* update access time and record count */
850 entry->last_accessed = sktimeNow();
851 entry->opened_rec_count = skStreamGetRecordCount(entry->stream);
852 *out_entry = entry;
853
854 END:
855 RW_MUTEX_UNLOCK(&cache->mutex);
856 return retval;
857 }
858
859
860 /*
861 ** Local Variables:
862 ** mode:c
863 ** indent-tabs-mode:nil
864 ** c-basic-offset:4
865 ** End:
866 */
867