1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22 
23  */
24 
25 #pragma once
26 
27 #include "tscore/ink_platform.h"
28 #include "tscore/ink_rand.h"
29 #include "tscore/I_Version.h"
30 #include "I_Thread.h"
31 #include "I_PriorityEventQueue.h"
32 #include "I_ProtectedQueue.h"
33 
34 // TODO: This would be much nicer to have "run-time" configurable (or something),
35 // perhaps based on proxy.config.stat_api.max_stats_allowed or other configs. XXX
36 #define PER_THREAD_DATA (1024 * 1024)
37 
38 // This is not used by the cache anymore, it uses proxy.config.cache.mutex_retry_delay
39 // instead.
40 #define MUTEX_RETRY_DELAY HRTIME_MSECONDS(20)
41 
42 struct DiskHandler;
43 struct EventIO;
44 
45 class ServerSessionPool;
46 class Event;
47 class Continuation;
48 
49 enum ThreadType {
50   REGULAR = 0,
51   DEDICATED,
52 };
53 
54 /**
55   Event System specific type of thread.
56 
57   The EThread class is the type of thread created and managed by
58   the Event System. It is one of the available interfaces for
59   scheduling events in the event system (another two are the Event
60   and EventProcessor classes).
61 
62   In order to handle events, each EThread object has two event
63   queues, one external and one internal. The external queue is
64   provided for users of the EThread (clients) to append events to
65   that particular thread. Since it can be accessed by other threads
66   at the same time, operations using it must proceed in an atomic
67   fashion.
68 
69   The internal queue, in the other hand, is used exclusively by the
70   EThread to process timed events within a certain time frame. These
71   events are queued internally and they may come from the external
72   queue as well.
73 
74   Scheduling Interface:
75 
76   There are eight scheduling functions provided by EThread and
77   they are a wrapper around their counterparts in EventProcessor.
78 
79   @see EventProcessor
80   @see Event
81 
82 */
83 class EThread : public Thread
84 {
85 public:
86   /** Handler for tail of event loop.
87 
88       The event loop should not spin. To avoid that a tail handler is called to block for a limited time.
89       This is a protocol class that defines the interface to the handler.
90   */
91   class LoopTailHandler
92   {
93   public:
94     /** Called at the end of the event loop to block.
95         @a timeout is the maximum length of time (in ns) to block.
96     */
97     virtual int waitForActivity(ink_hrtime timeout) = 0;
98     /** Unblock.
99 
100         This is required to unblock (wake up) the block created by calling @a cb.
101     */
102     virtual void signalActivity() = 0;
103 
~LoopTailHandler()104     virtual ~LoopTailHandler() {}
105   };
106 
107   /*-------------------------------------------------------*\
108   |  Common Interface                                       |
109   \*-------------------------------------------------------*/
110 
111   /**
112     Schedules the continuation on this EThread to receive an event
113     as soon as possible.
114 
115     Forwards to the EventProcessor the schedule of the callback to
116     the continuation 'c' as soon as possible. The event is assigned
117     to EThread.
118 
119     @param c Continuation to be called back as soon as possible.
120     @param callback_event Event code to be passed back to the
121       continuation's handler. See the EventProcessor class.
122     @param cookie User-defined value or pointer to be passed back
123       in the Event's object cookie field.
124     @return Reference to an Event object representing the scheduling
125       of this callback.
126 
127   */
128   Event *schedule_imm(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
129 
130   /**
131     Schedules the continuation on this EThread to receive an event
132     at the given timeout.
133 
134     Forwards the request to the EventProcessor to schedule the
135     callback to the continuation 'c' at the time specified in
136     'atimeout_at'. The event is assigned to this EThread.
137 
138     @param c Continuation to be called back at the time specified
139       in 'atimeout_at'.
140     @param atimeout_at Time value at which to callback.
141     @param callback_event Event code to be passed back to the
142       continuation's handler. See the EventProcessor class.
143     @param cookie User-defined value or pointer to be passed back
144       in the Event's object cookie field.
145     @return A reference to an Event object representing the scheduling
146       of this callback.
147 
148   */
149   Event *schedule_at(Continuation *c, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
150 
151   /**
152     Schedules the continuation on this EThread to receive an event
153     after the timeout elapses.
154 
155     Instructs the EventProcessor to schedule the callback to the
156     continuation 'c' after the time specified in atimeout_in elapses.
157     The event is assigned to this EThread.
158 
159     @param c Continuation to be called back after the timeout elapses.
160     @param atimeout_in Amount of time after which to callback.
161     @param callback_event Event code to be passed back to the
162       continuation's handler. See the EventProcessor class.
163     @param cookie User-defined value or pointer to be passed back
164       in the Event's object cookie field.
165     @return A reference to an Event object representing the scheduling
166       of this callback.
167 
168   */
169   Event *schedule_in(Continuation *c, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
170 
171   /**
172     Schedules the continuation on this EThread to receive an event
173     periodically.
174 
175     Schedules the callback to the continuation 'c' in the EventProcessor
176     to occur every time 'aperiod' elapses. It is scheduled on this
177     EThread.
178 
179     @param c Continuation to call back every time 'aperiod' elapses.
180     @param aperiod Duration of the time period between callbacks.
181     @param callback_event Event code to be passed back to the
182       continuation's handler. See the Remarks section in the
183       EventProcessor class.
184     @param cookie User-defined value or pointer to be passed back
185       in the Event's object cookie field.
186     @return A reference to an Event object representing the scheduling
187       of this callback.
188 
189   */
190   Event *schedule_every(Continuation *c, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
191 
192   /**
193     Schedules the continuation on this EThread to receive an event
194     as soon as possible.
195 
196     Schedules the callback to the continuation 'c' as soon as
197     possible. The event is assigned to this EThread.
198 
199     @param c Continuation to be called back as soon as possible.
200     @param callback_event Event code to be passed back to the
201       continuation's handler. See the EventProcessor class.
202     @param cookie User-defined value or pointer to be passed back
203       in the Event's object cookie field.
204     @return A reference to an Event object representing the scheduling
205       of this callback.
206 
207   */
208   Event *schedule_imm_local(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
209 
210   /**
211     Schedules the continuation on this EThread to receive an event
212     at the given timeout.
213 
214     Schedules the callback to the continuation 'c' at the time
215     specified in 'atimeout_at'. The event is assigned to this
216     EThread.
217 
218     @param c Continuation to be called back at the time specified
219       in 'atimeout_at'.
220     @param atimeout_at Time value at which to callback.
221     @param callback_event Event code to be passed back to the
222       continuation's handler. See the EventProcessor class.
223     @param cookie User-defined value or pointer to be passed back
224       in the Event's object cookie field.
225     @return A reference to an Event object representing the scheduling
226       of this callback.
227 
228   */
229   Event *schedule_at_local(Continuation *c, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
230 
231   /**
232     Schedules the continuation on this EThread to receive an event
233     after the timeout elapses.
234 
235     Schedules the callback to the continuation 'c' after the time
236     specified in atimeout_in elapses. The event is assigned to this
237     EThread.
238 
239     @param c Continuation to be called back after the timeout elapses.
240     @param atimeout_in Amount of time after which to callback.
241     @param callback_event Event code to be passed back to the
242       continuation's handler. See the Remarks section in the
243       EventProcessor class.
244     @param cookie User-defined value or pointer to be passed back
245       in the Event's object cookie field.
246     @return A reference to an Event object representing the scheduling
247       of this callback.
248 
249   */
250   Event *schedule_in_local(Continuation *c, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
251 
252   /**
253     Schedules the continuation on this EThread to receive an event
254     periodically.
255 
256     Schedules the callback to the continuation 'c' to occur every
257     time 'aperiod' elapses. It is scheduled on this EThread.
258 
259     @param c Continuation to call back every time 'aperiod' elapses.
260     @param aperiod Duration of the time period between callbacks.
261     @param callback_event Event code to be passed back to the
262       continuation's handler. See the Remarks section in the
263       EventProcessor class.
264     @param cookie User-defined value or pointer to be passed back
265       in the Event's object cookie field.
266     @return A reference to an Event object representing the scheduling
267       of this callback.
268 
269   */
270   Event *schedule_every_local(Continuation *c, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL, void *cookie = nullptr);
271 
272   /** Schedule an event called once when the thread is spawned.
273 
274       This is useful only for regular threads and if called before @c Thread::start. The event will be
275       called first before the event loop.
276 
277       @Note This will override the event for a dedicate thread so that this is called instead of the
278       event passed to the constructor.
279   */
280   Event *schedule_spawn(Continuation *c, int ev = EVENT_IMMEDIATE, void *cookie = nullptr);
281 
282   // Set the tail handler.
283   void set_tail_handler(LoopTailHandler *handler);
284 
285   void set_specific() override;
286 
287   /* private */
288 
289   Event *schedule_local(Event *e);
290 
291   InkRand generator = static_cast<uint64_t>(Thread::get_hrtime_updated() ^ reinterpret_cast<uintptr_t>(this));
292 
293   /*-------------------------------------------------------*\
294   |  UNIX Interface                                         |
295   \*-------------------------------------------------------*/
296 
297   EThread();
298   EThread(ThreadType att, int anid);
299   EThread(ThreadType att, Event *e);
300   EThread(const EThread &) = delete;
301   EThread &operator=(const EThread &) = delete;
302   ~EThread() override;
303 
304   Event *schedule(Event *e);
305 
306   /** Block of memory to allocate thread specific data e.g. stat system arrays. */
307   char thread_private[PER_THREAD_DATA];
308 
309   /** Private Data for the Disk Processor. */
310   DiskHandler *diskHandler = nullptr;
311 
312   /** Private Data for AIO. */
313   Que(Continuation, link) aio_ops;
314 
315   ProtectedQueue EventQueueExternal;
316   PriorityEventQueue EventQueue;
317 
318   static constexpr int NO_ETHREAD_ID = -1;
319   int id                             = NO_ETHREAD_ID;
320   unsigned int event_types           = 0;
321   bool is_event_type(EventType et);
322   void set_event_type(EventType et);
323 
324   // Private Interface
325 
326   void execute() override;
327   void execute_regular();
328   void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count);
329   void process_event(Event *e, int calling_code);
330   void free_event(Event *e);
331   LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;
332 
333 #if HAVE_EVENTFD
334   int evfd = ts::NO_FD;
335 #else
336   int evpipe[2];
337 #endif
338   EventIO *ep = nullptr;
339 
340   ThreadType tt = REGULAR;
341   /** Initial event to call, before any scheduling.
342 
343       For dedicated threads this is the only event called.
344       For regular threads this is called first before the event loop starts.
345       @internal For regular threads this is used by the EventProcessor to get called back after
346       the thread starts but before any other events can be dispatched to provide initializations
347       needed for the thread.
348   */
349   Event *start_event = nullptr;
350 
351   ServerSessionPool *server_session_pool = nullptr;
352 
353   /** Default handler used until it is overridden.
354 
355       This uses the cond var wait in @a ExternalQueue.
356   */
357   class DefaultTailHandler : public LoopTailHandler
358   {
359     // cppcheck-suppress noExplicitConstructor; allow implicit conversion
DefaultTailHandler(ProtectedQueue & q)360     DefaultTailHandler(ProtectedQueue &q) : _q(q) {}
361 
362     int
waitForActivity(ink_hrtime timeout)363     waitForActivity(ink_hrtime timeout) override
364     {
365       _q.wait(Thread::get_hrtime() + timeout);
366       return 0;
367     }
368     void
signalActivity()369     signalActivity() override
370     {
371       /* Try to acquire the `EThread::lock` of the Event Thread:
372        *   - Acquired, indicating that the Event Thread is sleep,
373        *               must send a wakeup signal to the Event Thread.
374        *   - Failed, indicating that the Event Thread is busy, do nothing.
375        */
376       (void)_q.try_signal();
377     }
378 
379     ProtectedQueue &_q;
380 
381     friend class EThread;
382   } DEFAULT_TAIL_HANDLER = EventQueueExternal;
383 
384   /// Statistics data for event dispatching.
385   struct EventMetrics {
386     /// Time the loop was active, not including wait time but including event dispatch time.
387     struct LoopTimes {
388       ink_hrtime _start = 0;         ///< The time of the first loop for this sample. Used to mark valid entries.
389       ink_hrtime _min   = INT64_MAX; ///< Shortest loop time.
390       ink_hrtime _max   = 0;         ///< Longest loop time.
LoopTimesEventMetrics::LoopTimes391       LoopTimes() {}
392     } _loop_time;
393 
394     struct Events {
395       int _min   = INT_MAX;
396       int _max   = 0;
397       int _total = 0;
EventsEventMetrics::Events398       Events() {}
399     } _events;
400 
401     int _count = 0; ///< # of times the loop executed.
402     int _wait  = 0; ///< # of timed wait for events
403 
404     /// Add @a that to @a this data.
405     /// This embodies the custom logic per member concerning whether each is a sum, min, or max.
406     EventMetrics &operator+=(EventMetrics const &that);
407 
EventMetricsEventMetrics408     EventMetrics() {}
409   };
410 
411   /** The number of metric blocks kept.
412       This is a circular buffer, with one block per second. We have a bit more than the required 1000
413       to provide sufficient slop for cross thread reading of the data (as only the current metric block
414       is being updated).
415   */
416   static int const N_EVENT_METRICS = 1024;
417 
418   volatile EventMetrics *current_metric = nullptr; ///< The current element of @a metrics
419   EventMetrics metrics[N_EVENT_METRICS];
420 
421   /** The various stats provided to the administrator.
422       THE ORDER IS VERY SENSITIVE.
423       More than one part of the code depends on this exact order. Be careful and thorough when changing.
424   */
425   enum STAT_ID {
426     STAT_LOOP_COUNT,      ///< # of event loops executed.
427     STAT_LOOP_EVENTS,     ///< # of events
428     STAT_LOOP_EVENTS_MIN, ///< min # of events dispatched in a loop
429     STAT_LOOP_EVENTS_MAX, ///< max # of events dispatched in a loop
430     STAT_LOOP_WAIT,       ///< # of loops that did a conditional wait.
431     STAT_LOOP_TIME_MIN,   ///< Shortest time spent in loop.
432     STAT_LOOP_TIME_MAX,   ///< Longest time spent in loop.
433     N_EVENT_STATS         ///< NOT A VALID STAT INDEX - # of different stat types.
434   };
435 
436   static char const *const STAT_NAME[N_EVENT_STATS];
437 
438   /** The number of time scales used in the event statistics.
439       Currently these are 10s, 100s, 1000s.
440   */
441   static int const N_EVENT_TIMESCALES = 3;
442   /// # of samples for each time scale.
443   static int const SAMPLE_COUNT[N_EVENT_TIMESCALES];
444 
445   /// Process the last 1000s of data and write out the summaries to @a summary.
446   void summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES]);
447   /// Back up the metric pointer, wrapping as needed.
448   EventMetrics *
prev(EventMetrics volatile * current)449   prev(EventMetrics volatile *current)
450   {
451     return const_cast<EventMetrics *>(--current < metrics ? &metrics[N_EVENT_METRICS - 1] : current); // cast to remove volatile
452   }
453   /// Advance the metric pointer, wrapping as needed.
454   EventMetrics *
next(EventMetrics volatile * current)455   next(EventMetrics volatile *current)
456   {
457     return const_cast<EventMetrics *>(++current > &metrics[N_EVENT_METRICS - 1] ? metrics : current); // cast to remove volatile
458   }
459 };
460 
461 /**
462   This is used so that we dont use up operator new(size_t, void *)
463   which users might want to define for themselves.
464 
465 */
466 class ink_dummy_for_new
467 {
468 };
469 
470 inline void *
new(size_t,ink_dummy_for_new * p)471 operator new(size_t, ink_dummy_for_new *p)
472 {
473   return (void *)p;
474 }
475 #define ETHREAD_GET_PTR(thread, offset) ((void *)((char *)(thread) + (offset)))
476 
477 extern EThread *this_ethread();
478 
479 extern int thread_max_heartbeat_mseconds;
480