1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "P_EventSystem.h"
25 #include <sched.h>
26 #if TS_USE_HWLOC
27 #if HAVE_ALLOCA_H
28 #include <alloca.h>
29 #endif
30 #include <hwloc.h>
31 #endif
32 #include "tscore/ink_defs.h"
33 #include "tscore/hugepages.h"
34
35 /// Global singleton.
36 class EventProcessor eventProcessor;
37
38 class ThreadAffinityInitializer : public Continuation
39 {
40 using self = ThreadAffinityInitializer;
41
42 public:
43 /// Default construct.
ThreadAffinityInitializer()44 ThreadAffinityInitializer() { SET_HANDLER(&self::set_affinity); }
45 /// Load up basic affinity data.
46 void init();
47 /// Set the affinity for the current thread.
48 int set_affinity(int, Event *);
49 /// Allocate a stack.
50 /// @internal This is the external entry point and is different depending on
51 /// whether HWLOC is enabled.
52 void *alloc_stack(EThread *t, size_t stacksize);
53
54 protected:
55 /// Allocate a hugepage stack.
56 /// If huge pages are not enable, allocate a basic stack.
57 void *alloc_hugepage_stack(size_t stacksize);
58
59 #if TS_USE_HWLOC
60
61 /// Allocate a stack based on NUMA information, if possible.
62 void *alloc_numa_stack(EThread *t, size_t stacksize);
63
64 private:
65 hwloc_obj_type_t obj_type = HWLOC_OBJ_MACHINE;
66 int obj_count = 0;
67 char const *obj_name = nullptr;
68 #endif
69 };
70
71 ThreadAffinityInitializer Thread_Affinity_Initializer;
72
73 namespace
74 {
75 int
EventMetricStatSync(const char *,RecDataT,RecData *,RecRawStatBlock * rsb,int)76 EventMetricStatSync(const char *, RecDataT, RecData *, RecRawStatBlock *rsb, int)
77 {
78 int id = 0;
79 EThread::EventMetrics summary[EThread::N_EVENT_TIMESCALES];
80
81 // scan the thread local values
82 for (EThread *t : eventProcessor.active_group_threads(ET_CALL)) {
83 t->summarize_stats(summary);
84 }
85
86 ink_mutex_acquire(&(rsb->mutex));
87
88 for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx, id += EThread::N_EVENT_STATS) {
89 EThread::EventMetrics *m = summary + ts_idx;
90 // Discarding the atomic swaps for global writes, doesn't seem to actually do anything useful.
91 rsb->global[id + EThread::STAT_LOOP_COUNT]->sum = m->_count;
92 rsb->global[id + EThread::STAT_LOOP_COUNT]->count = 1;
93 RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_COUNT);
94
95 rsb->global[id + EThread::STAT_LOOP_WAIT]->sum = m->_wait;
96 rsb->global[id + EThread::STAT_LOOP_WAIT]->count = 1;
97 RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_WAIT);
98
99 rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->sum = m->_loop_time._min;
100 rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->count = 1;
101 RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MIN);
102 rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->sum = m->_loop_time._max;
103 rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->count = 1;
104 RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MAX);
105
106 rsb->global[id + EThread::STAT_LOOP_EVENTS]->sum = m->_events._total;
107 rsb->global[id + EThread::STAT_LOOP_EVENTS]->count = 1;
108 RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS);
109 rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->sum = m->_events._min;
110 rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->count = 1;
111 RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MIN);
112 rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->sum = m->_events._max;
113 rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->count = 1;
114 RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MAX);
115 }
116
117 ink_mutex_release(&(rsb->mutex));
118 return REC_ERR_OKAY;
119 }
120
121 /// This is a wrapper used to convert a static function into a continuation. The function pointer is
122 /// passed in the cookie. For this reason the class is used as a singleton.
123 /// @internal This is the implementation for @c schedule_spawn... overloads.
124 class ThreadInitByFunc : public Continuation
125 {
126 public:
ThreadInitByFunc()127 ThreadInitByFunc() { SET_HANDLER(&ThreadInitByFunc::invoke); }
128 int
invoke(int,Event * ev)129 invoke(int, Event *ev)
130 {
131 void (*f)(EThread *) = reinterpret_cast<void (*)(EThread *)>(ev->cookie);
132 f(ev->ethread);
133 return 0;
134 }
135 } Thread_Init_Func;
136 } // namespace
137
138 void *
alloc_hugepage_stack(size_t stacksize)139 ThreadAffinityInitializer::alloc_hugepage_stack(size_t stacksize)
140 {
141 return ats_hugepage_enabled() ? ats_alloc_hugepage(stacksize) : ats_memalign(ats_pagesize(), stacksize);
142 }
143
144 #if TS_USE_HWLOC
145 void
init()146 ThreadAffinityInitializer::init()
147 {
148 int affinity = 1;
149 REC_ReadConfigInteger(affinity, "proxy.config.exec_thread.affinity");
150
151 switch (affinity) {
152 case 4: // assign threads to logical processing units
153 // Older versions of libhwloc (eg. Ubuntu 10.04) don't have HWLOC_OBJ_PU.
154 #if HAVE_HWLOC_OBJ_PU
155 obj_type = HWLOC_OBJ_PU;
156 obj_name = "Logical Processor";
157 break;
158 #endif
159
160 case 3: // assign threads to real cores
161 obj_type = HWLOC_OBJ_CORE;
162 obj_name = "Core";
163 break;
164
165 case 1: // assign threads to NUMA nodes (often 1:1 with sockets)
166 obj_type = HWLOC_OBJ_NODE;
167 obj_name = "NUMA Node";
168 if (hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type) > 0) {
169 break;
170 }
171 // fallthrough
172
173 case 2: // assign threads to sockets
174 obj_type = HWLOC_OBJ_SOCKET;
175 obj_name = "Socket";
176 break;
177 default: // assign threads to the machine as a whole (a level below SYSTEM)
178 obj_type = HWLOC_OBJ_MACHINE;
179 obj_name = "Machine";
180 }
181
182 obj_count = hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type);
183 Debug("iocore_thread", "Affinity: %d %ss: %d PU: %d", affinity, obj_name, obj_count, ink_number_of_processors());
184 }
185
186 int
set_affinity(int,Event *)187 ThreadAffinityInitializer::set_affinity(int, Event *)
188 {
189 EThread *t = this_ethread();
190
191 if (obj_count > 0) {
192 // Get our `obj` instance with index based on the thread number we are on.
193 hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
194 #if HWLOC_API_VERSION >= 0x00010100
195 int cpu_mask_len = hwloc_bitmap_snprintf(nullptr, 0, obj->cpuset) + 1;
196 char *cpu_mask = static_cast<char *>(alloca(cpu_mask_len));
197 hwloc_bitmap_snprintf(cpu_mask, cpu_mask_len, obj->cpuset);
198 Debug("iocore_thread", "EThread: %p %s: %d CPU Mask: %s\n", t, obj_name, obj->logical_index, cpu_mask);
199 #else
200 Debug("iocore_thread", "EThread: %d %s: %d", _name, obj->logical_index);
201 #endif // HWLOC_API_VERSION
202 hwloc_set_thread_cpubind(ink_get_topology(), t->tid, obj->cpuset, HWLOC_CPUBIND_STRICT);
203 } else {
204 Warning("hwloc returned an unexpected number of objects -- CPU affinity disabled");
205 }
206 return 0;
207 }
208
209 void *
alloc_numa_stack(EThread * t,size_t stacksize)210 ThreadAffinityInitializer::alloc_numa_stack(EThread *t, size_t stacksize)
211 {
212 hwloc_membind_policy_t mem_policy = HWLOC_MEMBIND_DEFAULT;
213 hwloc_nodeset_t nodeset = hwloc_bitmap_alloc();
214 int num_nodes = 0;
215 void *stack = nullptr;
216 hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
217
218 // Find the NUMA node set that correlates to our next thread CPU set
219 hwloc_cpuset_to_nodeset(ink_get_topology(), obj->cpuset, nodeset);
220 // How many NUMA nodes will we be needing to allocate across?
221 num_nodes = hwloc_get_nbobjs_inside_cpuset_by_type(ink_get_topology(), obj->cpuset, HWLOC_OBJ_NODE);
222
223 if (num_nodes == 1) {
224 // The preferred memory policy. The thread lives in one NUMA node.
225 mem_policy = HWLOC_MEMBIND_BIND;
226 } else if (num_nodes > 1) {
227 // If we have mode than one NUMA node we should interleave over them.
228 mem_policy = HWLOC_MEMBIND_INTERLEAVE;
229 }
230
231 if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
232 // Let's temporarily set the memory binding to our destination NUMA node
233 #if HWLOC_API_VERSION >= 0x20000
234 hwloc_set_membind(ink_get_topology(), nodeset, mem_policy, HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
235 #else
236 hwloc_set_membind_nodeset(ink_get_topology(), nodeset, mem_policy, HWLOC_MEMBIND_THREAD);
237 #endif
238 }
239
240 // Alloc our stack
241 stack = this->alloc_hugepage_stack(stacksize);
242
243 if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
244 // Now let's set it back to default for this thread.
245 #if HWLOC_API_VERSION >= 0x20000
246 hwloc_set_membind(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_DEFAULT,
247 HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
248 #else
249 hwloc_set_membind_nodeset(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_DEFAULT,
250 HWLOC_MEMBIND_THREAD);
251 #endif
252 }
253
254 hwloc_bitmap_free(nodeset);
255
256 return stack;
257 }
258
259 void *
alloc_stack(EThread * t,size_t stacksize)260 ThreadAffinityInitializer::alloc_stack(EThread *t, size_t stacksize)
261 {
262 return this->obj_count > 0 ? this->alloc_numa_stack(t, stacksize) : this->alloc_hugepage_stack(stacksize);
263 }
264
265 #else
266
267 void
init()268 ThreadAffinityInitializer::init()
269 {
270 }
271
272 int
set_affinity(int,Event *)273 ThreadAffinityInitializer::set_affinity(int, Event *)
274 {
275 return 0;
276 }
277
278 void *
alloc_stack(EThread *,size_t stacksize)279 ThreadAffinityInitializer::alloc_stack(EThread *, size_t stacksize)
280 {
281 return this->alloc_hugepage_stack(stacksize);
282 }
283
284 #endif // TS_USE_HWLOC
285
EventProcessor()286 EventProcessor::EventProcessor() : thread_initializer(this)
287 {
288 ink_zero(all_ethreads);
289 ink_zero(all_dthreads);
290 ink_mutex_init(&dedicated_thread_spawn_mutex);
291 // Because ET_NET is compile time set to 0 it *must* be the first type registered.
292 this->register_event_type("ET_NET");
293 }
294
~EventProcessor()295 EventProcessor::~EventProcessor()
296 {
297 ink_mutex_destroy(&dedicated_thread_spawn_mutex);
298 }
299
300 namespace
301 {
302 Event *
make_event_for_scheduling(Continuation * c,int event_code,void * cookie)303 make_event_for_scheduling(Continuation *c, int event_code, void *cookie)
304 {
305 Event *e = eventAllocator.alloc();
306
307 e->init(c);
308 e->mutex = c->mutex;
309 e->callback_event = event_code;
310 e->cookie = cookie;
311
312 return e;
313 }
314 } // namespace
315
316 Event *
schedule_spawn(Continuation * c,EventType ev_type,int event_code,void * cookie)317 EventProcessor::schedule_spawn(Continuation *c, EventType ev_type, int event_code, void *cookie)
318 {
319 Event *e = make_event_for_scheduling(c, event_code, cookie);
320 ink_assert(ev_type < MAX_EVENT_TYPES);
321 thread_group[ev_type]._spawnQueue.enqueue(e);
322 return e;
323 }
324
325 Event *
schedule_spawn(void (* f)(EThread *),EventType ev_type)326 EventProcessor::schedule_spawn(void (*f)(EThread *), EventType ev_type)
327 {
328 Event *e = make_event_for_scheduling(&Thread_Init_Func, EVENT_IMMEDIATE, reinterpret_cast<void *>(f));
329 ink_assert(ev_type < MAX_EVENT_TYPES);
330 thread_group[ev_type]._spawnQueue.enqueue(e);
331 return e;
332 }
333
334 EventType
register_event_type(char const * name)335 EventProcessor::register_event_type(char const *name)
336 {
337 ThreadGroupDescriptor *tg = &(thread_group[n_thread_groups++]);
338 ink_release_assert(n_thread_groups <= MAX_EVENT_TYPES); // check for overflow
339
340 tg->_name = name;
341 return n_thread_groups - 1;
342 }
343
344 EventType
spawn_event_threads(char const * name,int n_threads,size_t stacksize)345 EventProcessor::spawn_event_threads(char const *name, int n_threads, size_t stacksize)
346 {
347 int ev_type = this->register_event_type(name);
348 this->spawn_event_threads(ev_type, n_threads, stacksize);
349 return ev_type;
350 }
351
352 EventType
spawn_event_threads(EventType ev_type,int n_threads,size_t stacksize)353 EventProcessor::spawn_event_threads(EventType ev_type, int n_threads, size_t stacksize)
354 {
355 char thr_name[MAX_THREAD_NAME_LENGTH];
356 int i;
357 ThreadGroupDescriptor *tg = &(thread_group[ev_type]);
358
359 ink_release_assert(n_threads > 0);
360 ink_release_assert((n_ethreads + n_threads) <= MAX_EVENT_THREADS);
361 ink_release_assert(ev_type < MAX_EVENT_TYPES);
362
363 stacksize = std::max(stacksize, static_cast<decltype(stacksize)>(INK_THREAD_STACK_MIN));
364 // Make sure it is a multiple of our page size
365 if (ats_hugepage_enabled()) {
366 stacksize = INK_ALIGN(stacksize, ats_hugepage_size());
367 } else {
368 stacksize = INK_ALIGN(stacksize, ats_pagesize());
369 }
370
371 Debug("iocore_thread", "Thread stack size set to %zu", stacksize);
372
373 for (i = 0; i < n_threads; ++i) {
374 EThread *t = new EThread(REGULAR, n_ethreads + i);
375 all_ethreads[n_ethreads + i] = t;
376 tg->_thread[i] = t;
377 t->id = i; // unfortunately needed to support affinity and NUMA logic.
378 t->set_event_type(ev_type);
379 t->schedule_spawn(&thread_initializer);
380 }
381 tg->_count = n_threads;
382 n_ethreads += n_threads;
383 schedule_spawn(&thread_started, ev_type);
384
385 // Separate loop to avoid race conditions between spawn events and updating the thread table for
386 // the group. Some thread set up depends on knowing the total number of threads but that can't be
387 // safely updated until all the EThread instances are created and stored in the table.
388 for (i = 0; i < n_threads; ++i) {
389 Debug("iocore_thread_start", "Created %s thread #%d", tg->_name.c_str(), i + 1);
390 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[%s %d]", tg->_name.c_str(), i);
391 void *stack = Thread_Affinity_Initializer.alloc_stack(tg->_thread[i], stacksize);
392 tg->_thread[i]->start(thr_name, stack, stacksize);
393 }
394
395 Debug("iocore_thread", "Created thread group '%s' id %d with %d threads", tg->_name.c_str(), ev_type, n_threads);
396
397 return ev_type; // useless but not sure what would be better.
398 }
399
400 // This is called from inside a thread as the @a start_event for that thread. It chains to the
401 // startup events for the appropriate thread group start events.
402 void
initThreadState(EThread * t)403 EventProcessor::initThreadState(EThread *t)
404 {
405 // Run all thread type initialization continuations that match the event types for this thread.
406 for (int i = 0; i < MAX_EVENT_TYPES; ++i) {
407 if (t->is_event_type(i)) {
408 // To avoid race conditions on the event in the spawn queue, create a local one to actually send.
409 // Use the spawn queue event as a read only model.
410 Event *nev = eventAllocator.alloc();
411 for (Event *ev = thread_group[i]._spawnQueue.head; nullptr != ev; ev = ev->link.next) {
412 nev->init(ev->continuation, 0, 0);
413 nev->ethread = t;
414 nev->callback_event = ev->callback_event;
415 nev->mutex = ev->continuation->mutex;
416 nev->cookie = ev->cookie;
417 ev->continuation->handleEvent(ev->callback_event, nev);
418 }
419 nev->free();
420 }
421 }
422 }
423
424 int
start(int n_event_threads,size_t stacksize)425 EventProcessor::start(int n_event_threads, size_t stacksize)
426 {
427 // do some sanity checking.
428 static bool started = false;
429 ink_release_assert(!started);
430 ink_release_assert(n_event_threads > 0 && n_event_threads <= MAX_EVENT_THREADS);
431 started = true;
432
433 Thread_Affinity_Initializer.init();
434 // Least ugly thing - this needs to be the first callback from the thread but by the time this
435 // method is called other spawn callbacks have been registered. This forces thread affinity
436 // first. The other alternative would be to require a call to an @c init method which I like even
437 // less because this cannot be done in the constructor - that depends on too much other
438 // infrastructure being in place (e.g. the proxy allocators).
439 thread_group[ET_CALL]._spawnQueue.push(make_event_for_scheduling(&Thread_Affinity_Initializer, EVENT_IMMEDIATE, nullptr));
440
441 // Get our statistics set up
442 RecRawStatBlock *rsb = RecAllocateRawStatBlock(EThread::N_EVENT_STATS * EThread::N_EVENT_TIMESCALES);
443 char name[256];
444
445 for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx) {
446 for (int id = 0; id < EThread::N_EVENT_STATS; ++id) {
447 snprintf(name, sizeof(name), "%s.%ds", EThread::STAT_NAME[id], EThread::SAMPLE_COUNT[ts_idx]);
448 RecRegisterRawStat(rsb, RECT_PROCESS, name, RECD_INT, RECP_NON_PERSISTENT, id + (ts_idx * EThread::N_EVENT_STATS), NULL);
449 }
450 }
451
452 // Name must be that of a stat, pick one at random since we do all of them in one pass/callback.
453 RecRegisterRawStatSyncCb(name, EventMetricStatSync, rsb, 0);
454
455 this->spawn_event_threads(ET_CALL, n_event_threads, stacksize);
456
457 Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);
458 return 0;
459 }
460
461 void
shutdown()462 EventProcessor::shutdown()
463 {
464 }
465
466 Event *
spawn_thread(Continuation * cont,const char * thr_name,size_t stacksize)467 EventProcessor::spawn_thread(Continuation *cont, const char *thr_name, size_t stacksize)
468 {
469 /* Spawning threads in a live system - There are two potential race conditions in this logic. The
470 first is multiple calls to this method. In that case @a all_dthreads can end up in a bad state
471 as the same entry is overwritten while another is left uninitialized.
472
473 The other is read/write contention where another thread (e.g. the stats collection thread) is
474 iterating over the threads while the active count (@a n_dthreads) is being updated causing use
475 of a not yet initialized array element.
476
477 This logic covers both situations. For write/write the actual array update is locked. The
478 potentially expensive set up is done outside the lock making the time spent locked small. For
479 read/write it suffices to do the active count increment after initializing the array
480 element. It's not a problem if, for one cycle, a new thread is skipped.
481 */
482
483 // Do as much as possible outside the lock. Until the array element and count is changed
484 // this is thread safe.
485 Event *e = eventAllocator.alloc();
486 e->init(cont, 0, 0);
487 e->ethread = new EThread(DEDICATED, e);
488 e->mutex = e->ethread->mutex;
489 cont->mutex = e->ethread->mutex;
490 {
491 ink_scoped_mutex_lock lock(dedicated_thread_spawn_mutex);
492 ink_release_assert(n_dthreads < MAX_EVENT_THREADS);
493 all_dthreads[n_dthreads] = e->ethread;
494 ++n_dthreads; // Be very sure this is after the array element update.
495 }
496
497 e->ethread->start(thr_name, nullptr, stacksize);
498
499 return e;
500 }
501
502 bool
has_tg_started(int etype)503 EventProcessor::has_tg_started(int etype)
504 {
505 return thread_group[etype]._started == thread_group[etype]._count;
506 }
507
508 void
thread_started(EThread * t)509 thread_started(EThread *t)
510 {
511 // Find what type of thread this is, and increment the "_started" counter of that thread type.
512 for (int i = 0; i < MAX_EVENT_TYPES; ++i) {
513 if (t->is_event_type(i)) {
514 if (++eventProcessor.thread_group[i]._started == eventProcessor.thread_group[i]._count &&
515 eventProcessor.thread_group[i]._afterStartCallback != nullptr) {
516 eventProcessor.thread_group[i]._afterStartCallback();
517 }
518 break;
519 }
520 }
521 }
522