1 // libTorrent - BitTorrent library
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL. If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so. If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact: Jari Sundell <jaris@ifi.uio.no>
33 //
34 // Skomakerveien 33
35 // 3185 Skoppum, NORWAY
36
37 #include "config.h"
38
39 #include <cstring>
40 #include <signal.h>
41 #include <unistd.h>
42
43 #include "exceptions.h"
44 #include "poll.h"
45 #include "thread_base.h"
46 #include "thread_interrupt.h"
47 #include "utils/log.h"
48 #include "utils/instrumentation.h"
49
50 namespace torrent {
51
52 thread_base::global_lock_type lt_cacheline_aligned thread_base::m_global = { 0, 0, PTHREAD_MUTEX_INITIALIZER };
53
thread_base()54 thread_base::thread_base() :
55 m_state(STATE_UNKNOWN),
56 m_flags(0),
57 m_instrumentation_index(INSTRUMENTATION_POLLING_DO_POLL_OTHERS - INSTRUMENTATION_POLLING_DO_POLL),
58
59 m_poll(NULL),
60 m_interrupt_sender(NULL),
61 m_interrupt_receiver(NULL)
62 {
63 std::memset(&m_thread, 0, sizeof(pthread_t));
64
65 // #ifdef USE_INTERRUPT_SOCKET
66 thread_interrupt::pair_type interrupt_sockets = thread_interrupt::create_pair();
67
68 m_interrupt_sender = interrupt_sockets.first;
69 m_interrupt_receiver = interrupt_sockets.second;
70 // #endif
71 }
72
~thread_base()73 thread_base::~thread_base() {
74 delete m_interrupt_sender;
75 delete m_interrupt_receiver;
76 }
77
78 void
start_thread()79 thread_base::start_thread() {
80 if (m_poll == NULL)
81 throw internal_error("No poll object for thread defined.");
82
83 if (!is_initialized() || pthread_create(&m_thread, NULL, (pthread_func)&thread_base::event_loop, this))
84 throw internal_error("Failed to create thread.");
85 }
86
87 void
stop_thread()88 thread_base::stop_thread() {
89 __sync_fetch_and_or(&m_flags, flag_do_shutdown);
90 interrupt();
91 }
92
93 void
stop_thread_wait()94 thread_base::stop_thread_wait() {
95 stop_thread();
96
97 release_global_lock();
98
99 while (!is_inactive()) {
100 usleep(1000);
101 }
102
103 acquire_global_lock();
104 }
105
106 // Fix interrupting when shutting down thread.
107 void
interrupt()108 thread_base::interrupt() {
109 // Only poke when polling, set no_timeout
110 if (is_polling())
111 m_interrupt_sender->poke();
112 }
113
114 bool
should_handle_sigusr1()115 thread_base::should_handle_sigusr1() {
116 return false;
117 }
118
119 void*
event_loop(thread_base * thread)120 thread_base::event_loop(thread_base* thread) {
121 __sync_lock_test_and_set(&thread->m_state, STATE_ACTIVE);
122
123 #if defined(HAS_PTHREAD_SETNAME_NP_DARWIN)
124 pthread_setname_np(thread->name());
125 #elif defined(HAS_PTHREAD_SETNAME_NP_GENERIC)
126 pthread_setname_np(thread->m_thread, thread->name());
127 #endif
128
129 lt_log_print(torrent::LOG_THREAD_NOTICE, "%s: Starting thread.", thread->name());
130
131 try {
132
133 // #ifdef USE_INTERRUPT_SOCKET
134 thread->m_poll->insert_read(thread->m_interrupt_receiver);
135 // #endif
136
137 while (true) {
138 if (thread->m_slot_do_work)
139 thread->m_slot_do_work();
140
141 thread->call_events();
142 thread->signal_bitfield()->work();
143
144 __sync_fetch_and_or(&thread->m_flags, flag_polling);
145
146 // Call again after setting flag_polling to ensure we process
147 // any events set while it was working.
148 if (thread->m_slot_do_work)
149 thread->m_slot_do_work();
150
151 thread->call_events();
152 thread->signal_bitfield()->work();
153
154 uint64_t next_timeout = 0;
155
156 if (!thread->has_no_timeout()) {
157 next_timeout = thread->next_timeout_usec();
158
159 if (thread->m_slot_next_timeout)
160 next_timeout = std::min(next_timeout, thread->m_slot_next_timeout());
161 }
162
163 // Add the sleep call when testing interrupts, etc.
164 // usleep(50);
165
166 int poll_flags = 0;
167
168 if (!(thread->flags() & flag_main_thread))
169 poll_flags = torrent::Poll::poll_worker_thread;
170
171 instrumentation_update(INSTRUMENTATION_POLLING_DO_POLL, 1);
172 instrumentation_update(instrumentation_enum(INSTRUMENTATION_POLLING_DO_POLL + thread->m_instrumentation_index), 1);
173
174 int event_count = thread->m_poll->do_poll(next_timeout, poll_flags);
175
176 instrumentation_update(INSTRUMENTATION_POLLING_EVENTS, event_count);
177 instrumentation_update(instrumentation_enum(INSTRUMENTATION_POLLING_EVENTS + thread->m_instrumentation_index), event_count);
178
179 __sync_fetch_and_and(&thread->m_flags, ~(flag_polling | flag_no_timeout));
180 }
181
182 // #ifdef USE_INTERRUPT_SOCKET
183 thread->m_poll->remove_write(thread->m_interrupt_receiver);
184 // #endif
185
186 } catch (torrent::shutdown_exception& e) {
187 lt_log_print(torrent::LOG_THREAD_NOTICE, "%s: Shutting down thread.", thread->name());
188 }
189
190 __sync_lock_test_and_set(&thread->m_state, STATE_INACTIVE);
191 return NULL;
192 }
193
194 }
195