1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2    It is considered beerware. Prost. Skol. Cheers or whatever.
3 
4    $id$ */
5 
6 #ifdef WANT_FULLSCRAPE
7 
8 /* System */
9 #include <sys/param.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <pthread.h>
13 #include <arpa/inet.h>
14 #ifdef WANT_COMPRESSION_GZIP
15 #include <zlib.h>
16 #endif
17 
18 /* Libowfat */
19 #include "byte.h"
20 #include "io.h"
21 #include "textcode.h"
22 
23 /* Opentracker */
24 #include "trackerlogic.h"
25 #include "ot_mutex.h"
26 #include "ot_iovec.h"
27 #include "ot_fullscrape.h"
28 
29 /* Fetch full scrape info for all torrents
30    Full scrapes usually are huge and one does not want to
31    allocate more memory. So lets get them in 512k units
32 */
33 #define OT_SCRAPE_CHUNK_SIZE (512*1024)
34 
35 /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36 #define OT_SCRAPE_MAXENTRYLEN 256
37 
38 #ifdef WANT_COMPRESSION_GZIP
39 #define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK
40 #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) , param1, param2, param3
41 #else
42 #define IF_COMPRESSION( TASK )
43 #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 )
44 #endif
45 
46 /* Forward declaration */
47 static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
48 
49 /* Converter function from memory to human readable hex strings
50    XXX - Duplicated from ot_stats. Needs fix. */
to_hex(char * d,uint8_t * s)51 static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;}
52 
53 /* This is the entry point into this worker thread
54    It grabs tasks from mutex_tasklist and delivers results back
55 */
fullscrape_worker(void * args)56 static void * fullscrape_worker( void * args ) {
57   int iovec_entries;
58   struct iovec *iovector;
59 
60   (void) args;
61 
62   while( 1 ) {
63     ot_tasktype tasktype = TASK_FULLSCRAPE;
64     ot_taskid   taskid   = mutex_workqueue_poptask( &tasktype );
65     fullscrape_make( &iovec_entries, &iovector, tasktype );
66     if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
67       iovec_free( &iovec_entries, &iovector );
68     if( !g_opentracker_running )
69       return NULL;
70   }
71   return NULL;
72 }
73 
74 static pthread_t thread_id;
fullscrape_init()75 void fullscrape_init( ) {
76   pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
77 }
78 
fullscrape_deinit()79 void fullscrape_deinit( ) {
80   pthread_cancel( thread_id );
81 }
82 
fullscrape_deliver(int64 sock,ot_tasktype tasktype)83 void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
84   mutex_workqueue_pushtask( sock, tasktype );
85 }
86 
fullscrape_increase(int * iovec_entries,struct iovec ** iovector,char ** r,char ** re WANT_COMPRESSION_GZIP_PARAM (z_stream * strm,ot_tasktype mode,int zaction))87 static int fullscrape_increase( int *iovec_entries, struct iovec **iovector,
88                          char **r, char **re  WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode, int zaction ) ) {
89   /* Allocate a fresh output buffer at the end of our buffers list */
90   if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) {
91 
92     /* Deallocate gzip buffers */
93     IF_COMPRESSION( deflateEnd(strm); )
94 
95     /* Release lock on current bucket and return */
96     return -1;
97   }
98 
99   /* Adjust new end of output buffer */
100   *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
101 
102   /* When compressing, we have all the bytes in output buffer */
103 #ifdef WANT_COMPRESSION_GZIP
104   if( mode & TASK_FLAG_GZIP ) {
105     int zres;
106     *re -= OT_SCRAPE_MAXENTRYLEN;
107     strm->next_out  = (uint8_t*)*r;
108     strm->avail_out = OT_SCRAPE_CHUNK_SIZE;
109     zres = deflate( strm, zaction );
110     if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
111       fprintf( stderr, "deflate() failed while in fullscrape_increase(%d).\n", zaction );
112     *r = (char*)strm->next_out;
113   }
114 #endif
115 
116   return 0;
117 }
118 
fullscrape_make(int * iovec_entries,struct iovec ** iovector,ot_tasktype mode)119 static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
120   int      bucket;
121   char    *r, *re;
122 #ifdef WANT_COMPRESSION_GZIP
123   char     compress_buffer[OT_SCRAPE_MAXENTRYLEN];
124   z_stream strm;
125 #endif
126 
127   /* Setup return vector... */
128   *iovec_entries = 0;
129   *iovector = NULL;
130   if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
131     return;
132 
133   /* re points to low watermark */
134   re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
135 
136 #ifdef WANT_COMPRESSION_GZIP
137   if( mode & TASK_FLAG_GZIP ) {
138     re += OT_SCRAPE_MAXENTRYLEN;
139     byte_zero( &strm, sizeof(strm) );
140     strm.next_in   = (uint8_t*)compress_buffer;
141     strm.next_out  = (uint8_t*)r;
142     strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
143     if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
144       fprintf( stderr, "not ok.\n" );
145     r = compress_buffer;
146   }
147 #endif
148 
149   if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
150     r += sprintf( r, "d5:filesd" );
151 
152   /* For each bucket... */
153   for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
154     /* Get exclusive access to that bucket */
155     ot_vector *torrents_list = mutex_bucket_lock( bucket );
156     size_t tor_offset;
157 
158     /* For each torrent in this bucket.. */
159     for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
160       /* Address torrents members */
161       ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
162       ot_hash     *hash      =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash;
163 
164       switch( mode & TASK_TASK_MASK ) {
165       case TASK_FULLSCRAPE:
166       default:
167         /* push hash as bencoded string */
168         *r++='2'; *r++='0'; *r++=':';
169         memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
170         /* push rest of the scrape string */
171         r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count );
172 
173         break;
174       case TASK_FULLSCRAPE_TPB_ASCII:
175         to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
176         r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
177         break;
178       case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
179         to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
180         r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count );
181         break;
182       case TASK_FULLSCRAPE_TPB_BINARY:
183         memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
184         *(uint32_t*)(r+0) = htonl( (uint32_t)  peer_list->seed_count );
185         *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) );
186         r+=8;
187         break;
188       case TASK_FULLSCRAPE_TPB_URLENCODED:
189         r += fmt_urlencoded( r, (char *)*hash, 20 );
190         r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
191         break;
192       case TASK_FULLSCRAPE_TRACKERSTATE:
193         to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
194         r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count );
195         break;
196       }
197 
198 #ifdef WANT_COMPRESSION_GZIP
199      if( mode & TASK_FLAG_GZIP ) {
200         int zres;
201         strm.next_in  = (uint8_t*)compress_buffer;
202         strm.avail_in = r - compress_buffer;
203         zres = deflate( &strm, Z_NO_FLUSH );
204         if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
205           fprintf( stderr, "deflate() failed while in fullscrape_make().\n" );
206         r = (char*)strm.next_out;
207       }
208 #endif
209 
210       /* Check if there still is enough buffer left */
211       while( r >= re )
212        if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_NO_FLUSH ) ) )
213          return mutex_bucket_unlock( bucket, 0 );
214 
215       IF_COMPRESSION( r = compress_buffer; )
216     }
217 
218     /* All torrents done: release lock on current bucket */
219     mutex_bucket_unlock( bucket, 0 );
220 
221     /* Parent thread died? */
222     if( !g_opentracker_running )
223       return;
224   }
225 
226   if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
227     r += sprintf( r, "ee" );
228 
229 #ifdef WANT_COMPRESSION_GZIP
230   if( mode & TASK_FLAG_GZIP ) {
231     strm.next_in  = (uint8_t*)compress_buffer;
232     strm.avail_in = r - compress_buffer;
233     if( deflate( &strm, Z_FINISH ) < Z_OK )
234       fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
235     r = (char*)strm.next_out;
236 
237     while( r >= re )
238       if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_FINISH ) ) )
239         return mutex_bucket_unlock( bucket, 0 );
240     deflateEnd(&strm);
241   }
242 #endif
243 
244   /* Release unused memory in current output buffer */
245   iovec_fixlast( iovec_entries, iovector, r );
246 }
247 #endif
248 
249 const char *g_version_fullscrape_c = "$Source$: $Revision$\n";
250