1 /*
2 ** Copyright (C) 2004-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 /* #define SKTHREAD_DEBUG_MUTEX */
10 #include <silk/silk.h>
11 
12 RCSIDENT("$SiLK: stream-cache.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
13 
14 #include <silk/redblack.h>
15 #include <silk/sklog.h>
16 #include <silk/sksite.h>
17 #include <silk/skvector.h>
18 #include <silk/utils.h>
19 #include "stream-cache.h"
20 
21 /* use TRACEMSG_LEVEL as our tracing variable */
22 #define TRACEMSG(lvl, msg) TRACEMSG_TO_TRACEMSGLVL(lvl, msg)
23 #include <silk/sktracemsg.h>
24 
25 
26 /* DEFINES AND TYPEDEFS */
27 
28 /*
29  *    Maximum value for the time stamp
30  */
31 #define MAX_TIME  (sktime_t)INT64_MAX
32 
33 
34 /* Message to print when fail to initialize mutex */
35 #define FMT_MUTEX_FAILURE \
36     "Could not initialize a mutex at %s:%d", __FILE__, __LINE__
37 
38 
39 /**
40  *    stream_cache_t contains a red-black tree of cache_entry_t objects.
41  *    The number of valid entries in the array is specified by
42  *    'size'.  The cache also contains a redblack tree that is used to
43  *    index the entries.  Finally, there is a mutex on the cache; the
44  *    mutex will be a pthread_rwlock_t mutex if read/write locks are
45  *    supported on this system.
46  */
47 struct stream_cache_st {
48     /* the redblack tree used for searching */
49     struct rbtree      *rbtree;
50     /* function called by skCacheLookupOrOpenAdd() to open a file that
51      * is not currently in the cache */
52     cache_open_fn_t     open_callback;
53     /* current number of open entries */
54     unsigned int        open_count;
55     /* maximum number of open entries the user specified */
56     unsigned int        max_open_count;
57     /* total number of entries (open and closed) */
58     unsigned int        total_count;
59     /* mutex for the cache */
60     RWMUTEX             mutex;
61 };
62 /* typedef struct stream_cache_st stream_cache_t; // stream-cache.h */
63 
64 
65 /**
66  *    The cache_entry_t contains information about an active file in
67  *    the stream cache.  The structure contains the file key, the
68  *    number of records in the file, and the open file handle.
69  *
70  *    Users of the stream-cache should view the cache_entry_t as
71  *    opaque.  Use the macros and functions to access its members.
72  */
73 struct cache_entry_st {
74     /** the key */
75     cache_key_t         key;
76     /** The mutex associated with this entry */
77     pthread_mutex_t     mutex;
78     /** the number of records written to the file since it was added
79      * to the cache */
80     uint64_t            total_rec_count;
81     /** the number of records in the file when it was opened */
82     uint64_t            opened_rec_count;
83     /** when this entry was last accessed */
84     sktime_t            last_accessed;
85     /** the name of the file */
86     const char         *filename;
87     /** the open file handle */
88     skstream_t         *stream;
89     /** the more recently accessed entry */
90     cache_entry_t      *more_recent;
91     /** the less recently accessed entry */
92     cache_entry_t      *less_recent;
93 };
94 /* typedef struct cache_entry_st cache_entry_t; // stream-cache.h */
95 
96 
97 /**
98  *    cache_file_iter_t is returned by skCacheFlush() and
99  *    skCacheCloseAll().
100  */
101 struct cache_file_iter_st {
102     sk_vector_t        *vector;
103     size_t              pos;
104 };
105 /* typedef struct cache_file_iter_st cache_file_iter_t; // stream-cache.h */
106 
107 
108 /**
109  *    cache_file_t contains information about a file that exists in
110  *    the stream cache.  The structure contains the file's key, name,
111  *    and the number of records written to the file since it was added
112  *    to the cache or since the most recent call to skCacheFlush().
113  *
114  *    A vector of these structures may be returned by skCacheFlush()
115  *    or skCacheCloseAll().
116  */
117 struct cache_file_st {
118     /* the key for this closed file */
119     cache_key_t         key;
120     /* the number of records in the file as of opening or the most
121      * recent flush, used for log messages */
122     uint64_t            rec_count;
123     /* the name of the file */
124     const char         *filename;
125 };
126 typedef struct cache_file_st cache_file_t;
127 
128 
129 /* FUNCTION DEFINITIONS */
130 
131 /**
132  *    Close the stream that 'entry' wraps and destroy the stream.  In
133  *    addition, update the entry's 'total_rec_count'.
134  *
135  *    This function expects the caller to have the entry's mutex.
136  *
137  *    The entry's stream must be open.
138  *
139  *    Return the result of calling skStreamClose().  Log an error
140  *    message if skStreamClose() fails.
141  */
142 static int
cacheEntryClose(cache_entry_t * entry)143 cacheEntryClose(
144     cache_entry_t      *entry)
145 {
146     uint64_t new_count;
147     int rv;
148 
149     assert(entry);
150     assert(entry->stream);
151     ASSERT_MUTEX_LOCKED(&entry->mutex);
152 
153     TRACEMSG(2, ("cache: Closing file '%s'", entry->filename));
154 
155     /* update the record count */
156     new_count = skStreamGetRecordCount(entry->stream);
157     assert(entry->opened_rec_count <= new_count);
158     entry->total_rec_count += new_count - entry->opened_rec_count;
159 
160     /* close the stream */
161     rv = skStreamClose(entry->stream);
162     if (rv) {
163         skStreamPrintLastErr(entry->stream, rv, &NOTICEMSG);
164     }
165     skStreamDestroy(&entry->stream);
166 
167     return rv;
168 }
169 
170 /*
171  *  direction = cacheEntryCompare(a, b, config);
172  *
173  *    The comparison function used by the redblack tree.
174  */
175 static int
cacheEntryCompare(const void * entry1_v,const void * entry2_v,const void UNUSED (* config))176 cacheEntryCompare(
177     const void         *entry1_v,
178     const void         *entry2_v,
179     const void  UNUSED(*config))
180 {
181     cache_key_t *key1 = &((cache_entry_t *)entry1_v)->key;
182     cache_key_t *key2 = &((cache_entry_t *)entry2_v)->key;
183 
184     if (key1->sensor_id != key2->sensor_id) {
185         return ((key1->sensor_id < key2->sensor_id) ? -1 : 1);
186     }
187     if (key1->flowtype_id != key2->flowtype_id) {
188         return ((key1->flowtype_id < key2->flowtype_id) ? -1 : 1);
189     }
190     if (key1->time_stamp < key2->time_stamp) {
191         return -1;
192     }
193     return (key1->time_stamp > key2->time_stamp);
194 }
195 
196 
197 /**
198  *    Close the stream associated with the cache_entry_t 'entry' if it
199  *    is open and destroy the 'entry'.  Does not remove 'entry' from
200  *    the red-black tree.  This function assumes the caller holds the
201  *    entry's mutex.  This is a no-op if 'entry' is NULL.
202  *
203  *    Return the result of skStreamClose() or 0 if stream was already
204  *    closed.
205  */
206 static int
cacheEntryDestroy(cache_entry_t * entry)207 cacheEntryDestroy(
208     cache_entry_t      *entry)
209 {
210     int rv = 0;
211 
212     if (entry) {
213         ASSERT_MUTEX_LOCKED(&entry->mutex);
214 
215         if (entry->stream) {
216             rv = cacheEntryClose(entry);
217         }
218         free((void*)entry->filename);
219         MUTEX_UNLOCK(&entry->mutex);
220         MUTEX_DESTROY(&entry->mutex);
221         free(entry);
222     }
223     return rv;
224 }
225 
226 
227 /**
228  *    Return the interator entry at 'pos' or return NULL if 'pos' is
229  *    out of range.
230  */
231 static cache_file_t *
cacheFileIterAt(cache_file_iter_t * iter,size_t pos)232 cacheFileIterAt(
233     cache_file_iter_t  *iter,
234     size_t              pos)
235 {
236     assert(iter);
237     assert(iter->vector);
238     return (cache_file_t *)skVectorGetValuePointer(iter->vector, pos);
239 }
240 
241 
242 /* lock cache, then close and destroy all streams.  unlock cache. */
243 int
skCacheCloseAll(stream_cache_t * cache,cache_file_iter_t ** file_iter)244 skCacheCloseAll(
245     stream_cache_t     *cache,
246     cache_file_iter_t **file_iter)
247 {
248     sk_vector_t *vector;
249     struct rbtree *closed_tree;
250     RBLIST *iter;
251     cache_file_t closed;
252     cache_entry_t *entry;
253     int retval = 0;
254     int rv;
255 
256     assert(cache);
257 
258     if (NULL == file_iter) {
259         vector = NULL;
260     } else {
261         *file_iter = (cache_file_iter_t *)calloc(1, sizeof(cache_file_iter_t));
262         vector = skVectorNew(sizeof(cache_file_t));
263         if (*file_iter && vector) {
264             (*file_iter)->vector = vector;
265         } else {
266             skAppPrintOutOfMemory(NULL);
267             skVectorDestroy(vector);
268             free(*file_iter);
269             *file_iter = NULL;
270             vector = NULL;
271         }
272     }
273 
274     TRACEMSG(1, ("cache: Closing cache: %u total, %u open, %u closed...",
275                  cache->total_count, cache->open_count,
276                  cache->total_count - cache->open_count));
277 
278     WRITE_LOCK(&cache->mutex);
279     if (0 == cache->total_count) {
280         RW_MUTEX_UNLOCK(&cache->mutex);
281         return 0;
282     }
283 
284     TRACEMSG(2, ("cache: Closing cache: Closing files..."));
285 
286     /* close all open streams */
287     iter = rbopenlist(cache->rbtree);
288     while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
289         MUTEX_LOCK(&entry->mutex);
290         if (entry->stream) {
291             rv = cacheEntryClose(entry);
292             if (rv) {
293                 retval = -1;
294             }
295         }
296         MUTEX_UNLOCK(&entry->mutex);
297     }
298     rbcloselist(iter);
299 
300     TRACEMSG(2, ("cache: Closing cache: Creating new rbtree..."));
301 
302     /* get a handle to the existing red-black tree and create a new
303      * one on the cache. */
304     closed_tree = cache->rbtree;
305     cache->rbtree = rbinit(&cacheEntryCompare, NULL);
306     if (cache->rbtree == NULL) {
307         skAppPrintOutOfMemory(NULL);
308         RW_MUTEX_UNLOCK(&cache->mutex);
309         skAbort();
310     }
311     cache->open_count = 0;
312     cache->total_count = 0;
313 
314     /* release the mutex */
315     RW_MUTEX_UNLOCK(&cache->mutex);
316 
317     if (NULL == vector) {
318         /* destroy all the entries */
319         TRACEMSG(2, ("cache: Closing cache: Destroying entries..."));
320         iter = rbopenlist(closed_tree);
321         while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
322             MUTEX_LOCK(&entry->mutex);
323             assert(NULL == entry->stream);
324             cacheEntryDestroy(entry);
325         }
326         rbcloselist(iter);
327     } else {
328         /* move all entries that have a record count from the rbtree
329          * into the vector */
330         TRACEMSG(2, ("cache: Closing cache: Filling iterator..."));
331         iter = rbopenlist(closed_tree);
332         while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
333             MUTEX_LOCK(&entry->mutex);
334             assert(NULL == entry->stream);
335             if (entry->total_rec_count) {
336                 closed.key = entry->key;
337                 closed.rec_count = entry->total_rec_count;
338                 closed.filename = entry->filename;
339                 entry->filename = NULL;
340                 if (skVectorAppendValue(vector, &closed)) {
341                     skAppPrintOutOfMemory(NULL);
342                     free((void *)closed.filename);
343                 }
344             }
345             cacheEntryDestroy(entry);
346         }
347         rbcloselist(iter);
348     }
349 
350     /* done with the tree */
351     rbdestroy(closed_tree);
352 
353     TRACEMSG(1, ("cache: Closing cache: Done."));
354 
355     return retval;
356 }
357 
358 
359 /* create a cache with the specified size and open callback function */
360 stream_cache_t *
skCacheCreate(size_t max_size,cache_open_fn_t open_fn)361 skCacheCreate(
362     size_t              max_size,
363     cache_open_fn_t     open_fn)
364 {
365     stream_cache_t *cache = NULL;
366 
367     /* verify input */
368     if (max_size < STREAM_CACHE_MINIMUM_SIZE) {
369         CRITMSG(("Illegal maximum size (%" SK_PRIuZ ") for stream cache;"
370                  " must use value >= %u"),
371                 max_size, STREAM_CACHE_MINIMUM_SIZE);
372         return NULL;
373     }
374     if (NULL == open_fn) {
375         CRITMSG("No open function provided to stream cache");
376         return NULL;
377     }
378 
379     cache = (stream_cache_t *)calloc(1, sizeof(stream_cache_t));
380     if (cache == NULL) {
381         skAppPrintOutOfMemory(NULL);
382         return NULL;
383     }
384 
385     if (RW_MUTEX_INIT(&cache->mutex)) {
386         CRITMSG(FMT_MUTEX_FAILURE);
387         free(cache);
388         return NULL;
389     }
390 
391     cache->rbtree = rbinit(&cacheEntryCompare, NULL);
392     if (cache->rbtree == NULL) {
393         skAppPrintOutOfMemory(NULL);
394         RW_MUTEX_DESTROY(&cache->mutex);
395         free(cache);
396         return NULL;
397     }
398 
399     cache->max_open_count = max_size;
400     cache->open_callback = open_fn;
401 
402     return cache;
403 }
404 
405 
406 /* close all streams, destroy them, and destroy the cache */
407 int
skCacheDestroy(stream_cache_t * cache)408 skCacheDestroy(
409     stream_cache_t     *cache)
410 {
411     int retval;
412 
413     if (NULL == cache) {
414         TRACEMSG(1, ("cache: Tried to destroy unitialized stream cache"));
415         return 0;
416     }
417 
418     TRACEMSG(1, ("cache: Destroying cache: %u total, %u open, %u closed...",
419                  cache->total_count, cache->open_count,
420                  cache->total_count - cache->open_count));
421 
422     /* close any open files */
423     retval = skCacheCloseAll(cache, NULL);
424 
425     /* destroy the redblack tree */
426     rbdestroy(cache->rbtree);
427 
428     RW_MUTEX_UNLOCK(&cache->mutex);
429     RW_MUTEX_DESTROY(&cache->mutex);
430 
431     /* Free the structure itself */
432     free(cache);
433 
434     TRACEMSG(1, ("cache: Destroying cache: Done."));
435 
436     return retval;
437 }
438 
439 
440 /* return the stream member of an entry */
441 skstream_t *
skCacheEntryGetStream(const cache_entry_t * entry)442 skCacheEntryGetStream(
443     const cache_entry_t    *entry)
444 {
445     assert(entry);
446     ASSERT_MUTEX_LOCKED(&((cache_entry_t *)entry)->mutex);
447     return entry->stream;
448 }
449 
450 
451 /* unlock the entry */
452 void
skCacheEntryRelease(cache_entry_t * entry)453 skCacheEntryRelease(
454     cache_entry_t  *entry)
455 {
456     assert(entry);
457     ASSERT_MUTEX_LOCKED(&entry->mutex);
458     MUTEX_UNLOCK(&entry->mutex);
459 }
460 
461 
462 /* return the number of files in the iterator */
463 size_t
skCacheFileIterCountEntries(const cache_file_iter_t * iter)464 skCacheFileIterCountEntries(
465     const cache_file_iter_t    *iter)
466 {
467     assert(iter);
468     return skVectorGetCount(iter->vector);
469 }
470 
471 /* destory the iterator and all the files it contains */
472 void
skCacheFileIterDestroy(cache_file_iter_t * iter)473 skCacheFileIterDestroy(
474     cache_file_iter_t  *iter)
475 {
476     cache_file_t *file;
477     size_t i;
478 
479     if (iter) {
480         for (i = 0; (file = (cache_file_t *)cacheFileIterAt(iter, i)); ++i) {
481             free((void *)file->filename);
482         }
483         skVectorDestroy(iter->vector);
484         free(iter);
485     }
486 }
487 
488 
489 /* get the next filename and record-count from the iterator */
490 int
skCacheFileIterNext(cache_file_iter_t * iter,const char ** filename,uint64_t * record_count)491 skCacheFileIterNext(
492     cache_file_iter_t  *iter,
493     const char        **filename,
494     uint64_t           *record_count)
495 {
496     cache_file_t *file;
497 
498     assert(iter);
499     assert(filename);
500     assert(record_count);
501 
502     file = cacheFileIterAt(iter, iter->pos);
503     if (NULL == file) {
504         assert(skVectorGetCount(iter->vector) == iter->pos);
505         return SK_ITERATOR_NO_MORE_ENTRIES;
506     }
507     ++iter->pos;
508     *filename = file->filename;
509     *record_count = file->rec_count;
510     return SK_ITERATOR_OK;
511 }
512 
513 
514 
515 /* flush all streams in the cache */
516 int
skCacheFlush(stream_cache_t * cache,cache_file_iter_t ** file_iter)517 skCacheFlush(
518     stream_cache_t     *cache,
519     cache_file_iter_t **file_iter)
520 {
521 #if TRACEMSG_LEVEL >= 3
522     char tstamp[SKTIMESTAMP_STRLEN];
523 #endif
524     sktime_t inactive_time;
525     cache_entry_t *entry;
526     cache_entry_t *del_entry;
527     uint64_t old_count;
528     sk_vector_t *vector;
529     RBLIST *iter;
530     cache_file_t flushed;
531     int retval = 0;
532     int rv;
533 
534     assert(cache);
535     assert(file_iter);
536 
537     *file_iter = (cache_file_iter_t *)calloc(1, sizeof(cache_file_iter_t));
538     vector = skVectorNew(sizeof(cache_file_t));
539     if (NULL == *file_iter || NULL == vector) {
540         skAppPrintOutOfMemory(NULL);
541         skVectorDestroy(vector);
542         free(*file_iter);
543         *file_iter = NULL;
544         return -1;
545     }
546     (*file_iter)->vector = vector;
547 
548     if (NULL == cache) {
549         TRACEMSG(1, ("cache: Tried to flush unitialized stream cache."));
550         return 0;
551     }
552 
553     WRITE_LOCK(&cache->mutex);
554 
555     /* compute the time for determining the inactive files */
556     inactive_time = sktimeNow() - STREAM_CACHE_INACTIVE_TIMEOUT;
557 
558     TRACEMSG(1, ("cache: Flushing cache: %u total, %u open, %u closed...",
559                  cache->total_count, cache->open_count,
560                  cache->total_count - cache->open_count));
561     TRACEMSG(3, ("cache: Flushing cache: Closing files inactive since %s...",
562                  sktimestamp_r(tstamp, inactive_time, 0)));
563 
564     /* entry to delete from rbtree; delete it after moving to the next
565      * entry in the tree */
566     del_entry = NULL;
567 
568     iter = rbopenlist(cache->rbtree);
569     while ((entry = (cache_entry_t *)rbreadlist(iter)) != NULL) {
570         if (del_entry) {
571             rbdelete(del_entry, cache->rbtree);
572             cacheEntryDestroy(del_entry);
573             --cache->total_count;
574             del_entry = NULL;
575         }
576         MUTEX_LOCK(&entry->mutex);
577         if (entry->stream && (entry->last_accessed > inactive_time)) {
578             /* file is still active; flush it */
579             rv = skStreamFlush(entry->stream);
580             if (rv) {
581                 skStreamPrintLastErr(entry->stream, rv, &NOTICEMSG);
582                 retval = -1;
583             }
584             old_count = entry->opened_rec_count;
585             entry->opened_rec_count = skStreamGetRecordCount(entry->stream);
586             assert(old_count <= entry->opened_rec_count);
587             entry->total_rec_count += entry->opened_rec_count - old_count;
588             if (entry->total_rec_count) {
589                 /* append an entry to vector; copy the filename */
590                 flushed.filename = strdup(entry->filename);
591                 if (!flushed.filename) {
592                     skAppPrintOutOfMemory(NULL);
593                 } else {
594                     flushed.key = entry->key;
595                     flushed.rec_count = entry->total_rec_count;
596                     entry->total_rec_count = 0;
597                     if (skVectorAppendValue(vector, &flushed)) {
598                         skAppPrintOutOfMemory(NULL);
599                         free((void *)flushed.filename);
600                     }
601                 }
602             }
603             MUTEX_UNLOCK(&entry->mutex);
604         } else {
605             /* stream is inactive or closed; delete the entry */
606             del_entry = entry;
607             if (entry->stream) {
608                 TRACEMSG(3, ("cache: Flushing cache:"
609                              " Closing inactive file %s; last_accessed %s",
610                              entry->filename,
611                              sktimestamp_r(tstamp, entry->last_accessed, 0)));
612                 rv = cacheEntryClose(entry);
613                 if (rv) {
614                     retval = -1;
615                 }
616                 --cache->open_count;
617             }
618             if (entry->total_rec_count) {
619                 /* append an entry to the vector; steal the filename
620                  * since the entry is being destroyed */
621                 flushed.key = entry->key;
622                 flushed.rec_count = entry->total_rec_count;
623                 flushed.filename = entry->filename;
624                 entry->filename = NULL;
625                 if (skVectorAppendValue(vector, &flushed)) {
626                     skAppPrintOutOfMemory(NULL);
627                     free((void *)flushed.filename);
628                 }
629             }
630         }
631     }
632     rbcloselist(iter);
633 
634     if (del_entry) {
635         rbdelete(del_entry, cache->rbtree);
636         cacheEntryDestroy(del_entry);
637         --cache->total_count;
638     }
639 
640     TRACEMSG(1, ("cache: Flushing cache. %u total, %u open. Done.",
641                  cache->total_count, cache->open_count));
642 
643     RW_MUTEX_UNLOCK(&cache->mutex);
644 
645     return retval;
646 }
647 
648 
649 /* find an entry in the cache.  if not present, use the open-callback
650  * function to open/create the stream and then add it. */
651 int
skCacheLookupOrOpenAdd(stream_cache_t * cache,const cache_key_t * key,void * caller_data,cache_entry_t ** out_entry)652 skCacheLookupOrOpenAdd(
653     stream_cache_t     *cache,
654     const cache_key_t  *key,
655     void               *caller_data,
656     cache_entry_t     **out_entry)
657 {
658 #ifdef SK_HAVE_PTHREAD_RWLOCK
659     int have_writelock = 0;
660 #endif
661     cache_entry_t search_key;
662     cache_entry_t *e;
663     cache_entry_t *entry;
664     int retval = 0;
665     int rv;
666 #if TRACEMSG_LEVEL >= 3
667     char tstamp[SKTIMESTAMP_STRLEN];
668     char sensor[SK_MAX_STRLEN_SENSOR+1];
669     char flowtype[SK_MAX_STRLEN_FLOWTYPE+1];
670 
671     sktimestamp_r(tstamp, key->time_stamp, SKTIMESTAMP_NOMSEC);
672     sksiteSensorGetName(sensor, sizeof(sensor), key->sensor_id);
673     sksiteFlowtypeGetName(flowtype, sizeof(flowtype), key->flowtype_id);
674 #endif /* TRACEMSG_LEVEL */
675 
676     search_key.key.time_stamp = key->time_stamp;
677     search_key.key.sensor_id = key->sensor_id;
678     search_key.key.flowtype_id = key->flowtype_id;
679 
680     /* do a lookup holding only the read lock; if there is no support
681      * for read-write locks, the entire cache is locked. */
682     READ_LOCK(&cache->mutex);
683 
684   LOOKUP:
685     /* try to find the entry */
686     entry = (cache_entry_t *)rbfind(&search_key, cache->rbtree);
687     TRACEMSG(3, ("cache: Lookup: %s for stream %s %s %s",
688                  ((entry) ? "hit" : "miss"), tstamp, sensor, flowtype));
689 
690     /* if we find it and the stream is open, return it */
691     if (entry && entry->stream) {
692         MUTEX_LOCK(&entry->mutex);
693         TRACEMSG(2, ("cache: Lookup: found open stream '%s'",entry->filename));
694         entry->last_accessed = sktimeNow();
695         *out_entry = entry;
696         retval = 0;
697         goto END;
698     }
699 
700 #ifdef SK_HAVE_PTHREAD_RWLOCK
701     if (!have_writelock) {
702         have_writelock = 1;
703         /*
704          *  we need to either add or reopen the stream.  We want to get a
705          *  write lock on the cache, but first we must release the read
706          *  lock on the cache.
707          *
708          *  skip all of these steps if there is no support for read-write
709          *  locks, since the entire cache is already locked.
710          */
711         RW_MUTEX_UNLOCK(&cache->mutex);
712         WRITE_LOCK(&cache->mutex);
713 
714         /* search for the entry again, in case it was added or opened
715          * between releasing the read lock on the cache and getting
716          * the write lock on the cache */
717         goto LOOKUP;
718     }
719 #endif /* SK_HAVE_PTHREAD_RWLOCK */
720 
721     *out_entry = NULL;
722     retval = -1;
723 
724     if (entry) {
725         MUTEX_LOCK(&entry->mutex);
726         /* use the callback to open the file */
727         entry->stream = cache->open_callback(key, caller_data, entry->filename);
728         if (NULL == entry->stream) {
729             MUTEX_UNLOCK(&entry->mutex);
730             goto END;
731         }
732         if (strcmp(entry->filename, skStreamGetPathname(entry->stream))) {
733             DEBUGMSG("Pathname changed");
734             free((void *)entry->filename);
735             entry->filename = strdup(skStreamGetPathname(entry->stream));
736             if (NULL == entry->filename) {
737                 skAppPrintOutOfMemory(NULL);
738                 MUTEX_UNLOCK(&entry->mutex);
739                 goto END;
740             }
741         }
742         ++cache->open_count;
743         TRACEMSG(1, ("cache: Lookup: Opened known file '%s'", entry->filename));
744 
745     } else {
746         /* create a new entry */
747         entry = (cache_entry_t *)calloc(1, sizeof(cache_entry_t));
748         if (NULL == entry) {
749             skAppPrintOutOfMemory(NULL);
750             goto END;
751         }
752         if (MUTEX_INIT(&entry->mutex)) {
753             CRITMSG(FMT_MUTEX_FAILURE);
754             free(entry);
755             goto END;
756         }
757         MUTEX_LOCK(&entry->mutex);
758         /* use the callback to open the file */
759         entry->stream = cache->open_callback(key, caller_data, NULL);
760         if (NULL == entry->stream) {
761             cacheEntryDestroy(entry);
762             goto END;
763         }
764         entry->filename = strdup(skStreamGetPathname(entry->stream));
765         if (NULL == entry->filename) {
766             skAppPrintOutOfMemory(NULL);
767             cacheEntryDestroy(entry);
768             goto END;
769         }
770 
771         entry->key.time_stamp = key->time_stamp;
772         entry->key.sensor_id = key->sensor_id;
773         entry->key.flowtype_id = key->flowtype_id;
774         entry->total_rec_count = 0;
775         entry->last_accessed = MAX_TIME;
776 
777         /* add the entry to the redblack tree */
778         e = (cache_entry_t *)rbsearch(entry, cache->rbtree);
779         if (e != entry) {
780             if (e == NULL) {
781                 skAppPrintOutOfMemory(NULL);
782                 cacheEntryDestroy(entry);
783                 goto END;
784             }
785             CRITMSG(("Duplicate entries in stream cache "
786                      "for time=%" PRId64 " sensor=%d flowtype=%d"),
787                     key->time_stamp, key->sensor_id, key->flowtype_id);
788             skAbort();
789         }
790 
791         ++cache->total_count;
792         ++cache->open_count;
793 
794         TRACEMSG(1, ("cache: Lookup: Opened new file '%s'", entry->filename));
795     }
796 
797     retval = 0;
798 
799     TRACEMSG(2, ("cache: Lookup: %u total, %u open, %u max, %u closed",
800                  cache->total_count, cache->open_count, cache->max_open_count,
801                  cache->total_count - cache->open_count));
802 
803     if (cache->open_count > cache->max_open_count) {
804         /* The cache is full: close the least recently used stream.
805          * This uses a linear search over all entries. */
806         RBLIST *iter;
807         cache_entry_t *min_entry;
808         sktime_t min_time;
809 
810         min_entry = NULL;
811         min_time = MAX_TIME;
812 
813         /* unlock the entry's mutex to avoid a deadlock */
814         MUTEX_UNLOCK(&entry->mutex);
815 
816         /* visit entries in the red-black tree; this only finds open
817          * entries since closed entries have their time set to
818          * MAX_TIME. */
819         iter = rbopenlist(cache->rbtree);
820         while ((e = (cache_entry_t *)rbreadlist(iter)) != NULL) {
821             MUTEX_LOCK(&e->mutex);
822             if (e->last_accessed < min_time) {
823                 min_time = e->last_accessed;
824                 min_entry = e;
825             }
826             MUTEX_UNLOCK(&e->mutex);
827         }
828         rbcloselist(iter);
829 
830         assert(min_time < MAX_TIME);
831         assert(min_entry != NULL);
832         assert(min_entry != entry);
833         assert(min_entry->stream != NULL);
834 
835         MUTEX_LOCK(&min_entry->mutex);
836         rv = cacheEntryClose(min_entry);
837         if (rv) {
838             retval = -1;
839         }
840         min_entry->last_accessed = MAX_TIME;
841         MUTEX_UNLOCK(&min_entry->mutex);
842 
843         --cache->open_count;
844 
845         /* re-lock the entry's mutex */
846         MUTEX_LOCK(&entry->mutex);
847     }
848 
849     /* update access time and record count */
850     entry->last_accessed = sktimeNow();
851     entry->opened_rec_count = skStreamGetRecordCount(entry->stream);
852     *out_entry = entry;
853 
854   END:
855     RW_MUTEX_UNLOCK(&cache->mutex);
856     return retval;
857 }
858 
859 
860 /*
861 ** Local Variables:
862 ** mode:c
863 ** indent-tabs-mode:nil
864 ** c-basic-offset:4
865 ** End:
866 */
867