1 // libTorrent - BitTorrent library
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL.  If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so.  If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact:  Jari Sundell <jaris@ifi.uio.no>
33 //
34 //           Skomakerveien 33
35 //           3185 Skoppum, NORWAY
36 
37 #include "config.h"
38 
39 #include <cstring>
40 #include <signal.h>
41 #include <unistd.h>
42 
43 #include "exceptions.h"
44 #include "poll.h"
45 #include "thread_base.h"
46 #include "thread_interrupt.h"
47 #include "utils/log.h"
48 #include "utils/instrumentation.h"
49 
50 namespace torrent {
51 
52 thread_base::global_lock_type lt_cacheline_aligned thread_base::m_global = { 0, 0, PTHREAD_MUTEX_INITIALIZER };
53 
thread_base()54 thread_base::thread_base() :
55   m_state(STATE_UNKNOWN),
56   m_flags(0),
57   m_instrumentation_index(INSTRUMENTATION_POLLING_DO_POLL_OTHERS - INSTRUMENTATION_POLLING_DO_POLL),
58 
59   m_poll(NULL),
60   m_interrupt_sender(NULL),
61   m_interrupt_receiver(NULL)
62 {
63   std::memset(&m_thread, 0, sizeof(pthread_t));
64 
65 // #ifdef USE_INTERRUPT_SOCKET
66   thread_interrupt::pair_type interrupt_sockets = thread_interrupt::create_pair();
67 
68   m_interrupt_sender = interrupt_sockets.first;
69   m_interrupt_receiver = interrupt_sockets.second;
70 // #endif
71 }
72 
~thread_base()73 thread_base::~thread_base() {
74   delete m_interrupt_sender;
75   delete m_interrupt_receiver;
76 }
77 
78 void
start_thread()79 thread_base::start_thread() {
80   if (m_poll == NULL)
81     throw internal_error("No poll object for thread defined.");
82 
83   if (!is_initialized() || pthread_create(&m_thread, NULL, (pthread_func)&thread_base::event_loop, this))
84     throw internal_error("Failed to create thread.");
85 }
86 
87 void
stop_thread()88 thread_base::stop_thread() {
89   __sync_fetch_and_or(&m_flags, flag_do_shutdown);
90   interrupt();
91 }
92 
93 void
stop_thread_wait()94 thread_base::stop_thread_wait() {
95   stop_thread();
96 
97   release_global_lock();
98 
99   while (!is_inactive()) {
100     usleep(1000);
101   }
102 
103   acquire_global_lock();
104 }
105 
106 // Fix interrupting when shutting down thread.
107 void
interrupt()108 thread_base::interrupt() {
109   // Only poke when polling, set no_timeout
110   if (is_polling())
111     m_interrupt_sender->poke();
112 }
113 
114 bool
should_handle_sigusr1()115 thread_base::should_handle_sigusr1() {
116   return false;
117 }
118 
119 void*
event_loop(thread_base * thread)120 thread_base::event_loop(thread_base* thread) {
121   __sync_lock_test_and_set(&thread->m_state, STATE_ACTIVE);
122 
123 #if defined(HAS_PTHREAD_SETNAME_NP_DARWIN)
124   pthread_setname_np(thread->name());
125 #elif defined(HAS_PTHREAD_SETNAME_NP_GENERIC)
126   pthread_setname_np(thread->m_thread, thread->name());
127 #endif
128 
129   lt_log_print(torrent::LOG_THREAD_NOTICE, "%s: Starting thread.", thread->name());
130 
131   try {
132 
133 // #ifdef USE_INTERRUPT_SOCKET
134     thread->m_poll->insert_read(thread->m_interrupt_receiver);
135 // #endif
136 
137     while (true) {
138       if (thread->m_slot_do_work)
139         thread->m_slot_do_work();
140 
141       thread->call_events();
142       thread->signal_bitfield()->work();
143 
144       __sync_fetch_and_or(&thread->m_flags, flag_polling);
145 
146       // Call again after setting flag_polling to ensure we process
147       // any events set while it was working.
148       if (thread->m_slot_do_work)
149         thread->m_slot_do_work();
150 
151       thread->call_events();
152       thread->signal_bitfield()->work();
153 
154       uint64_t next_timeout = 0;
155 
156       if (!thread->has_no_timeout()) {
157         next_timeout = thread->next_timeout_usec();
158 
159         if (thread->m_slot_next_timeout)
160           next_timeout = std::min(next_timeout, thread->m_slot_next_timeout());
161       }
162 
163       // Add the sleep call when testing interrupts, etc.
164       // usleep(50);
165 
166       int poll_flags = 0;
167 
168       if (!(thread->flags() & flag_main_thread))
169         poll_flags = torrent::Poll::poll_worker_thread;
170 
171       instrumentation_update(INSTRUMENTATION_POLLING_DO_POLL, 1);
172       instrumentation_update(instrumentation_enum(INSTRUMENTATION_POLLING_DO_POLL + thread->m_instrumentation_index), 1);
173 
174       int event_count = thread->m_poll->do_poll(next_timeout, poll_flags);
175 
176       instrumentation_update(INSTRUMENTATION_POLLING_EVENTS, event_count);
177       instrumentation_update(instrumentation_enum(INSTRUMENTATION_POLLING_EVENTS + thread->m_instrumentation_index), event_count);
178 
179       __sync_fetch_and_and(&thread->m_flags, ~(flag_polling | flag_no_timeout));
180     }
181 
182 // #ifdef USE_INTERRUPT_SOCKET
183     thread->m_poll->remove_write(thread->m_interrupt_receiver);
184 // #endif
185 
186   } catch (torrent::shutdown_exception& e) {
187     lt_log_print(torrent::LOG_THREAD_NOTICE, "%s: Shutting down thread.", thread->name());
188   }
189 
190   __sync_lock_test_and_set(&thread->m_state, STATE_INACTIVE);
191   return NULL;
192 }
193 
194 }
195