1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #if VRNA_WITH_PTHREADS
10 # include <pthread.h>
11 #endif
12 
13 #include "ViennaRNA/utils/basic.h"
14 #include "ViennaRNA/datastructures/stream_output.h"
15 
16 #ifdef __GNUC__
17 # define INLINE inline
18 #else
19 # define INLINE
20 #endif
21 
22 #define QUEUE_OVERHEAD  32
23 
24 
25 struct vrna_ordered_stream_s {
26   unsigned int                start;      /* first element index in queue, i.e. start of queue */
27   unsigned int                end;        /* last element index in queue */
28   unsigned int                size;       /* available memory size for 'data' and 'provided' */
29   unsigned int                shift;      /* pointer offset for 'data' and 'provided' */
30 
31   vrna_callback_stream_output *output;    /* callback to execute if consecutive elements from head are available */
32   void                        **data;     /* actual data passed to the callback */
33   unsigned char               *provided;  /* for simplicity we use unsigned char instead of single bits per element */
34   void                        *auxdata;   /* auxiliary data passed to the callback */
35 #if VRNA_WITH_PTHREADS
36   pthread_mutex_t             mtx;        /* semaphore to provide concurrent access */
37 #endif
38 };
39 
40 
41 PRIVATE INLINE void
flush_output(struct vrna_ordered_stream_s * queue)42 flush_output(struct vrna_ordered_stream_s *queue)
43 {
44   unsigned int j;
45 
46   /* flush all consecutive blocks available from the start of queue */
47 
48   /* 1. process output callback */
49   if (queue->output)
50     for (j = queue->start; (j <= queue->end) && (queue->provided[j]); j++)
51       queue->output(queue->auxdata, j, queue->data[j]);
52 
53   /* 2. move start of queue */
54   while ((queue->start <= queue->end) && (queue->provided[queue->start]))
55     queue->start++;
56 
57   /* 3. set empty queue condition if necessary */
58   if (queue->end < queue->start) {
59     queue->provided[queue->start] = 0;
60     queue->end                    = queue->start;
61   }
62 }
63 
64 
65 PUBLIC struct vrna_ordered_stream_s *
vrna_ostream_init(vrna_callback_stream_output * output,void * auxdata)66 vrna_ostream_init(vrna_callback_stream_output *output,
67                   void                        *auxdata)
68 {
69   struct vrna_ordered_stream_s *queue;
70 
71   queue = (struct vrna_ordered_stream_s *)vrna_alloc(sizeof(struct vrna_ordered_stream_s));
72 
73   queue->start    = 0;
74   queue->end      = 0;
75   queue->size     = QUEUE_OVERHEAD;
76   queue->shift    = 0;
77   queue->output   = output;
78   queue->auxdata  = auxdata;
79   queue->data     = (void **)vrna_alloc(sizeof(void *) * QUEUE_OVERHEAD);
80   queue->provided = (unsigned char *)vrna_alloc(sizeof(unsigned char) * QUEUE_OVERHEAD);
81 
82 #if VRNA_WITH_PTHREADS
83   pthread_mutex_init(&queue->mtx, NULL);
84 #endif
85 
86   return queue;
87 }
88 
89 
90 PUBLIC void
vrna_ostream_free(struct vrna_ordered_stream_s * queue)91 vrna_ostream_free(struct vrna_ordered_stream_s *queue)
92 {
93   if (queue) {
94 #if VRNA_WITH_PTHREADS
95     pthread_mutex_lock(&queue->mtx);
96 #endif
97 
98     flush_output(queue);
99 
100 #if VRNA_WITH_PTHREADS
101     pthread_mutex_unlock(&queue->mtx);
102 #endif
103 
104     /* free remaining memory */
105     queue->data     += queue->shift;
106     queue->provided += queue->shift;
107     free(queue->data);
108     free(queue->provided);
109 
110     /* free ostream itself */
111     free(queue);
112   }
113 }
114 
115 
116 PUBLIC int
vrna_ostream_threadsafe(void)117 vrna_ostream_threadsafe(void)
118 {
119 #if VRNA_WITH_PTHREADS
120   return 1;
121 #else
122   return 0;
123 #endif
124 }
125 
126 
127 PUBLIC void
vrna_ostream_request(struct vrna_ordered_stream_s * queue,unsigned int num)128 vrna_ostream_request(struct vrna_ordered_stream_s *queue,
129                      unsigned int                 num)
130 {
131   unsigned int i;
132 
133   if (queue) {
134 #if VRNA_WITH_PTHREADS
135     pthread_mutex_lock(&queue->mtx);
136 #endif
137     if (num >= queue->end) {
138       /* check whether we have to increase memory */
139       unsigned int new_size = num - queue->shift + 1;
140 
141       if (queue->size < new_size + 1) {
142         /*
143          *  Check whether we can simply move data around to get more space.
144          *  We do this only, if more than half of the first buffer is empty.
145          *  Otherwise, we simply increase the buffer to fit incoming data.
146          */
147         unsigned int mem_unavail = queue->start - queue->shift;
148         if ((mem_unavail > (queue->size / 2)) &&
149             (new_size - mem_unavail < queue->size + 1)) {
150           /* reset pointer shift */
151           queue->data     += queue->shift;
152           queue->provided += queue->shift;
153 
154           /* move remaining data to the front */
155           queue->data = memmove(queue->data,
156                                 queue->data + mem_unavail,
157                                 sizeof(void *) * (queue->end - queue->start + 1));
158 
159           /* move provider bitmask to the front */
160           queue->provided = memmove(queue->provided,
161                                     queue->provided + mem_unavail,
162                                     sizeof(unsigned char) * (queue->end - queue->start + 1));
163 
164           /* reset start and pointer shifts */
165           queue->shift    = queue->start;
166           queue->data     -= queue->start;
167           queue->provided -= queue->start;
168         } else {
169           /* increase stream buffer size */
170           new_size += QUEUE_OVERHEAD;
171           /* reset pointer shift */
172           queue->data     += queue->shift;
173           queue->provided += queue->shift;
174 
175           /* reallocate memory blocks */
176           queue->data     = (void **)vrna_realloc(queue->data, sizeof(void *) * new_size);
177           queue->provided =
178             (unsigned char *)vrna_realloc(queue->provided, sizeof(void *) * new_size);
179           queue->size = new_size;
180 
181           /* restore pointer shift */
182           queue->data     -= queue->shift;
183           queue->provided -= queue->shift;
184         }
185       }
186 
187       for (i = queue->end + 1; i <= num; i++)
188         queue->provided[i] = 0;
189 
190       queue->end = num;
191     }
192 
193 #if VRNA_WITH_PTHREADS
194     pthread_mutex_unlock(&queue->mtx);
195 #endif
196   }
197 }
198 
199 
200 PUBLIC void
vrna_ostream_provide(struct vrna_ordered_stream_s * queue,unsigned int i,void * data)201 vrna_ostream_provide(struct vrna_ordered_stream_s *queue,
202                      unsigned int                 i,
203                      void                         *data)
204 {
205   if (queue) {
206 #if VRNA_WITH_PTHREADS
207     pthread_mutex_lock(&queue->mtx);
208 #endif
209     if ((queue->end < i) || (i < queue->start)) {
210       vrna_message_warning(
211         "vrna_ostream_provide(): data position (%d) out of range [%d:%d]!",
212         i,
213         queue->start,
214         queue->end);
215       return;
216     }
217 
218     /* store data */
219     queue->data[i]      = data;
220     queue->provided[i]  = 1;
221 
222     /* process all consecutive blocks available from the start */
223     if (i == queue->start)
224       flush_output(queue);
225 
226 #if VRNA_WITH_PTHREADS
227     pthread_mutex_unlock(&queue->mtx);
228 #endif
229   }
230 }
231