1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_EventSystem.h"
25 #include <sched.h>
26 #if TS_USE_HWLOC
27 #if HAVE_ALLOCA_H
28 #include <alloca.h>
29 #endif
30 #include <hwloc.h>
31 #endif
32 #include "tscore/ink_defs.h"
33 #include "tscore/hugepages.h"
34 
35 /// Global singleton.
36 class EventProcessor eventProcessor;
37 
38 class ThreadAffinityInitializer : public Continuation
39 {
40   using self = ThreadAffinityInitializer;
41 
42 public:
43   /// Default construct.
ThreadAffinityInitializer()44   ThreadAffinityInitializer() { SET_HANDLER(&self::set_affinity); }
45   /// Load up basic affinity data.
46   void init();
47   /// Set the affinity for the current thread.
48   int set_affinity(int, Event *);
49   /// Allocate a stack.
50   /// @internal This is the external entry point and is different depending on
51   /// whether HWLOC is enabled.
52   void *alloc_stack(EThread *t, size_t stacksize);
53 
54 protected:
55   /// Allocate a hugepage stack.
56   /// If huge pages are not enable, allocate a basic stack.
57   void *alloc_hugepage_stack(size_t stacksize);
58 
59 #if TS_USE_HWLOC
60 
61   /// Allocate a stack based on NUMA information, if possible.
62   void *alloc_numa_stack(EThread *t, size_t stacksize);
63 
64 private:
65   hwloc_obj_type_t obj_type = HWLOC_OBJ_MACHINE;
66   int obj_count             = 0;
67   char const *obj_name      = nullptr;
68 #endif
69 };
70 
71 ThreadAffinityInitializer Thread_Affinity_Initializer;
72 
73 namespace
74 {
75 int
EventMetricStatSync(const char *,RecDataT,RecData *,RecRawStatBlock * rsb,int)76 EventMetricStatSync(const char *, RecDataT, RecData *, RecRawStatBlock *rsb, int)
77 {
78   int id = 0;
79   EThread::EventMetrics summary[EThread::N_EVENT_TIMESCALES];
80 
81   // scan the thread local values
82   for (EThread *t : eventProcessor.active_group_threads(ET_CALL)) {
83     t->summarize_stats(summary);
84   }
85 
86   ink_mutex_acquire(&(rsb->mutex));
87 
88   for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx, id += EThread::N_EVENT_STATS) {
89     EThread::EventMetrics *m = summary + ts_idx;
90     // Discarding the atomic swaps for global writes, doesn't seem to actually do anything useful.
91     rsb->global[id + EThread::STAT_LOOP_COUNT]->sum   = m->_count;
92     rsb->global[id + EThread::STAT_LOOP_COUNT]->count = 1;
93     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_COUNT);
94 
95     rsb->global[id + EThread::STAT_LOOP_WAIT]->sum   = m->_wait;
96     rsb->global[id + EThread::STAT_LOOP_WAIT]->count = 1;
97     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_WAIT);
98 
99     rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->sum   = m->_loop_time._min;
100     rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->count = 1;
101     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MIN);
102     rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->sum   = m->_loop_time._max;
103     rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->count = 1;
104     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MAX);
105 
106     rsb->global[id + EThread::STAT_LOOP_EVENTS]->sum   = m->_events._total;
107     rsb->global[id + EThread::STAT_LOOP_EVENTS]->count = 1;
108     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS);
109     rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->sum   = m->_events._min;
110     rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->count = 1;
111     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MIN);
112     rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->sum   = m->_events._max;
113     rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->count = 1;
114     RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MAX);
115   }
116 
117   ink_mutex_release(&(rsb->mutex));
118   return REC_ERR_OKAY;
119 }
120 
121 /// This is a wrapper used to convert a static function into a continuation. The function pointer is
122 /// passed in the cookie. For this reason the class is used as a singleton.
123 /// @internal This is the implementation for @c schedule_spawn... overloads.
124 class ThreadInitByFunc : public Continuation
125 {
126 public:
ThreadInitByFunc()127   ThreadInitByFunc() { SET_HANDLER(&ThreadInitByFunc::invoke); }
128   int
invoke(int,Event * ev)129   invoke(int, Event *ev)
130   {
131     void (*f)(EThread *) = reinterpret_cast<void (*)(EThread *)>(ev->cookie);
132     f(ev->ethread);
133     return 0;
134   }
135 } Thread_Init_Func;
136 } // namespace
137 
138 void *
alloc_hugepage_stack(size_t stacksize)139 ThreadAffinityInitializer::alloc_hugepage_stack(size_t stacksize)
140 {
141   return ats_hugepage_enabled() ? ats_alloc_hugepage(stacksize) : ats_memalign(ats_pagesize(), stacksize);
142 }
143 
144 #if TS_USE_HWLOC
145 void
init()146 ThreadAffinityInitializer::init()
147 {
148   int affinity = 1;
149   REC_ReadConfigInteger(affinity, "proxy.config.exec_thread.affinity");
150 
151   switch (affinity) {
152   case 4: // assign threads to logical processing units
153 // Older versions of libhwloc (eg. Ubuntu 10.04) don't have HWLOC_OBJ_PU.
154 #if HAVE_HWLOC_OBJ_PU
155     obj_type = HWLOC_OBJ_PU;
156     obj_name = "Logical Processor";
157     break;
158 #endif
159 
160   case 3: // assign threads to real cores
161     obj_type = HWLOC_OBJ_CORE;
162     obj_name = "Core";
163     break;
164 
165   case 1: // assign threads to NUMA nodes (often 1:1 with sockets)
166     obj_type = HWLOC_OBJ_NODE;
167     obj_name = "NUMA Node";
168     if (hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type) > 0) {
169       break;
170     }
171     // fallthrough
172 
173   case 2: // assign threads to sockets
174     obj_type = HWLOC_OBJ_SOCKET;
175     obj_name = "Socket";
176     break;
177   default: // assign threads to the machine as a whole (a level below SYSTEM)
178     obj_type = HWLOC_OBJ_MACHINE;
179     obj_name = "Machine";
180   }
181 
182   obj_count = hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type);
183   Debug("iocore_thread", "Affinity: %d %ss: %d PU: %d", affinity, obj_name, obj_count, ink_number_of_processors());
184 }
185 
186 int
set_affinity(int,Event *)187 ThreadAffinityInitializer::set_affinity(int, Event *)
188 {
189   EThread *t = this_ethread();
190 
191   if (obj_count > 0) {
192     // Get our `obj` instance with index based on the thread number we are on.
193     hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
194 #if HWLOC_API_VERSION >= 0x00010100
195     int cpu_mask_len = hwloc_bitmap_snprintf(nullptr, 0, obj->cpuset) + 1;
196     char *cpu_mask   = static_cast<char *>(alloca(cpu_mask_len));
197     hwloc_bitmap_snprintf(cpu_mask, cpu_mask_len, obj->cpuset);
198     Debug("iocore_thread", "EThread: %p %s: %d CPU Mask: %s\n", t, obj_name, obj->logical_index, cpu_mask);
199 #else
200     Debug("iocore_thread", "EThread: %d %s: %d", _name, obj->logical_index);
201 #endif // HWLOC_API_VERSION
202     hwloc_set_thread_cpubind(ink_get_topology(), t->tid, obj->cpuset, HWLOC_CPUBIND_STRICT);
203   } else {
204     Warning("hwloc returned an unexpected number of objects -- CPU affinity disabled");
205   }
206   return 0;
207 }
208 
209 void *
alloc_numa_stack(EThread * t,size_t stacksize)210 ThreadAffinityInitializer::alloc_numa_stack(EThread *t, size_t stacksize)
211 {
212   hwloc_membind_policy_t mem_policy = HWLOC_MEMBIND_DEFAULT;
213   hwloc_nodeset_t nodeset           = hwloc_bitmap_alloc();
214   int num_nodes                     = 0;
215   void *stack                       = nullptr;
216   hwloc_obj_t obj                   = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
217 
218   // Find the NUMA node set that correlates to our next thread CPU set
219   hwloc_cpuset_to_nodeset(ink_get_topology(), obj->cpuset, nodeset);
220   // How many NUMA nodes will we be needing to allocate across?
221   num_nodes = hwloc_get_nbobjs_inside_cpuset_by_type(ink_get_topology(), obj->cpuset, HWLOC_OBJ_NODE);
222 
223   if (num_nodes == 1) {
224     // The preferred memory policy. The thread lives in one NUMA node.
225     mem_policy = HWLOC_MEMBIND_BIND;
226   } else if (num_nodes > 1) {
227     // If we have mode than one NUMA node we should interleave over them.
228     mem_policy = HWLOC_MEMBIND_INTERLEAVE;
229   }
230 
231   if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
232     // Let's temporarily set the memory binding to our destination NUMA node
233 #if HWLOC_API_VERSION >= 0x20000
234     hwloc_set_membind(ink_get_topology(), nodeset, mem_policy, HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
235 #else
236     hwloc_set_membind_nodeset(ink_get_topology(), nodeset, mem_policy, HWLOC_MEMBIND_THREAD);
237 #endif
238   }
239 
240   // Alloc our stack
241   stack = this->alloc_hugepage_stack(stacksize);
242 
243   if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
244     // Now let's set it back to default for this thread.
245 #if HWLOC_API_VERSION >= 0x20000
246     hwloc_set_membind(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_DEFAULT,
247                       HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
248 #else
249     hwloc_set_membind_nodeset(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_DEFAULT,
250                               HWLOC_MEMBIND_THREAD);
251 #endif
252   }
253 
254   hwloc_bitmap_free(nodeset);
255 
256   return stack;
257 }
258 
259 void *
alloc_stack(EThread * t,size_t stacksize)260 ThreadAffinityInitializer::alloc_stack(EThread *t, size_t stacksize)
261 {
262   return this->obj_count > 0 ? this->alloc_numa_stack(t, stacksize) : this->alloc_hugepage_stack(stacksize);
263 }
264 
265 #else
266 
267 void
init()268 ThreadAffinityInitializer::init()
269 {
270 }
271 
272 int
set_affinity(int,Event *)273 ThreadAffinityInitializer::set_affinity(int, Event *)
274 {
275   return 0;
276 }
277 
278 void *
alloc_stack(EThread *,size_t stacksize)279 ThreadAffinityInitializer::alloc_stack(EThread *, size_t stacksize)
280 {
281   return this->alloc_hugepage_stack(stacksize);
282 }
283 
284 #endif // TS_USE_HWLOC
285 
EventProcessor()286 EventProcessor::EventProcessor() : thread_initializer(this)
287 {
288   ink_zero(all_ethreads);
289   ink_zero(all_dthreads);
290   ink_mutex_init(&dedicated_thread_spawn_mutex);
291   // Because ET_NET is compile time set to 0 it *must* be the first type registered.
292   this->register_event_type("ET_NET");
293 }
294 
~EventProcessor()295 EventProcessor::~EventProcessor()
296 {
297   ink_mutex_destroy(&dedicated_thread_spawn_mutex);
298 }
299 
300 namespace
301 {
302 Event *
make_event_for_scheduling(Continuation * c,int event_code,void * cookie)303 make_event_for_scheduling(Continuation *c, int event_code, void *cookie)
304 {
305   Event *e = eventAllocator.alloc();
306 
307   e->init(c);
308   e->mutex          = c->mutex;
309   e->callback_event = event_code;
310   e->cookie         = cookie;
311 
312   return e;
313 }
314 } // namespace
315 
316 Event *
schedule_spawn(Continuation * c,EventType ev_type,int event_code,void * cookie)317 EventProcessor::schedule_spawn(Continuation *c, EventType ev_type, int event_code, void *cookie)
318 {
319   Event *e = make_event_for_scheduling(c, event_code, cookie);
320   ink_assert(ev_type < MAX_EVENT_TYPES);
321   thread_group[ev_type]._spawnQueue.enqueue(e);
322   return e;
323 }
324 
325 Event *
schedule_spawn(void (* f)(EThread *),EventType ev_type)326 EventProcessor::schedule_spawn(void (*f)(EThread *), EventType ev_type)
327 {
328   Event *e = make_event_for_scheduling(&Thread_Init_Func, EVENT_IMMEDIATE, reinterpret_cast<void *>(f));
329   ink_assert(ev_type < MAX_EVENT_TYPES);
330   thread_group[ev_type]._spawnQueue.enqueue(e);
331   return e;
332 }
333 
334 EventType
register_event_type(char const * name)335 EventProcessor::register_event_type(char const *name)
336 {
337   ThreadGroupDescriptor *tg = &(thread_group[n_thread_groups++]);
338   ink_release_assert(n_thread_groups <= MAX_EVENT_TYPES); // check for overflow
339 
340   tg->_name = name;
341   return n_thread_groups - 1;
342 }
343 
344 EventType
spawn_event_threads(char const * name,int n_threads,size_t stacksize)345 EventProcessor::spawn_event_threads(char const *name, int n_threads, size_t stacksize)
346 {
347   int ev_type = this->register_event_type(name);
348   this->spawn_event_threads(ev_type, n_threads, stacksize);
349   return ev_type;
350 }
351 
352 EventType
spawn_event_threads(EventType ev_type,int n_threads,size_t stacksize)353 EventProcessor::spawn_event_threads(EventType ev_type, int n_threads, size_t stacksize)
354 {
355   char thr_name[MAX_THREAD_NAME_LENGTH];
356   int i;
357   ThreadGroupDescriptor *tg = &(thread_group[ev_type]);
358 
359   ink_release_assert(n_threads > 0);
360   ink_release_assert((n_ethreads + n_threads) <= MAX_EVENT_THREADS);
361   ink_release_assert(ev_type < MAX_EVENT_TYPES);
362 
363   stacksize = std::max(stacksize, static_cast<decltype(stacksize)>(INK_THREAD_STACK_MIN));
364   // Make sure it is a multiple of our page size
365   if (ats_hugepage_enabled()) {
366     stacksize = INK_ALIGN(stacksize, ats_hugepage_size());
367   } else {
368     stacksize = INK_ALIGN(stacksize, ats_pagesize());
369   }
370 
371   Debug("iocore_thread", "Thread stack size set to %zu", stacksize);
372 
373   for (i = 0; i < n_threads; ++i) {
374     EThread *t                   = new EThread(REGULAR, n_ethreads + i);
375     all_ethreads[n_ethreads + i] = t;
376     tg->_thread[i]               = t;
377     t->id                        = i; // unfortunately needed to support affinity and NUMA logic.
378     t->set_event_type(ev_type);
379     t->schedule_spawn(&thread_initializer);
380   }
381   tg->_count = n_threads;
382   n_ethreads += n_threads;
383   schedule_spawn(&thread_started, ev_type);
384 
385   // Separate loop to avoid race conditions between spawn events and updating the thread table for
386   // the group. Some thread set up depends on knowing the total number of threads but that can't be
387   // safely updated until all the EThread instances are created and stored in the table.
388   for (i = 0; i < n_threads; ++i) {
389     Debug("iocore_thread_start", "Created %s thread #%d", tg->_name.c_str(), i + 1);
390     snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[%s %d]", tg->_name.c_str(), i);
391     void *stack = Thread_Affinity_Initializer.alloc_stack(tg->_thread[i], stacksize);
392     tg->_thread[i]->start(thr_name, stack, stacksize);
393   }
394 
395   Debug("iocore_thread", "Created thread group '%s' id %d with %d threads", tg->_name.c_str(), ev_type, n_threads);
396 
397   return ev_type; // useless but not sure what would be better.
398 }
399 
400 // This is called from inside a thread as the @a start_event for that thread.  It chains to the
401 // startup events for the appropriate thread group start events.
402 void
initThreadState(EThread * t)403 EventProcessor::initThreadState(EThread *t)
404 {
405   // Run all thread type initialization continuations that match the event types for this thread.
406   for (int i = 0; i < MAX_EVENT_TYPES; ++i) {
407     if (t->is_event_type(i)) {
408       // To avoid race conditions on the event in the spawn queue, create a local one to actually send.
409       // Use the spawn queue event as a read only model.
410       Event *nev = eventAllocator.alloc();
411       for (Event *ev = thread_group[i]._spawnQueue.head; nullptr != ev; ev = ev->link.next) {
412         nev->init(ev->continuation, 0, 0);
413         nev->ethread        = t;
414         nev->callback_event = ev->callback_event;
415         nev->mutex          = ev->continuation->mutex;
416         nev->cookie         = ev->cookie;
417         ev->continuation->handleEvent(ev->callback_event, nev);
418       }
419       nev->free();
420     }
421   }
422 }
423 
424 int
start(int n_event_threads,size_t stacksize)425 EventProcessor::start(int n_event_threads, size_t stacksize)
426 {
427   // do some sanity checking.
428   static bool started = false;
429   ink_release_assert(!started);
430   ink_release_assert(n_event_threads > 0 && n_event_threads <= MAX_EVENT_THREADS);
431   started = true;
432 
433   Thread_Affinity_Initializer.init();
434   // Least ugly thing - this needs to be the first callback from the thread but by the time this
435   // method is called other spawn callbacks have been registered. This forces thread affinity
436   // first. The other alternative would be to require a call to an @c init method which I like even
437   // less because this cannot be done in the constructor - that depends on too much other
438   // infrastructure being in place (e.g. the proxy allocators).
439   thread_group[ET_CALL]._spawnQueue.push(make_event_for_scheduling(&Thread_Affinity_Initializer, EVENT_IMMEDIATE, nullptr));
440 
441   // Get our statistics set up
442   RecRawStatBlock *rsb = RecAllocateRawStatBlock(EThread::N_EVENT_STATS * EThread::N_EVENT_TIMESCALES);
443   char name[256];
444 
445   for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx) {
446     for (int id = 0; id < EThread::N_EVENT_STATS; ++id) {
447       snprintf(name, sizeof(name), "%s.%ds", EThread::STAT_NAME[id], EThread::SAMPLE_COUNT[ts_idx]);
448       RecRegisterRawStat(rsb, RECT_PROCESS, name, RECD_INT, RECP_NON_PERSISTENT, id + (ts_idx * EThread::N_EVENT_STATS), NULL);
449     }
450   }
451 
452   // Name must be that of a stat, pick one at random since we do all of them in one pass/callback.
453   RecRegisterRawStatSyncCb(name, EventMetricStatSync, rsb, 0);
454 
455   this->spawn_event_threads(ET_CALL, n_event_threads, stacksize);
456 
457   Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);
458   return 0;
459 }
460 
461 void
shutdown()462 EventProcessor::shutdown()
463 {
464 }
465 
466 Event *
spawn_thread(Continuation * cont,const char * thr_name,size_t stacksize)467 EventProcessor::spawn_thread(Continuation *cont, const char *thr_name, size_t stacksize)
468 {
469   /* Spawning threads in a live system - There are two potential race conditions in this logic. The
470      first is multiple calls to this method.  In that case @a all_dthreads can end up in a bad state
471      as the same entry is overwritten while another is left uninitialized.
472 
473      The other is read/write contention where another thread (e.g. the stats collection thread) is
474      iterating over the threads while the active count (@a n_dthreads) is being updated causing use
475      of a not yet initialized array element.
476 
477      This logic covers both situations. For write/write the actual array update is locked. The
478      potentially expensive set up is done outside the lock making the time spent locked small. For
479      read/write it suffices to do the active count increment after initializing the array
480      element. It's not a problem if, for one cycle, a new thread is skipped.
481   */
482 
483   // Do as much as possible outside the lock. Until the array element and count is changed
484   // this is thread safe.
485   Event *e = eventAllocator.alloc();
486   e->init(cont, 0, 0);
487   e->ethread  = new EThread(DEDICATED, e);
488   e->mutex    = e->ethread->mutex;
489   cont->mutex = e->ethread->mutex;
490   {
491     ink_scoped_mutex_lock lock(dedicated_thread_spawn_mutex);
492     ink_release_assert(n_dthreads < MAX_EVENT_THREADS);
493     all_dthreads[n_dthreads] = e->ethread;
494     ++n_dthreads; // Be very sure this is after the array element update.
495   }
496 
497   e->ethread->start(thr_name, nullptr, stacksize);
498 
499   return e;
500 }
501 
502 bool
has_tg_started(int etype)503 EventProcessor::has_tg_started(int etype)
504 {
505   return thread_group[etype]._started == thread_group[etype]._count;
506 }
507 
508 void
thread_started(EThread * t)509 thread_started(EThread *t)
510 {
511   // Find what type of thread this is, and increment the "_started" counter of that thread type.
512   for (int i = 0; i < MAX_EVENT_TYPES; ++i) {
513     if (t->is_event_type(i)) {
514       if (++eventProcessor.thread_group[i]._started == eventProcessor.thread_group[i]._count &&
515           eventProcessor.thread_group[i]._afterStartCallback != nullptr) {
516         eventProcessor.thread_group[i]._afterStartCallback();
517       }
518       break;
519     }
520   }
521 }
522