1 /*
2  * This file Copyright (C) 2008-2014 Mnemosyne LLC
3  *
4  * It may be used under the GNU GPL versions 2 or 3
5  * or any future license endorsed by Mnemosyne LLC.
6  *
7  */
8 
9 #include <string.h> /* strlen() */
10 
11 #include <event2/buffer.h>
12 #include <event2/event.h>
13 
14 #include "transmission.h"
15 #include "bandwidth.h"
16 #include "cache.h"
17 #include "inout.h" /* tr_ioFindFileLocation() */
18 #include "list.h"
19 #include "peer-mgr.h"
20 #include "torrent.h"
21 #include "trevent.h" /* tr_runInEventThread() */
22 #include "utils.h"
23 #include "web.h"
24 #include "webseed.h"
25 
26 struct tr_webseed_task
27 {
28     bool dead;
29     struct evbuffer* content;
30     struct tr_webseed* webseed;
31     tr_session* session;
32     tr_block_index_t block;
33     tr_piece_index_t piece_index;
34     uint32_t piece_offset;
35     uint32_t length;
36     tr_block_index_t blocks_done;
37     uint32_t block_size;
38     struct tr_web_task* web_task;
39     long response_code;
40 };
41 
42 struct tr_webseed
43 {
44     tr_peer parent;
45     tr_bandwidth bandwidth;
46     tr_session* session;
47     tr_peer_callback callback;
48     void* callback_data;
49     tr_list* tasks;
50     struct event* timer;
51     char* base_url;
52     size_t base_url_len;
53     int torrent_id;
54     int consecutive_failures;
55     int retry_tickcount;
56     int retry_challenge;
57     int idle_connections;
58     int active_transfers;
59     char** file_urls;
60 };
61 
62 enum
63 {
64     TR_IDLE_TIMER_MSEC = 2000,
65     /* */
66     FAILURE_RETRY_INTERVAL = 150,
67     /* */
68     MAX_CONSECUTIVE_FAILURES = 5,
69     /* */
70     MAX_WEBSEED_CONNECTIONS = 4
71 };
72 
73 /***
74 ****
75 ***/
76 
publish(tr_webseed * w,tr_peer_event * e)77 static void publish(tr_webseed* w, tr_peer_event* e)
78 {
79     if (w->callback != NULL)
80     {
81         (*w->callback)(&w->parent, e, w->callback_data);
82     }
83 }
84 
fire_client_got_rejs(tr_torrent * tor,tr_webseed * w,tr_block_index_t block,tr_block_index_t count)85 static void fire_client_got_rejs(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count)
86 {
87     tr_peer_event e = TR_PEER_EVENT_INIT;
88     e.eventType = TR_PEER_CLIENT_GOT_REJ;
89     tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length);
90 
91     for (tr_block_index_t i = 1; i <= count; i++)
92     {
93         if (i == count)
94         {
95             e.length = tr_torBlockCountBytes(tor, block + count - 1);
96         }
97 
98         publish(w, &e);
99         e.offset += e.length;
100     }
101 }
102 
fire_client_got_blocks(tr_torrent * tor,tr_webseed * w,tr_block_index_t block,tr_block_index_t count)103 static void fire_client_got_blocks(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count)
104 {
105     tr_peer_event e = TR_PEER_EVENT_INIT;
106     e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
107     tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length);
108 
109     for (tr_block_index_t i = 1; i <= count; i++)
110     {
111         if (i == count)
112         {
113             e.length = tr_torBlockCountBytes(tor, block + count - 1);
114         }
115 
116         publish(w, &e);
117         e.offset += e.length;
118     }
119 }
120 
fire_client_got_piece_data(tr_webseed * w,uint32_t length)121 static void fire_client_got_piece_data(tr_webseed* w, uint32_t length)
122 {
123     tr_peer_event e = TR_PEER_EVENT_INIT;
124     e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
125     e.length = length;
126     publish(w, &e);
127 }
128 
129 /***
130 ****
131 ***/
132 
133 struct write_block_data
134 {
135     tr_session* session;
136     int torrent_id;
137     struct tr_webseed* webseed;
138     struct evbuffer* content;
139     tr_piece_index_t piece_index;
140     tr_block_index_t block_index;
141     tr_block_index_t count;
142     uint32_t block_offset;
143 };
144 
write_block_func(void * vdata)145 static void write_block_func(void* vdata)
146 {
147     struct write_block_data* data = vdata;
148     struct tr_webseed* w = data->webseed;
149     struct evbuffer* buf = data->content;
150     struct tr_torrent* tor;
151 
152     tor = tr_torrentFindFromId(data->session, data->torrent_id);
153 
154     if (tor != NULL)
155     {
156         uint32_t const block_size = tor->blockSize;
157         uint32_t len = evbuffer_get_length(buf);
158         uint32_t const offset_end = data->block_offset + len;
159         tr_cache* cache = data->session->cache;
160         tr_piece_index_t const piece = data->piece_index;
161 
162         if (!tr_torrentPieceIsComplete(tor, piece))
163         {
164             while (len > 0)
165             {
166                 uint32_t const bytes_this_pass = MIN(len, block_size);
167                 tr_cacheWriteBlock(cache, tor, piece, offset_end - len, bytes_this_pass, buf);
168                 len -= bytes_this_pass;
169             }
170 
171             fire_client_got_blocks(tor, w, data->block_index, data->count);
172         }
173     }
174 
175     evbuffer_free(buf);
176     tr_free(data);
177 }
178 
179 /***
180 ****
181 ***/
182 
183 struct connection_succeeded_data
184 {
185     struct tr_webseed* webseed;
186     char* real_url;
187     tr_piece_index_t piece_index;
188     uint32_t piece_offset;
189 };
190 
connection_succeeded(void * vdata)191 static void connection_succeeded(void* vdata)
192 {
193     struct connection_succeeded_data* data = vdata;
194     struct tr_webseed* w = data->webseed;
195 
196     if (++w->active_transfers >= w->retry_challenge && w->retry_challenge != 0)
197     {
198         /* the server seems to be accepting more connections now */
199         w->consecutive_failures = w->retry_tickcount = w->retry_challenge = 0;
200     }
201 
202     if (data->real_url != NULL)
203     {
204         tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id);
205 
206         if (tor != NULL)
207         {
208             uint64_t file_offset;
209             tr_file_index_t file_index;
210 
211             tr_ioFindFileLocation(tor, data->piece_index, data->piece_offset, &file_index, &file_offset);
212             tr_free(w->file_urls[file_index]);
213             w->file_urls[file_index] = data->real_url;
214             data->real_url = NULL;
215         }
216     }
217 
218     tr_free(data->real_url);
219     tr_free(data);
220 }
221 
222 /***
223 ****
224 ***/
225 
on_content_changed(struct evbuffer * buf,struct evbuffer_cb_info const * info,void * vtask)226 static void on_content_changed(struct evbuffer* buf, struct evbuffer_cb_info const* info, void* vtask)
227 {
228     size_t const n_added = info->n_added;
229     struct tr_webseed_task* task = vtask;
230     tr_session* session = task->session;
231 
232     tr_sessionLock(session);
233 
234     if (!task->dead && n_added > 0)
235     {
236         uint32_t len;
237         struct tr_webseed* w = task->webseed;
238 
239         tr_bandwidthUsed(&w->bandwidth, TR_DOWN, n_added, true, tr_time_msec());
240         fire_client_got_piece_data(w, n_added);
241         len = evbuffer_get_length(buf);
242 
243         if (task->response_code == 0)
244         {
245             tr_webGetTaskInfo(task->web_task, TR_WEB_GET_CODE, &task->response_code);
246 
247             if (task->response_code == 206)
248             {
249                 char const* url;
250                 struct connection_succeeded_data* data;
251 
252                 url = NULL;
253                 tr_webGetTaskInfo(task->web_task, TR_WEB_GET_REAL_URL, &url);
254 
255                 data = tr_new(struct connection_succeeded_data, 1);
256                 data->webseed = w;
257                 data->real_url = tr_strdup(url);
258                 data->piece_index = task->piece_index;
259                 data->piece_offset = task->piece_offset + task->blocks_done * task->block_size + len - 1;
260 
261                 /* processing this uses a tr_torrent pointer,
262                    so push the work to the libevent thread... */
263                 tr_runInEventThread(w->session, connection_succeeded, data);
264             }
265         }
266 
267         if (task->response_code == 206 && len >= task->block_size)
268         {
269             /* once we've got at least one full block, save it */
270 
271             struct write_block_data* data;
272             uint32_t const block_size = task->block_size;
273             tr_block_index_t const completed = len / block_size;
274 
275             data = tr_new(struct write_block_data, 1);
276             data->webseed = task->webseed;
277             data->piece_index = task->piece_index;
278             data->block_index = task->block + task->blocks_done;
279             data->count = completed;
280             data->block_offset = task->piece_offset + task->blocks_done * block_size;
281             data->content = evbuffer_new();
282             data->torrent_id = w->torrent_id;
283             data->session = w->session;
284 
285             /* we don't use locking on this evbuffer so we must copy out the data
286                that will be needed when writing the block in a different thread */
287             evbuffer_remove_buffer(task->content, data->content, block_size * completed);
288 
289             tr_runInEventThread(w->session, write_block_func, data);
290             task->blocks_done += completed;
291         }
292     }
293 
294     tr_sessionUnlock(session);
295 }
296 
297 static void task_request_next_chunk(struct tr_webseed_task* task);
298 
on_idle(tr_webseed * w)299 static void on_idle(tr_webseed* w)
300 {
301     int want;
302     int running_tasks = tr_list_size(w->tasks);
303     tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id);
304 
305     if (w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES)
306     {
307         want = w->idle_connections;
308 
309         if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL)
310         {
311             /* some time has passed since our connection attempts failed. try again */
312             ++want;
313             /* if this challenge is fulfilled we will reset consecutive_failures */
314             w->retry_challenge = running_tasks + want;
315         }
316     }
317     else
318     {
319         want = MAX_WEBSEED_CONNECTIONS - running_tasks;
320         w->retry_challenge = running_tasks + w->idle_connections + 1;
321     }
322 
323     if (tor != NULL && tor->isRunning && !tr_torrentIsSeed(tor) && want > 0)
324     {
325         int got = 0;
326         tr_block_index_t* blocks = NULL;
327 
328         blocks = tr_new(tr_block_index_t, want * 2);
329         tr_peerMgrGetNextRequests(tor, &w->parent, want, blocks, &got, true);
330 
331         w->idle_connections -= MIN(w->idle_connections, got);
332 
333         if (w->retry_tickcount >= FAILURE_RETRY_INTERVAL && got == want)
334         {
335             w->retry_tickcount = 0;
336         }
337 
338         for (int i = 0; i < got; ++i)
339         {
340             tr_block_index_t const b = blocks[i * 2];
341             tr_block_index_t const be = blocks[i * 2 + 1];
342             struct tr_webseed_task* task;
343 
344             task = tr_new0(struct tr_webseed_task, 1);
345             task->session = tor->session;
346             task->webseed = w;
347             task->block = b;
348             task->piece_index = tr_torBlockPiece(tor, b);
349             task->piece_offset = tor->blockSize * b - tor->info.pieceSize * task->piece_index;
350             task->length = (be - b) * tor->blockSize + tr_torBlockCountBytes(tor, be);
351             task->blocks_done = 0;
352             task->response_code = 0;
353             task->block_size = tor->blockSize;
354             task->content = evbuffer_new();
355             evbuffer_add_cb(task->content, on_content_changed, task);
356             tr_list_append(&w->tasks, task);
357             task_request_next_chunk(task);
358         }
359 
360         tr_free(blocks);
361     }
362 }
363 
web_response_func(tr_session * session,bool did_connect UNUSED,bool did_timeout UNUSED,long response_code,void const * response UNUSED,size_t response_byte_count UNUSED,void * vtask)364 static void web_response_func(tr_session* session, bool did_connect UNUSED, bool did_timeout UNUSED, long response_code,
365     void const* response UNUSED, size_t response_byte_count UNUSED, void* vtask)
366 {
367     tr_webseed* w;
368     tr_torrent* tor;
369     struct tr_webseed_task* t = vtask;
370     bool const success = response_code == 206;
371 
372     if (t->dead)
373     {
374         evbuffer_free(t->content);
375         tr_free(t);
376         return;
377     }
378 
379     w = t->webseed;
380     tor = tr_torrentFindFromId(session, w->torrent_id);
381 
382     if (tor != NULL)
383     {
384         /* active_transfers was only increased if the connection was successful */
385         if (t->response_code == 206)
386         {
387             --w->active_transfers;
388         }
389 
390         if (!success)
391         {
392             tr_block_index_t const blocks_remain = (t->length + tor->blockSize - 1) / tor->blockSize - t->blocks_done;
393 
394             if (blocks_remain != 0)
395             {
396                 fire_client_got_rejs(tor, w, t->block + t->blocks_done, blocks_remain);
397             }
398 
399             if (t->blocks_done != 0)
400             {
401                 ++w->idle_connections;
402             }
403             else if (++w->consecutive_failures >= MAX_CONSECUTIVE_FAILURES && w->retry_tickcount == 0)
404             {
405                 /* now wait a while until retrying to establish a connection */
406                 ++w->retry_tickcount;
407             }
408 
409             tr_list_remove_data(&w->tasks, t);
410             evbuffer_free(t->content);
411             tr_free(t);
412         }
413         else
414         {
415             uint32_t const bytes_done = t->blocks_done * tor->blockSize;
416             uint32_t const buf_len = evbuffer_get_length(t->content);
417 
418             if (bytes_done + buf_len < t->length)
419             {
420                 /* request finished successfully but there's still data missing. that
421                    means we've reached the end of a file and need to request the next one */
422                 t->response_code = 0;
423                 task_request_next_chunk(t);
424             }
425             else
426             {
427                 if (buf_len != 0 && !tr_torrentPieceIsComplete(tor, t->piece_index))
428                 {
429                     /* on_content_changed() will not write a block if it is smaller than
430                        the torrent's block size, i.e. the torrent's very last block */
431                     tr_cacheWriteBlock(session->cache, tor, t->piece_index, t->piece_offset + bytes_done, buf_len, t->content);
432 
433                     fire_client_got_blocks(tor, t->webseed, t->block + t->blocks_done, 1);
434                 }
435 
436                 ++w->idle_connections;
437 
438                 tr_list_remove_data(&w->tasks, t);
439                 evbuffer_free(t->content);
440                 tr_free(t);
441 
442                 on_idle(w);
443             }
444         }
445     }
446 }
447 
make_url(tr_webseed * w,tr_file const * file)448 static struct evbuffer* make_url(tr_webseed* w, tr_file const* file)
449 {
450     struct evbuffer* buf = evbuffer_new();
451 
452     evbuffer_add(buf, w->base_url, w->base_url_len);
453 
454     /* if url ends with a '/', add the torrent name */
455     if (w->base_url[w->base_url_len - 1] == '/' && file->name != NULL)
456     {
457         tr_http_escape(buf, file->name, strlen(file->name), false);
458     }
459 
460     return buf;
461 }
462 
task_request_next_chunk(struct tr_webseed_task * t)463 static void task_request_next_chunk(struct tr_webseed_task* t)
464 {
465     tr_webseed* w = t->webseed;
466     tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id);
467 
468     if (tor != NULL)
469     {
470         char range[64];
471         char** urls = t->webseed->file_urls;
472 
473         tr_info const* inf = tr_torrentInfo(tor);
474         uint64_t const remain = t->length - t->blocks_done * tor->blockSize - evbuffer_get_length(t->content);
475 
476         uint64_t const total_offset = tr_pieceOffset(tor, t->piece_index, t->piece_offset, t->length - remain);
477         tr_piece_index_t const step_piece = total_offset / inf->pieceSize;
478         uint64_t const step_piece_offset = total_offset - inf->pieceSize * step_piece;
479 
480         tr_file_index_t file_index;
481         tr_file const* file;
482         uint64_t file_offset;
483         uint64_t this_pass;
484 
485         tr_ioFindFileLocation(tor, step_piece, step_piece_offset, &file_index, &file_offset);
486         file = &inf->files[file_index];
487         this_pass = MIN(remain, file->length - file_offset);
488 
489         if (urls[file_index] == NULL)
490         {
491             urls[file_index] = evbuffer_free_to_str(make_url(t->webseed, file), NULL);
492         }
493 
494         tr_snprintf(range, sizeof(range), "%" PRIu64 "-%" PRIu64, file_offset, file_offset + this_pass - 1);
495 
496         t->web_task = tr_webRunWebseed(tor, urls[file_index], range, web_response_func, t, t->content);
497     }
498 }
499 
500 /***
501 ****
502 ***/
503 
webseed_timer_func(evutil_socket_t foo UNUSED,short bar UNUSED,void * vw)504 static void webseed_timer_func(evutil_socket_t foo UNUSED, short bar UNUSED, void* vw)
505 {
506     tr_webseed* w = vw;
507 
508     if (w->retry_tickcount != 0)
509     {
510         ++w->retry_tickcount;
511     }
512 
513     on_idle(w);
514 
515     tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC);
516 }
517 
518 /***
519 ****  tr_peer virtual functions
520 ***/
521 
webseed_is_transferring_pieces(tr_peer const * peer,uint64_t now,tr_direction direction,unsigned int * setme_Bps)522 static bool webseed_is_transferring_pieces(tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* setme_Bps)
523 {
524     unsigned int Bps = 0;
525     bool is_active = false;
526 
527     if (direction == TR_DOWN)
528     {
529         tr_webseed const* w = (tr_webseed const*)peer;
530         is_active = w->tasks != NULL;
531         Bps = tr_bandwidthGetPieceSpeed_Bps(&w->bandwidth, now, direction);
532     }
533 
534     if (setme_Bps != NULL)
535     {
536         *setme_Bps = Bps;
537     }
538 
539     return is_active;
540 }
541 
webseed_destruct(tr_peer * peer)542 static void webseed_destruct(tr_peer* peer)
543 {
544     tr_webseed* w = (tr_webseed*)peer;
545 
546     /* flag all the pending tasks as dead */
547     for (tr_list* l = w->tasks; l != NULL; l = l->next)
548     {
549         struct tr_webseed_task* task = l->data;
550         task->dead = true;
551     }
552 
553     tr_list_free(&w->tasks, NULL);
554 
555     /* if we have an array of file URLs, free it */
556     if (w->file_urls != NULL)
557     {
558         tr_torrent* tor = tr_torrentFindFromId(w->session, w->torrent_id);
559         tr_info const* inf = tr_torrentInfo(tor);
560 
561         for (tr_file_index_t i = 0; i < inf->fileCount; ++i)
562         {
563             tr_free(w->file_urls[i]);
564         }
565 
566         tr_free(w->file_urls);
567     }
568 
569     /* webseed destruct */
570     event_free(w->timer);
571     tr_bandwidthDestruct(&w->bandwidth);
572     tr_free(w->base_url);
573 
574     /* parent class destruct */
575     tr_peerDestruct(&w->parent);
576 }
577 
578 static struct tr_peer_virtual_funcs const my_funcs =
579 {
580     .destruct = webseed_destruct,
581     .is_transferring_pieces = webseed_is_transferring_pieces
582 };
583 
584 /***
585 ****
586 ***/
587 
tr_webseedNew(struct tr_torrent * tor,char const * url,tr_peer_callback callback,void * callback_data)588 tr_webseed* tr_webseedNew(struct tr_torrent* tor, char const* url, tr_peer_callback callback, void* callback_data)
589 {
590     tr_webseed* w = tr_new0(tr_webseed, 1);
591     tr_peer* peer = &w->parent;
592     tr_info const* inf = tr_torrentInfo(tor);
593 
594     /* construct parent class */
595     tr_peerConstruct(peer, tor);
596     peer->client = TR_KEY_webseeds;
597     peer->funcs = &my_funcs;
598     tr_bitfieldSetHasAll(&peer->have);
599     tr_peerUpdateProgress(tor, peer);
600 
601     w->torrent_id = tr_torrentId(tor);
602     w->session = tor->session;
603     w->base_url_len = strlen(url);
604     w->base_url = tr_strndup(url, w->base_url_len);
605     w->callback = callback;
606     w->callback_data = callback_data;
607     w->file_urls = tr_new0(char*, inf->fileCount);
608     // tr_rcConstruct(&w->download_rate);
609     tr_bandwidthConstruct(&w->bandwidth, tor->session, &tor->bandwidth);
610     w->timer = evtimer_new(w->session->event_base, webseed_timer_func, w);
611     tr_timerAddMsec(w->timer, TR_IDLE_TIMER_MSEC);
612     return w;
613 }
614