1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
3
4 $id$ */
5
6 /* System */
7 #include <pthread.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <sys/mman.h>
11 #include <sys/uio.h>
12
13 /* Libowfat */
14 #include "byte.h"
15 #include "io.h"
16 #include "uint32.h"
17
18 /* Opentracker */
19 #include "trackerlogic.h"
20 #include "ot_mutex.h"
21 #include "ot_stats.h"
22
23 /* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */
24 #define MTX_DBG( STRING )
25
26 /* Our global all torrents list */
27 static ot_vector all_torrents[OT_BUCKET_COUNT];
28 static size_t g_torrent_count;
29
30 /* Bucket Magic */
31 static int bucket_locklist[ OT_MAX_THREADS ];
32 static int bucket_locklist_count = 0;
33 static pthread_mutex_t bucket_mutex;
34 static pthread_cond_t bucket_being_unlocked;
35
36 /* Self pipe from opentracker.c */
37 extern int g_self_pipe[2];
38
bucket_check(int bucket)39 static int bucket_check( int bucket ) {
40 /* C should come with auto-i ;) */
41 int i;
42
43 /* No more space to acquire lock to bucket -- should not happen */
44 if( bucket_locklist_count == OT_MAX_THREADS ) {
45 fprintf( stderr, "More lock requests than mutexes. Consult source code.\n" );
46 return -1;
47 }
48
49 /* See, if bucket is already locked */
50 for( i=0; i<bucket_locklist_count; ++i )
51 if( bucket_locklist[ i ] == bucket ) {
52 stats_issue_event( EVENT_BUCKET_LOCKED, 0, 0 );
53 return -1;
54 }
55
56 return 0;
57 }
58
bucket_push(int bucket)59 static void bucket_push( int bucket ) {
60 bucket_locklist[ bucket_locklist_count++ ] = bucket;
61 }
62
bucket_remove(int bucket)63 static void bucket_remove( int bucket ) {
64 int i = 0;
65
66 while( ( i < bucket_locklist_count ) && ( bucket_locklist[ i ] != bucket ) )
67 ++i;
68
69 if( i == bucket_locklist_count ) {
70 fprintf( stderr, "Request to unlock bucket that was never locked. Consult source code.\n" );
71 return;
72 }
73
74 for( ; i < bucket_locklist_count - 1; ++i )
75 bucket_locklist[ i ] = bucket_locklist[ i + 1 ];
76
77 --bucket_locklist_count;
78 }
79
80 /* Can block */
mutex_bucket_lock(int bucket)81 ot_vector *mutex_bucket_lock( int bucket ) {
82 pthread_mutex_lock( &bucket_mutex );
83 while( bucket_check( bucket ) )
84 pthread_cond_wait( &bucket_being_unlocked, &bucket_mutex );
85 bucket_push( bucket );
86 pthread_mutex_unlock( &bucket_mutex );
87 return all_torrents + bucket;
88 }
89
mutex_bucket_lock_by_hash(ot_hash hash)90 ot_vector *mutex_bucket_lock_by_hash( ot_hash hash ) {
91 return mutex_bucket_lock( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT );
92 }
93
mutex_bucket_unlock(int bucket,int delta_torrentcount)94 void mutex_bucket_unlock( int bucket, int delta_torrentcount ) {
95 pthread_mutex_lock( &bucket_mutex );
96 bucket_remove( bucket );
97 g_torrent_count += delta_torrentcount;
98 pthread_cond_broadcast( &bucket_being_unlocked );
99 pthread_mutex_unlock( &bucket_mutex );
100 }
101
mutex_bucket_unlock_by_hash(ot_hash hash,int delta_torrentcount)102 void mutex_bucket_unlock_by_hash( ot_hash hash, int delta_torrentcount ) {
103 mutex_bucket_unlock( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT, delta_torrentcount );
104 }
105
mutex_get_torrent_count()106 size_t mutex_get_torrent_count( ) {
107 size_t torrent_count;
108 pthread_mutex_lock( &bucket_mutex );
109 torrent_count = g_torrent_count;
110 pthread_mutex_unlock( &bucket_mutex );
111 return torrent_count;
112 }
113
114 /* TaskQueue Magic */
115
116 struct ot_task {
117 ot_taskid taskid;
118 ot_tasktype tasktype;
119 int64 sock;
120 int iovec_entries;
121 struct iovec *iovec;
122 struct ot_task *next;
123 };
124
125 static ot_taskid next_free_taskid = 1;
126 static struct ot_task *tasklist;
127 static pthread_mutex_t tasklist_mutex;
128 static pthread_cond_t tasklist_being_filled;
129
mutex_workqueue_pushtask(int64 sock,ot_tasktype tasktype)130 int mutex_workqueue_pushtask( int64 sock, ot_tasktype tasktype ) {
131 struct ot_task ** tmptask, * task;
132
133 /* Want exclusive access to tasklist */
134 MTX_DBG( "pushtask locks.\n" );
135 pthread_mutex_lock( &tasklist_mutex );
136 MTX_DBG( "pushtask locked.\n" );
137
138 task = malloc(sizeof( struct ot_task));
139 if( !task ) {
140 MTX_DBG( "pushtask fail unlocks.\n" );
141 pthread_mutex_unlock( &tasklist_mutex );
142 MTX_DBG( "pushtask fail unlocked.\n" );
143 return -1;
144 }
145
146 /* Skip to end of list */
147 tmptask = &tasklist;
148 while( *tmptask )
149 tmptask = &(*tmptask)->next;
150 *tmptask = task;
151
152 task->taskid = 0;
153 task->tasktype = tasktype;
154 task->sock = sock;
155 task->iovec_entries = 0;
156 task->iovec = NULL;
157 task->next = 0;
158
159 /* Inform waiting workers and release lock */
160 MTX_DBG( "pushtask broadcasts.\n" );
161 pthread_cond_broadcast( &tasklist_being_filled );
162 MTX_DBG( "pushtask broadcasted, mutex unlocks.\n" );
163 pthread_mutex_unlock( &tasklist_mutex );
164 MTX_DBG( "pushtask end mutex unlocked.\n" );
165 return 0;
166 }
167
mutex_workqueue_canceltask(int64 sock)168 void mutex_workqueue_canceltask( int64 sock ) {
169 struct ot_task ** task;
170
171 /* Want exclusive access to tasklist */
172 MTX_DBG( "canceltask locks.\n" );
173 pthread_mutex_lock( &tasklist_mutex );
174 MTX_DBG( "canceltask locked.\n" );
175
176 task = &tasklist;
177 while( *task && ( (*task)->sock != sock ) )
178 *task = (*task)->next;
179
180 if( *task && ( (*task)->sock == sock ) ) {
181 struct iovec *iovec = (*task)->iovec;
182 struct ot_task *ptask = *task;
183 int i;
184
185 /* Free task's iovec */
186 for( i=0; i<(*task)->iovec_entries; ++i )
187 munmap( iovec[i].iov_base, iovec[i].iov_len );
188
189 *task = (*task)->next;
190 free( ptask );
191 }
192
193 /* Release lock */
194 MTX_DBG( "canceltask unlocks.\n" );
195 pthread_mutex_unlock( &tasklist_mutex );
196 MTX_DBG( "canceltask unlocked.\n" );
197 }
198
mutex_workqueue_poptask(ot_tasktype * tasktype)199 ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ) {
200 struct ot_task * task;
201 ot_taskid taskid = 0;
202
203 /* Want exclusive access to tasklist */
204 MTX_DBG( "poptask mutex locks.\n" );
205 pthread_mutex_lock( &tasklist_mutex );
206 MTX_DBG( "poptask mutex locked.\n" );
207
208 while( !taskid ) {
209 /* Skip to the first unassigned task this worker wants to do */
210 task = tasklist;
211 while( task && ( ( ( TASK_CLASS_MASK & task->tasktype ) != *tasktype ) || task->taskid ) )
212 task = task->next;
213
214 /* If we found an outstanding task, assign a taskid to it
215 and leave the loop */
216 if( task ) {
217 task->taskid = taskid = ++next_free_taskid;
218 *tasktype = task->tasktype;
219 } else {
220 /* Wait until the next task is being fed */
221 MTX_DBG( "poptask cond waits.\n" );
222 pthread_cond_wait( &tasklist_being_filled, &tasklist_mutex );
223 MTX_DBG( "poptask cond waited.\n" );
224 }
225 }
226
227 /* Release lock */
228 MTX_DBG( "poptask end mutex unlocks.\n" );
229 pthread_mutex_unlock( &tasklist_mutex );
230 MTX_DBG( "poptask end mutex unlocked.\n" );
231
232 return taskid;
233 }
234
mutex_workqueue_pushsuccess(ot_taskid taskid)235 void mutex_workqueue_pushsuccess( ot_taskid taskid ) {
236 struct ot_task ** task;
237
238 /* Want exclusive access to tasklist */
239 MTX_DBG( "pushsuccess locks.\n" );
240 pthread_mutex_lock( &tasklist_mutex );
241 MTX_DBG( "pushsuccess locked.\n" );
242
243 task = &tasklist;
244 while( *task && ( (*task)->taskid != taskid ) )
245 *task = (*task)->next;
246
247 if( *task && ( (*task)->taskid == taskid ) ) {
248 struct ot_task *ptask = *task;
249 *task = (*task)->next;
250 free( ptask );
251 }
252
253 /* Release lock */
254 MTX_DBG( "pushsuccess unlocks.\n" );
255 pthread_mutex_unlock( &tasklist_mutex );
256 MTX_DBG( "pushsuccess unlocked.\n" );
257 }
258
mutex_workqueue_pushresult(ot_taskid taskid,int iovec_entries,struct iovec * iovec)259 int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) {
260 struct ot_task * task;
261 const char byte = 'o';
262
263 /* Want exclusive access to tasklist */
264 MTX_DBG( "pushresult locks.\n" );
265 pthread_mutex_lock( &tasklist_mutex );
266 MTX_DBG( "pushresult locked.\n" );
267
268 task = tasklist;
269 while( task && ( task->taskid != taskid ) )
270 task = task->next;
271
272 if( task ) {
273 task->iovec_entries = iovec_entries;
274 task->iovec = iovec;
275 task->tasktype = TASK_DONE;
276 }
277
278 /* Release lock */
279 MTX_DBG( "pushresult unlocks.\n" );
280 pthread_mutex_unlock( &tasklist_mutex );
281 MTX_DBG( "pushresult unlocked.\n" );
282
283 io_trywrite( g_self_pipe[1], &byte, 1 );
284
285 /* Indicate whether the worker has to throw away results */
286 return task ? 0 : -1;
287 }
288
mutex_workqueue_popresult(int * iovec_entries,struct iovec ** iovec)289 int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) {
290 struct ot_task ** task;
291 int64 sock = -1;
292
293 /* Want exclusive access to tasklist */
294 MTX_DBG( "popresult locks.\n" );
295 pthread_mutex_lock( &tasklist_mutex );
296 MTX_DBG( "popresult locked.\n" );
297
298 task = &tasklist;
299 while( *task && ( (*task)->tasktype != TASK_DONE ) )
300 task = &(*task)->next;
301
302 if( *task && ( (*task)->tasktype == TASK_DONE ) ) {
303 struct ot_task *ptask = *task;
304
305 *iovec_entries = (*task)->iovec_entries;
306 *iovec = (*task)->iovec;
307 sock = (*task)->sock;
308
309 *task = (*task)->next;
310 free( ptask );
311 }
312
313 /* Release lock */
314 MTX_DBG( "popresult unlocks.\n" );
315 pthread_mutex_unlock( &tasklist_mutex );
316 MTX_DBG( "popresult unlocked.\n" );
317 return sock;
318 }
319
mutex_init()320 void mutex_init( ) {
321 pthread_mutex_init(&tasklist_mutex, NULL);
322 pthread_cond_init (&tasklist_being_filled, NULL);
323 pthread_mutex_init(&bucket_mutex, NULL);
324 pthread_cond_init (&bucket_being_unlocked, NULL);
325 byte_zero( all_torrents, sizeof( all_torrents ) );
326 }
327
mutex_deinit()328 void mutex_deinit( ) {
329 pthread_mutex_destroy(&bucket_mutex);
330 pthread_cond_destroy(&bucket_being_unlocked);
331 pthread_mutex_destroy(&tasklist_mutex);
332 pthread_cond_destroy(&tasklist_being_filled);
333 byte_zero( all_torrents, sizeof( all_torrents ) );
334 }
335
336 const char *g_version_mutex_c = "$Source$: $Revision$\n";
337