1 /*
2  * Copyright (C) 2004 Nathan Lutchansky <lutchann@litech.org>
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17  */
18 
19 #include <sys/types.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <string.h>
23 #include <unistd.h>
24 #include <sys/time.h>
25 #include <fcntl.h>
26 #include <errno.h>
27 #include <pthread.h>
28 
29 #include <event.h>
30 #include <log.h>
31 #include <frame.h>
32 
33 // #define METER_DEBUG
34 
35 static struct frame_slot *frame_heap_take = NULL, *frame_heap_put = NULL;
36 static pthread_mutex_t frame_heap_mutex;
37 static int max_frame_size = 0;
38 
init_frame_heap(int size,int count)39 void init_frame_heap( int size, int count )
40 {
41 	struct frame_slot *f = NULL, *prev = NULL;
42 
43 	max_frame_size = size;
44 	pthread_mutex_init( &frame_heap_mutex, NULL );
45 	while( count-- > 0 )
46 	{
47 		f = (struct frame_slot *)
48 				malloc( sizeof( struct frame_slot ) );
49 		f->f = (struct frame *)malloc( sizeof( struct frame ) + size );
50 		f->f->size = size;
51 		pthread_mutex_init( &f->f->mutex, NULL );
52 		f->prev = prev;
53 		if( prev ) prev->next = f;
54 		else frame_heap_take = f;
55 		prev = f;
56 	}
57 	frame_heap_take->prev = f;
58 	f->next = frame_heap_take;
59 	frame_heap_put = frame_heap_take;
60 }
61 
get_max_frame_size(void)62 int get_max_frame_size(void)
63 {
64 	return max_frame_size;
65 }
66 
new_frame(void)67 struct frame *new_frame(void)
68 {
69 	struct frame *f;
70 
71 	pthread_mutex_lock( &frame_heap_mutex );
72 
73 	if( frame_heap_take->f )
74 	{
75 		f = frame_heap_take->f;
76 		frame_heap_take->f = NULL;
77 		frame_heap_take = frame_heap_take->next;
78 		f->ref_count = 1;
79 		f->destructor = NULL;
80 		f->destructor_data = NULL;
81 		f->d = (unsigned char *)f + sizeof( struct frame );
82 		f->format = FORMAT_EMPTY;
83 		f->width = 0;
84 		f->height = 0;
85 		f->length = 0;
86 		f->key = 0;
87 	} else
88 	{
89 		spook_log( SL_WARN, "Ack!  Out of frame buffers!" );
90 		f = NULL;
91 	}
92 
93 	pthread_mutex_unlock( &frame_heap_mutex );
94 
95 	return f;
96 }
97 
clone_destructor(struct frame * f,void * d)98 static int clone_destructor( struct frame *f, void *d )
99 {
100 	unref_frame( (struct frame *)d );
101 	return 0;
102 }
103 
clone_frame(struct frame * orig)104 struct frame *clone_frame( struct frame *orig )
105 {
106 	struct frame *f;
107 
108 	if( ! ( f = new_frame() ) ) return NULL;
109 	f->destructor = clone_destructor;
110 	f->destructor_data = orig;
111 	f->format = orig->format;
112 	f->width = orig->width;
113 	f->height = orig->height;
114 	f->length = orig->length;
115 	f->key = orig->key;
116 	f->step = orig->step;
117 	f->d = orig->d;
118 	ref_frame( orig );
119 	return f;
120 }
121 
ref_frame(struct frame * f)122 void ref_frame( struct frame *f )
123 {
124 	pthread_mutex_lock( &f->mutex );
125 	++f->ref_count;
126 	pthread_mutex_unlock( &f->mutex );
127 }
128 
unref_frame(struct frame * f)129 void unref_frame( struct frame *f )
130 {
131 	int r;
132 
133 	pthread_mutex_lock( &f->mutex );
134 	r = --f->ref_count;
135 	pthread_mutex_unlock( &f->mutex );
136 	if( r > 0 ) return;
137 
138 	if( f->destructor )
139 	{
140 		f->ref_count = 1;
141 		if( f->destructor( f, f->destructor_data ) ) return;
142 	}
143 
144 	pthread_mutex_lock( &frame_heap_mutex );
145 	if( frame_heap_put->f )
146 	{
147 		spook_log( SL_WARN, "Ack!  There is a frame at frame_heap_put!" );
148 		return;
149 	}
150 	frame_heap_put->f = f;
151 	frame_heap_put = frame_heap_put->next;
152 	pthread_mutex_unlock( &frame_heap_mutex );
153 }
154 
exchanger_read(struct event_info * ei,void * d)155 static void exchanger_read( struct event_info *ei, void *d )
156 {
157 	struct frame_exchanger *ex = (struct frame_exchanger *)d;
158 	unsigned char c;
159 	int ret;
160 	struct frame *f;
161 
162 	for(;;)
163 	{
164 		ret = read( ex->master_fd, &c, 1 );
165 		if( ret <= 0 )
166 		{
167 			if( ret < 0 && errno == EAGAIN ) return;
168 			spook_log( SL_ERR, "We lost an exchanger fd!" );
169 			exit( 1 );
170 		}
171 		pthread_mutex_lock( &ex->mutex );
172 		f = ex->master_read->f;
173 		ex->master_read->f = NULL;
174 		ex->master_read = ex->master_read->next;
175 		pthread_mutex_unlock( &ex->mutex );
176 		ex->f( f, ex->d );
177 	}
178 }
179 
new_exchanger(int slots,frame_deliver_func func,void * d)180 struct frame_exchanger *new_exchanger( int slots,
181 					frame_deliver_func func, void *d )
182 {
183 	struct frame_slot *f = NULL, *prev = NULL;
184 	struct frame_exchanger *ex;
185 	int fds[2];
186 
187 	ex = (struct frame_exchanger *)
188 			malloc( sizeof( struct frame_exchanger ) );
189 
190 	while( slots-- > 0 )
191 	{
192 		f = (struct frame_slot *)
193 				malloc( sizeof( struct frame_slot ) );
194 		f->f = NULL;
195 		f->pending = 0;
196 		f->prev = prev;
197 		if( prev ) prev->next = f;
198 		else ex->slave_cur = f;
199 		prev = f;
200 	}
201 	ex->slave_cur->prev = f;
202 	f->next = ex->slave_cur;
203 	ex->master_read = ex->master_write = ex->slave_cur;
204 
205 	pipe( fds );
206 	ex->master_fd = fds[0];
207 	ex->slave_fd = fds[1];
208 
209 	fcntl( ex->master_fd, F_SETFL, O_NONBLOCK );
210 	ex->master_event = add_fd_event( ex->master_fd, 0, 0,
211 						exchanger_read, ex );
212 
213 	pthread_mutex_init( &ex->mutex, NULL );
214 	pthread_cond_init( &ex->slave_wait, NULL );
215 
216 	ex->f = func;
217 	ex->d = d;
218 
219 	return ex;
220 }
221 
exchange_frame(struct frame_exchanger * ex,struct frame * frame)222 int exchange_frame( struct frame_exchanger *ex, struct frame *frame )
223 {
224 	pthread_mutex_lock( &ex->mutex );
225 	if( ex->master_write->f )
226 	{
227 		spook_log( SL_WARN, "Exchanger is full, dropping frame!" );
228 		pthread_mutex_unlock( &ex->mutex );
229 		return -1;
230 	}
231 	ex->master_write->f = frame;
232 	ex->master_write->pending = 1;
233 	ex->master_write = ex->master_write->next;
234 	pthread_cond_signal( &ex->slave_wait );
235 	pthread_mutex_unlock( &ex->mutex );
236 
237 	return 0;
238 }
239 
get_next_frame(struct frame_exchanger * ex,int wait)240 struct frame *get_next_frame( struct frame_exchanger *ex, int wait )
241 {
242 	struct frame *f = NULL;
243 
244 	pthread_mutex_lock( &ex->mutex );
245 	if( ex->slave_cur->pending ) f = ex->slave_cur->f;
246 	if( ! f && wait )
247 	{
248 		pthread_cond_wait( &ex->slave_wait, &ex->mutex );
249 		if( ex->slave_cur->pending ) f = ex->slave_cur->f;
250 		if( ! f ) spook_log( SL_WARN, "Slave signalled but no frame??" );
251 	}
252 	pthread_mutex_unlock( &ex->mutex );
253 	return f;
254 }
255 
deliver_frame(struct frame_exchanger * ex,struct frame * f)256 void deliver_frame( struct frame_exchanger *ex, struct frame *f )
257 {
258 	unsigned char c = 0;
259 
260 	ex->slave_cur->f = f;
261 	ex->slave_cur->pending = 0;
262 	if( write( ex->slave_fd, &c, 1 ) <= 0 ) exit( 0 );
263 	ex->slave_cur = ex->slave_cur->next;
264 }
265 
meter_init(struct meter * m,int fbase,int downstream)266 void meter_init( struct meter *m, int fbase, int downstream )
267 {
268 	m->started = 0;
269 	m->downstream = downstream;
270 	m->rate = fbase;
271 	m->ticks = 0;
272 	m->slip = 0;
273 	m->avg_slip = 0;
274 	m->precomp = 0;
275 	m->correction_rate = 0;
276 	m->corrected = 0;
277 }
278 
meter_count(struct meter * m,int ticks,int * rate)279 int meter_count( struct meter *m, int ticks, int *rate )
280 {
281 	int msec;
282 	time_ref now;
283 	double expected;
284 
285 	if( ! m->started )
286 	{
287 		time_now( &m->last_check );
288 		m->started = 1;
289 		m->ticks = ticks;
290 		return 0;
291 	}
292 	time_now( &now );
293 	msec = time_diff( &m->last_check, &now );
294 	if( msec < 30000 )
295 	{
296 		m->ticks += ticks;
297 		return 0;
298 	}
299 	expected = m->rate * (double)msec / 1000.0;
300 	if( ! m->downstream ) expected -= m->corrected;
301 #ifdef METER_DEBUG
302 	printf( "meter: expected %f in %d msec, got %d\n",
303 			expected, msec, m->ticks );
304 #endif
305 	m->slip += expected - (double)m->ticks - (double)m->precomp;
306 	m->slip += m->corrected;
307 	m->avg_slip = 4.0 * m->avg_slip / 5.0 +
308 		( expected - (double)m->ticks ) * 1000.0 / (double)msec / 5.0;
309 	/* Pre-compensate by 1/2 the average slip */
310 	m->precomp = 30 * m->avg_slip / 2;
311 #ifdef METER_DEBUG
312 	printf( "meter: total slip = %f avg slip = %.3f/sec precomp = %d\n",
313 			m->slip, m->avg_slip, m->precomp );
314 #endif
315 	m->slip += m->precomp;
316 	m->correction_rate = m->ticks / m->slip;
317 	if( rate ) *rate = m->correction_rate;
318 	m->ticks = ticks;
319 	m->last_check = now;
320 	m->corrected = 0;
321 	return rate != NULL;
322 }
323 
meter_get_adjustment(struct meter * m)324 int meter_get_adjustment( struct meter *m )
325 {
326 	int msec;
327 
328 	if( m->slip == 0 ) return 0;
329 	msec = time_ago( &m->last_check );
330 	return m->slip * msec / 30000 + m->corrected;
331 }
332 
meter_report_correction(struct meter * m,int ticks)333 void meter_report_correction( struct meter *m, int ticks )
334 {
335 	m->corrected -= ticks;
336 #ifdef METER_DEBUG
337 //	printf( "meter: total slip = %f correction = %d\n",
338 //			m->slip, m->corrected );
339 #endif
340 }
341