1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 /****************************************************************************
25
26 P_UnixEThread.h
27
28
29
30 *****************************************************************************/
31 #pragma once
32
33 #include "I_EThread.h"
34 #include "I_EventProcessor.h"
35
36 const ink_hrtime DELAY_FOR_RETRY = HRTIME_MSECONDS(10);
37 extern ink_thread_key ethread_key;
38
39 TS_INLINE Event *
schedule_imm(Continuation * cont,int callback_event,void * cookie)40 EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
41 {
42 Event *e = ::eventAllocator.alloc();
43 e->callback_event = callback_event;
44 e->cookie = cookie;
45 return schedule(e->init(cont, 0, 0));
46 }
47
48 TS_INLINE Event *
schedule_at(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)49 EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
50 {
51 Event *e = ::eventAllocator.alloc();
52 e->callback_event = callback_event;
53 e->cookie = cookie;
54 return schedule(e->init(cont, t, 0));
55 }
56
57 TS_INLINE Event *
schedule_in(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)58 EThread::schedule_in(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
59 {
60 Event *e = ::eventAllocator.alloc();
61 e->callback_event = callback_event;
62 e->cookie = cookie;
63 return schedule(e->init(cont, get_hrtime() + t, 0));
64 }
65
66 TS_INLINE Event *
schedule_every(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)67 EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
68 {
69 Event *e = ::eventAllocator.alloc();
70 e->callback_event = callback_event;
71 e->cookie = cookie;
72 if (t < 0) {
73 return schedule(e->init(cont, t, t));
74 } else {
75 return schedule(e->init(cont, get_hrtime() + t, t));
76 }
77 }
78
79 TS_INLINE Event *
schedule(Event * e)80 EThread::schedule(Event *e)
81 {
82 e->ethread = this;
83 if (tt != REGULAR) {
84 ink_assert(tt == DEDICATED);
85 return eventProcessor.schedule(e, ET_CALL);
86 }
87 if (e->continuation->mutex) {
88 e->mutex = e->continuation->mutex;
89 } else {
90 e->mutex = e->continuation->mutex = e->ethread->mutex;
91 }
92 ink_assert(e->mutex.get());
93
94 // Make sure client IP debugging works consistently
95 // The continuation that gets scheduled later is not always the
96 // client VC, it can be HttpCacheSM etc. so save the flags
97 e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
98
99 if (e->ethread == this_ethread()) {
100 EventQueueExternal.enqueue_local(e);
101 } else {
102 EventQueueExternal.enqueue(e);
103 }
104
105 return e;
106 }
107
108 TS_INLINE Event *
schedule_imm_local(Continuation * cont,int callback_event,void * cookie)109 EThread::schedule_imm_local(Continuation *cont, int callback_event, void *cookie)
110 {
111 Event *e = EVENT_ALLOC(eventAllocator, this);
112 e->callback_event = callback_event;
113 e->cookie = cookie;
114 return schedule_local(e->init(cont, 0, 0));
115 }
116
117 TS_INLINE Event *
schedule_at_local(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)118 EThread::schedule_at_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
119 {
120 Event *e = EVENT_ALLOC(eventAllocator, this);
121 e->callback_event = callback_event;
122 e->cookie = cookie;
123 return schedule_local(e->init(cont, t, 0));
124 }
125
126 TS_INLINE Event *
schedule_in_local(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)127 EThread::schedule_in_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
128 {
129 Event *e = EVENT_ALLOC(eventAllocator, this);
130 e->callback_event = callback_event;
131 e->cookie = cookie;
132 return schedule_local(e->init(cont, get_hrtime() + t, 0));
133 }
134
135 TS_INLINE Event *
schedule_every_local(Continuation * cont,ink_hrtime t,int callback_event,void * cookie)136 EThread::schedule_every_local(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
137 {
138 Event *e = EVENT_ALLOC(eventAllocator, this);
139 e->callback_event = callback_event;
140 e->cookie = cookie;
141 if (t < 0) {
142 return schedule_local(e->init(cont, t, t));
143 } else {
144 return schedule_local(e->init(cont, get_hrtime() + t, t));
145 }
146 }
147
148 TS_INLINE Event *
schedule_local(Event * e)149 EThread::schedule_local(Event *e)
150 {
151 if (tt != REGULAR) {
152 ink_assert(tt == DEDICATED);
153 return eventProcessor.schedule(e, ET_CALL);
154 }
155 if (!e->mutex) {
156 e->ethread = this;
157 e->mutex = e->continuation->mutex;
158 } else {
159 ink_assert(e->ethread == this);
160 }
161 e->globally_allocated = false;
162
163 // Make sure client IP debugging works consistently
164 // The continuation that gets scheduled later is not always the
165 // client VC, it can be HttpCacheSM etc. so save the flags
166 e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
167 EventQueueExternal.enqueue_local(e);
168 return e;
169 }
170
171 TS_INLINE Event *
schedule_spawn(Continuation * c,int ev,void * cookie)172 EThread::schedule_spawn(Continuation *c, int ev, void *cookie)
173 {
174 ink_assert(this != this_ethread()); // really broken to call this from the same thread.
175 if (start_event) {
176 free_event(start_event);
177 }
178 start_event = EVENT_ALLOC(eventAllocator, this);
179 start_event->ethread = this;
180 start_event->mutex = this->mutex;
181 start_event->init(c);
182 start_event->callback_event = ev;
183 start_event->cookie = cookie;
184 return start_event;
185 }
186
187 TS_INLINE EThread *
this_ethread()188 this_ethread()
189 {
190 // The `dynamic_cast` has a significant performance impact (~6%).
191 // Reported by masaori and create PR #6281 to fix it.
192 return static_cast<EThread *>(ink_thread_getspecific(ethread_key));
193 }
194
195 TS_INLINE EThread *
this_event_thread()196 this_event_thread()
197 {
198 EThread *ethread = this_ethread();
199 if (ethread != nullptr && ethread->tt == REGULAR) {
200 return ethread;
201 } else {
202 return nullptr;
203 }
204 }
205
206 TS_INLINE void
free_event(Event * e)207 EThread::free_event(Event *e)
208 {
209 ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
210 e->mutex = nullptr;
211 EVENT_FREE(e, eventAllocator, this);
212 }
213
214 TS_INLINE void
set_tail_handler(LoopTailHandler * handler)215 EThread::set_tail_handler(LoopTailHandler *handler)
216 {
217 ink_atomic_swap(&tail_cb, handler);
218 }
219