1 /** @file
2 
3   File contains the member function defs and thread loop for the process manager.
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "InkAPIInternal.h"
25 #include "ProcessManager.h"
26 
27 #include "tscore/ink_apidefs.h"
28 #include "tscore/TSSystemState.h"
29 #include "MgmtSocket.h"
30 #include "tscore/I_Layout.h"
31 
32 /*
33  * Global ProcessManager
34  */
35 inkcoreapi ProcessManager *pmgmt = nullptr;
36 
37 // read_management_message attempts to read a message from the management
38 // socket. Returns -errno on error, otherwise 0. If a message was read the
39 // *msg pointer will be filled in with the message that was read.
40 static int
read_management_message(int sockfd,MgmtMessageHdr ** msg)41 read_management_message(int sockfd, MgmtMessageHdr **msg)
42 {
43   MgmtMessageHdr hdr;
44   int ret;
45 
46   *msg = nullptr;
47 
48   // We have a message, try to read the message header.
49   ret = mgmt_read_pipe(sockfd, reinterpret_cast<char *>(&hdr), sizeof(MgmtMessageHdr));
50   switch (ret) {
51   case 0:
52     // Received EOF.
53     return 0;
54   case sizeof(MgmtMessageHdr):
55     break;
56   default:
57     // Received -errno.
58     return -errno;
59   }
60 
61   size_t msg_size          = sizeof(MgmtMessageHdr) + hdr.data_len;
62   MgmtMessageHdr *full_msg = static_cast<MgmtMessageHdr *>(ats_malloc(msg_size));
63 
64   memcpy(full_msg, &hdr, sizeof(MgmtMessageHdr));
65   char *data_raw = reinterpret_cast<char *>(full_msg) + sizeof(MgmtMessageHdr);
66 
67   ret = mgmt_read_pipe(sockfd, data_raw, hdr.data_len);
68   if (ret == 0) {
69     // Received EOF.
70     ats_free(full_msg);
71     return 0;
72   } else if (ret < 0) {
73     // Received -errno.
74     ats_free(full_msg);
75     return ret;
76   } else {
77     ink_release_assert(ret == hdr.data_len);
78     // Received the message.
79     *msg = full_msg;
80     return 0;
81   }
82 }
83 
84 void
start(std::function<TSThread ()> const & cb_init,std::function<void (TSThread)> const & cb_destroy)85 ProcessManager::start(std::function<TSThread()> const &cb_init, std::function<void(TSThread)> const &cb_destroy)
86 {
87   Debug("pmgmt", "starting process manager");
88 
89   init    = cb_init;
90   destroy = cb_destroy;
91 
92   ink_release_assert(running == 0);
93   ink_atomic_increment(&running, 1);
94   ink_thread_create(&poll_thread, processManagerThread, nullptr, 0, 0, nullptr);
95 }
96 
97 void
stop()98 ProcessManager::stop()
99 {
100   Debug("pmgmt", "stopping process manager");
101 
102   ink_release_assert(running == 1);
103   ink_atomic_decrement(&running, 1);
104 
105   int tmp;
106 
107   if (local_manager_sockfd != ts::NO_FD) {
108     tmp                  = local_manager_sockfd;
109     local_manager_sockfd = ts::NO_FD;
110     close_socket(tmp);
111   }
112 
113 #if HAVE_EVENTFD
114   if (wakeup_fd != ts::NO_FD) {
115     tmp       = wakeup_fd;
116     wakeup_fd = ts::NO_FD;
117     close_socket(tmp);
118   }
119 #endif
120 
121   ink_thread_kill(poll_thread, SIGINT);
122 
123   ink_thread_join(poll_thread);
124   poll_thread = ink_thread_null();
125 
126   while (!queue_is_empty(mgmt_signal_queue)) {
127     char *sig = static_cast<char *>(::dequeue(mgmt_signal_queue));
128     ats_free(sig);
129   }
130 
131   LLQ *tmp_queue    = mgmt_signal_queue;
132   mgmt_signal_queue = nullptr;
133   delete_queue(tmp_queue);
134 }
135 
136 /*
137  * processManagerThread(...)
138  *   The start function and thread loop for the process manager.
139  */
140 void *
processManagerThread(void * arg)141 ProcessManager::processManagerThread(void *arg)
142 {
143   void *ret = arg;
144 
145   while (!pmgmt) { /* Avert race condition, thread spun during constructor */
146     Debug("pmgmt", "waiting for initialization");
147     mgmt_sleep_sec(1);
148   }
149 
150   if (pmgmt->require_lm) { /* Allow p. process to run w/o a lm */
151     pmgmt->initLMConnection();
152   } else {
153     return ret;
154   }
155 
156   if (pmgmt->init) {
157     pmgmt->managerThread = pmgmt->init();
158   }
159 
160   // Start pumping messages between the local process and the process
161   // manager. This will terminate when the process manager terminates
162   // or the local process calls stop(). In either case, it is likely
163   // that we will first notice because we got a socket error, but in
164   // the latter case, the `running` flag has already been toggled so
165   // we know that we are really doing a shutdown.
166   while (pmgmt->running) {
167     int ret;
168 
169     if (pmgmt->require_lm) {
170       ret = pmgmt->pollLMConnection();
171       if (ret < 0 && pmgmt->running && !TSSystemState::is_event_system_shut_down()) {
172         Alert("exiting with read error from process manager: %s", strerror(-ret));
173       }
174     }
175 
176     ret = pmgmt->processSignalQueue();
177     if (ret < 0 && pmgmt->running && !TSSystemState::is_event_system_shut_down()) {
178       Alert("exiting with write error from process manager: %s", strerror(-ret));
179     }
180   }
181 
182   if (pmgmt->destroy && pmgmt->managerThread != nullptr) {
183     pmgmt->destroy(pmgmt->managerThread);
184     pmgmt->managerThread = nullptr;
185   }
186 
187   return ret;
188 }
189 
ProcessManager(bool rlm)190 ProcessManager::ProcessManager(bool rlm)
191   : BaseManager(), require_lm(rlm), pid(getpid()), local_manager_sockfd(0), cbtable(nullptr), max_msgs_in_a_row(1)
192 {
193   mgmt_signal_queue = create_queue();
194 
195   local_manager_sockfd = ts::NO_FD;
196 #if HAVE_EVENTFD
197   wakeup_fd = ts::NO_FD;
198 #endif
199 
200   // Set temp. process/manager timeout. Will be reconfigure later.
201   // Making the process_manager thread a spinning thread to start traffic server
202   // as quickly as possible. Will reset this timeout when reconfigure()
203   timeout = 0;
204 }
205 
~ProcessManager()206 ProcessManager::~ProcessManager()
207 {
208   if (running) {
209     stop();
210   }
211 }
212 
213 void
reconfigure()214 ProcessManager::reconfigure()
215 {
216   max_msgs_in_a_row = MAX_MSGS_IN_A_ROW;
217 
218   if (RecGetRecordInt("proxy.config.process_manager.timeout", &timeout) != REC_ERR_OKAY) {
219     // Default to 5sec if the timeout is unspecified.
220     timeout = 5;
221   }
222 }
223 
224 void
signalConfigFileChild(const char * parent,const char * child)225 ProcessManager::signalConfigFileChild(const char *parent, const char *child)
226 {
227   static const MgmtMarshallType fields[] = {MGMT_MARSHALL_STRING, MGMT_MARSHALL_STRING};
228 
229   size_t len   = mgmt_message_length(fields, countof(fields), &parent, &child);
230   void *buffer = ats_malloc(len);
231 
232   mgmt_message_marshall(buffer, len, fields, countof(fields), &parent, &child);
233   signalManager(MGMT_SIGNAL_CONFIG_FILE_CHILD, static_cast<const char *>(buffer), len);
234 
235   ats_free(buffer);
236 }
237 
238 void
signalManager(int msg_id,const char * data_str)239 ProcessManager::signalManager(int msg_id, const char *data_str)
240 {
241   signalManager(msg_id, data_str, strlen(data_str) + 1);
242 }
243 
244 void
signalManager(int msg_id,const char * data_raw,int data_len)245 ProcessManager::signalManager(int msg_id, const char *data_raw, int data_len)
246 {
247   MgmtMessageHdr *mh;
248 
249   mh           = static_cast<MgmtMessageHdr *>(ats_malloc(sizeof(MgmtMessageHdr) + data_len));
250   mh->msg_id   = msg_id;
251   mh->data_len = data_len;
252   memcpy(reinterpret_cast<char *>(mh) + sizeof(MgmtMessageHdr), data_raw, data_len);
253   this->signalManager(mh);
254 }
255 
256 void
signalManager(int msg_id,std::string_view text)257 ProcessManager::signalManager(int msg_id, std::string_view text)
258 {
259   MgmtMessageHdr *mh;
260 
261   // Make space for the extra null terminator.
262   mh           = static_cast<MgmtMessageHdr *>(ats_malloc(sizeof(MgmtMessageHdr) + text.size() + 1));
263   auto body    = reinterpret_cast<char *>(mh + 1); // start of the message body.
264   mh->msg_id   = msg_id;
265   mh->data_len = text.size() + 1;
266   memcpy(body, text.data(), text.size());
267   body[text.size()] = '\0';
268 
269   this->signalManager(mh);
270 }
271 
272 void
signalManager(MgmtMessageHdr * mh)273 ProcessManager::signalManager(MgmtMessageHdr *mh)
274 {
275   if (!this->running) {
276     Warning("MgmtMessageHdr is ignored. Because ProcessManager is not running");
277     return;
278   }
279   ink_release_assert(::enqueue(mgmt_signal_queue, mh));
280 
281 #if HAVE_EVENTFD
282   // we don't care about the actual value of wakeup_fd, so just keep adding 1. just need to
283   // wakeup the fd. also, note that wakeup_fd was initialized to non-blocking so we can
284   // directly write to it without any timeout checking.
285   //
286   // don't trigger if MGMT_EVENT_LIBRECORD because they happen all the time
287   // and don't require a quick response. for MGMT_EVENT_LIBRECORD, rely on timeouts so
288   // traffic_server can spend more time doing other things/
289   uint64_t one = 1;
290   if (wakeup_fd != ts::NO_FD && mh->msg_id != MGMT_SIGNAL_LIBRECORDS) {
291     ATS_UNUSED_RETURN(write(wakeup_fd, &one, sizeof(uint64_t))); // trigger to stop polling
292   }
293 #endif
294 }
295 
296 int
processSignalQueue()297 ProcessManager::processSignalQueue()
298 {
299   while (!queue_is_empty(mgmt_signal_queue)) {
300     MgmtMessageHdr *mh = static_cast<MgmtMessageHdr *>(::dequeue(mgmt_signal_queue));
301 
302     Debug("pmgmt", "signaling local manager with message ID %d", mh->msg_id);
303 
304     if (require_lm) {
305       int ret = mgmt_write_pipe(local_manager_sockfd, reinterpret_cast<char *>(mh), sizeof(MgmtMessageHdr) + mh->data_len);
306       ats_free(mh);
307 
308       if (ret < 0) {
309         return ret;
310       }
311     }
312   }
313 
314   return 0;
315 }
316 
317 void
initLMConnection()318 ProcessManager::initLMConnection()
319 {
320   std::string rundir(RecConfigReadRuntimeDir());
321   std::string sockpath(Layout::relative_to(rundir, LM_CONNECTION_SERVER));
322 
323   MgmtMessageHdr *mh_full;
324   int data_len;
325 
326   int servlen;
327   struct sockaddr_un serv_addr;
328 
329   if (sockpath.length() > sizeof(serv_addr.sun_path) - 1) {
330     errno = ENAMETOOLONG;
331     Fatal("Unable to create socket '%s': %s", sockpath.c_str(), strerror(errno));
332   }
333 
334   /* Setup Connection to LocalManager */
335   memset(reinterpret_cast<char *>(&serv_addr), 0, sizeof(serv_addr));
336   serv_addr.sun_family = AF_UNIX;
337 
338   ink_strlcpy(serv_addr.sun_path, sockpath.c_str(), sizeof(serv_addr.sun_path));
339 #if defined(darwin) || defined(freebsd)
340   servlen = sizeof(sockaddr_un);
341 #else
342   servlen = strlen(serv_addr.sun_path) + sizeof(serv_addr.sun_family);
343 #endif
344 
345   if ((local_manager_sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
346     Fatal("Unable to create socket '%s': %s", sockpath.c_str(), strerror(errno));
347   }
348 
349   if (fcntl(local_manager_sockfd, F_SETFD, FD_CLOEXEC) < 0) {
350     Fatal("unable to set close-on-exec flag: %s", strerror(errno));
351   }
352 
353   if ((connect(local_manager_sockfd, reinterpret_cast<struct sockaddr *>(&serv_addr), servlen)) < 0) {
354     Fatal("failed to connect management socket '%s': %s", sockpath.c_str(), strerror(errno));
355   }
356 
357 #if HAVE_EVENTFD
358   wakeup_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
359   if (wakeup_fd < 0) {
360     Fatal("unable to create wakeup eventfd. errno: %s", strerror(errno));
361   }
362 #endif
363 
364   data_len          = sizeof(pid_t);
365   mh_full           = static_cast<MgmtMessageHdr *>(alloca(sizeof(MgmtMessageHdr) + data_len));
366   mh_full->msg_id   = MGMT_SIGNAL_PID;
367   mh_full->data_len = data_len;
368 
369   memcpy(reinterpret_cast<char *>(mh_full) + sizeof(MgmtMessageHdr), &(pid), data_len);
370 
371   if (mgmt_write_pipe(local_manager_sockfd, reinterpret_cast<char *>(mh_full), sizeof(MgmtMessageHdr) + data_len) <= 0) {
372     Fatal("error writing message: %s", strerror(errno));
373   }
374 }
375 
376 int
pollLMConnection()377 ProcessManager::pollLMConnection()
378 {
379   int count;
380   int ready;
381   struct timeval timeout;
382   fd_set fdlist;
383 
384   // Avoid getting stuck enqueuing too many requests in a row, limit to MAX_MSGS_IN_A_ROW.
385   for (count = 0; running && count < max_msgs_in_a_row; ++count) {
386     timeout.tv_sec  = 1;
387     timeout.tv_usec = 0;
388 
389     FD_ZERO(&fdlist);
390 
391     if (local_manager_sockfd != ts::NO_FD) {
392       FD_SET(local_manager_sockfd, &fdlist);
393     }
394 
395 #if HAVE_EVENTFD
396     if (wakeup_fd != ts::NO_FD) {
397       FD_SET(wakeup_fd, &fdlist);
398     }
399 #endif
400 
401     // wait for data on socket
402     ready = mgmt_select(FD_SETSIZE, &fdlist, nullptr, nullptr, &timeout);
403 
404     switch (ready) {
405     case 0:
406       // Timed out.
407       return 0;
408     case -1:
409       if (mgmt_transient_error()) {
410         continue;
411       }
412       return -errno;
413     }
414 
415     if (local_manager_sockfd != ts::NO_FD && FD_ISSET(local_manager_sockfd, &fdlist)) { /* Message from manager */
416       MgmtMessageHdr *msg;
417 
418       int ret = read_management_message(local_manager_sockfd, &msg);
419       if (ret < 0) {
420         return ret;
421       }
422 
423       // No message, we are done polling. */
424       if (msg == nullptr) {
425         return 0;
426       }
427 
428       Debug("pmgmt", "received message ID %d", msg->msg_id);
429       handleMgmtMsgFromLM(msg);
430     }
431 #if HAVE_EVENTFD
432     else if (wakeup_fd != ts::NO_FD && FD_ISSET(wakeup_fd, &fdlist)) { /* if msg, keep polling for more */
433       // read or else fd will always be set.
434       uint64_t ignore;
435       ATS_UNUSED_RETURN(read(wakeup_fd, &ignore, sizeof(uint64_t)));
436       break;
437     }
438 #endif
439   }
440   Debug("pmgmt", "enqueued %d of max %d messages in a row", count, max_msgs_in_a_row);
441   return 0;
442 }
443 
444 void
handleMgmtMsgFromLM(MgmtMessageHdr * mh)445 ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh)
446 {
447   ink_assert(mh != nullptr);
448 
449   auto payload = mh->payload();
450 
451   Debug("pmgmt", "processing event id '%d' payload=%d", mh->msg_id, mh->data_len);
452   switch (mh->msg_id) {
453   case MGMT_EVENT_SHUTDOWN:
454     executeMgmtCallback(MGMT_EVENT_SHUTDOWN, {});
455     Alert("exiting on shutdown message");
456     break;
457   case MGMT_EVENT_RESTART:
458     executeMgmtCallback(MGMT_EVENT_RESTART, {});
459     break;
460   case MGMT_EVENT_DRAIN:
461     executeMgmtCallback(MGMT_EVENT_DRAIN, payload);
462     break;
463   case MGMT_EVENT_CLEAR_STATS:
464     executeMgmtCallback(MGMT_EVENT_CLEAR_STATS, {});
465     break;
466   case MGMT_EVENT_HOST_STATUS_UP:
467     executeMgmtCallback(MGMT_EVENT_HOST_STATUS_UP, payload);
468     break;
469   case MGMT_EVENT_HOST_STATUS_DOWN:
470     executeMgmtCallback(MGMT_EVENT_HOST_STATUS_DOWN, payload);
471     break;
472   case MGMT_EVENT_ROLL_LOG_FILES:
473     executeMgmtCallback(MGMT_EVENT_ROLL_LOG_FILES, {});
474     break;
475   case MGMT_EVENT_PLUGIN_CONFIG_UPDATE: {
476     auto msg{payload.rebind<char>()};
477     if (!msg.empty() && msg[0] != '\0' && this->cbtable) {
478       this->cbtable->invoke(msg.data());
479     }
480   } break;
481   case MGMT_EVENT_CONFIG_FILE_UPDATE:
482     /*
483       librecords -- we don't do anything in here because we are traffic_server
484       and we are not the owner of proxy.config.* variables.
485       Even if we trigger the sync_required bit, by
486       RecSetSynRequired, the sync. message will send back to
487       traffic_manager. And traffic_manager founds out that, the
488       actual value of the config variable didn't changed.
489       At the end, the sync_required bit is not set and we will
490       never get notified and callbacks are never invoked.
491 
492       The solution is to set the sync_required bit on the
493       manager side. See LocalManager::sendMgmtMsgToProcesses()
494       for details.
495     */
496     break;
497   case MGMT_EVENT_LIBRECORDS:
498     executeMgmtCallback(MGMT_EVENT_LIBRECORDS, payload);
499     break;
500   case MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE:
501     executeMgmtCallback(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE, payload);
502     break;
503   case MGMT_EVENT_LIFECYCLE_MESSAGE:
504     executeMgmtCallback(MGMT_EVENT_LIFECYCLE_MESSAGE, payload);
505     break;
506   default:
507     Warning("received unknown message ID %d\n", mh->msg_id);
508     break;
509   }
510 
511   ats_free(mh);
512 }
513