1 /*
2 * Copyright 2011, Ben Langmead <langmea@cs.jhu.edu>
3 *
4 * This file is part of Bowtie 2.
5 *
6 * Bowtie 2 is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Bowtie 2 is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Bowtie 2. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #ifndef THREADING_H_
21 #define THREADING_H_
22
23 #include <cstdio>
24 #include <cstdlib>
25 #include <cstring>
26 #include <iostream>
27 #include <mutex>
28
29 #include "bt2_locks.h"
30
31 #ifdef WITH_QUEUELOCK
32 #define MUTEX_T mcs_lock
33 #elif defined(NO_SPINLOCK)
34 #define MUTEX_T std::mutex
35 #else
36 #define MUTEX_T spin_lock
37 #endif
38
39 struct thread_tracking_pair {
40 int tid;
41 std::atomic<int>* done;
42 };
43
44 #if defined(_TTHREAD_WIN32_)
45 #define SLEEP(x) Sleep(x)
46 #else
47 #define SLEEP(x) do { \
48 const static timespec ts_tmp_ = {0, 1000000 * x}; \
49 nanosleep(&ts_tmp_, NULL); \
50 } while(false)
51 #endif
52
53 #ifdef NO_SPINLOCK
54 # ifdef WITH_QUEUELOCK
55 # define MUTEX_T mcs_lock
56 # else
57 # define MUTEX_T std::mutex
58 # endif
59 #else
60 # ifdef WITH_TBB
61 # define MUTEX_T spin_lock
62 # endif
63 #endif /* NO_SPINLOCK */
64 /**
65 * Wrap a lock; obtain lock upon construction, release upon destruction.
66 */
67 class ThreadSafe {
68 public:
69
ThreadSafe(MUTEX_T & mutex)70 ThreadSafe(MUTEX_T& mutex) :
71 #if NO_SPINLOCK && WITH_QUEUELOCK
72 node_{},
73 #endif
74 mutex_(mutex) {
75 #if NO_SPINLOCK && WITH_QUEUELOCK
76 mutex_.lock(node_);
77 #else
78 mutex_.lock();
79 #endif
80
81 }
82
~ThreadSafe()83 ~ThreadSafe() {
84 #if NO_SPINLOCK && WITH_QUEUELOCK
85 mutex_.unlock(node_);
86 #else
87 mutex_.unlock();
88 #endif
89 }
90
91 private:
92 #if NO_SPINLOCK && WITH_QUEUELOCK
93 MUTEX_T::mcs_node node_;
94 #endif
95 MUTEX_T& mutex_;
96 };
97
98 #ifdef WITH_TBB
99 #ifdef WITH_AFFINITY
100 //ripped entirely from;
101 //https://software.intel.com/en-us/blogs/2013/10/31/applying-intel-threading-building-blocks-observers-for-thread-affinity-on-intel
102 class concurrency_tracker: public tbb::task_scheduler_observer {
103 std::atomic<int> num_threads;
104 public:
concurrency_tracker()105 concurrency_tracker() : num_threads() { observe(true); }
on_scheduler_entry(bool)106 /*override*/ void on_scheduler_entry( bool ) { ++num_threads; }
on_scheduler_exit(bool)107 /*override*/ void on_scheduler_exit( bool ) { --num_threads; }
108
get_concurrency()109 int get_concurrency() { return num_threads; }
110 };
111
112 class pinning_observer: public tbb::task_scheduler_observer {
113 cpu_set_t *mask;
114 int ncpus;
115
116 const int pinning_step;
117 std::atomic<int> thread_index;
118 public:
pinning_step(pinning_step)119 pinning_observer( int pinning_step=1 ) : pinning_step(pinning_step), thread_index() {
120 for ( ncpus = sizeof(cpu_set_t)/CHAR_BIT; ncpus < 16*1024 /* some reasonable limit */; ncpus <<= 1 ) {
121 mask = CPU_ALLOC( ncpus );
122 if ( !mask ) break;
123 const size_t size = CPU_ALLOC_SIZE( ncpus );
124 CPU_ZERO_S( size, mask );
125 const int err = sched_getaffinity( 0, size, mask );
126 if ( !err ) break;
127
128 CPU_FREE( mask );
129 mask = NULL;
130 if ( errno != EINVAL ) break;
131 }
132 if ( !mask )
133 std::cout << "Warning: Failed to obtain process affinity mask. Thread affinitization is disabled." << std::endl;
134 }
135
on_scheduler_entry(bool)136 /*override*/ void on_scheduler_entry( bool ) {
137 if ( !mask ) return;
138
139 const size_t size = CPU_ALLOC_SIZE( ncpus );
140 const int num_cpus = CPU_COUNT_S( size, mask );
141 int thr_idx =
142 //cwilks: we're one interface version lower than what
143 //is required for task arena (7000 vs. 7001)
144 #if USE_TASK_ARENA_CURRENT_SLOT
145 tbb::task_arena::current_slot();
146 #else
147 thread_index++;
148 #endif
149 #if __MIC__
150 thr_idx += 1; // To avoid logical thread zero for the master thread on Intel(R) Xeon Phi(tm)
151 #endif
152 thr_idx %= num_cpus; // To limit unique number in [0; num_cpus-1] range
153
154 // Place threads with specified step
155 int cpu_idx = 0;
156 for ( int i = 0, offset = 0; i<thr_idx; ++i ) {
157 cpu_idx += pinning_step;
158 if ( cpu_idx >= num_cpus )
159 cpu_idx = ++offset;
160 }
161
162 // Find index of 'cpu_idx'-th bit equal to 1
163 int mapped_idx = -1;
164 while ( cpu_idx >= 0 ) {
165 if ( CPU_ISSET_S( ++mapped_idx, size, mask ) )
166 --cpu_idx;
167 }
168
169 cpu_set_t *target_mask = CPU_ALLOC( ncpus );
170 CPU_ZERO_S( size, target_mask );
171 CPU_SET_S( mapped_idx, size, target_mask );
172 const int err = sched_setaffinity( 0, size, target_mask );
173
174 //std::cout << "Just set affinity for thread " << thr_idx << "\n";
175 if ( err ) {
176 std::cout << "Failed to set thread affinity!n";
177 exit( EXIT_FAILURE );
178 }
179 #if LOG_PINNING
180 else {
181 std::stringstream ss;
182 ss << "Set thread affinity: Thread " << thr_idx << ": CPU " << mapped_idx << std::endl;
183 std::cerr << ss.str();
184 }
185 #endif
186 CPU_FREE( target_mask );
187 }
188
~pinning_observer()189 ~pinning_observer() {
190 if ( mask )
191 CPU_FREE( mask );
192 }
193 };
194
195 #endif
196 #endif
197
198 #endif
199