1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8
9 #if VRNA_WITH_PTHREADS
10 # include <pthread.h>
11 #endif
12
13 #include "ViennaRNA/utils/basic.h"
14 #include "ViennaRNA/datastructures/stream_output.h"
15
16 #ifdef __GNUC__
17 # define INLINE inline
18 #else
19 # define INLINE
20 #endif
21
22 #define QUEUE_OVERHEAD 32
23
24
25 struct vrna_ordered_stream_s {
26 unsigned int start; /* first element index in queue, i.e. start of queue */
27 unsigned int end; /* last element index in queue */
28 unsigned int size; /* available memory size for 'data' and 'provided' */
29 unsigned int shift; /* pointer offset for 'data' and 'provided' */
30
31 vrna_callback_stream_output *output; /* callback to execute if consecutive elements from head are available */
32 void **data; /* actual data passed to the callback */
33 unsigned char *provided; /* for simplicity we use unsigned char instead of single bits per element */
34 void *auxdata; /* auxiliary data passed to the callback */
35 #if VRNA_WITH_PTHREADS
36 pthread_mutex_t mtx; /* semaphore to provide concurrent access */
37 #endif
38 };
39
40
41 PRIVATE INLINE void
flush_output(struct vrna_ordered_stream_s * queue)42 flush_output(struct vrna_ordered_stream_s *queue)
43 {
44 unsigned int j;
45
46 /* flush all consecutive blocks available from the start of queue */
47
48 /* 1. process output callback */
49 if (queue->output)
50 for (j = queue->start; (j <= queue->end) && (queue->provided[j]); j++)
51 queue->output(queue->auxdata, j, queue->data[j]);
52
53 /* 2. move start of queue */
54 while ((queue->start <= queue->end) && (queue->provided[queue->start]))
55 queue->start++;
56
57 /* 3. set empty queue condition if necessary */
58 if (queue->end < queue->start) {
59 queue->provided[queue->start] = 0;
60 queue->end = queue->start;
61 }
62 }
63
64
65 PUBLIC struct vrna_ordered_stream_s *
vrna_ostream_init(vrna_callback_stream_output * output,void * auxdata)66 vrna_ostream_init(vrna_callback_stream_output *output,
67 void *auxdata)
68 {
69 struct vrna_ordered_stream_s *queue;
70
71 queue = (struct vrna_ordered_stream_s *)vrna_alloc(sizeof(struct vrna_ordered_stream_s));
72
73 queue->start = 0;
74 queue->end = 0;
75 queue->size = QUEUE_OVERHEAD;
76 queue->shift = 0;
77 queue->output = output;
78 queue->auxdata = auxdata;
79 queue->data = (void **)vrna_alloc(sizeof(void *) * QUEUE_OVERHEAD);
80 queue->provided = (unsigned char *)vrna_alloc(sizeof(unsigned char) * QUEUE_OVERHEAD);
81
82 #if VRNA_WITH_PTHREADS
83 pthread_mutex_init(&queue->mtx, NULL);
84 #endif
85
86 return queue;
87 }
88
89
90 PUBLIC void
vrna_ostream_free(struct vrna_ordered_stream_s * queue)91 vrna_ostream_free(struct vrna_ordered_stream_s *queue)
92 {
93 if (queue) {
94 #if VRNA_WITH_PTHREADS
95 pthread_mutex_lock(&queue->mtx);
96 #endif
97
98 flush_output(queue);
99
100 #if VRNA_WITH_PTHREADS
101 pthread_mutex_unlock(&queue->mtx);
102 #endif
103
104 /* free remaining memory */
105 queue->data += queue->shift;
106 queue->provided += queue->shift;
107 free(queue->data);
108 free(queue->provided);
109
110 /* free ostream itself */
111 free(queue);
112 }
113 }
114
115
116 PUBLIC int
vrna_ostream_threadsafe(void)117 vrna_ostream_threadsafe(void)
118 {
119 #if VRNA_WITH_PTHREADS
120 return 1;
121 #else
122 return 0;
123 #endif
124 }
125
126
127 PUBLIC void
vrna_ostream_request(struct vrna_ordered_stream_s * queue,unsigned int num)128 vrna_ostream_request(struct vrna_ordered_stream_s *queue,
129 unsigned int num)
130 {
131 unsigned int i;
132
133 if (queue) {
134 #if VRNA_WITH_PTHREADS
135 pthread_mutex_lock(&queue->mtx);
136 #endif
137 if (num >= queue->end) {
138 /* check whether we have to increase memory */
139 unsigned int new_size = num - queue->shift + 1;
140
141 if (queue->size < new_size + 1) {
142 /*
143 * Check whether we can simply move data around to get more space.
144 * We do this only, if more than half of the first buffer is empty.
145 * Otherwise, we simply increase the buffer to fit incoming data.
146 */
147 unsigned int mem_unavail = queue->start - queue->shift;
148 if ((mem_unavail > (queue->size / 2)) &&
149 (new_size - mem_unavail < queue->size + 1)) {
150 /* reset pointer shift */
151 queue->data += queue->shift;
152 queue->provided += queue->shift;
153
154 /* move remaining data to the front */
155 queue->data = memmove(queue->data,
156 queue->data + mem_unavail,
157 sizeof(void *) * (queue->end - queue->start + 1));
158
159 /* move provider bitmask to the front */
160 queue->provided = memmove(queue->provided,
161 queue->provided + mem_unavail,
162 sizeof(unsigned char) * (queue->end - queue->start + 1));
163
164 /* reset start and pointer shifts */
165 queue->shift = queue->start;
166 queue->data -= queue->start;
167 queue->provided -= queue->start;
168 } else {
169 /* increase stream buffer size */
170 new_size += QUEUE_OVERHEAD;
171 /* reset pointer shift */
172 queue->data += queue->shift;
173 queue->provided += queue->shift;
174
175 /* reallocate memory blocks */
176 queue->data = (void **)vrna_realloc(queue->data, sizeof(void *) * new_size);
177 queue->provided =
178 (unsigned char *)vrna_realloc(queue->provided, sizeof(void *) * new_size);
179 queue->size = new_size;
180
181 /* restore pointer shift */
182 queue->data -= queue->shift;
183 queue->provided -= queue->shift;
184 }
185 }
186
187 for (i = queue->end + 1; i <= num; i++)
188 queue->provided[i] = 0;
189
190 queue->end = num;
191 }
192
193 #if VRNA_WITH_PTHREADS
194 pthread_mutex_unlock(&queue->mtx);
195 #endif
196 }
197 }
198
199
200 PUBLIC void
vrna_ostream_provide(struct vrna_ordered_stream_s * queue,unsigned int i,void * data)201 vrna_ostream_provide(struct vrna_ordered_stream_s *queue,
202 unsigned int i,
203 void *data)
204 {
205 if (queue) {
206 #if VRNA_WITH_PTHREADS
207 pthread_mutex_lock(&queue->mtx);
208 #endif
209 if ((queue->end < i) || (i < queue->start)) {
210 vrna_message_warning(
211 "vrna_ostream_provide(): data position (%d) out of range [%d:%d]!",
212 i,
213 queue->start,
214 queue->end);
215 return;
216 }
217
218 /* store data */
219 queue->data[i] = data;
220 queue->provided[i] = 1;
221
222 /* process all consecutive blocks available from the start */
223 if (i == queue->start)
224 flush_output(queue);
225
226 #if VRNA_WITH_PTHREADS
227 pthread_mutex_unlock(&queue->mtx);
228 #endif
229 }
230 }
231