1 /** @file
2
3 The Local Manager process of the management system.
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "tscore/ink_platform.h"
25 #include "tscore/ink_sock.h"
26 #include "tscore/ink_file.h"
27 #include "tscore/ink_error.h"
28 #include "Alarms.h"
29 #include "MgmtUtils.h"
30 #include "tscore/I_Layout.h"
31 #include "tscore/runroot.h"
32 #include "LocalManager.h"
33 #include "MgmtSocket.h"
34 #include "tscore/ink_cap.h"
35 #include "FileManager.h"
36 #include <string_view>
37 #include <algorithm>
38 #include "tscpp/util/TextView.h"
39 #include "tscore/BufferWriter.h"
40 #include "tscore/bwf_std_format.h"
41 #include "tscore/Filenames.h"
42
43 #if TS_USE_POSIX_CAP
44 #include <sys/capability.h>
45 #endif
46
47 using namespace std::literals;
48 static const std::string_view MGMT_OPT{"-M"};
49 static const std::string_view RUNROOT_OPT{"--run-root="};
50
51 void
mgmtCleanup()52 LocalManager::mgmtCleanup()
53 {
54 close_socket(process_server_sockfd);
55 process_server_sockfd = ts::NO_FD;
56
57 #if HAVE_EVENTFD
58 if (wakeup_fd != ts::NO_FD) {
59 close_socket(wakeup_fd);
60 wakeup_fd = ts::NO_FD;
61 }
62 #endif
63
64 // fix me for librecords
65
66 closelog();
67 return;
68 }
69
70 void
mgmtShutdown()71 LocalManager::mgmtShutdown()
72 {
73 mgmt_log("[LocalManager::mgmtShutdown] Executing shutdown request.\n");
74 processShutdown(true);
75 // WCCP TBD: Send a shutdown message to routers.
76
77 if (processRunning()) {
78 waitpid(watched_process_pid, nullptr, 0);
79 #if defined(linux)
80 /* Avert race condition, wait for the thread to complete,
81 before getting one more restart process */
82 /* Workaround for bugid INKqa10060 */
83 mgmt_sleep_msec(1);
84 #endif
85 }
86 mgmtCleanup();
87 }
88
89 void
processShutdown(bool mainThread)90 LocalManager::processShutdown(bool mainThread)
91 {
92 mgmt_log("[LocalManager::processShutdown] Executing process shutdown request.\n");
93 if (mainThread) {
94 sendMgmtMsgToProcesses(MGMT_EVENT_SHUTDOWN, "processShutdown[main]");
95 } else {
96 signalEvent(MGMT_EVENT_SHUTDOWN, "processShutdown");
97 }
98 return;
99 }
100
101 void
processRestart()102 LocalManager::processRestart()
103 {
104 mgmt_log("[LocalManager::processRestart] Executing process restart request.\n");
105 signalEvent(MGMT_EVENT_RESTART, "processRestart");
106 return;
107 }
108
109 void
processBounce()110 LocalManager::processBounce()
111 {
112 mgmt_log("[LocalManager::processBounce] Executing process bounce request.\n");
113 signalEvent(MGMT_EVENT_BOUNCE, "processBounce");
114 return;
115 }
116
117 void
processDrain(int to_drain)118 LocalManager::processDrain(int to_drain)
119 {
120 mgmt_log("[LocalManager::processDrain] Executing process drain request.\n");
121 signalEvent(MGMT_EVENT_DRAIN, to_drain ? "1" : "0");
122 return;
123 }
124
125 void
rollLogFiles()126 LocalManager::rollLogFiles()
127 {
128 mgmt_log("[LocalManager::rollLogFiles] Log files are being rolled.\n");
129 signalEvent(MGMT_EVENT_ROLL_LOG_FILES, "rollLogs");
130 return;
131 }
132
133 void
hostStatusSetDown(const char * marshalled_req,int len)134 LocalManager::hostStatusSetDown(const char *marshalled_req, int len)
135 {
136 signalEvent(MGMT_EVENT_HOST_STATUS_DOWN, marshalled_req, len);
137 return;
138 }
139
140 void
hostStatusSetUp(const char * marshalled_req,int len)141 LocalManager::hostStatusSetUp(const char *marshalled_req, int len)
142 {
143 signalEvent(MGMT_EVENT_HOST_STATUS_UP, marshalled_req, len);
144 return;
145 }
146
147 void
clearStats(const char * name)148 LocalManager::clearStats(const char *name)
149 {
150 // Clear our records and then send the signal. There is a race condition
151 // here where our stats could get re-updated from the proxy
152 // before the proxy clears them, but this should be rare.
153 //
154 // Doing things in the opposite order prevents that race
155 // but exacerbates the race between the node and cluster
156 // stats getting cleared by propagation of clearing the
157 // cluster stats
158 //
159 if (name && *name) {
160 RecResetStatRecord(name);
161 } else {
162 RecResetStatRecord(RECT_NULL, true);
163 }
164
165 // If the proxy is not running, sending the signal does
166 // not do anything. Remove the stats file to make sure
167 // that operation works even when the proxy is off
168 //
169 if (this->proxy_running == 0) {
170 ats_scoped_str statsPath(RecConfigReadPersistentStatsPath());
171 if (unlink(statsPath) < 0) {
172 if (errno != ENOENT) {
173 mgmt_log("[LocalManager::clearStats] Unlink of %s failed : %s\n", (const char *)statsPath, strerror(errno));
174 }
175 }
176 }
177 }
178
179 bool
processRunning()180 LocalManager::processRunning()
181 {
182 if (watched_process_fd != ts::NO_FD && watched_process_pid != -1) {
183 return true;
184 } else {
185 return false;
186 }
187 }
188
LocalManager(bool proxy_on,bool listen)189 LocalManager::LocalManager(bool proxy_on, bool listen) : BaseManager(), run_proxy(proxy_on), listen_for_proxy(listen)
190 {
191 bool found;
192 std::string bindir(RecConfigReadBinDir());
193 std::string sysconfdir(RecConfigReadConfigDir());
194
195 manager_started_at = time(nullptr);
196
197 RecRegisterStatInt(RECT_NODE, "proxy.node.proxy_running", 0, RECP_NON_PERSISTENT);
198
199 RecInt http_enabled = REC_readInteger("proxy.config.http.enabled", &found);
200 ink_assert(found);
201 if (http_enabled && found) {
202 HttpProxyPort::loadConfig(m_proxy_ports);
203 }
204 HttpProxyPort::loadDefaultIfEmpty(m_proxy_ports);
205
206 // Get the default IP binding values.
207 RecHttpLoadIp("proxy.local.incoming_ip_to_bind", m_inbound_ip4, m_inbound_ip6);
208
209 if (access(sysconfdir.c_str(), R_OK) == -1) {
210 mgmt_log("[LocalManager::LocalManager] unable to access() directory '%s': %d, %s\n", sysconfdir.c_str(), errno,
211 strerror(errno));
212 mgmt_fatal(0, "[LocalManager::LocalManager] please set the 'TS_ROOT' environment variable\n");
213 }
214
215 #if TS_HAS_WCCP
216 // Bind the WCCP address if present.
217 ats_scoped_str wccp_addr_str(REC_readString("proxy.config.wccp.addr", &found));
218 if (found && wccp_addr_str && *wccp_addr_str) {
219 wccp_cache.setAddr(inet_addr(wccp_addr_str));
220 mgmt_log("[LocalManager::LocalManager] WCCP identifying address set to %s.\n", static_cast<char *>(wccp_addr_str));
221 }
222
223 ats_scoped_str wccp_config_str(RecConfigReadConfigPath("proxy.config.wccp.services"));
224 if (wccp_config_str && strlen(wccp_config_str) > 0) {
225 bool located = true;
226 if (access(wccp_config_str, R_OK) == -1) {
227 located = false;
228 }
229
230 if (located) {
231 wccp_cache.loadServicesFromFile(wccp_config_str);
232 } else { // not located
233 mgmt_log("[LocalManager::LocalManager] WCCP service configuration file '%s' was specified but could not be found in the file "
234 "system.\n",
235 static_cast<char *>(wccp_config_str));
236 }
237 }
238 #endif
239
240 process_server_timeout_secs = REC_readInteger("proxy.config.lm.pserver_timeout_secs", &found);
241 process_server_timeout_msecs = REC_readInteger("proxy.config.lm.pserver_timeout_msecs", &found);
242 proxy_name = REC_readString("proxy.config.proxy_name", &found);
243 proxy_binary = REC_readString("proxy.config.proxy_binary", &found);
244 env_prep = REC_readString("proxy.config.env_prep", &found);
245
246 // Calculate proxy_binary from the absolute bin_path
247 absolute_proxy_binary = ats_stringdup(Layout::relative_to(bindir, proxy_binary));
248
249 // coverity[fs_check_call]
250 if (access(absolute_proxy_binary, R_OK | X_OK) == -1) {
251 mgmt_log("[LocalManager::LocalManager] Unable to access() '%s': %d, %s\n", absolute_proxy_binary, errno, strerror(errno));
252 mgmt_fatal(0, "[LocalManager::LocalManager] please set bin path 'proxy.config.bin_path' \n");
253 }
254
255 return;
256 }
257
~LocalManager()258 LocalManager::~LocalManager()
259 {
260 delete alarm_keeper;
261 ats_free(absolute_proxy_binary);
262 ats_free(proxy_name);
263 ats_free(proxy_binary);
264 ats_free(env_prep);
265 }
266
267 void
initAlarm()268 LocalManager::initAlarm()
269 {
270 alarm_keeper = new Alarms();
271 }
272
273 /*
274 * initMgmtProcessServer()
275 * sets up the server socket that proxy processes connect to.
276 */
277 void
initMgmtProcessServer()278 LocalManager::initMgmtProcessServer()
279 {
280 std::string rundir(RecConfigReadRuntimeDir());
281 std::string sockpath(Layout::relative_to(rundir, LM_CONNECTION_SERVER));
282 mode_t oldmask = umask(0);
283
284 #if TS_HAS_WCCP
285 if (wccp_cache.isConfigured()) {
286 if (0 > wccp_cache.open())
287 mgmt_log("Failed to open WCCP socket\n");
288 }
289 #endif
290
291 process_server_sockfd = bind_unix_domain_socket(sockpath.c_str(), 00700);
292 if (process_server_sockfd == -1) {
293 mgmt_fatal(errno, "[LocalManager::initMgmtProcessServer] failed to bind socket at %s\n", sockpath.c_str());
294 }
295
296 #if HAVE_EVENTFD
297 wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
298 if (wakeup_fd < 0) {
299 mgmt_fatal(errno, "[LocalManager::initMgmtProcessServer] failed to create eventfd. errno : %s\n", strerror(errno));
300 }
301 #endif
302
303 umask(oldmask);
304 RecSetRecordInt("proxy.node.restarts.manager.start_time", manager_started_at, REC_SOURCE_DEFAULT);
305 }
306
307 /*
308 * pollMgmtProcessServer()
309 * - Function checks the mgmt process server for new processes
310 * and any requests sent from processes. It handles processes sent.
311 */
312 void
pollMgmtProcessServer()313 LocalManager::pollMgmtProcessServer()
314 {
315 struct timeval timeout;
316 fd_set fdlist;
317
318 while (true) {
319 #if TS_HAS_WCCP
320 int wccp_fd = wccp_cache.getSocket();
321 #endif
322
323 timeout.tv_sec = process_server_timeout_secs;
324 timeout.tv_usec = process_server_timeout_msecs * 1000;
325
326 FD_ZERO(&fdlist);
327
328 if (process_server_sockfd != ts::NO_FD) {
329 FD_SET(process_server_sockfd, &fdlist);
330 }
331
332 if (watched_process_fd != ts::NO_FD) {
333 FD_SET(watched_process_fd, &fdlist);
334 }
335
336 #if TS_HAS_WCCP
337 // Only run WCCP housekeeping while we have a server process.
338 // Note: The WCCP socket is opened iff WCCP is configured.
339 if (wccp_fd != ts::NO_FD && watched_process_fd != ts::NO_FD) {
340 wccp_cache.housekeeping();
341 time_t wccp_wait = wccp_cache.waitTime();
342 if (wccp_wait < process_server_timeout_secs)
343 timeout.tv_sec = wccp_wait;
344
345 if (wccp_fd != ts::NO_FD) {
346 FD_SET(wccp_fd, &fdlist);
347 }
348 }
349 #endif
350
351 #if HAVE_EVENTFD
352 if (wakeup_fd != ts::NO_FD) {
353 FD_SET(wakeup_fd, &fdlist);
354 }
355 #endif
356
357 int num = mgmt_select(FD_SETSIZE, &fdlist, nullptr, nullptr, &timeout);
358
359 switch (num) {
360 case 0:
361 // Timed out, nothing to do.
362 return;
363 case -1:
364 if (mgmt_transient_error()) {
365 continue;
366 }
367
368 mgmt_log("[LocalManager::pollMgmtProcessServer] select failed: %s (%d)\n", ::strerror(errno), errno);
369 return;
370
371 default:
372 // if we get a wakeup_fd event, we may not want to follow it
373 // because there may be more data to be read on the socket.
374 bool keep_polling = false;
375 #if TS_HAS_WCCP
376 if (wccp_fd != ts::NO_FD && FD_ISSET(wccp_fd, &fdlist)) {
377 wccp_cache.handleMessage();
378 --num;
379 keep_polling = true;
380 }
381 #endif
382
383 if (process_server_sockfd != ts::NO_FD && FD_ISSET(process_server_sockfd, &fdlist)) { /* New connection */
384 struct sockaddr_in clientAddr;
385 socklen_t clientLen = sizeof(clientAddr);
386 int new_sockfd = mgmt_accept(process_server_sockfd, reinterpret_cast<struct sockaddr *>(&clientAddr), &clientLen);
387
388 mgmt_log("[LocalManager::pollMgmtProcessServer] New process connecting fd '%d'\n", new_sockfd);
389
390 if (new_sockfd < 0) {
391 mgmt_elog(errno, "[LocalManager::pollMgmtProcessServer] ==> ");
392 } else if (!processRunning()) {
393 watched_process_fd = new_sockfd;
394 } else {
395 close_socket(new_sockfd);
396 }
397 --num;
398 keep_polling = true;
399 }
400
401 if (ts::NO_FD != watched_process_fd && FD_ISSET(watched_process_fd, &fdlist)) {
402 int res;
403 MgmtMessageHdr mh_hdr;
404
405 keep_polling = true;
406
407 // read the message
408 if ((res = mgmt_read_pipe(watched_process_fd, reinterpret_cast<char *>(&mh_hdr), sizeof(MgmtMessageHdr))) > 0) {
409 MgmtMessageHdr *mh_full = static_cast<MgmtMessageHdr *>(malloc(sizeof(MgmtMessageHdr) + mh_hdr.data_len));
410 memcpy(mh_full, &mh_hdr, sizeof(MgmtMessageHdr));
411 char *data_raw = reinterpret_cast<char *>(mh_full) + sizeof(MgmtMessageHdr);
412 if ((res = mgmt_read_pipe(watched_process_fd, data_raw, mh_hdr.data_len)) > 0) {
413 handleMgmtMsgFromProcesses(mh_full);
414 } else if (res < 0) {
415 mgmt_fatal(0, "[LocalManager::pollMgmtProcessServer] Error in read (errno: %d)\n", -res);
416 }
417 free(mh_full);
418 } else if (res < 0) {
419 mgmt_fatal(0, "[LocalManager::pollMgmtProcessServer] Error in read (errno: %d)\n", -res);
420 }
421
422 // handle EOF
423 if (res == 0) {
424 int estatus;
425 pid_t tmp_pid = watched_process_pid;
426
427 Debug("lm", "[LocalManager::pollMgmtProcessServer] Lost process EOF!");
428
429 close_socket(watched_process_fd);
430
431 waitpid(watched_process_pid, &estatus, 0); /* Reap child */
432 if (WIFSIGNALED(estatus)) {
433 int sig = WTERMSIG(estatus);
434 mgmt_log("[LocalManager::pollMgmtProcessServer] Server Process terminated due to Sig %d: %s\n", sig, strsignal(sig));
435 } else if (WIFEXITED(estatus)) {
436 int return_code = WEXITSTATUS(estatus);
437
438 // traffic_server's exit code will be UNRECOVERABLE_EXIT if it calls
439 // ink_emergency() or ink_emergency_va(). The call signals that traffic_server
440 // cannot be recovered with a reboot. In other words, catastrophic failure.
441 if (return_code == UNRECOVERABLE_EXIT) {
442 proxy_recoverable = false;
443 }
444 }
445
446 if (lmgmt->run_proxy) {
447 mgmt_log("[Alarms::signalAlarm] Server Process was reset\n");
448 lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_PROCESS_DIED, nullptr);
449 } else {
450 mgmt_log("[TrafficManager] Server process shutdown\n");
451 }
452
453 watched_process_fd = watched_process_pid = -1;
454 if (tmp_pid != -1) { /* Incremented after a pid: message is sent */
455 proxy_running--;
456 }
457 proxy_started_at = -1;
458 RecSetRecordInt("proxy.node.proxy_running", 0, REC_SOURCE_DEFAULT);
459 }
460
461 --num;
462 }
463
464 #if HAVE_EVENTFD
465 if (wakeup_fd != ts::NO_FD && FD_ISSET(wakeup_fd, &fdlist)) {
466 if (!keep_polling) {
467 // read or else fd will always be set.
468 uint64_t ignore;
469 ATS_UNUSED_RETURN(read(wakeup_fd, &ignore, sizeof(uint64_t)));
470 return;
471 }
472 --num;
473 }
474 #else
475 (void)keep_polling; // suppress compiler warning
476 #endif
477
478 ink_assert(num == 0); /* Invariant */
479 }
480 }
481 }
482
483 void
handleMgmtMsgFromProcesses(MgmtMessageHdr * mh)484 LocalManager::handleMgmtMsgFromProcesses(MgmtMessageHdr *mh)
485 {
486 char *data_raw = reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr);
487 switch (mh->msg_id) {
488 case MGMT_SIGNAL_PID:
489 watched_process_pid = *(reinterpret_cast<pid_t *>(data_raw));
490 lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_PROCESS_BORN, nullptr);
491 proxy_running++;
492 proxy_launch_pid = -1;
493 proxy_launch_outstanding = false;
494 RecSetRecordInt("proxy.node.proxy_running", 1, REC_SOURCE_DEFAULT);
495 break;
496
497 // FIX: This is very messy need to correlate mgmt signals and
498 // alarms better
499 case MGMT_SIGNAL_CONFIG_ERROR:
500 alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_CONFIG_ERROR, data_raw);
501 break;
502 case MGMT_SIGNAL_SYSTEM_ERROR:
503 alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_SYSTEM_ERROR, data_raw);
504 break;
505 case MGMT_SIGNAL_CACHE_ERROR:
506 alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_CACHE_ERROR, data_raw);
507 break;
508 case MGMT_SIGNAL_CACHE_WARNING:
509 alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_CACHE_WARNING, data_raw);
510 break;
511 case MGMT_SIGNAL_LOGGING_ERROR:
512 alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_LOGGING_ERROR, data_raw);
513 break;
514 case MGMT_SIGNAL_LOGGING_WARNING:
515 alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_LOGGING_WARNING, data_raw);
516 break;
517 case MGMT_SIGNAL_PLUGIN_SET_CONFIG: {
518 char var_name[256];
519 char var_value[256];
520 MgmtType stype;
521 // stype is an enum type, so cast to an int* to avoid warnings. /leif
522 int tokens = sscanf(data_raw, "%255s %d %255s", var_name, reinterpret_cast<int *>(&stype), var_value);
523 if (tokens != 3) {
524 stype = MGMT_INVALID;
525 }
526 switch (stype) {
527 case MGMT_INT:
528 RecSetRecordInt(var_name, ink_atoi64(var_value), REC_SOURCE_EXPLICIT);
529 break;
530 case MGMT_COUNTER:
531 case MGMT_FLOAT:
532 case MGMT_STRING:
533 case MGMT_INVALID:
534 default:
535 mgmt_log("[LocalManager::handleMgmtMsgFromProcesses] "
536 "Invalid plugin set-config msg '%s'\n",
537 data_raw);
538 break;
539 }
540 } break;
541 case MGMT_SIGNAL_LIBRECORDS:
542 if (mh->data_len > 0) {
543 executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, {data_raw, static_cast<size_t>(mh->data_len)});
544 } else {
545 executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, {});
546 }
547 break;
548 case MGMT_SIGNAL_CONFIG_FILE_CHILD: {
549 static const MgmtMarshallType fields[] = {MGMT_MARSHALL_STRING, MGMT_MARSHALL_STRING};
550 char *parent = nullptr;
551 char *child = nullptr;
552 if (mgmt_message_parse(data_raw, mh->data_len, fields, countof(fields), &parent, &child) != -1) {
553 configFiles->configFileChild(parent, child);
554 } else {
555 mgmt_log("[LocalManager::handleMgmtMsgFromProcesses] "
556 "MGMT_SIGNAL_CONFIG_FILE_CHILD mgmt_message_parse error\n");
557 }
558 // Output pointers are guaranteed to be NULL or valid.
559 ats_free_null(parent);
560 ats_free_null(child);
561 } break;
562
563 default:
564 break;
565 }
566 }
567
568 void
sendMgmtMsgToProcesses(int msg_id,const char * data_str)569 LocalManager::sendMgmtMsgToProcesses(int msg_id, const char *data_str)
570 {
571 sendMgmtMsgToProcesses(msg_id, data_str, strlen(data_str) + 1);
572 return;
573 }
574
575 void
sendMgmtMsgToProcesses(int msg_id,const char * data_raw,int data_len)576 LocalManager::sendMgmtMsgToProcesses(int msg_id, const char *data_raw, int data_len)
577 {
578 MgmtMessageHdr *mh;
579
580 mh = static_cast<MgmtMessageHdr *>(alloca(sizeof(MgmtMessageHdr) + data_len));
581 mh->msg_id = msg_id;
582 mh->data_len = data_len;
583 memcpy(reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr), data_raw, data_len);
584 sendMgmtMsgToProcesses(mh);
585 return;
586 }
587
588 void
sendMgmtMsgToProcesses(MgmtMessageHdr * mh)589 LocalManager::sendMgmtMsgToProcesses(MgmtMessageHdr *mh)
590 {
591 switch (mh->msg_id) {
592 case MGMT_EVENT_SHUTDOWN: {
593 run_proxy = false;
594 this->closeProxyPorts();
595 break;
596 }
597 case MGMT_EVENT_RESTART:
598 run_proxy = true;
599 listenForProxy();
600 return;
601 case MGMT_EVENT_BOUNCE: /* Just bouncing the cluster, have it exit well restart */
602 mh->msg_id = MGMT_EVENT_SHUTDOWN;
603 break;
604 case MGMT_EVENT_ROLL_LOG_FILES:
605 mgmt_log("[LocalManager::SendMgmtMsgsToProcesses]Event is being constructed .\n");
606 break;
607 case MGMT_EVENT_CONFIG_FILE_UPDATE:
608 bool found;
609 char *fname = nullptr;
610 ConfigManager *rb;
611 char *data_raw;
612
613 data_raw = reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr);
614 fname = REC_readString(data_raw, &found);
615
616 RecT rec_type;
617 if (RecGetRecordType(data_raw, &rec_type) == REC_ERR_OKAY && rec_type == RECT_CONFIG) {
618 RecSetSyncRequired(data_raw);
619 } else {
620 mgmt_log("[LocalManager:sendMgmtMsgToProcesses] Unknown file change: '%s'\n", data_raw);
621 }
622 ink_assert(found);
623 if (!(fname && configFiles && configFiles->getConfigObj(fname, &rb)) &&
624 (strcmp(data_raw, "proxy.config.body_factory.template_sets_dir") != 0) &&
625 (strcmp(data_raw, "proxy.config.ssl.server.ticket_key.filename") != 0)) {
626 mgmt_fatal(0, "[LocalManager::sendMgmtMsgToProcesses] "
627 "Invalid 'data_raw' for MGMT_EVENT_CONFIG_FILE_UPDATE\n");
628 }
629 ats_free(fname);
630 break;
631 }
632
633 if (watched_process_fd != -1) {
634 if (mgmt_write_pipe(watched_process_fd, reinterpret_cast<char *>(mh), sizeof(MgmtMessageHdr) + mh->data_len) <= 0) {
635 // In case of Linux, sometimes when the TS dies, the connection between TS and TM
636 // is not closed properly. the socket does not receive an EOF. So, the TM does
637 // not detect that the connection and hence TS has gone down. Hence it still
638 // tries to send a message to TS, but encounters an error and enters here
639 // Also, ensure that this whole thing is done only once because there will be a
640 // deluge of message in the traffic.log otherwise
641
642 static pid_t check_prev_pid = watched_process_pid;
643 static pid_t check_current_pid = watched_process_pid;
644 if (check_prev_pid != watched_process_pid) {
645 check_prev_pid = watched_process_pid;
646 check_current_pid = watched_process_pid;
647 }
648
649 if (check_prev_pid == check_current_pid) {
650 check_current_pid = -1;
651 int lerrno = errno;
652 mgmt_elog(errno, "[LocalManager::sendMgmtMsgToProcesses] Error writing message\n");
653 if (lerrno == ECONNRESET || lerrno == EPIPE) { // Connection closed by peer or Broken pipe
654 if ((kill(watched_process_pid, 0) < 0) && (errno == ESRCH)) {
655 // TS is down
656 pid_t tmp_pid = watched_process_pid;
657 close_socket(watched_process_fd);
658 mgmt_log("[LocalManager::pollMgmtProcessServer] "
659 "Server Process has been terminated\n");
660 if (lmgmt->run_proxy) {
661 mgmt_log("[Alarms::signalAlarm] Server Process was reset\n");
662 lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_PROCESS_DIED, nullptr);
663 } else {
664 mgmt_log("[TrafficManager] Server process shutdown\n");
665 }
666 watched_process_fd = watched_process_pid = -1;
667 if (tmp_pid != -1) { /* Incremented after a pid: message is sent */
668 proxy_running--;
669 }
670 proxy_started_at = -1;
671 RecSetRecordInt("proxy.node.proxy_running", 0, REC_SOURCE_DEFAULT);
672 // End of TS down
673 } else {
674 // TS is still up, but the connection is lost
675 const char *err_msg = "The TS-TM connection is broken for some reason. Either restart TS and TM or correct this error "
676 "for TM to display TS statistics correctly";
677 lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_SYSTEM_ERROR, err_msg);
678 }
679
680 // check if the TS is down, by checking the pid
681 // if TS is down, then,
682 // raise an alarm
683 // set the variables so that TS is restarted later
684 // else (TS is still up)
685 // raise an alarm stating the problem
686 }
687 }
688 }
689 }
690 }
691
692 void
signalFileChange(const char * var_name)693 LocalManager::signalFileChange(const char *var_name)
694 {
695 signalEvent(MGMT_EVENT_CONFIG_FILE_UPDATE, var_name);
696
697 return;
698 }
699
700 void
signalEvent(int msg_id,const char * data_str)701 LocalManager::signalEvent(int msg_id, const char *data_str)
702 {
703 signalEvent(msg_id, data_str, strlen(data_str) + 1);
704 return;
705 }
706
707 void
signalEvent(int msg_id,const char * data_raw,int data_len)708 LocalManager::signalEvent(int msg_id, const char *data_raw, int data_len)
709 {
710 MgmtMessageHdr *mh;
711 size_t n = sizeof(MgmtMessageHdr) + data_len;
712
713 mh = static_cast<MgmtMessageHdr *>(ats_malloc(n));
714 mh->msg_id = msg_id;
715 mh->data_len = data_len;
716 auto payload = mh->payload();
717 memcpy(payload.data(), data_raw, data_len);
718 this->enqueue(mh);
719 // ink_assert(enqueue(mgmt_event_queue, mh));
720
721 #if HAVE_EVENTFD
722 // we don't care about the actual value of wakeup_fd, so just keep adding 1. just need to
723 // wakeup the fd. also, note that wakeup_fd was initialized to non-blocking so we can
724 // directly write to it without any timeout checking.
725 //
726 // don't trigger if MGMT_EVENT_LIBRECORD because they happen all the time
727 // and don't require a quick response. for MGMT_EVENT_LIBRECORD, rely on timeouts so
728 // traffic_server can spend more time doing other things
729 uint64_t one = 1;
730 if (wakeup_fd != ts::NO_FD && mh->msg_id != MGMT_EVENT_LIBRECORDS) {
731 ATS_UNUSED_RETURN(write(wakeup_fd, &one, sizeof(uint64_t))); // trigger to stop polling
732 }
733 #endif
734 }
735
736 /*
737 * processEventQueue()
738 * Function drains and processes the mgmt event queue
739 * notifying any registered callback functions and performing
740 * any mgmt tasks for each event.
741 */
742 void
processEventQueue()743 LocalManager::processEventQueue()
744 {
745 while (!this->queue_empty()) {
746 bool handled_by_mgmt = false;
747
748 MgmtMessageHdr *mh = this->dequeue();
749 auto payload = mh->payload().rebind<char>();
750
751 // check if we have a local file update
752 if (mh->msg_id == MGMT_EVENT_CONFIG_FILE_UPDATE) {
753 // records.config
754 if (!(strcmp(payload.begin(), ts::filename::RECORDS))) {
755 if (RecReadConfigFile() != REC_ERR_OKAY) {
756 mgmt_elog(errno, "[fileUpdated] Config update failed for %s\n", ts::filename::RECORDS);
757 } else {
758 RecConfigWarnIfUnregistered();
759 }
760 handled_by_mgmt = true;
761 }
762 }
763
764 if (!handled_by_mgmt) {
765 if (processRunning() == false) {
766 // Fix INKqa04984
767 // If traffic server hasn't completely come up yet,
768 // we will hold off until next round.
769 this->enqueue(mh);
770 return;
771 }
772 Debug("lm", "[TrafficManager] ==> Sending signal event '%d' %s payload=%d", mh->msg_id, payload.begin(), int(payload.size()));
773 lmgmt->sendMgmtMsgToProcesses(mh);
774 }
775 ats_free(mh);
776 }
777 }
778
779 /*
780 * startProxy()
781 * Function fires up a proxy process.
782 *
783 * Args:
784 * onetime_options: one time options that traffic_server should be started with (ie
785 * these options do not persist across reboots)
786 */
787 static const size_t OPTIONS_SIZE = 16384; // Arbitrary max size for command line option string
788
789 bool
startProxy(const char * onetime_options)790 LocalManager::startProxy(const char *onetime_options)
791 {
792 if (proxy_launch_outstanding) {
793 return false;
794 }
795 mgmt_log("[LocalManager::startProxy] Launching ts process\n");
796
797 pid_t pid;
798
799 // Before we do anything lets check for the existence of
800 // the traffic server binary along with it's execute permissions
801 if (access(absolute_proxy_binary, F_OK) < 0) {
802 // Error can't find traffic_server
803 mgmt_elog(errno, "[LocalManager::startProxy] Unable to find traffic server at %s\n", absolute_proxy_binary);
804 return false;
805 }
806 // traffic server binary exists, check permissions
807 else if (access(absolute_proxy_binary, R_OK | X_OK) < 0) {
808 // Error don't have proper permissions
809 mgmt_elog(errno, "[LocalManager::startProxy] Unable to access %s due to bad permissions \n", absolute_proxy_binary);
810 return false;
811 }
812
813 if (env_prep) {
814 #ifdef POSIX_THREAD
815 if ((pid = fork()) < 0)
816 #else
817 if ((pid = fork1()) < 0)
818 #endif
819 {
820 mgmt_elog(errno, "[LocalManager::startProxy] Unable to fork1 prep process\n");
821 return false;
822 } else if (pid > 0) {
823 int estatus;
824 waitpid(pid, &estatus, 0);
825 } else {
826 int res;
827
828 char env_prep_bin[MAXPATHLEN];
829 std::string bindir(RecConfigReadBinDir());
830
831 ink_filepath_make(env_prep_bin, sizeof(env_prep_bin), bindir.c_str(), env_prep);
832 res = execl(env_prep_bin, env_prep_bin, (char *)nullptr);
833 _exit(res);
834 }
835 }
836 #ifdef POSIX_THREAD
837 if ((pid = fork()) < 0)
838 #else
839 if ((pid = fork1()) < 0)
840 #endif
841 {
842 mgmt_elog(errno, "[LocalManager::startProxy] Unable to fork1 process\n");
843 return false;
844 } else if (pid > 0) { /* Parent */
845 proxy_launch_pid = pid;
846 proxy_launch_outstanding = true;
847 proxy_started_at = time(nullptr);
848 ++proxy_launch_count;
849 RecSetRecordInt("proxy.node.restarts.proxy.start_time", proxy_started_at, REC_SOURCE_DEFAULT);
850 RecSetRecordInt("proxy.node.restarts.proxy.restart_count", proxy_launch_count, REC_SOURCE_DEFAULT);
851 } else {
852 int i = 0;
853 char *options[32], *last, *tok;
854 char options_buffer[OPTIONS_SIZE];
855 ts::FixedBufferWriter w{options_buffer, OPTIONS_SIZE};
856
857 w.clip(1);
858 w.print("{}{}", ts::bwf::OptionalAffix(proxy_options), ts::bwf::OptionalAffix(onetime_options));
859
860 // Make sure we're starting the proxy in mgmt mode
861 if (w.view().find(MGMT_OPT) == std::string_view::npos) {
862 w.write(MGMT_OPT);
863 w.write(' ');
864 }
865
866 // pass the runroot option to traffic_server
867 std::string_view runroot_arg = get_runroot();
868 if (!runroot_arg.empty()) {
869 w.write(RUNROOT_OPT);
870 w.write(runroot_arg);
871 w.write(' ');
872 }
873
874 // Pass down port/fd information to traffic_server if there are any open ports.
875 if (std::any_of(m_proxy_ports.begin(), m_proxy_ports.end(), [](HttpProxyPort &p) { return ts::NO_FD != p.m_fd; })) {
876 char portbuf[128];
877 bool need_comma_p = false;
878
879 w.write("--httpport "sv);
880 for (auto &p : m_proxy_ports) {
881 if (ts::NO_FD != p.m_fd) {
882 if (need_comma_p) {
883 w.write(',');
884 }
885 need_comma_p = true;
886 p.print(portbuf, sizeof(portbuf));
887 w.write(portbuf);
888 }
889 }
890 }
891
892 w.extend(1);
893 w.write('\0'); // null terminate.
894
895 Debug("lm", "[LocalManager::startProxy] Launching %s '%s'", absolute_proxy_binary, w.data());
896
897 // Unfortunately the normally obnoxious null writing of strtok is in this case a required
898 // side effect and other alternatives are noticeably more clunky.
899 ink_zero(options);
900 options[0] = absolute_proxy_binary;
901 i = 1;
902 tok = strtok_r(options_buffer, " ", &last);
903 Debug("lm", "opt %d = '%s'", i, tok);
904 options[i++] = tok;
905 while (i < 32 && (tok = strtok_r(nullptr, " ", &last))) {
906 Debug("lm", "opt %d = '%s'", i, tok);
907 options[i++] = tok;
908 }
909
910 EnableDeathSignal(SIGTERM);
911
912 execv(absolute_proxy_binary, options);
913 mgmt_fatal(errno, "[LocalManager::startProxy] Exec of %s failed\n", absolute_proxy_binary);
914 }
915 return true;
916 }
917
918 /** Close all open ports.
919 */
920 void
closeProxyPorts()921 LocalManager::closeProxyPorts()
922 {
923 for (auto &p : lmgmt->m_proxy_ports) {
924 if (ts::NO_FD != p.m_fd) {
925 close_socket(p.m_fd);
926 p.m_fd = ts::NO_FD;
927 }
928 }
929 }
930 /*
931 * listenForProxy()
932 * Function listens on the accept port of the proxy, so users aren't dropped.
933 */
934 void
listenForProxy()935 LocalManager::listenForProxy()
936 {
937 if (!run_proxy || !listen_for_proxy) {
938 return;
939 }
940
941 // We are not already bound, bind the port
942 for (auto &p : lmgmt->m_proxy_ports) {
943 if (ts::NO_FD == p.m_fd) {
944 // Check the protocol (TCP or UDP) and create an appropriate socket
945 if (p.isQUIC()) {
946 this->bindUdpProxyPort(p);
947 } else {
948 this->bindTcpProxyPort(p);
949 }
950 }
951
952 std::string_view fam{ats_ip_family_name(p.m_family)};
953 if (p.isQUIC()) {
954 // Can we do something like listen backlog for QUIC(UDP) ??
955 // Do nothing for now
956 } else {
957 // read backlog configuration value and overwrite the default value if found
958 bool found;
959 RecInt backlog = REC_readInteger("proxy.config.net.listen_backlog", &found);
960 backlog = (found && backlog >= 0) ? backlog : ats_tcp_somaxconn();
961
962 if ((listen(p.m_fd, backlog)) < 0) {
963 mgmt_fatal(errno, "[LocalManager::listenForProxy] Unable to listen on port: %d (%.*s)\n", p.m_port, fam.size(), fam.data());
964 }
965 }
966
967 mgmt_log("[LocalManager::listenForProxy] Listening on port: %d (%.*s)\n", p.m_port, fam.size(), fam.data());
968 }
969 return;
970 }
971
972 /*
973 * bindUdpProxyPort()
974 * Function binds the accept port of the proxy
975 */
976 void
bindUdpProxyPort(HttpProxyPort & port)977 LocalManager::bindUdpProxyPort(HttpProxyPort &port)
978 {
979 int one = 1;
980 int priv = (port.m_port < 1024 && 0 != geteuid()) ? ElevateAccess::LOW_PORT_PRIVILEGE : 0;
981
982 ElevateAccess access(priv);
983
984 if ((port.m_fd = socket(port.m_family, SOCK_DGRAM, 0)) < 0) {
985 mgmt_fatal(0, "[bindProxyPort] Unable to create socket : %s\n", strerror(errno));
986 }
987
988 if (port.m_family == AF_INET6) {
989 if (setsockopt(port.m_fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
990 mgmt_log("[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
991 }
992 }
993 if (setsockopt(port.m_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&one), sizeof(int)) < 0) {
994 mgmt_fatal(0, "[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
995 }
996
997 IpEndpoint ip;
998 if (port.m_inbound_ip.isValid()) {
999 ip.assign(port.m_inbound_ip);
1000 } else if (AF_INET6 == port.m_family) {
1001 if (m_inbound_ip6.isValid()) {
1002 ip.assign(m_inbound_ip6);
1003 } else {
1004 ip.setToAnyAddr(AF_INET6);
1005 }
1006 } else if (AF_INET == port.m_family) {
1007 if (m_inbound_ip4.isValid()) {
1008 ip.assign(m_inbound_ip4);
1009 } else {
1010 ip.setToAnyAddr(AF_INET);
1011 }
1012 } else {
1013 mgmt_fatal(0, "[bindProxyPort] Proxy port with invalid address type %d\n", port.m_family);
1014 }
1015 ip.port() = htons(port.m_port);
1016 if (bind(port.m_fd, &ip.sa, ats_ip_size(&ip)) < 0) {
1017 mgmt_fatal(0, "[bindProxyPort] Unable to bind socket: %d : %s\n", port.m_port, strerror(errno));
1018 }
1019
1020 Debug("lm", "[bindProxyPort] Successfully bound proxy port %d", port.m_port);
1021 }
1022
1023 /*
1024 * bindTcpProxyPort()
1025 * Function binds the accept port of the proxy
1026 */
1027 void
bindTcpProxyPort(HttpProxyPort & port)1028 LocalManager::bindTcpProxyPort(HttpProxyPort &port)
1029 {
1030 int one = 1;
1031 int priv = (port.m_port < 1024 && 0 != geteuid()) ? ElevateAccess::LOW_PORT_PRIVILEGE : 0;
1032
1033 ElevateAccess access(priv);
1034
1035 /* Setup reliable connection, for large config changes */
1036 if ((port.m_fd = socket(port.m_family, SOCK_STREAM, 0)) < 0) {
1037 mgmt_fatal(0, "[bindProxyPort] Unable to create socket : %s\n", strerror(errno));
1038 }
1039
1040 if (port.m_type == HttpProxyPort::TRANSPORT_DEFAULT) {
1041 int should_filter_int = 0;
1042 bool found;
1043 should_filter_int = REC_readInteger("proxy.config.net.defer_accept", &found);
1044 if (found && should_filter_int > 0) {
1045 #if defined(SOL_FILTER) && defined(FIL_ATTACH)
1046 (void)setsockopt(port.m_fd, SOL_FILTER, FIL_ATTACH, "httpfilt", 9);
1047 #endif
1048 }
1049 }
1050
1051 if (port.m_mptcp) {
1052 #if MPTCP_ENABLED
1053 int err;
1054
1055 err = setsockopt(port.m_fd, IPPROTO_TCP, MPTCP_ENABLED, &one, sizeof(one));
1056 if (err < 0) {
1057 mgmt_log("[bindProxyPort] Unable to enable MPTCP: %s\n", strerror(errno));
1058 Debug("lm_mptcp", "[bindProxyPort] Unable to enable MPTCP: %s", strerror(errno));
1059 } else {
1060 mgmt_log("[bindProxyPort] Successfully enabled MPTCP on %d\n", port.m_port);
1061 Debug("lm_mptcp", "[bindProxyPort] Successfully enabled MPTCP on %d\n", port.m_port);
1062 }
1063 #else
1064 Debug("lm_mptcp", "[bindProxyPort] Multipath TCP requested but not configured on this host");
1065 #endif
1066 }
1067
1068 if (port.m_family == AF_INET6) {
1069 if (setsockopt(port.m_fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
1070 mgmt_log("[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
1071 }
1072 }
1073 if (setsockopt(port.m_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&one), sizeof(int)) < 0) {
1074 mgmt_fatal(0, "[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
1075 }
1076
1077 if (port.m_proxy_protocol) {
1078 Debug("lm", "[bindProxyPort] Proxy Protocol enabled");
1079 }
1080
1081 if (port.m_inbound_transparent_p) {
1082 #if TS_USE_TPROXY
1083 Debug("http_tproxy", "Listen port %d inbound transparency enabled.", port.m_port);
1084 if (setsockopt(port.m_fd, SOL_IP, TS_IP_TRANSPARENT, &one, sizeof(one)) == -1) {
1085 mgmt_fatal(0, "[bindProxyPort] Unable to set transparent socket option [%d] %s\n", errno, strerror(errno));
1086 }
1087 #else
1088 Debug("lm", "[bindProxyPort] Transparency requested but TPROXY not configured");
1089 #endif
1090 }
1091
1092 IpEndpoint ip;
1093 if (port.m_inbound_ip.isValid()) {
1094 ip.assign(port.m_inbound_ip);
1095 } else if (AF_INET6 == port.m_family) {
1096 if (m_inbound_ip6.isValid()) {
1097 ip.assign(m_inbound_ip6);
1098 } else {
1099 ip.setToAnyAddr(AF_INET6);
1100 }
1101 } else if (AF_INET == port.m_family) {
1102 if (m_inbound_ip4.isValid()) {
1103 ip.assign(m_inbound_ip4);
1104 } else {
1105 ip.setToAnyAddr(AF_INET);
1106 }
1107 } else {
1108 mgmt_fatal(0, "[bindProxyPort] Proxy port with invalid address type %d\n", port.m_family);
1109 }
1110 ip.port() = htons(port.m_port);
1111 if (bind(port.m_fd, &ip.sa, ats_ip_size(&ip)) < 0) {
1112 mgmt_fatal(0, "[bindProxyPort] Unable to bind socket: %d : %s\n", port.m_port, strerror(errno));
1113 }
1114
1115 Debug("lm", "[bindProxyPort] Successfully bound proxy port %d", port.m_port);
1116 }
1117
1118 void
signalAlarm(int alarm_id,const char * desc,const char * ip)1119 LocalManager::signalAlarm(int alarm_id, const char *desc, const char *ip)
1120 {
1121 if (alarm_keeper) {
1122 alarm_keeper->signalAlarm(static_cast<alarm_t>(alarm_id), desc, ip);
1123 }
1124 }
1125