1 /** @file
2 
3   The Local Manager process of the management system.
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "tscore/ink_platform.h"
25 #include "tscore/ink_sock.h"
26 #include "tscore/ink_file.h"
27 #include "tscore/ink_error.h"
28 #include "Alarms.h"
29 #include "MgmtUtils.h"
30 #include "tscore/I_Layout.h"
31 #include "tscore/runroot.h"
32 #include "LocalManager.h"
33 #include "MgmtSocket.h"
34 #include "tscore/ink_cap.h"
35 #include "FileManager.h"
36 #include <string_view>
37 #include <algorithm>
38 #include "tscpp/util/TextView.h"
39 #include "tscore/BufferWriter.h"
40 #include "tscore/bwf_std_format.h"
41 #include "tscore/Filenames.h"
42 
43 #if TS_USE_POSIX_CAP
44 #include <sys/capability.h>
45 #endif
46 
47 using namespace std::literals;
48 static const std::string_view MGMT_OPT{"-M"};
49 static const std::string_view RUNROOT_OPT{"--run-root="};
50 
51 void
mgmtCleanup()52 LocalManager::mgmtCleanup()
53 {
54   close_socket(process_server_sockfd);
55   process_server_sockfd = ts::NO_FD;
56 
57 #if HAVE_EVENTFD
58   if (wakeup_fd != ts::NO_FD) {
59     close_socket(wakeup_fd);
60     wakeup_fd = ts::NO_FD;
61   }
62 #endif
63 
64   // fix me for librecords
65 
66   closelog();
67   return;
68 }
69 
70 void
mgmtShutdown()71 LocalManager::mgmtShutdown()
72 {
73   mgmt_log("[LocalManager::mgmtShutdown] Executing shutdown request.\n");
74   processShutdown(true);
75   // WCCP TBD: Send a shutdown message to routers.
76 
77   if (processRunning()) {
78     waitpid(watched_process_pid, nullptr, 0);
79 #if defined(linux)
80     /* Avert race condition, wait for the thread to complete,
81        before getting one more restart process */
82     /* Workaround for bugid INKqa10060 */
83     mgmt_sleep_msec(1);
84 #endif
85   }
86   mgmtCleanup();
87 }
88 
89 void
processShutdown(bool mainThread)90 LocalManager::processShutdown(bool mainThread)
91 {
92   mgmt_log("[LocalManager::processShutdown] Executing process shutdown request.\n");
93   if (mainThread) {
94     sendMgmtMsgToProcesses(MGMT_EVENT_SHUTDOWN, "processShutdown[main]");
95   } else {
96     signalEvent(MGMT_EVENT_SHUTDOWN, "processShutdown");
97   }
98   return;
99 }
100 
101 void
processRestart()102 LocalManager::processRestart()
103 {
104   mgmt_log("[LocalManager::processRestart] Executing process restart request.\n");
105   signalEvent(MGMT_EVENT_RESTART, "processRestart");
106   return;
107 }
108 
109 void
processBounce()110 LocalManager::processBounce()
111 {
112   mgmt_log("[LocalManager::processBounce] Executing process bounce request.\n");
113   signalEvent(MGMT_EVENT_BOUNCE, "processBounce");
114   return;
115 }
116 
117 void
processDrain(int to_drain)118 LocalManager::processDrain(int to_drain)
119 {
120   mgmt_log("[LocalManager::processDrain] Executing process drain request.\n");
121   signalEvent(MGMT_EVENT_DRAIN, to_drain ? "1" : "0");
122   return;
123 }
124 
125 void
rollLogFiles()126 LocalManager::rollLogFiles()
127 {
128   mgmt_log("[LocalManager::rollLogFiles] Log files are being rolled.\n");
129   signalEvent(MGMT_EVENT_ROLL_LOG_FILES, "rollLogs");
130   return;
131 }
132 
133 void
hostStatusSetDown(const char * marshalled_req,int len)134 LocalManager::hostStatusSetDown(const char *marshalled_req, int len)
135 {
136   signalEvent(MGMT_EVENT_HOST_STATUS_DOWN, marshalled_req, len);
137   return;
138 }
139 
140 void
hostStatusSetUp(const char * marshalled_req,int len)141 LocalManager::hostStatusSetUp(const char *marshalled_req, int len)
142 {
143   signalEvent(MGMT_EVENT_HOST_STATUS_UP, marshalled_req, len);
144   return;
145 }
146 
147 void
clearStats(const char * name)148 LocalManager::clearStats(const char *name)
149 {
150   // Clear our records and then send the signal.  There is a race condition
151   //  here where our stats could get re-updated from the proxy
152   //  before the proxy clears them, but this should be rare.
153   //
154   //  Doing things in the opposite order prevents that race
155   //   but exacerbates the race between the node and cluster
156   //   stats getting cleared by propagation of clearing the
157   //   cluster stats
158   //
159   if (name && *name) {
160     RecResetStatRecord(name);
161   } else {
162     RecResetStatRecord(RECT_NULL, true);
163   }
164 
165   // If the proxy is not running, sending the signal does
166   //   not do anything.  Remove the stats file to make sure
167   //   that operation works even when the proxy is off
168   //
169   if (this->proxy_running == 0) {
170     ats_scoped_str statsPath(RecConfigReadPersistentStatsPath());
171     if (unlink(statsPath) < 0) {
172       if (errno != ENOENT) {
173         mgmt_log("[LocalManager::clearStats] Unlink of %s failed : %s\n", (const char *)statsPath, strerror(errno));
174       }
175     }
176   }
177 }
178 
179 bool
processRunning()180 LocalManager::processRunning()
181 {
182   if (watched_process_fd != ts::NO_FD && watched_process_pid != -1) {
183     return true;
184   } else {
185     return false;
186   }
187 }
188 
LocalManager(bool proxy_on,bool listen)189 LocalManager::LocalManager(bool proxy_on, bool listen) : BaseManager(), run_proxy(proxy_on), listen_for_proxy(listen)
190 {
191   bool found;
192   std::string bindir(RecConfigReadBinDir());
193   std::string sysconfdir(RecConfigReadConfigDir());
194 
195   manager_started_at = time(nullptr);
196 
197   RecRegisterStatInt(RECT_NODE, "proxy.node.proxy_running", 0, RECP_NON_PERSISTENT);
198 
199   RecInt http_enabled = REC_readInteger("proxy.config.http.enabled", &found);
200   ink_assert(found);
201   if (http_enabled && found) {
202     HttpProxyPort::loadConfig(m_proxy_ports);
203   }
204   HttpProxyPort::loadDefaultIfEmpty(m_proxy_ports);
205 
206   // Get the default IP binding values.
207   RecHttpLoadIp("proxy.local.incoming_ip_to_bind", m_inbound_ip4, m_inbound_ip6);
208 
209   if (access(sysconfdir.c_str(), R_OK) == -1) {
210     mgmt_log("[LocalManager::LocalManager] unable to access() directory '%s': %d, %s\n", sysconfdir.c_str(), errno,
211              strerror(errno));
212     mgmt_fatal(0, "[LocalManager::LocalManager] please set the 'TS_ROOT' environment variable\n");
213   }
214 
215 #if TS_HAS_WCCP
216   // Bind the WCCP address if present.
217   ats_scoped_str wccp_addr_str(REC_readString("proxy.config.wccp.addr", &found));
218   if (found && wccp_addr_str && *wccp_addr_str) {
219     wccp_cache.setAddr(inet_addr(wccp_addr_str));
220     mgmt_log("[LocalManager::LocalManager] WCCP identifying address set to %s.\n", static_cast<char *>(wccp_addr_str));
221   }
222 
223   ats_scoped_str wccp_config_str(RecConfigReadConfigPath("proxy.config.wccp.services"));
224   if (wccp_config_str && strlen(wccp_config_str) > 0) {
225     bool located = true;
226     if (access(wccp_config_str, R_OK) == -1) {
227       located = false;
228     }
229 
230     if (located) {
231       wccp_cache.loadServicesFromFile(wccp_config_str);
232     } else { // not located
233       mgmt_log("[LocalManager::LocalManager] WCCP service configuration file '%s' was specified but could not be found in the file "
234                "system.\n",
235                static_cast<char *>(wccp_config_str));
236     }
237   }
238 #endif
239 
240   process_server_timeout_secs  = REC_readInteger("proxy.config.lm.pserver_timeout_secs", &found);
241   process_server_timeout_msecs = REC_readInteger("proxy.config.lm.pserver_timeout_msecs", &found);
242   proxy_name                   = REC_readString("proxy.config.proxy_name", &found);
243   proxy_binary                 = REC_readString("proxy.config.proxy_binary", &found);
244   env_prep                     = REC_readString("proxy.config.env_prep", &found);
245 
246   // Calculate proxy_binary from the absolute bin_path
247   absolute_proxy_binary = ats_stringdup(Layout::relative_to(bindir, proxy_binary));
248 
249   // coverity[fs_check_call]
250   if (access(absolute_proxy_binary, R_OK | X_OK) == -1) {
251     mgmt_log("[LocalManager::LocalManager] Unable to access() '%s': %d, %s\n", absolute_proxy_binary, errno, strerror(errno));
252     mgmt_fatal(0, "[LocalManager::LocalManager] please set bin path 'proxy.config.bin_path' \n");
253   }
254 
255   return;
256 }
257 
~LocalManager()258 LocalManager::~LocalManager()
259 {
260   delete alarm_keeper;
261   ats_free(absolute_proxy_binary);
262   ats_free(proxy_name);
263   ats_free(proxy_binary);
264   ats_free(env_prep);
265 }
266 
267 void
initAlarm()268 LocalManager::initAlarm()
269 {
270   alarm_keeper = new Alarms();
271 }
272 
273 /*
274  * initMgmtProcessServer()
275  *   sets up the server socket that proxy processes connect to.
276  */
277 void
initMgmtProcessServer()278 LocalManager::initMgmtProcessServer()
279 {
280   std::string rundir(RecConfigReadRuntimeDir());
281   std::string sockpath(Layout::relative_to(rundir, LM_CONNECTION_SERVER));
282   mode_t oldmask = umask(0);
283 
284 #if TS_HAS_WCCP
285   if (wccp_cache.isConfigured()) {
286     if (0 > wccp_cache.open())
287       mgmt_log("Failed to open WCCP socket\n");
288   }
289 #endif
290 
291   process_server_sockfd = bind_unix_domain_socket(sockpath.c_str(), 00700);
292   if (process_server_sockfd == -1) {
293     mgmt_fatal(errno, "[LocalManager::initMgmtProcessServer] failed to bind socket at %s\n", sockpath.c_str());
294   }
295 
296 #if HAVE_EVENTFD
297   wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
298   if (wakeup_fd < 0) {
299     mgmt_fatal(errno, "[LocalManager::initMgmtProcessServer] failed to create eventfd. errno : %s\n", strerror(errno));
300   }
301 #endif
302 
303   umask(oldmask);
304   RecSetRecordInt("proxy.node.restarts.manager.start_time", manager_started_at, REC_SOURCE_DEFAULT);
305 }
306 
307 /*
308  * pollMgmtProcessServer()
309  * -  Function checks the mgmt process server for new processes
310  *    and any requests sent from processes. It handles processes sent.
311  */
312 void
pollMgmtProcessServer()313 LocalManager::pollMgmtProcessServer()
314 {
315   struct timeval timeout;
316   fd_set fdlist;
317 
318   while (true) {
319 #if TS_HAS_WCCP
320     int wccp_fd = wccp_cache.getSocket();
321 #endif
322 
323     timeout.tv_sec  = process_server_timeout_secs;
324     timeout.tv_usec = process_server_timeout_msecs * 1000;
325 
326     FD_ZERO(&fdlist);
327 
328     if (process_server_sockfd != ts::NO_FD) {
329       FD_SET(process_server_sockfd, &fdlist);
330     }
331 
332     if (watched_process_fd != ts::NO_FD) {
333       FD_SET(watched_process_fd, &fdlist);
334     }
335 
336 #if TS_HAS_WCCP
337     // Only run WCCP housekeeping while we have a server process.
338     // Note: The WCCP socket is opened iff WCCP is configured.
339     if (wccp_fd != ts::NO_FD && watched_process_fd != ts::NO_FD) {
340       wccp_cache.housekeeping();
341       time_t wccp_wait = wccp_cache.waitTime();
342       if (wccp_wait < process_server_timeout_secs)
343         timeout.tv_sec = wccp_wait;
344 
345       if (wccp_fd != ts::NO_FD) {
346         FD_SET(wccp_fd, &fdlist);
347       }
348     }
349 #endif
350 
351 #if HAVE_EVENTFD
352     if (wakeup_fd != ts::NO_FD) {
353       FD_SET(wakeup_fd, &fdlist);
354     }
355 #endif
356 
357     int num = mgmt_select(FD_SETSIZE, &fdlist, nullptr, nullptr, &timeout);
358 
359     switch (num) {
360     case 0:
361       // Timed out, nothing to do.
362       return;
363     case -1:
364       if (mgmt_transient_error()) {
365         continue;
366       }
367 
368       mgmt_log("[LocalManager::pollMgmtProcessServer] select failed: %s (%d)\n", ::strerror(errno), errno);
369       return;
370 
371     default:
372       // if we get a wakeup_fd event, we may not want to follow it
373       // because there may be more data to be read on the socket.
374       bool keep_polling = false;
375 #if TS_HAS_WCCP
376       if (wccp_fd != ts::NO_FD && FD_ISSET(wccp_fd, &fdlist)) {
377         wccp_cache.handleMessage();
378         --num;
379         keep_polling = true;
380       }
381 #endif
382 
383       if (process_server_sockfd != ts::NO_FD && FD_ISSET(process_server_sockfd, &fdlist)) { /* New connection */
384         struct sockaddr_in clientAddr;
385         socklen_t clientLen = sizeof(clientAddr);
386         int new_sockfd      = mgmt_accept(process_server_sockfd, reinterpret_cast<struct sockaddr *>(&clientAddr), &clientLen);
387 
388         mgmt_log("[LocalManager::pollMgmtProcessServer] New process connecting fd '%d'\n", new_sockfd);
389 
390         if (new_sockfd < 0) {
391           mgmt_elog(errno, "[LocalManager::pollMgmtProcessServer] ==> ");
392         } else if (!processRunning()) {
393           watched_process_fd = new_sockfd;
394         } else {
395           close_socket(new_sockfd);
396         }
397         --num;
398         keep_polling = true;
399       }
400 
401       if (ts::NO_FD != watched_process_fd && FD_ISSET(watched_process_fd, &fdlist)) {
402         int res;
403         MgmtMessageHdr mh_hdr;
404 
405         keep_polling = true;
406 
407         // read the message
408         if ((res = mgmt_read_pipe(watched_process_fd, reinterpret_cast<char *>(&mh_hdr), sizeof(MgmtMessageHdr))) > 0) {
409           MgmtMessageHdr *mh_full = static_cast<MgmtMessageHdr *>(malloc(sizeof(MgmtMessageHdr) + mh_hdr.data_len));
410           memcpy(mh_full, &mh_hdr, sizeof(MgmtMessageHdr));
411           char *data_raw = reinterpret_cast<char *>(mh_full) + sizeof(MgmtMessageHdr);
412           if ((res = mgmt_read_pipe(watched_process_fd, data_raw, mh_hdr.data_len)) > 0) {
413             handleMgmtMsgFromProcesses(mh_full);
414           } else if (res < 0) {
415             mgmt_fatal(0, "[LocalManager::pollMgmtProcessServer] Error in read (errno: %d)\n", -res);
416           }
417           free(mh_full);
418         } else if (res < 0) {
419           mgmt_fatal(0, "[LocalManager::pollMgmtProcessServer] Error in read (errno: %d)\n", -res);
420         }
421 
422         // handle EOF
423         if (res == 0) {
424           int estatus;
425           pid_t tmp_pid = watched_process_pid;
426 
427           Debug("lm", "[LocalManager::pollMgmtProcessServer] Lost process EOF!");
428 
429           close_socket(watched_process_fd);
430 
431           waitpid(watched_process_pid, &estatus, 0); /* Reap child */
432           if (WIFSIGNALED(estatus)) {
433             int sig = WTERMSIG(estatus);
434             mgmt_log("[LocalManager::pollMgmtProcessServer] Server Process terminated due to Sig %d: %s\n", sig, strsignal(sig));
435           } else if (WIFEXITED(estatus)) {
436             int return_code = WEXITSTATUS(estatus);
437 
438             // traffic_server's exit code will be UNRECOVERABLE_EXIT if it calls
439             // ink_emergency() or ink_emergency_va(). The call signals that traffic_server
440             // cannot be recovered with a reboot. In other words, catastrophic failure.
441             if (return_code == UNRECOVERABLE_EXIT) {
442               proxy_recoverable = false;
443             }
444           }
445 
446           if (lmgmt->run_proxy) {
447             mgmt_log("[Alarms::signalAlarm] Server Process was reset\n");
448             lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_PROCESS_DIED, nullptr);
449           } else {
450             mgmt_log("[TrafficManager] Server process shutdown\n");
451           }
452 
453           watched_process_fd = watched_process_pid = -1;
454           if (tmp_pid != -1) { /* Incremented after a pid: message is sent */
455             proxy_running--;
456           }
457           proxy_started_at = -1;
458           RecSetRecordInt("proxy.node.proxy_running", 0, REC_SOURCE_DEFAULT);
459         }
460 
461         --num;
462       }
463 
464 #if HAVE_EVENTFD
465       if (wakeup_fd != ts::NO_FD && FD_ISSET(wakeup_fd, &fdlist)) {
466         if (!keep_polling) {
467           // read or else fd will always be set.
468           uint64_t ignore;
469           ATS_UNUSED_RETURN(read(wakeup_fd, &ignore, sizeof(uint64_t)));
470           return;
471         }
472         --num;
473       }
474 #else
475       (void)keep_polling; // suppress compiler warning
476 #endif
477 
478       ink_assert(num == 0); /* Invariant */
479     }
480   }
481 }
482 
483 void
handleMgmtMsgFromProcesses(MgmtMessageHdr * mh)484 LocalManager::handleMgmtMsgFromProcesses(MgmtMessageHdr *mh)
485 {
486   char *data_raw = reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr);
487   switch (mh->msg_id) {
488   case MGMT_SIGNAL_PID:
489     watched_process_pid = *(reinterpret_cast<pid_t *>(data_raw));
490     lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_PROCESS_BORN, nullptr);
491     proxy_running++;
492     proxy_launch_pid         = -1;
493     proxy_launch_outstanding = false;
494     RecSetRecordInt("proxy.node.proxy_running", 1, REC_SOURCE_DEFAULT);
495     break;
496 
497   // FIX: This is very messy need to correlate mgmt signals and
498   // alarms better
499   case MGMT_SIGNAL_CONFIG_ERROR:
500     alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_CONFIG_ERROR, data_raw);
501     break;
502   case MGMT_SIGNAL_SYSTEM_ERROR:
503     alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_SYSTEM_ERROR, data_raw);
504     break;
505   case MGMT_SIGNAL_CACHE_ERROR:
506     alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_CACHE_ERROR, data_raw);
507     break;
508   case MGMT_SIGNAL_CACHE_WARNING:
509     alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_CACHE_WARNING, data_raw);
510     break;
511   case MGMT_SIGNAL_LOGGING_ERROR:
512     alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_LOGGING_ERROR, data_raw);
513     break;
514   case MGMT_SIGNAL_LOGGING_WARNING:
515     alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_LOGGING_WARNING, data_raw);
516     break;
517   case MGMT_SIGNAL_PLUGIN_SET_CONFIG: {
518     char var_name[256];
519     char var_value[256];
520     MgmtType stype;
521     // stype is an enum type, so cast to an int* to avoid warnings. /leif
522     int tokens = sscanf(data_raw, "%255s %d %255s", var_name, reinterpret_cast<int *>(&stype), var_value);
523     if (tokens != 3) {
524       stype = MGMT_INVALID;
525     }
526     switch (stype) {
527     case MGMT_INT:
528       RecSetRecordInt(var_name, ink_atoi64(var_value), REC_SOURCE_EXPLICIT);
529       break;
530     case MGMT_COUNTER:
531     case MGMT_FLOAT:
532     case MGMT_STRING:
533     case MGMT_INVALID:
534     default:
535       mgmt_log("[LocalManager::handleMgmtMsgFromProcesses] "
536                "Invalid plugin set-config msg '%s'\n",
537                data_raw);
538       break;
539     }
540   } break;
541   case MGMT_SIGNAL_LIBRECORDS:
542     if (mh->data_len > 0) {
543       executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, {data_raw, static_cast<size_t>(mh->data_len)});
544     } else {
545       executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, {});
546     }
547     break;
548   case MGMT_SIGNAL_CONFIG_FILE_CHILD: {
549     static const MgmtMarshallType fields[] = {MGMT_MARSHALL_STRING, MGMT_MARSHALL_STRING};
550     char *parent                           = nullptr;
551     char *child                            = nullptr;
552     if (mgmt_message_parse(data_raw, mh->data_len, fields, countof(fields), &parent, &child) != -1) {
553       configFiles->configFileChild(parent, child);
554     } else {
555       mgmt_log("[LocalManager::handleMgmtMsgFromProcesses] "
556                "MGMT_SIGNAL_CONFIG_FILE_CHILD mgmt_message_parse error\n");
557     }
558     // Output pointers are guaranteed to be NULL or valid.
559     ats_free_null(parent);
560     ats_free_null(child);
561   } break;
562 
563   default:
564     break;
565   }
566 }
567 
568 void
sendMgmtMsgToProcesses(int msg_id,const char * data_str)569 LocalManager::sendMgmtMsgToProcesses(int msg_id, const char *data_str)
570 {
571   sendMgmtMsgToProcesses(msg_id, data_str, strlen(data_str) + 1);
572   return;
573 }
574 
575 void
sendMgmtMsgToProcesses(int msg_id,const char * data_raw,int data_len)576 LocalManager::sendMgmtMsgToProcesses(int msg_id, const char *data_raw, int data_len)
577 {
578   MgmtMessageHdr *mh;
579 
580   mh           = static_cast<MgmtMessageHdr *>(alloca(sizeof(MgmtMessageHdr) + data_len));
581   mh->msg_id   = msg_id;
582   mh->data_len = data_len;
583   memcpy(reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr), data_raw, data_len);
584   sendMgmtMsgToProcesses(mh);
585   return;
586 }
587 
588 void
sendMgmtMsgToProcesses(MgmtMessageHdr * mh)589 LocalManager::sendMgmtMsgToProcesses(MgmtMessageHdr *mh)
590 {
591   switch (mh->msg_id) {
592   case MGMT_EVENT_SHUTDOWN: {
593     run_proxy = false;
594     this->closeProxyPorts();
595     break;
596   }
597   case MGMT_EVENT_RESTART:
598     run_proxy = true;
599     listenForProxy();
600     return;
601   case MGMT_EVENT_BOUNCE: /* Just bouncing the cluster, have it exit well restart */
602     mh->msg_id = MGMT_EVENT_SHUTDOWN;
603     break;
604   case MGMT_EVENT_ROLL_LOG_FILES:
605     mgmt_log("[LocalManager::SendMgmtMsgsToProcesses]Event is being constructed .\n");
606     break;
607   case MGMT_EVENT_CONFIG_FILE_UPDATE:
608     bool found;
609     char *fname = nullptr;
610     ConfigManager *rb;
611     char *data_raw;
612 
613     data_raw = reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr);
614     fname    = REC_readString(data_raw, &found);
615 
616     RecT rec_type;
617     if (RecGetRecordType(data_raw, &rec_type) == REC_ERR_OKAY && rec_type == RECT_CONFIG) {
618       RecSetSyncRequired(data_raw);
619     } else {
620       mgmt_log("[LocalManager:sendMgmtMsgToProcesses] Unknown file change: '%s'\n", data_raw);
621     }
622     ink_assert(found);
623     if (!(fname && configFiles && configFiles->getConfigObj(fname, &rb)) &&
624         (strcmp(data_raw, "proxy.config.body_factory.template_sets_dir") != 0) &&
625         (strcmp(data_raw, "proxy.config.ssl.server.ticket_key.filename") != 0)) {
626       mgmt_fatal(0, "[LocalManager::sendMgmtMsgToProcesses] "
627                     "Invalid 'data_raw' for MGMT_EVENT_CONFIG_FILE_UPDATE\n");
628     }
629     ats_free(fname);
630     break;
631   }
632 
633   if (watched_process_fd != -1) {
634     if (mgmt_write_pipe(watched_process_fd, reinterpret_cast<char *>(mh), sizeof(MgmtMessageHdr) + mh->data_len) <= 0) {
635       // In case of Linux, sometimes when the TS dies, the connection between TS and TM
636       // is not closed properly. the socket does not receive an EOF. So, the TM does
637       // not detect that the connection and hence TS has gone down. Hence it still
638       // tries to send a message to TS, but encounters an error and enters here
639       // Also, ensure that this whole thing is done only once because there will be a
640       // deluge of message in the traffic.log otherwise
641 
642       static pid_t check_prev_pid    = watched_process_pid;
643       static pid_t check_current_pid = watched_process_pid;
644       if (check_prev_pid != watched_process_pid) {
645         check_prev_pid    = watched_process_pid;
646         check_current_pid = watched_process_pid;
647       }
648 
649       if (check_prev_pid == check_current_pid) {
650         check_current_pid = -1;
651         int lerrno        = errno;
652         mgmt_elog(errno, "[LocalManager::sendMgmtMsgToProcesses] Error writing message\n");
653         if (lerrno == ECONNRESET || lerrno == EPIPE) { // Connection closed by peer or Broken pipe
654           if ((kill(watched_process_pid, 0) < 0) && (errno == ESRCH)) {
655             // TS is down
656             pid_t tmp_pid = watched_process_pid;
657             close_socket(watched_process_fd);
658             mgmt_log("[LocalManager::pollMgmtProcessServer] "
659                      "Server Process has been terminated\n");
660             if (lmgmt->run_proxy) {
661               mgmt_log("[Alarms::signalAlarm] Server Process was reset\n");
662               lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_PROCESS_DIED, nullptr);
663             } else {
664               mgmt_log("[TrafficManager] Server process shutdown\n");
665             }
666             watched_process_fd = watched_process_pid = -1;
667             if (tmp_pid != -1) { /* Incremented after a pid: message is sent */
668               proxy_running--;
669             }
670             proxy_started_at = -1;
671             RecSetRecordInt("proxy.node.proxy_running", 0, REC_SOURCE_DEFAULT);
672             // End of TS down
673           } else {
674             // TS is still up, but the connection is lost
675             const char *err_msg = "The TS-TM connection is broken for some reason. Either restart TS and TM or correct this error "
676                                   "for TM to display TS statistics correctly";
677             lmgmt->alarm_keeper->signalAlarm(MGMT_ALARM_PROXY_SYSTEM_ERROR, err_msg);
678           }
679 
680           // check if the TS is down, by checking the pid
681           // if TS is down, then,
682           //     raise an alarm
683           //     set the variables so that TS is restarted later
684           // else (TS is still up)
685           //     raise an alarm stating the problem
686         }
687       }
688     }
689   }
690 }
691 
692 void
signalFileChange(const char * var_name)693 LocalManager::signalFileChange(const char *var_name)
694 {
695   signalEvent(MGMT_EVENT_CONFIG_FILE_UPDATE, var_name);
696 
697   return;
698 }
699 
700 void
signalEvent(int msg_id,const char * data_str)701 LocalManager::signalEvent(int msg_id, const char *data_str)
702 {
703   signalEvent(msg_id, data_str, strlen(data_str) + 1);
704   return;
705 }
706 
707 void
signalEvent(int msg_id,const char * data_raw,int data_len)708 LocalManager::signalEvent(int msg_id, const char *data_raw, int data_len)
709 {
710   MgmtMessageHdr *mh;
711   size_t n = sizeof(MgmtMessageHdr) + data_len;
712 
713   mh           = static_cast<MgmtMessageHdr *>(ats_malloc(n));
714   mh->msg_id   = msg_id;
715   mh->data_len = data_len;
716   auto payload = mh->payload();
717   memcpy(payload.data(), data_raw, data_len);
718   this->enqueue(mh);
719   //  ink_assert(enqueue(mgmt_event_queue, mh));
720 
721 #if HAVE_EVENTFD
722   // we don't care about the actual value of wakeup_fd, so just keep adding 1. just need to
723   // wakeup the fd. also, note that wakeup_fd was initialized to non-blocking so we can
724   // directly write to it without any timeout checking.
725   //
726   // don't trigger if MGMT_EVENT_LIBRECORD because they happen all the time
727   // and don't require a quick response. for MGMT_EVENT_LIBRECORD, rely on timeouts so
728   // traffic_server can spend more time doing other things
729   uint64_t one = 1;
730   if (wakeup_fd != ts::NO_FD && mh->msg_id != MGMT_EVENT_LIBRECORDS) {
731     ATS_UNUSED_RETURN(write(wakeup_fd, &one, sizeof(uint64_t))); // trigger to stop polling
732   }
733 #endif
734 }
735 
736 /*
737  * processEventQueue()
738  *   Function drains and processes the mgmt event queue
739  * notifying any registered callback functions and performing
740  * any mgmt tasks for each event.
741  */
742 void
processEventQueue()743 LocalManager::processEventQueue()
744 {
745   while (!this->queue_empty()) {
746     bool handled_by_mgmt = false;
747 
748     MgmtMessageHdr *mh = this->dequeue();
749     auto payload       = mh->payload().rebind<char>();
750 
751     // check if we have a local file update
752     if (mh->msg_id == MGMT_EVENT_CONFIG_FILE_UPDATE) {
753       // records.config
754       if (!(strcmp(payload.begin(), ts::filename::RECORDS))) {
755         if (RecReadConfigFile() != REC_ERR_OKAY) {
756           mgmt_elog(errno, "[fileUpdated] Config update failed for %s\n", ts::filename::RECORDS);
757         } else {
758           RecConfigWarnIfUnregistered();
759         }
760         handled_by_mgmt = true;
761       }
762     }
763 
764     if (!handled_by_mgmt) {
765       if (processRunning() == false) {
766         // Fix INKqa04984
767         // If traffic server hasn't completely come up yet,
768         // we will hold off until next round.
769         this->enqueue(mh);
770         return;
771       }
772       Debug("lm", "[TrafficManager] ==> Sending signal event '%d' %s payload=%d", mh->msg_id, payload.begin(), int(payload.size()));
773       lmgmt->sendMgmtMsgToProcesses(mh);
774     }
775     ats_free(mh);
776   }
777 }
778 
779 /*
780  * startProxy()
781  *   Function fires up a proxy process.
782  *
783  * Args:
784  *   onetime_options: one time options that traffic_server should be started with (ie
785  *                    these options do not persist across reboots)
786  */
787 static const size_t OPTIONS_SIZE = 16384; // Arbitrary max size for command line option string
788 
789 bool
startProxy(const char * onetime_options)790 LocalManager::startProxy(const char *onetime_options)
791 {
792   if (proxy_launch_outstanding) {
793     return false;
794   }
795   mgmt_log("[LocalManager::startProxy] Launching ts process\n");
796 
797   pid_t pid;
798 
799   // Before we do anything lets check for the existence of
800   // the traffic server binary along with it's execute permissions
801   if (access(absolute_proxy_binary, F_OK) < 0) {
802     // Error can't find traffic_server
803     mgmt_elog(errno, "[LocalManager::startProxy] Unable to find traffic server at %s\n", absolute_proxy_binary);
804     return false;
805   }
806   // traffic server binary exists, check permissions
807   else if (access(absolute_proxy_binary, R_OK | X_OK) < 0) {
808     // Error don't have proper permissions
809     mgmt_elog(errno, "[LocalManager::startProxy] Unable to access %s due to bad permissions \n", absolute_proxy_binary);
810     return false;
811   }
812 
813   if (env_prep) {
814 #ifdef POSIX_THREAD
815     if ((pid = fork()) < 0)
816 #else
817     if ((pid = fork1()) < 0)
818 #endif
819     {
820       mgmt_elog(errno, "[LocalManager::startProxy] Unable to fork1 prep process\n");
821       return false;
822     } else if (pid > 0) {
823       int estatus;
824       waitpid(pid, &estatus, 0);
825     } else {
826       int res;
827 
828       char env_prep_bin[MAXPATHLEN];
829       std::string bindir(RecConfigReadBinDir());
830 
831       ink_filepath_make(env_prep_bin, sizeof(env_prep_bin), bindir.c_str(), env_prep);
832       res = execl(env_prep_bin, env_prep_bin, (char *)nullptr);
833       _exit(res);
834     }
835   }
836 #ifdef POSIX_THREAD
837   if ((pid = fork()) < 0)
838 #else
839   if ((pid = fork1()) < 0)
840 #endif
841   {
842     mgmt_elog(errno, "[LocalManager::startProxy] Unable to fork1 process\n");
843     return false;
844   } else if (pid > 0) { /* Parent */
845     proxy_launch_pid         = pid;
846     proxy_launch_outstanding = true;
847     proxy_started_at         = time(nullptr);
848     ++proxy_launch_count;
849     RecSetRecordInt("proxy.node.restarts.proxy.start_time", proxy_started_at, REC_SOURCE_DEFAULT);
850     RecSetRecordInt("proxy.node.restarts.proxy.restart_count", proxy_launch_count, REC_SOURCE_DEFAULT);
851   } else {
852     int i = 0;
853     char *options[32], *last, *tok;
854     char options_buffer[OPTIONS_SIZE];
855     ts::FixedBufferWriter w{options_buffer, OPTIONS_SIZE};
856 
857     w.clip(1);
858     w.print("{}{}", ts::bwf::OptionalAffix(proxy_options), ts::bwf::OptionalAffix(onetime_options));
859 
860     // Make sure we're starting the proxy in mgmt mode
861     if (w.view().find(MGMT_OPT) == std::string_view::npos) {
862       w.write(MGMT_OPT);
863       w.write(' ');
864     }
865 
866     // pass the runroot option to traffic_server
867     std::string_view runroot_arg = get_runroot();
868     if (!runroot_arg.empty()) {
869       w.write(RUNROOT_OPT);
870       w.write(runroot_arg);
871       w.write(' ');
872     }
873 
874     // Pass down port/fd information to traffic_server if there are any open ports.
875     if (std::any_of(m_proxy_ports.begin(), m_proxy_ports.end(), [](HttpProxyPort &p) { return ts::NO_FD != p.m_fd; })) {
876       char portbuf[128];
877       bool need_comma_p = false;
878 
879       w.write("--httpport "sv);
880       for (auto &p : m_proxy_ports) {
881         if (ts::NO_FD != p.m_fd) {
882           if (need_comma_p) {
883             w.write(',');
884           }
885           need_comma_p = true;
886           p.print(portbuf, sizeof(portbuf));
887           w.write(portbuf);
888         }
889       }
890     }
891 
892     w.extend(1);
893     w.write('\0'); // null terminate.
894 
895     Debug("lm", "[LocalManager::startProxy] Launching %s '%s'", absolute_proxy_binary, w.data());
896 
897     // Unfortunately the normally obnoxious null writing of strtok is in this case a required
898     // side effect and other alternatives are noticeably more clunky.
899     ink_zero(options);
900     options[0] = absolute_proxy_binary;
901     i          = 1;
902     tok        = strtok_r(options_buffer, " ", &last);
903     Debug("lm", "opt %d = '%s'", i, tok);
904     options[i++] = tok;
905     while (i < 32 && (tok = strtok_r(nullptr, " ", &last))) {
906       Debug("lm", "opt %d = '%s'", i, tok);
907       options[i++] = tok;
908     }
909 
910     EnableDeathSignal(SIGTERM);
911 
912     execv(absolute_proxy_binary, options);
913     mgmt_fatal(errno, "[LocalManager::startProxy] Exec of %s failed\n", absolute_proxy_binary);
914   }
915   return true;
916 }
917 
918 /** Close all open ports.
919  */
920 void
closeProxyPorts()921 LocalManager::closeProxyPorts()
922 {
923   for (auto &p : lmgmt->m_proxy_ports) {
924     if (ts::NO_FD != p.m_fd) {
925       close_socket(p.m_fd);
926       p.m_fd = ts::NO_FD;
927     }
928   }
929 }
930 /*
931  * listenForProxy()
932  *  Function listens on the accept port of the proxy, so users aren't dropped.
933  */
934 void
listenForProxy()935 LocalManager::listenForProxy()
936 {
937   if (!run_proxy || !listen_for_proxy) {
938     return;
939   }
940 
941   // We are not already bound, bind the port
942   for (auto &p : lmgmt->m_proxy_ports) {
943     if (ts::NO_FD == p.m_fd) {
944       // Check the protocol (TCP or UDP) and create an appropriate socket
945       if (p.isQUIC()) {
946         this->bindUdpProxyPort(p);
947       } else {
948         this->bindTcpProxyPort(p);
949       }
950     }
951 
952     std::string_view fam{ats_ip_family_name(p.m_family)};
953     if (p.isQUIC()) {
954       // Can we do something like listen backlog for QUIC(UDP) ??
955       // Do nothing for now
956     } else {
957       // read backlog configuration value and overwrite the default value if found
958       bool found;
959       RecInt backlog = REC_readInteger("proxy.config.net.listen_backlog", &found);
960       backlog        = (found && backlog >= 0) ? backlog : ats_tcp_somaxconn();
961 
962       if ((listen(p.m_fd, backlog)) < 0) {
963         mgmt_fatal(errno, "[LocalManager::listenForProxy] Unable to listen on port: %d (%.*s)\n", p.m_port, fam.size(), fam.data());
964       }
965     }
966 
967     mgmt_log("[LocalManager::listenForProxy] Listening on port: %d (%.*s)\n", p.m_port, fam.size(), fam.data());
968   }
969   return;
970 }
971 
972 /*
973  * bindUdpProxyPort()
974  *  Function binds the accept port of the proxy
975  */
976 void
bindUdpProxyPort(HttpProxyPort & port)977 LocalManager::bindUdpProxyPort(HttpProxyPort &port)
978 {
979   int one  = 1;
980   int priv = (port.m_port < 1024 && 0 != geteuid()) ? ElevateAccess::LOW_PORT_PRIVILEGE : 0;
981 
982   ElevateAccess access(priv);
983 
984   if ((port.m_fd = socket(port.m_family, SOCK_DGRAM, 0)) < 0) {
985     mgmt_fatal(0, "[bindProxyPort] Unable to create socket : %s\n", strerror(errno));
986   }
987 
988   if (port.m_family == AF_INET6) {
989     if (setsockopt(port.m_fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
990       mgmt_log("[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
991     }
992   }
993   if (setsockopt(port.m_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&one), sizeof(int)) < 0) {
994     mgmt_fatal(0, "[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
995   }
996 
997   IpEndpoint ip;
998   if (port.m_inbound_ip.isValid()) {
999     ip.assign(port.m_inbound_ip);
1000   } else if (AF_INET6 == port.m_family) {
1001     if (m_inbound_ip6.isValid()) {
1002       ip.assign(m_inbound_ip6);
1003     } else {
1004       ip.setToAnyAddr(AF_INET6);
1005     }
1006   } else if (AF_INET == port.m_family) {
1007     if (m_inbound_ip4.isValid()) {
1008       ip.assign(m_inbound_ip4);
1009     } else {
1010       ip.setToAnyAddr(AF_INET);
1011     }
1012   } else {
1013     mgmt_fatal(0, "[bindProxyPort] Proxy port with invalid address type %d\n", port.m_family);
1014   }
1015   ip.port() = htons(port.m_port);
1016   if (bind(port.m_fd, &ip.sa, ats_ip_size(&ip)) < 0) {
1017     mgmt_fatal(0, "[bindProxyPort] Unable to bind socket: %d : %s\n", port.m_port, strerror(errno));
1018   }
1019 
1020   Debug("lm", "[bindProxyPort] Successfully bound proxy port %d", port.m_port);
1021 }
1022 
1023 /*
1024  * bindTcpProxyPort()
1025  *  Function binds the accept port of the proxy
1026  */
1027 void
bindTcpProxyPort(HttpProxyPort & port)1028 LocalManager::bindTcpProxyPort(HttpProxyPort &port)
1029 {
1030   int one  = 1;
1031   int priv = (port.m_port < 1024 && 0 != geteuid()) ? ElevateAccess::LOW_PORT_PRIVILEGE : 0;
1032 
1033   ElevateAccess access(priv);
1034 
1035   /* Setup reliable connection, for large config changes */
1036   if ((port.m_fd = socket(port.m_family, SOCK_STREAM, 0)) < 0) {
1037     mgmt_fatal(0, "[bindProxyPort] Unable to create socket : %s\n", strerror(errno));
1038   }
1039 
1040   if (port.m_type == HttpProxyPort::TRANSPORT_DEFAULT) {
1041     int should_filter_int = 0;
1042     bool found;
1043     should_filter_int = REC_readInteger("proxy.config.net.defer_accept", &found);
1044     if (found && should_filter_int > 0) {
1045 #if defined(SOL_FILTER) && defined(FIL_ATTACH)
1046       (void)setsockopt(port.m_fd, SOL_FILTER, FIL_ATTACH, "httpfilt", 9);
1047 #endif
1048     }
1049   }
1050 
1051   if (port.m_mptcp) {
1052 #if MPTCP_ENABLED
1053     int err;
1054 
1055     err = setsockopt(port.m_fd, IPPROTO_TCP, MPTCP_ENABLED, &one, sizeof(one));
1056     if (err < 0) {
1057       mgmt_log("[bindProxyPort] Unable to enable MPTCP: %s\n", strerror(errno));
1058       Debug("lm_mptcp", "[bindProxyPort] Unable to enable MPTCP: %s", strerror(errno));
1059     } else {
1060       mgmt_log("[bindProxyPort] Successfully enabled MPTCP on %d\n", port.m_port);
1061       Debug("lm_mptcp", "[bindProxyPort] Successfully enabled MPTCP on %d\n", port.m_port);
1062     }
1063 #else
1064     Debug("lm_mptcp", "[bindProxyPort] Multipath TCP requested but not configured on this host");
1065 #endif
1066   }
1067 
1068   if (port.m_family == AF_INET6) {
1069     if (setsockopt(port.m_fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
1070       mgmt_log("[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
1071     }
1072   }
1073   if (setsockopt(port.m_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&one), sizeof(int)) < 0) {
1074     mgmt_fatal(0, "[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
1075   }
1076 
1077   if (port.m_proxy_protocol) {
1078     Debug("lm", "[bindProxyPort] Proxy Protocol enabled");
1079   }
1080 
1081   if (port.m_inbound_transparent_p) {
1082 #if TS_USE_TPROXY
1083     Debug("http_tproxy", "Listen port %d inbound transparency enabled.", port.m_port);
1084     if (setsockopt(port.m_fd, SOL_IP, TS_IP_TRANSPARENT, &one, sizeof(one)) == -1) {
1085       mgmt_fatal(0, "[bindProxyPort] Unable to set transparent socket option [%d] %s\n", errno, strerror(errno));
1086     }
1087 #else
1088     Debug("lm", "[bindProxyPort] Transparency requested but TPROXY not configured");
1089 #endif
1090   }
1091 
1092   IpEndpoint ip;
1093   if (port.m_inbound_ip.isValid()) {
1094     ip.assign(port.m_inbound_ip);
1095   } else if (AF_INET6 == port.m_family) {
1096     if (m_inbound_ip6.isValid()) {
1097       ip.assign(m_inbound_ip6);
1098     } else {
1099       ip.setToAnyAddr(AF_INET6);
1100     }
1101   } else if (AF_INET == port.m_family) {
1102     if (m_inbound_ip4.isValid()) {
1103       ip.assign(m_inbound_ip4);
1104     } else {
1105       ip.setToAnyAddr(AF_INET);
1106     }
1107   } else {
1108     mgmt_fatal(0, "[bindProxyPort] Proxy port with invalid address type %d\n", port.m_family);
1109   }
1110   ip.port() = htons(port.m_port);
1111   if (bind(port.m_fd, &ip.sa, ats_ip_size(&ip)) < 0) {
1112     mgmt_fatal(0, "[bindProxyPort] Unable to bind socket: %d : %s\n", port.m_port, strerror(errno));
1113   }
1114 
1115   Debug("lm", "[bindProxyPort] Successfully bound proxy port %d", port.m_port);
1116 }
1117 
1118 void
signalAlarm(int alarm_id,const char * desc,const char * ip)1119 LocalManager::signalAlarm(int alarm_id, const char *desc, const char *ip)
1120 {
1121   if (alarm_keeper) {
1122     alarm_keeper->signalAlarm(static_cast<alarm_t>(alarm_id), desc, ip);
1123   }
1124 }
1125