1 /*
2 * Copyright (C) 2004 Nathan Lutchansky <lutchann@litech.org>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software Foundation,
16 * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 */
18
19 #include <sys/types.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <string.h>
23 #include <unistd.h>
24 #include <sys/time.h>
25 #include <fcntl.h>
26 #include <errno.h>
27 #include <pthread.h>
28
29 #include <event.h>
30 #include <log.h>
31 #include <frame.h>
32
33 // #define METER_DEBUG
34
35 static struct frame_slot *frame_heap_take = NULL, *frame_heap_put = NULL;
36 static pthread_mutex_t frame_heap_mutex;
37 static int max_frame_size = 0;
38
init_frame_heap(int size,int count)39 void init_frame_heap( int size, int count )
40 {
41 struct frame_slot *f = NULL, *prev = NULL;
42
43 max_frame_size = size;
44 pthread_mutex_init( &frame_heap_mutex, NULL );
45 while( count-- > 0 )
46 {
47 f = (struct frame_slot *)
48 malloc( sizeof( struct frame_slot ) );
49 f->f = (struct frame *)malloc( sizeof( struct frame ) + size );
50 f->f->size = size;
51 pthread_mutex_init( &f->f->mutex, NULL );
52 f->prev = prev;
53 if( prev ) prev->next = f;
54 else frame_heap_take = f;
55 prev = f;
56 }
57 frame_heap_take->prev = f;
58 f->next = frame_heap_take;
59 frame_heap_put = frame_heap_take;
60 }
61
get_max_frame_size(void)62 int get_max_frame_size(void)
63 {
64 return max_frame_size;
65 }
66
new_frame(void)67 struct frame *new_frame(void)
68 {
69 struct frame *f;
70
71 pthread_mutex_lock( &frame_heap_mutex );
72
73 if( frame_heap_take->f )
74 {
75 f = frame_heap_take->f;
76 frame_heap_take->f = NULL;
77 frame_heap_take = frame_heap_take->next;
78 f->ref_count = 1;
79 f->destructor = NULL;
80 f->destructor_data = NULL;
81 f->d = (unsigned char *)f + sizeof( struct frame );
82 f->format = FORMAT_EMPTY;
83 f->width = 0;
84 f->height = 0;
85 f->length = 0;
86 f->key = 0;
87 } else
88 {
89 spook_log( SL_WARN, "Ack! Out of frame buffers!" );
90 f = NULL;
91 }
92
93 pthread_mutex_unlock( &frame_heap_mutex );
94
95 return f;
96 }
97
clone_destructor(struct frame * f,void * d)98 static int clone_destructor( struct frame *f, void *d )
99 {
100 unref_frame( (struct frame *)d );
101 return 0;
102 }
103
clone_frame(struct frame * orig)104 struct frame *clone_frame( struct frame *orig )
105 {
106 struct frame *f;
107
108 if( ! ( f = new_frame() ) ) return NULL;
109 f->destructor = clone_destructor;
110 f->destructor_data = orig;
111 f->format = orig->format;
112 f->width = orig->width;
113 f->height = orig->height;
114 f->length = orig->length;
115 f->key = orig->key;
116 f->step = orig->step;
117 f->d = orig->d;
118 ref_frame( orig );
119 return f;
120 }
121
ref_frame(struct frame * f)122 void ref_frame( struct frame *f )
123 {
124 pthread_mutex_lock( &f->mutex );
125 ++f->ref_count;
126 pthread_mutex_unlock( &f->mutex );
127 }
128
unref_frame(struct frame * f)129 void unref_frame( struct frame *f )
130 {
131 int r;
132
133 pthread_mutex_lock( &f->mutex );
134 r = --f->ref_count;
135 pthread_mutex_unlock( &f->mutex );
136 if( r > 0 ) return;
137
138 if( f->destructor )
139 {
140 f->ref_count = 1;
141 if( f->destructor( f, f->destructor_data ) ) return;
142 }
143
144 pthread_mutex_lock( &frame_heap_mutex );
145 if( frame_heap_put->f )
146 {
147 spook_log( SL_WARN, "Ack! There is a frame at frame_heap_put!" );
148 return;
149 }
150 frame_heap_put->f = f;
151 frame_heap_put = frame_heap_put->next;
152 pthread_mutex_unlock( &frame_heap_mutex );
153 }
154
exchanger_read(struct event_info * ei,void * d)155 static void exchanger_read( struct event_info *ei, void *d )
156 {
157 struct frame_exchanger *ex = (struct frame_exchanger *)d;
158 unsigned char c;
159 int ret;
160 struct frame *f;
161
162 for(;;)
163 {
164 ret = read( ex->master_fd, &c, 1 );
165 if( ret <= 0 )
166 {
167 if( ret < 0 && errno == EAGAIN ) return;
168 spook_log( SL_ERR, "We lost an exchanger fd!" );
169 exit( 1 );
170 }
171 pthread_mutex_lock( &ex->mutex );
172 f = ex->master_read->f;
173 ex->master_read->f = NULL;
174 ex->master_read = ex->master_read->next;
175 pthread_mutex_unlock( &ex->mutex );
176 ex->f( f, ex->d );
177 }
178 }
179
new_exchanger(int slots,frame_deliver_func func,void * d)180 struct frame_exchanger *new_exchanger( int slots,
181 frame_deliver_func func, void *d )
182 {
183 struct frame_slot *f = NULL, *prev = NULL;
184 struct frame_exchanger *ex;
185 int fds[2];
186
187 ex = (struct frame_exchanger *)
188 malloc( sizeof( struct frame_exchanger ) );
189
190 while( slots-- > 0 )
191 {
192 f = (struct frame_slot *)
193 malloc( sizeof( struct frame_slot ) );
194 f->f = NULL;
195 f->pending = 0;
196 f->prev = prev;
197 if( prev ) prev->next = f;
198 else ex->slave_cur = f;
199 prev = f;
200 }
201 ex->slave_cur->prev = f;
202 f->next = ex->slave_cur;
203 ex->master_read = ex->master_write = ex->slave_cur;
204
205 pipe( fds );
206 ex->master_fd = fds[0];
207 ex->slave_fd = fds[1];
208
209 fcntl( ex->master_fd, F_SETFL, O_NONBLOCK );
210 ex->master_event = add_fd_event( ex->master_fd, 0, 0,
211 exchanger_read, ex );
212
213 pthread_mutex_init( &ex->mutex, NULL );
214 pthread_cond_init( &ex->slave_wait, NULL );
215
216 ex->f = func;
217 ex->d = d;
218
219 return ex;
220 }
221
exchange_frame(struct frame_exchanger * ex,struct frame * frame)222 int exchange_frame( struct frame_exchanger *ex, struct frame *frame )
223 {
224 pthread_mutex_lock( &ex->mutex );
225 if( ex->master_write->f )
226 {
227 spook_log( SL_WARN, "Exchanger is full, dropping frame!" );
228 pthread_mutex_unlock( &ex->mutex );
229 return -1;
230 }
231 ex->master_write->f = frame;
232 ex->master_write->pending = 1;
233 ex->master_write = ex->master_write->next;
234 pthread_cond_signal( &ex->slave_wait );
235 pthread_mutex_unlock( &ex->mutex );
236
237 return 0;
238 }
239
get_next_frame(struct frame_exchanger * ex,int wait)240 struct frame *get_next_frame( struct frame_exchanger *ex, int wait )
241 {
242 struct frame *f = NULL;
243
244 pthread_mutex_lock( &ex->mutex );
245 if( ex->slave_cur->pending ) f = ex->slave_cur->f;
246 if( ! f && wait )
247 {
248 pthread_cond_wait( &ex->slave_wait, &ex->mutex );
249 if( ex->slave_cur->pending ) f = ex->slave_cur->f;
250 if( ! f ) spook_log( SL_WARN, "Slave signalled but no frame??" );
251 }
252 pthread_mutex_unlock( &ex->mutex );
253 return f;
254 }
255
deliver_frame(struct frame_exchanger * ex,struct frame * f)256 void deliver_frame( struct frame_exchanger *ex, struct frame *f )
257 {
258 unsigned char c = 0;
259
260 ex->slave_cur->f = f;
261 ex->slave_cur->pending = 0;
262 if( write( ex->slave_fd, &c, 1 ) <= 0 ) exit( 0 );
263 ex->slave_cur = ex->slave_cur->next;
264 }
265
meter_init(struct meter * m,int fbase,int downstream)266 void meter_init( struct meter *m, int fbase, int downstream )
267 {
268 m->started = 0;
269 m->downstream = downstream;
270 m->rate = fbase;
271 m->ticks = 0;
272 m->slip = 0;
273 m->avg_slip = 0;
274 m->precomp = 0;
275 m->correction_rate = 0;
276 m->corrected = 0;
277 }
278
meter_count(struct meter * m,int ticks,int * rate)279 int meter_count( struct meter *m, int ticks, int *rate )
280 {
281 int msec;
282 time_ref now;
283 double expected;
284
285 if( ! m->started )
286 {
287 time_now( &m->last_check );
288 m->started = 1;
289 m->ticks = ticks;
290 return 0;
291 }
292 time_now( &now );
293 msec = time_diff( &m->last_check, &now );
294 if( msec < 30000 )
295 {
296 m->ticks += ticks;
297 return 0;
298 }
299 expected = m->rate * (double)msec / 1000.0;
300 if( ! m->downstream ) expected -= m->corrected;
301 #ifdef METER_DEBUG
302 printf( "meter: expected %f in %d msec, got %d\n",
303 expected, msec, m->ticks );
304 #endif
305 m->slip += expected - (double)m->ticks - (double)m->precomp;
306 m->slip += m->corrected;
307 m->avg_slip = 4.0 * m->avg_slip / 5.0 +
308 ( expected - (double)m->ticks ) * 1000.0 / (double)msec / 5.0;
309 /* Pre-compensate by 1/2 the average slip */
310 m->precomp = 30 * m->avg_slip / 2;
311 #ifdef METER_DEBUG
312 printf( "meter: total slip = %f avg slip = %.3f/sec precomp = %d\n",
313 m->slip, m->avg_slip, m->precomp );
314 #endif
315 m->slip += m->precomp;
316 m->correction_rate = m->ticks / m->slip;
317 if( rate ) *rate = m->correction_rate;
318 m->ticks = ticks;
319 m->last_check = now;
320 m->corrected = 0;
321 return rate != NULL;
322 }
323
meter_get_adjustment(struct meter * m)324 int meter_get_adjustment( struct meter *m )
325 {
326 int msec;
327
328 if( m->slip == 0 ) return 0;
329 msec = time_ago( &m->last_check );
330 return m->slip * msec / 30000 + m->corrected;
331 }
332
meter_report_correction(struct meter * m,int ticks)333 void meter_report_correction( struct meter *m, int ticks )
334 {
335 m->corrected -= ticks;
336 #ifdef METER_DEBUG
337 // printf( "meter: total slip = %f correction = %d\n",
338 // m->slip, m->corrected );
339 #endif
340 }
341