1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26   P_UnixEThread.h
27 
28 
29 
30 *****************************************************************************/
31 #pragma once
32 
33 #include "I_EThread.h"
34 #include "I_EventProcessor.h"
35 
36 const ink_hrtime DELAY_FOR_RETRY = HRTIME_MSECONDS(10);
37 extern ink_thread_key ethread_key;
38 
39 TS_INLINE Event *
schedule_imm(Continuation * cont,int callback_event,void * cookie)40 EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
41 {
42   Event *e          = ::eventAllocator.alloc();
43   e->callback_event = callback_event;
44   e->cookie         = cookie;
45   return schedule(e->init(cont, 0, 0));
46 }
47 
48 TS_INLINE Event *
schedule_at(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)49 EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
50 {
51   Event *e          = ::eventAllocator.alloc();
52   e->callback_event = callback_event;
53   e->cookie         = cookie;
54   return schedule(e->init(cont, t, 0));
55 }
56 
57 TS_INLINE Event *
schedule_in(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)58 EThread::schedule_in(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
59 {
60   Event *e          = ::eventAllocator.alloc();
61   e->callback_event = callback_event;
62   e->cookie         = cookie;
63   return schedule(e->init(cont, get_hrtime() + t, 0));
64 }
65 
66 TS_INLINE Event *
schedule_every(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)67 EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
68 {
69   Event *e          = ::eventAllocator.alloc();
70   e->callback_event = callback_event;
71   e->cookie         = cookie;
72   if (t < 0) {
73     return schedule(e->init(cont, t, t));
74   } else {
75     return schedule(e->init(cont, get_hrtime() + t, t));
76   }
77 }
78 
79 TS_INLINE Event *
schedule(Event * e)80 EThread::schedule(Event *e)
81 {
82   e->ethread = this;
83   if (tt != REGULAR) {
84     ink_assert(tt == DEDICATED);
85     return eventProcessor.schedule(e, ET_CALL);
86   }
87   if (e->continuation->mutex) {
88     e->mutex = e->continuation->mutex;
89   } else {
90     e->mutex = e->continuation->mutex = e->ethread->mutex;
91   }
92   ink_assert(e->mutex.get());
93 
94   // Make sure client IP debugging works consistently
95   // The continuation that gets scheduled later is not always the
96   // client VC, it can be HttpCacheSM etc. so save the flags
97   e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
98 
99   if (e->ethread == this_ethread()) {
100     EventQueueExternal.enqueue_local(e);
101   } else {
102     EventQueueExternal.enqueue(e);
103   }
104 
105   return e;
106 }
107 
108 TS_INLINE Event *
schedule_imm_local(Continuation * cont,int callback_event,void * cookie)109 EThread::schedule_imm_local(Continuation *cont, int callback_event, void *cookie)
110 {
111   Event *e          = EVENT_ALLOC(eventAllocator, this);
112   e->callback_event = callback_event;
113   e->cookie         = cookie;
114   return schedule_local(e->init(cont, 0, 0));
115 }
116 
117 TS_INLINE Event *
schedule_at_local(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)118 EThread::schedule_at_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
119 {
120   Event *e          = EVENT_ALLOC(eventAllocator, this);
121   e->callback_event = callback_event;
122   e->cookie         = cookie;
123   return schedule_local(e->init(cont, t, 0));
124 }
125 
126 TS_INLINE Event *
schedule_in_local(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)127 EThread::schedule_in_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
128 {
129   Event *e          = EVENT_ALLOC(eventAllocator, this);
130   e->callback_event = callback_event;
131   e->cookie         = cookie;
132   return schedule_local(e->init(cont, get_hrtime() + t, 0));
133 }
134 
135 TS_INLINE Event *
schedule_every_local(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)136 EThread::schedule_every_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
137 {
138   Event *e          = EVENT_ALLOC(eventAllocator, this);
139   e->callback_event = callback_event;
140   e->cookie         = cookie;
141   if (t < 0) {
142     return schedule_local(e->init(cont, t, t));
143   } else {
144     return schedule_local(e->init(cont, get_hrtime() + t, t));
145   }
146 }
147 
148 TS_INLINE Event *
schedule_local(Event * e)149 EThread::schedule_local(Event *e)
150 {
151   if (tt != REGULAR) {
152     ink_assert(tt == DEDICATED);
153     return eventProcessor.schedule(e, ET_CALL);
154   }
155   if (!e->mutex) {
156     e->ethread = this;
157     e->mutex   = e->continuation->mutex;
158   } else {
159     ink_assert(e->ethread == this);
160   }
161   e->globally_allocated = false;
162 
163   // Make sure client IP debugging works consistently
164   // The continuation that gets scheduled later is not always the
165   // client VC, it can be HttpCacheSM etc. so save the flags
166   e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
167   EventQueueExternal.enqueue_local(e);
168   return e;
169 }
170 
171 TS_INLINE Event *
schedule_spawn(Continuation * c,int ev,void * cookie)172 EThread::schedule_spawn(Continuation *c, int ev, void *cookie)
173 {
174   ink_assert(this != this_ethread()); // really broken to call this from the same thread.
175   if (start_event) {
176     free_event(start_event);
177   }
178   start_event          = EVENT_ALLOC(eventAllocator, this);
179   start_event->ethread = this;
180   start_event->mutex   = this->mutex;
181   start_event->init(c);
182   start_event->callback_event = ev;
183   start_event->cookie         = cookie;
184   return start_event;
185 }
186 
187 TS_INLINE EThread *
this_ethread()188 this_ethread()
189 {
190   // The `dynamic_cast` has a significant performance impact (~6%).
191   // Reported by masaori and create PR #6281 to fix it.
192   return static_cast<EThread *>(ink_thread_getspecific(ethread_key));
193 }
194 
195 TS_INLINE EThread *
this_event_thread()196 this_event_thread()
197 {
198   EThread *ethread = this_ethread();
199   if (ethread != nullptr && ethread->tt == REGULAR) {
200     return ethread;
201   } else {
202     return nullptr;
203   }
204 }
205 
206 TS_INLINE void
free_event(Event * e)207 EThread::free_event(Event *e)
208 {
209   ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
210   e->mutex = nullptr;
211   EVENT_FREE(e, eventAllocator, this);
212 }
213 
214 TS_INLINE void
set_tail_handler(LoopTailHandler * handler)215 EThread::set_tail_handler(LoopTailHandler *handler)
216 {
217   ink_atomic_swap(&tail_cb, handler);
218 }
219