1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
3
4 $id$ */
5
6 #ifdef WANT_FULLSCRAPE
7
8 /* System */
9 #include <sys/param.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <pthread.h>
13 #include <arpa/inet.h>
14 #ifdef WANT_COMPRESSION_GZIP
15 #include <zlib.h>
16 #endif
17
18 /* Libowfat */
19 #include "byte.h"
20 #include "io.h"
21 #include "textcode.h"
22
23 /* Opentracker */
24 #include "trackerlogic.h"
25 #include "ot_mutex.h"
26 #include "ot_iovec.h"
27 #include "ot_fullscrape.h"
28
29 /* Fetch full scrape info for all torrents
30 Full scrapes usually are huge and one does not want to
31 allocate more memory. So lets get them in 512k units
32 */
33 #define OT_SCRAPE_CHUNK_SIZE (512*1024)
34
35 /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36 #define OT_SCRAPE_MAXENTRYLEN 256
37
38 #ifdef WANT_COMPRESSION_GZIP
39 #define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK
40 #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) , param1, param2, param3
41 #else
42 #define IF_COMPRESSION( TASK )
43 #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 )
44 #endif
45
46 /* Forward declaration */
47 static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
48
49 /* Converter function from memory to human readable hex strings
50 XXX - Duplicated from ot_stats. Needs fix. */
to_hex(char * d,uint8_t * s)51 static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;}
52
53 /* This is the entry point into this worker thread
54 It grabs tasks from mutex_tasklist and delivers results back
55 */
fullscrape_worker(void * args)56 static void * fullscrape_worker( void * args ) {
57 int iovec_entries;
58 struct iovec *iovector;
59
60 (void) args;
61
62 while( 1 ) {
63 ot_tasktype tasktype = TASK_FULLSCRAPE;
64 ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
65 fullscrape_make( &iovec_entries, &iovector, tasktype );
66 if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
67 iovec_free( &iovec_entries, &iovector );
68 if( !g_opentracker_running )
69 return NULL;
70 }
71 return NULL;
72 }
73
74 static pthread_t thread_id;
fullscrape_init()75 void fullscrape_init( ) {
76 pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
77 }
78
fullscrape_deinit()79 void fullscrape_deinit( ) {
80 pthread_cancel( thread_id );
81 }
82
fullscrape_deliver(int64 sock,ot_tasktype tasktype)83 void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
84 mutex_workqueue_pushtask( sock, tasktype );
85 }
86
fullscrape_increase(int * iovec_entries,struct iovec ** iovector,char ** r,char ** re WANT_COMPRESSION_GZIP_PARAM (z_stream * strm,ot_tasktype mode,int zaction))87 static int fullscrape_increase( int *iovec_entries, struct iovec **iovector,
88 char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode, int zaction ) ) {
89 /* Allocate a fresh output buffer at the end of our buffers list */
90 if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) {
91
92 /* Deallocate gzip buffers */
93 IF_COMPRESSION( deflateEnd(strm); )
94
95 /* Release lock on current bucket and return */
96 return -1;
97 }
98
99 /* Adjust new end of output buffer */
100 *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
101
102 /* When compressing, we have all the bytes in output buffer */
103 #ifdef WANT_COMPRESSION_GZIP
104 if( mode & TASK_FLAG_GZIP ) {
105 int zres;
106 *re -= OT_SCRAPE_MAXENTRYLEN;
107 strm->next_out = (uint8_t*)*r;
108 strm->avail_out = OT_SCRAPE_CHUNK_SIZE;
109 zres = deflate( strm, zaction );
110 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
111 fprintf( stderr, "deflate() failed while in fullscrape_increase(%d).\n", zaction );
112 *r = (char*)strm->next_out;
113 }
114 #endif
115
116 return 0;
117 }
118
fullscrape_make(int * iovec_entries,struct iovec ** iovector,ot_tasktype mode)119 static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
120 int bucket;
121 char *r, *re;
122 #ifdef WANT_COMPRESSION_GZIP
123 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
124 z_stream strm;
125 #endif
126
127 /* Setup return vector... */
128 *iovec_entries = 0;
129 *iovector = NULL;
130 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
131 return;
132
133 /* re points to low watermark */
134 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
135
136 #ifdef WANT_COMPRESSION_GZIP
137 if( mode & TASK_FLAG_GZIP ) {
138 re += OT_SCRAPE_MAXENTRYLEN;
139 byte_zero( &strm, sizeof(strm) );
140 strm.next_in = (uint8_t*)compress_buffer;
141 strm.next_out = (uint8_t*)r;
142 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
143 if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
144 fprintf( stderr, "not ok.\n" );
145 r = compress_buffer;
146 }
147 #endif
148
149 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
150 r += sprintf( r, "d5:filesd" );
151
152 /* For each bucket... */
153 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
154 /* Get exclusive access to that bucket */
155 ot_vector *torrents_list = mutex_bucket_lock( bucket );
156 size_t tor_offset;
157
158 /* For each torrent in this bucket.. */
159 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
160 /* Address torrents members */
161 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
162 ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
163
164 switch( mode & TASK_TASK_MASK ) {
165 case TASK_FULLSCRAPE:
166 default:
167 /* push hash as bencoded string */
168 *r++='2'; *r++='0'; *r++=':';
169 memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
170 /* push rest of the scrape string */
171 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count );
172
173 break;
174 case TASK_FULLSCRAPE_TPB_ASCII:
175 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
176 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
177 break;
178 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
179 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
180 r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count );
181 break;
182 case TASK_FULLSCRAPE_TPB_BINARY:
183 memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
184 *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count );
185 *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) );
186 r+=8;
187 break;
188 case TASK_FULLSCRAPE_TPB_URLENCODED:
189 r += fmt_urlencoded( r, (char *)*hash, 20 );
190 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
191 break;
192 case TASK_FULLSCRAPE_TRACKERSTATE:
193 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
194 r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count );
195 break;
196 }
197
198 #ifdef WANT_COMPRESSION_GZIP
199 if( mode & TASK_FLAG_GZIP ) {
200 int zres;
201 strm.next_in = (uint8_t*)compress_buffer;
202 strm.avail_in = r - compress_buffer;
203 zres = deflate( &strm, Z_NO_FLUSH );
204 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
205 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" );
206 r = (char*)strm.next_out;
207 }
208 #endif
209
210 /* Check if there still is enough buffer left */
211 while( r >= re )
212 if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_NO_FLUSH ) ) )
213 return mutex_bucket_unlock( bucket, 0 );
214
215 IF_COMPRESSION( r = compress_buffer; )
216 }
217
218 /* All torrents done: release lock on current bucket */
219 mutex_bucket_unlock( bucket, 0 );
220
221 /* Parent thread died? */
222 if( !g_opentracker_running )
223 return;
224 }
225
226 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
227 r += sprintf( r, "ee" );
228
229 #ifdef WANT_COMPRESSION_GZIP
230 if( mode & TASK_FLAG_GZIP ) {
231 strm.next_in = (uint8_t*)compress_buffer;
232 strm.avail_in = r - compress_buffer;
233 if( deflate( &strm, Z_FINISH ) < Z_OK )
234 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
235 r = (char*)strm.next_out;
236
237 while( r >= re )
238 if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_FINISH ) ) )
239 return mutex_bucket_unlock( bucket, 0 );
240 deflateEnd(&strm);
241 }
242 #endif
243
244 /* Release unused memory in current output buffer */
245 iovec_fixlast( iovec_entries, iovector, r );
246 }
247 #endif
248
249 const char *g_version_fullscrape_c = "$Source$: $Revision$\n";
250