1 /** @file
2
3 File contains the member function defs and thread loop for the process manager.
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "InkAPIInternal.h"
25 #include "ProcessManager.h"
26
27 #include "tscore/ink_apidefs.h"
28 #include "tscore/TSSystemState.h"
29 #include "MgmtSocket.h"
30 #include "tscore/I_Layout.h"
31
32 /*
33 * Global ProcessManager
34 */
35 inkcoreapi ProcessManager *pmgmt = nullptr;
36
37 // read_management_message attempts to read a message from the management
38 // socket. Returns -errno on error, otherwise 0. If a message was read the
39 // *msg pointer will be filled in with the message that was read.
40 static int
read_management_message(int sockfd,MgmtMessageHdr ** msg)41 read_management_message(int sockfd, MgmtMessageHdr **msg)
42 {
43 MgmtMessageHdr hdr;
44 int ret;
45
46 *msg = nullptr;
47
48 // We have a message, try to read the message header.
49 ret = mgmt_read_pipe(sockfd, reinterpret_cast<char *>(&hdr), sizeof(MgmtMessageHdr));
50 switch (ret) {
51 case 0:
52 // Received EOF.
53 return 0;
54 case sizeof(MgmtMessageHdr):
55 break;
56 default:
57 // Received -errno.
58 return -errno;
59 }
60
61 size_t msg_size = sizeof(MgmtMessageHdr) + hdr.data_len;
62 MgmtMessageHdr *full_msg = static_cast<MgmtMessageHdr *>(ats_malloc(msg_size));
63
64 memcpy(full_msg, &hdr, sizeof(MgmtMessageHdr));
65 char *data_raw = reinterpret_cast<char *>(full_msg) + sizeof(MgmtMessageHdr);
66
67 ret = mgmt_read_pipe(sockfd, data_raw, hdr.data_len);
68 if (ret == 0) {
69 // Received EOF.
70 ats_free(full_msg);
71 return 0;
72 } else if (ret < 0) {
73 // Received -errno.
74 ats_free(full_msg);
75 return ret;
76 } else {
77 ink_release_assert(ret == hdr.data_len);
78 // Received the message.
79 *msg = full_msg;
80 return 0;
81 }
82 }
83
84 void
start(std::function<TSThread ()> const & cb_init,std::function<void (TSThread)> const & cb_destroy)85 ProcessManager::start(std::function<TSThread()> const &cb_init, std::function<void(TSThread)> const &cb_destroy)
86 {
87 Debug("pmgmt", "starting process manager");
88
89 init = cb_init;
90 destroy = cb_destroy;
91
92 ink_release_assert(running == 0);
93 ink_atomic_increment(&running, 1);
94 ink_thread_create(&poll_thread, processManagerThread, nullptr, 0, 0, nullptr);
95 }
96
97 void
stop()98 ProcessManager::stop()
99 {
100 Debug("pmgmt", "stopping process manager");
101
102 ink_release_assert(running == 1);
103 ink_atomic_decrement(&running, 1);
104
105 int tmp;
106
107 if (local_manager_sockfd != ts::NO_FD) {
108 tmp = local_manager_sockfd;
109 local_manager_sockfd = ts::NO_FD;
110 close_socket(tmp);
111 }
112
113 #if HAVE_EVENTFD
114 if (wakeup_fd != ts::NO_FD) {
115 tmp = wakeup_fd;
116 wakeup_fd = ts::NO_FD;
117 close_socket(tmp);
118 }
119 #endif
120
121 ink_thread_kill(poll_thread, SIGINT);
122
123 ink_thread_join(poll_thread);
124 poll_thread = ink_thread_null();
125
126 while (!queue_is_empty(mgmt_signal_queue)) {
127 char *sig = static_cast<char *>(::dequeue(mgmt_signal_queue));
128 ats_free(sig);
129 }
130
131 LLQ *tmp_queue = mgmt_signal_queue;
132 mgmt_signal_queue = nullptr;
133 delete_queue(tmp_queue);
134 }
135
136 /*
137 * processManagerThread(...)
138 * The start function and thread loop for the process manager.
139 */
140 void *
processManagerThread(void * arg)141 ProcessManager::processManagerThread(void *arg)
142 {
143 void *ret = arg;
144
145 while (!pmgmt) { /* Avert race condition, thread spun during constructor */
146 Debug("pmgmt", "waiting for initialization");
147 mgmt_sleep_sec(1);
148 }
149
150 if (pmgmt->require_lm) { /* Allow p. process to run w/o a lm */
151 pmgmt->initLMConnection();
152 } else {
153 return ret;
154 }
155
156 if (pmgmt->init) {
157 pmgmt->managerThread = pmgmt->init();
158 }
159
160 // Start pumping messages between the local process and the process
161 // manager. This will terminate when the process manager terminates
162 // or the local process calls stop(). In either case, it is likely
163 // that we will first notice because we got a socket error, but in
164 // the latter case, the `running` flag has already been toggled so
165 // we know that we are really doing a shutdown.
166 while (pmgmt->running) {
167 int ret;
168
169 if (pmgmt->require_lm) {
170 ret = pmgmt->pollLMConnection();
171 if (ret < 0 && pmgmt->running && !TSSystemState::is_event_system_shut_down()) {
172 Alert("exiting with read error from process manager: %s", strerror(-ret));
173 }
174 }
175
176 ret = pmgmt->processSignalQueue();
177 if (ret < 0 && pmgmt->running && !TSSystemState::is_event_system_shut_down()) {
178 Alert("exiting with write error from process manager: %s", strerror(-ret));
179 }
180 }
181
182 if (pmgmt->destroy && pmgmt->managerThread != nullptr) {
183 pmgmt->destroy(pmgmt->managerThread);
184 pmgmt->managerThread = nullptr;
185 }
186
187 return ret;
188 }
189
ProcessManager(bool rlm)190 ProcessManager::ProcessManager(bool rlm)
191 : BaseManager(), require_lm(rlm), pid(getpid()), local_manager_sockfd(0), cbtable(nullptr), max_msgs_in_a_row(1)
192 {
193 mgmt_signal_queue = create_queue();
194
195 local_manager_sockfd = ts::NO_FD;
196 #if HAVE_EVENTFD
197 wakeup_fd = ts::NO_FD;
198 #endif
199
200 // Set temp. process/manager timeout. Will be reconfigure later.
201 // Making the process_manager thread a spinning thread to start traffic server
202 // as quickly as possible. Will reset this timeout when reconfigure()
203 timeout = 0;
204 }
205
~ProcessManager()206 ProcessManager::~ProcessManager()
207 {
208 if (running) {
209 stop();
210 }
211 }
212
213 void
reconfigure()214 ProcessManager::reconfigure()
215 {
216 max_msgs_in_a_row = MAX_MSGS_IN_A_ROW;
217
218 if (RecGetRecordInt("proxy.config.process_manager.timeout", &timeout) != REC_ERR_OKAY) {
219 // Default to 5sec if the timeout is unspecified.
220 timeout = 5;
221 }
222 }
223
224 void
signalConfigFileChild(const char * parent,const char * child)225 ProcessManager::signalConfigFileChild(const char *parent, const char *child)
226 {
227 static const MgmtMarshallType fields[] = {MGMT_MARSHALL_STRING, MGMT_MARSHALL_STRING};
228
229 size_t len = mgmt_message_length(fields, countof(fields), &parent, &child);
230 void *buffer = ats_malloc(len);
231
232 mgmt_message_marshall(buffer, len, fields, countof(fields), &parent, &child);
233 signalManager(MGMT_SIGNAL_CONFIG_FILE_CHILD, static_cast<const char *>(buffer), len);
234
235 ats_free(buffer);
236 }
237
238 void
signalManager(int msg_id,const char * data_str)239 ProcessManager::signalManager(int msg_id, const char *data_str)
240 {
241 signalManager(msg_id, data_str, strlen(data_str) + 1);
242 }
243
244 void
signalManager(int msg_id,const char * data_raw,int data_len)245 ProcessManager::signalManager(int msg_id, const char *data_raw, int data_len)
246 {
247 MgmtMessageHdr *mh;
248
249 mh = static_cast<MgmtMessageHdr *>(ats_malloc(sizeof(MgmtMessageHdr) + data_len));
250 mh->msg_id = msg_id;
251 mh->data_len = data_len;
252 memcpy(reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr), data_raw, data_len);
253 this->signalManager(mh);
254 }
255
256 void
signalManager(int msg_id,std::string_view text)257 ProcessManager::signalManager(int msg_id, std::string_view text)
258 {
259 MgmtMessageHdr *mh;
260
261 // Make space for the extra null terminator.
262 mh = static_cast<MgmtMessageHdr *>(ats_malloc(sizeof(MgmtMessageHdr) + text.size() + 1));
263 auto body = reinterpret_cast<char *>(mh + 1); // start of the message body.
264 mh->msg_id = msg_id;
265 mh->data_len = text.size() + 1;
266 memcpy(body, text.data(), text.size());
267 body[text.size()] = '\0';
268
269 this->signalManager(mh);
270 }
271
272 void
signalManager(MgmtMessageHdr * mh)273 ProcessManager::signalManager(MgmtMessageHdr *mh)
274 {
275 if (!this->running) {
276 Warning("MgmtMessageHdr is ignored. Because ProcessManager is not running");
277 return;
278 }
279 ink_release_assert(::enqueue(mgmt_signal_queue, mh));
280
281 #if HAVE_EVENTFD
282 // we don't care about the actual value of wakeup_fd, so just keep adding 1. just need to
283 // wakeup the fd. also, note that wakeup_fd was initialized to non-blocking so we can
284 // directly write to it without any timeout checking.
285 //
286 // don't trigger if MGMT_EVENT_LIBRECORD because they happen all the time
287 // and don't require a quick response. for MGMT_EVENT_LIBRECORD, rely on timeouts so
288 // traffic_server can spend more time doing other things/
289 uint64_t one = 1;
290 if (wakeup_fd != ts::NO_FD && mh->msg_id != MGMT_SIGNAL_LIBRECORDS) {
291 ATS_UNUSED_RETURN(write(wakeup_fd, &one, sizeof(uint64_t))); // trigger to stop polling
292 }
293 #endif
294 }
295
296 int
processSignalQueue()297 ProcessManager::processSignalQueue()
298 {
299 while (!queue_is_empty(mgmt_signal_queue)) {
300 MgmtMessageHdr *mh = static_cast<MgmtMessageHdr *>(::dequeue(mgmt_signal_queue));
301
302 Debug("pmgmt", "signaling local manager with message ID %d", mh->msg_id);
303
304 if (require_lm) {
305 int ret = mgmt_write_pipe(local_manager_sockfd, reinterpret_cast<char *>(mh), sizeof(MgmtMessageHdr) + mh->data_len);
306 ats_free(mh);
307
308 if (ret < 0) {
309 return ret;
310 }
311 }
312 }
313
314 return 0;
315 }
316
317 void
initLMConnection()318 ProcessManager::initLMConnection()
319 {
320 std::string rundir(RecConfigReadRuntimeDir());
321 std::string sockpath(Layout::relative_to(rundir, LM_CONNECTION_SERVER));
322
323 MgmtMessageHdr *mh_full;
324 int data_len;
325
326 int servlen;
327 struct sockaddr_un serv_addr;
328
329 if (sockpath.length() > sizeof(serv_addr.sun_path) - 1) {
330 errno = ENAMETOOLONG;
331 Fatal("Unable to create socket '%s': %s", sockpath.c_str(), strerror(errno));
332 }
333
334 /* Setup Connection to LocalManager */
335 memset(reinterpret_cast<char *>(&serv_addr), 0, sizeof(serv_addr));
336 serv_addr.sun_family = AF_UNIX;
337
338 ink_strlcpy(serv_addr.sun_path, sockpath.c_str(), sizeof(serv_addr.sun_path));
339 #if defined(darwin) || defined(freebsd)
340 servlen = sizeof(sockaddr_un);
341 #else
342 servlen = strlen(serv_addr.sun_path) + sizeof(serv_addr.sun_family);
343 #endif
344
345 if ((local_manager_sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
346 Fatal("Unable to create socket '%s': %s", sockpath.c_str(), strerror(errno));
347 }
348
349 if (fcntl(local_manager_sockfd, F_SETFD, FD_CLOEXEC) < 0) {
350 Fatal("unable to set close-on-exec flag: %s", strerror(errno));
351 }
352
353 if ((connect(local_manager_sockfd, reinterpret_cast<struct sockaddr *>(&serv_addr), servlen)) < 0) {
354 Fatal("failed to connect management socket '%s': %s", sockpath.c_str(), strerror(errno));
355 }
356
357 #if HAVE_EVENTFD
358 wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
359 if (wakeup_fd < 0) {
360 Fatal("unable to create wakeup eventfd. errno: %s", strerror(errno));
361 }
362 #endif
363
364 data_len = sizeof(pid_t);
365 mh_full = static_cast<MgmtMessageHdr *>(alloca(sizeof(MgmtMessageHdr) + data_len));
366 mh_full->msg_id = MGMT_SIGNAL_PID;
367 mh_full->data_len = data_len;
368
369 memcpy(reinterpret_cast<char *>(mh_full) + sizeof(MgmtMessageHdr), &(pid), data_len);
370
371 if (mgmt_write_pipe(local_manager_sockfd, reinterpret_cast<char *>(mh_full), sizeof(MgmtMessageHdr) + data_len) <= 0) {
372 Fatal("error writing message: %s", strerror(errno));
373 }
374 }
375
376 int
pollLMConnection()377 ProcessManager::pollLMConnection()
378 {
379 int count;
380 int ready;
381 struct timeval timeout;
382 fd_set fdlist;
383
384 // Avoid getting stuck enqueuing too many requests in a row, limit to MAX_MSGS_IN_A_ROW.
385 for (count = 0; running && count < max_msgs_in_a_row; ++count) {
386 timeout.tv_sec = 1;
387 timeout.tv_usec = 0;
388
389 FD_ZERO(&fdlist);
390
391 if (local_manager_sockfd != ts::NO_FD) {
392 FD_SET(local_manager_sockfd, &fdlist);
393 }
394
395 #if HAVE_EVENTFD
396 if (wakeup_fd != ts::NO_FD) {
397 FD_SET(wakeup_fd, &fdlist);
398 }
399 #endif
400
401 // wait for data on socket
402 ready = mgmt_select(FD_SETSIZE, &fdlist, nullptr, nullptr, &timeout);
403
404 switch (ready) {
405 case 0:
406 // Timed out.
407 return 0;
408 case -1:
409 if (mgmt_transient_error()) {
410 continue;
411 }
412 return -errno;
413 }
414
415 if (local_manager_sockfd != ts::NO_FD && FD_ISSET(local_manager_sockfd, &fdlist)) { /* Message from manager */
416 MgmtMessageHdr *msg;
417
418 int ret = read_management_message(local_manager_sockfd, &msg);
419 if (ret < 0) {
420 return ret;
421 }
422
423 // No message, we are done polling. */
424 if (msg == nullptr) {
425 return 0;
426 }
427
428 Debug("pmgmt", "received message ID %d", msg->msg_id);
429 handleMgmtMsgFromLM(msg);
430 }
431 #if HAVE_EVENTFD
432 else if (wakeup_fd != ts::NO_FD && FD_ISSET(wakeup_fd, &fdlist)) { /* if msg, keep polling for more */
433 // read or else fd will always be set.
434 uint64_t ignore;
435 ATS_UNUSED_RETURN(read(wakeup_fd, &ignore, sizeof(uint64_t)));
436 break;
437 }
438 #endif
439 }
440 Debug("pmgmt", "enqueued %d of max %d messages in a row", count, max_msgs_in_a_row);
441 return 0;
442 }
443
444 void
handleMgmtMsgFromLM(MgmtMessageHdr * mh)445 ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh)
446 {
447 ink_assert(mh != nullptr);
448
449 auto payload = mh->payload();
450
451 Debug("pmgmt", "processing event id '%d' payload=%d", mh->msg_id, mh->data_len);
452 switch (mh->msg_id) {
453 case MGMT_EVENT_SHUTDOWN:
454 executeMgmtCallback(MGMT_EVENT_SHUTDOWN, {});
455 Alert("exiting on shutdown message");
456 break;
457 case MGMT_EVENT_RESTART:
458 executeMgmtCallback(MGMT_EVENT_RESTART, {});
459 break;
460 case MGMT_EVENT_DRAIN:
461 executeMgmtCallback(MGMT_EVENT_DRAIN, payload);
462 break;
463 case MGMT_EVENT_CLEAR_STATS:
464 executeMgmtCallback(MGMT_EVENT_CLEAR_STATS, {});
465 break;
466 case MGMT_EVENT_HOST_STATUS_UP:
467 executeMgmtCallback(MGMT_EVENT_HOST_STATUS_UP, payload);
468 break;
469 case MGMT_EVENT_HOST_STATUS_DOWN:
470 executeMgmtCallback(MGMT_EVENT_HOST_STATUS_DOWN, payload);
471 break;
472 case MGMT_EVENT_ROLL_LOG_FILES:
473 executeMgmtCallback(MGMT_EVENT_ROLL_LOG_FILES, {});
474 break;
475 case MGMT_EVENT_PLUGIN_CONFIG_UPDATE: {
476 auto msg{payload.rebind<char>()};
477 if (!msg.empty() && msg[0] != '\0' && this->cbtable) {
478 this->cbtable->invoke(msg.data());
479 }
480 } break;
481 case MGMT_EVENT_CONFIG_FILE_UPDATE:
482 /*
483 librecords -- we don't do anything in here because we are traffic_server
484 and we are not the owner of proxy.config.* variables.
485 Even if we trigger the sync_required bit, by
486 RecSetSynRequired, the sync. message will send back to
487 traffic_manager. And traffic_manager founds out that, the
488 actual value of the config variable didn't changed.
489 At the end, the sync_required bit is not set and we will
490 never get notified and callbacks are never invoked.
491
492 The solution is to set the sync_required bit on the
493 manager side. See LocalManager::sendMgmtMsgToProcesses()
494 for details.
495 */
496 break;
497 case MGMT_EVENT_LIBRECORDS:
498 executeMgmtCallback(MGMT_EVENT_LIBRECORDS, payload);
499 break;
500 case MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE:
501 executeMgmtCallback(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE, payload);
502 break;
503 case MGMT_EVENT_LIFECYCLE_MESSAGE:
504 executeMgmtCallback(MGMT_EVENT_LIFECYCLE_MESSAGE, payload);
505 break;
506 default:
507 Warning("received unknown message ID %d\n", mh->msg_id);
508 break;
509 }
510
511 ats_free(mh);
512 }
513