1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2    It is considered beerware. Prost. Skol. Cheers or whatever.
3 
4    $id$ */
5 
6 /* System */
7 #include <pthread.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <sys/mman.h>
11 #include <sys/uio.h>
12 
13 /* Libowfat */
14 #include "byte.h"
15 #include "io.h"
16 #include "uint32.h"
17 
18 /* Opentracker */
19 #include "trackerlogic.h"
20 #include "ot_mutex.h"
21 #include "ot_stats.h"
22 
23 /* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */
24 #define MTX_DBG( STRING )
25 
26 /* Our global all torrents list */
27 static ot_vector all_torrents[OT_BUCKET_COUNT];
28 static size_t    g_torrent_count;
29 
30 /* Bucket Magic */
31 static int bucket_locklist[ OT_MAX_THREADS ];
32 static int bucket_locklist_count = 0;
33 static pthread_mutex_t bucket_mutex;
34 static pthread_cond_t bucket_being_unlocked;
35 
36 /* Self pipe from opentracker.c */
37 extern int g_self_pipe[2];
38 
bucket_check(int bucket)39 static int bucket_check( int bucket ) {
40   /* C should come with auto-i ;) */
41   int i;
42 
43   /* No more space to acquire lock to bucket -- should not happen */
44   if( bucket_locklist_count == OT_MAX_THREADS ) {
45     fprintf( stderr, "More lock requests than mutexes. Consult source code.\n" );
46     return -1;
47   }
48 
49   /* See, if bucket is already locked */
50   for( i=0; i<bucket_locklist_count; ++i )
51     if( bucket_locklist[ i ] == bucket ) {
52       stats_issue_event( EVENT_BUCKET_LOCKED, 0, 0 );
53       return -1;
54     }
55 
56   return 0;
57 }
58 
bucket_push(int bucket)59 static void bucket_push( int bucket ) {
60   bucket_locklist[ bucket_locklist_count++ ] = bucket;
61 }
62 
bucket_remove(int bucket)63 static void bucket_remove( int bucket ) {
64   int i = 0;
65 
66   while( ( i < bucket_locklist_count ) && ( bucket_locklist[ i ] != bucket ) )
67     ++i;
68 
69   if( i == bucket_locklist_count ) {
70     fprintf( stderr, "Request to unlock bucket that was never locked. Consult source code.\n" );
71     return;
72   }
73 
74   for( ; i < bucket_locklist_count - 1; ++i )
75     bucket_locklist[ i ] = bucket_locklist[ i + 1 ];
76 
77   --bucket_locklist_count;
78 }
79 
80 /* Can block */
mutex_bucket_lock(int bucket)81 ot_vector *mutex_bucket_lock( int bucket ) {
82   pthread_mutex_lock( &bucket_mutex );
83   while( bucket_check( bucket ) )
84     pthread_cond_wait( &bucket_being_unlocked, &bucket_mutex );
85   bucket_push( bucket );
86   pthread_mutex_unlock( &bucket_mutex );
87   return all_torrents + bucket;
88 }
89 
mutex_bucket_lock_by_hash(ot_hash hash)90 ot_vector *mutex_bucket_lock_by_hash( ot_hash hash ) {
91   return mutex_bucket_lock( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT );
92 }
93 
mutex_bucket_unlock(int bucket,int delta_torrentcount)94 void mutex_bucket_unlock( int bucket, int delta_torrentcount ) {
95   pthread_mutex_lock( &bucket_mutex );
96   bucket_remove( bucket );
97   g_torrent_count += delta_torrentcount;
98   pthread_cond_broadcast( &bucket_being_unlocked );
99   pthread_mutex_unlock( &bucket_mutex );
100 }
101 
mutex_bucket_unlock_by_hash(ot_hash hash,int delta_torrentcount)102 void mutex_bucket_unlock_by_hash( ot_hash hash, int delta_torrentcount ) {
103   mutex_bucket_unlock( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT, delta_torrentcount );
104 }
105 
mutex_get_torrent_count()106 size_t mutex_get_torrent_count( ) {
107   size_t torrent_count;
108   pthread_mutex_lock( &bucket_mutex );
109   torrent_count = g_torrent_count;
110   pthread_mutex_unlock( &bucket_mutex );
111   return torrent_count;
112 }
113 
114 /* TaskQueue Magic */
115 
116 struct ot_task {
117   ot_taskid       taskid;
118   ot_tasktype     tasktype;
119   int64           sock;
120   int             iovec_entries;
121   struct iovec   *iovec;
122   struct ot_task *next;
123 };
124 
125 static ot_taskid next_free_taskid = 1;
126 static struct ot_task *tasklist;
127 static pthread_mutex_t tasklist_mutex;
128 static pthread_cond_t tasklist_being_filled;
129 
mutex_workqueue_pushtask(int64 sock,ot_tasktype tasktype)130 int mutex_workqueue_pushtask( int64 sock, ot_tasktype tasktype ) {
131   struct ot_task ** tmptask, * task;
132 
133   /* Want exclusive access to tasklist */
134   MTX_DBG( "pushtask locks.\n" );
135   pthread_mutex_lock( &tasklist_mutex );
136   MTX_DBG( "pushtask locked.\n" );
137 
138   task = malloc(sizeof( struct ot_task));
139   if( !task ) {
140     MTX_DBG( "pushtask fail unlocks.\n" );
141     pthread_mutex_unlock( &tasklist_mutex );
142     MTX_DBG( "pushtask fail unlocked.\n" );
143     return -1;
144   }
145 
146   /* Skip to end of list */
147   tmptask = &tasklist;
148   while( *tmptask )
149     tmptask = &(*tmptask)->next;
150   *tmptask = task;
151 
152   task->taskid        = 0;
153   task->tasktype      = tasktype;
154   task->sock          = sock;
155   task->iovec_entries = 0;
156   task->iovec         = NULL;
157   task->next          = 0;
158 
159   /* Inform waiting workers and release lock */
160   MTX_DBG( "pushtask broadcasts.\n" );
161   pthread_cond_broadcast( &tasklist_being_filled );
162   MTX_DBG( "pushtask broadcasted, mutex unlocks.\n" );
163   pthread_mutex_unlock( &tasklist_mutex );
164   MTX_DBG( "pushtask end mutex unlocked.\n" );
165   return 0;
166 }
167 
mutex_workqueue_canceltask(int64 sock)168 void mutex_workqueue_canceltask( int64 sock ) {
169   struct ot_task ** task;
170 
171   /* Want exclusive access to tasklist */
172   MTX_DBG( "canceltask locks.\n" );
173   pthread_mutex_lock( &tasklist_mutex );
174   MTX_DBG( "canceltask locked.\n" );
175 
176   task = &tasklist;
177   while( *task && ( (*task)->sock != sock ) )
178     *task = (*task)->next;
179 
180   if( *task && ( (*task)->sock == sock ) ) {
181     struct iovec *iovec = (*task)->iovec;
182     struct ot_task *ptask = *task;
183     int i;
184 
185     /* Free task's iovec */
186     for( i=0; i<(*task)->iovec_entries; ++i )
187       munmap( iovec[i].iov_base, iovec[i].iov_len );
188 
189     *task = (*task)->next;
190     free( ptask );
191   }
192 
193   /* Release lock */
194   MTX_DBG( "canceltask unlocks.\n" );
195   pthread_mutex_unlock( &tasklist_mutex );
196   MTX_DBG( "canceltask unlocked.\n" );
197 }
198 
mutex_workqueue_poptask(ot_tasktype * tasktype)199 ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ) {
200   struct ot_task * task;
201   ot_taskid taskid = 0;
202 
203   /* Want exclusive access to tasklist */
204   MTX_DBG( "poptask mutex locks.\n" );
205   pthread_mutex_lock( &tasklist_mutex );
206   MTX_DBG( "poptask mutex locked.\n" );
207 
208   while( !taskid ) {
209     /* Skip to the first unassigned task this worker wants to do */
210     task = tasklist;
211     while( task && ( ( ( TASK_CLASS_MASK & task->tasktype ) != *tasktype ) || task->taskid ) )
212       task = task->next;
213 
214     /* If we found an outstanding task, assign a taskid to it
215        and leave the loop */
216     if( task ) {
217       task->taskid = taskid = ++next_free_taskid;
218       *tasktype = task->tasktype;
219     } else {
220       /* Wait until the next task is being fed */
221       MTX_DBG( "poptask cond waits.\n" );
222       pthread_cond_wait( &tasklist_being_filled, &tasklist_mutex );
223       MTX_DBG( "poptask cond waited.\n" );
224     }
225   }
226 
227   /* Release lock */
228   MTX_DBG( "poptask end mutex unlocks.\n" );
229   pthread_mutex_unlock( &tasklist_mutex );
230   MTX_DBG( "poptask end mutex unlocked.\n" );
231 
232   return taskid;
233 }
234 
mutex_workqueue_pushsuccess(ot_taskid taskid)235 void mutex_workqueue_pushsuccess( ot_taskid taskid ) {
236   struct ot_task ** task;
237 
238   /* Want exclusive access to tasklist */
239   MTX_DBG( "pushsuccess locks.\n" );
240   pthread_mutex_lock( &tasklist_mutex );
241   MTX_DBG( "pushsuccess locked.\n" );
242 
243   task = &tasklist;
244   while( *task && ( (*task)->taskid != taskid ) )
245     *task = (*task)->next;
246 
247   if( *task && ( (*task)->taskid == taskid ) ) {
248     struct ot_task *ptask = *task;
249     *task = (*task)->next;
250     free( ptask );
251   }
252 
253   /* Release lock */
254   MTX_DBG( "pushsuccess unlocks.\n" );
255   pthread_mutex_unlock( &tasklist_mutex );
256   MTX_DBG( "pushsuccess unlocked.\n" );
257 }
258 
mutex_workqueue_pushresult(ot_taskid taskid,int iovec_entries,struct iovec * iovec)259 int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) {
260   struct ot_task * task;
261   const char byte = 'o';
262 
263   /* Want exclusive access to tasklist */
264   MTX_DBG( "pushresult locks.\n" );
265   pthread_mutex_lock( &tasklist_mutex );
266   MTX_DBG( "pushresult locked.\n" );
267 
268   task = tasklist;
269   while( task && ( task->taskid != taskid ) )
270     task = task->next;
271 
272   if( task ) {
273     task->iovec_entries = iovec_entries;
274     task->iovec         = iovec;
275     task->tasktype      = TASK_DONE;
276   }
277 
278   /* Release lock */
279   MTX_DBG( "pushresult unlocks.\n" );
280   pthread_mutex_unlock( &tasklist_mutex );
281   MTX_DBG( "pushresult unlocked.\n" );
282 
283   io_trywrite( g_self_pipe[1], &byte, 1 );
284 
285   /* Indicate whether the worker has to throw away results */
286   return task ? 0 : -1;
287 }
288 
mutex_workqueue_popresult(int * iovec_entries,struct iovec ** iovec)289 int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) {
290   struct ot_task ** task;
291   int64 sock = -1;
292 
293   /* Want exclusive access to tasklist */
294   MTX_DBG( "popresult locks.\n" );
295   pthread_mutex_lock( &tasklist_mutex );
296   MTX_DBG( "popresult locked.\n" );
297 
298   task = &tasklist;
299   while( *task && ( (*task)->tasktype != TASK_DONE ) )
300     task = &(*task)->next;
301 
302   if( *task && ( (*task)->tasktype == TASK_DONE ) ) {
303     struct ot_task *ptask = *task;
304 
305     *iovec_entries = (*task)->iovec_entries;
306     *iovec         = (*task)->iovec;
307     sock           = (*task)->sock;
308 
309     *task = (*task)->next;
310     free( ptask );
311   }
312 
313   /* Release lock */
314   MTX_DBG( "popresult unlocks.\n" );
315   pthread_mutex_unlock( &tasklist_mutex );
316   MTX_DBG( "popresult unlocked.\n" );
317   return sock;
318 }
319 
mutex_init()320 void mutex_init( ) {
321   pthread_mutex_init(&tasklist_mutex, NULL);
322   pthread_cond_init (&tasklist_being_filled, NULL);
323   pthread_mutex_init(&bucket_mutex, NULL);
324   pthread_cond_init (&bucket_being_unlocked, NULL);
325   byte_zero( all_torrents, sizeof( all_torrents ) );
326 }
327 
mutex_deinit()328 void mutex_deinit( ) {
329   pthread_mutex_destroy(&bucket_mutex);
330   pthread_cond_destroy(&bucket_being_unlocked);
331   pthread_mutex_destroy(&tasklist_mutex);
332   pthread_cond_destroy(&tasklist_being_filled);
333   byte_zero( all_torrents, sizeof( all_torrents ) );
334 }
335 
336 const char *g_version_mutex_c = "$Source$: $Revision$\n";
337