1 /** @file
2 
3   Outbound connection tracking support.
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include <algorithm>
25 #include <deque>
26 #include <records/P_RecDefs.h>
27 #include <HttpConfig.h>
28 #include "HttpConnectionCount.h"
29 #include "tscore/bwf_std_format.h"
30 #include "tscore/BufferWriter.h"
31 
32 using namespace std::literals;
33 
34 extern int http_config_cb(const char *, RecDataT, RecData, void *);
35 
36 OutboundConnTrack::Imp OutboundConnTrack::_imp;
37 
38 OutboundConnTrack::GlobalConfig *OutboundConnTrack::_global_config{nullptr};
39 
40 const MgmtConverter OutboundConnTrack::MAX_CONV(
__anona1b610590102(const void *data) 41   [](const void *data) -> MgmtInt { return static_cast<MgmtInt>(*static_cast<const decltype(TxnConfig::max) *>(data)); },
__anona1b610590202(void *data, MgmtInt i) 42   [](void *data, MgmtInt i) -> void { *static_cast<decltype(TxnConfig::max) *>(data) = static_cast<decltype(TxnConfig::max)>(i); });
43 
44 const MgmtConverter OutboundConnTrack::MIN_CONV(
__anona1b610590302(const void *data) 45   [](const void *data) -> MgmtInt { return static_cast<MgmtInt>(*static_cast<const decltype(TxnConfig::min) *>(data)); },
__anona1b610590402(void *data, MgmtInt i) 46   [](void *data, MgmtInt i) -> void { *static_cast<decltype(TxnConfig::min) *>(data) = static_cast<decltype(TxnConfig::min)>(i); });
47 
48 // Do integer and string conversions.
49 const MgmtConverter OutboundConnTrack::MATCH_CONV{
__anona1b610590502(const void *data) 50   [](const void *data) -> MgmtInt { return static_cast<MgmtInt>(*static_cast<const decltype(TxnConfig::match) *>(data)); },
__anona1b610590602(void *data, MgmtInt i) 51   [](void *data, MgmtInt i) -> void {
52     // Problem - the InkAPITest requires being able to set an arbitrary value, so this can either
53     // correctly clamp or pass the regression tests. Currently it passes the tests.
54     //    *static_cast<decltype(TxnConfig::match) *>(data) = std::clamp(static_cast<decltype(TxnConfig::match)>(i), MATCH_IP,
55     //    MATCH_BOTH);
56     *static_cast<decltype(TxnConfig::match) *>(data) = static_cast<decltype(TxnConfig::match)>(i);
57   },
58   nullptr,
59   nullptr,
__anona1b610590702(const void *data) 60   [](const void *data) -> std::string_view {
61     auto t = *static_cast<const OutboundConnTrack::MatchType *>(data);
62     return t < 0 || t > OutboundConnTrack::MATCH_BOTH ? "Invalid"sv : OutboundConnTrack::MATCH_TYPE_NAME[t];
63   },
__anona1b610590802(void *data, std::string_view src) 64   [](void *data, std::string_view src) -> void {
65     OutboundConnTrack::MatchType t;
66     if (OutboundConnTrack::lookup_match_type(src, t)) {
67       *static_cast<OutboundConnTrack::MatchType *>(data) = t;
68     } else {
69       OutboundConnTrack::Warning_Bad_Match_Type(src);
70     }
71   }};
72 
73 const std::array<std::string_view, static_cast<int>(OutboundConnTrack::MATCH_BOTH) + 1> OutboundConnTrack::MATCH_TYPE_NAME{
74   {"ip"sv, "port"sv, "host"sv, "both"sv}};
75 
76 // Make sure the clock is millisecond resolution or finer.
77 static_assert(OutboundConnTrack::Group::Clock::period::num == 1);
78 static_assert(OutboundConnTrack::Group::Clock::period::den >= 1000);
79 
80 // Configuration callback functions.
81 namespace
82 {
83 bool
Config_Update_Conntrack_Min(const char * name,RecDataT dtype,RecData data,void * cookie)84 Config_Update_Conntrack_Min(const char *name, RecDataT dtype, RecData data, void *cookie)
85 {
86   auto config = static_cast<OutboundConnTrack::TxnConfig *>(cookie);
87 
88   if (RECD_INT == dtype) {
89     config->min = data.rec_int;
90     return true;
91   }
92   return false;
93 }
94 
95 bool
Config_Update_Conntrack_Max(const char * name,RecDataT dtype,RecData data,void * cookie)96 Config_Update_Conntrack_Max(const char *name, RecDataT dtype, RecData data, void *cookie)
97 {
98   auto config = static_cast<OutboundConnTrack::TxnConfig *>(cookie);
99 
100   if (RECD_INT == dtype) {
101     config->max = data.rec_int;
102     return true;
103   }
104   return false;
105 }
106 
107 bool
Config_Update_Conntrack_Queue_Size(const char * name,RecDataT dtype,RecData data,void * cookie)108 Config_Update_Conntrack_Queue_Size(const char *name, RecDataT dtype, RecData data, void *cookie)
109 {
110   auto config = static_cast<OutboundConnTrack::GlobalConfig *>(cookie);
111 
112   if (RECD_INT == dtype) {
113     config->queue_size = data.rec_int;
114     return true;
115   }
116   return false;
117 }
118 
119 bool
Config_Update_Conntrack_Queue_Delay(const char * name,RecDataT dtype,RecData data,void * cookie)120 Config_Update_Conntrack_Queue_Delay(const char *name, RecDataT dtype, RecData data, void *cookie)
121 {
122   auto config = static_cast<OutboundConnTrack::GlobalConfig *>(cookie);
123 
124   if (RECD_INT == dtype && data.rec_int > 0) {
125     config->queue_delay = std::chrono::milliseconds(data.rec_int);
126     return true;
127   }
128   return false;
129 }
130 
131 bool
Config_Update_Conntrack_Match(const char * name,RecDataT dtype,RecData data,void * cookie)132 Config_Update_Conntrack_Match(const char *name, RecDataT dtype, RecData data, void *cookie)
133 {
134   auto config = static_cast<OutboundConnTrack::TxnConfig *>(cookie);
135 
136   if (RECD_STRING == dtype) {
137     OutboundConnTrack::MatchType match_type;
138     std::string_view tag{data.rec_string};
139     if (OutboundConnTrack::lookup_match_type(tag, match_type)) {
140       config->match = match_type;
141       return true;
142     } else {
143       OutboundConnTrack::Warning_Bad_Match_Type(tag);
144     }
145   } else {
146     Warning("Invalid type for '%s' - must be 'INT'", OutboundConnTrack::CONFIG_VAR_MATCH.data());
147   }
148   return false;
149 }
150 
151 bool
Config_Update_Conntrack_Alert_Delay(const char * name,RecDataT dtype,RecData data,void * cookie)152 Config_Update_Conntrack_Alert_Delay(const char *name, RecDataT dtype, RecData data, void *cookie)
153 {
154   auto config = static_cast<OutboundConnTrack::GlobalConfig *>(cookie);
155 
156   if (RECD_INT == dtype && data.rec_int >= 0) {
157     config->alert_delay = std::chrono::seconds(data.rec_int);
158     return true;
159   }
160   return false;
161 }
162 
163 } // namespace
164 
165 void
config_init(GlobalConfig * global,TxnConfig * txn)166 OutboundConnTrack::config_init(GlobalConfig *global, TxnConfig *txn)
167 {
168   _global_config = global; // remember this for later retrieval.
169                            // Per transaction lookup must be done at call time because it changes.
170 
171   Enable_Config_Var(CONFIG_VAR_MIN, &Config_Update_Conntrack_Min, txn);
172   Enable_Config_Var(CONFIG_VAR_MAX, &Config_Update_Conntrack_Max, txn);
173   Enable_Config_Var(CONFIG_VAR_MATCH, &Config_Update_Conntrack_Match, txn);
174   Enable_Config_Var(CONFIG_VAR_QUEUE_SIZE, &Config_Update_Conntrack_Queue_Size, global);
175   Enable_Config_Var(CONFIG_VAR_QUEUE_DELAY, &Config_Update_Conntrack_Queue_Delay, global);
176   Enable_Config_Var(CONFIG_VAR_ALERT_DELAY, &Config_Update_Conntrack_Alert_Delay, global);
177 }
178 
179 OutboundConnTrack::TxnState
obtain(TxnConfig const & txn_cnf,std::string_view fqdn,IpEndpoint const & addr)180 OutboundConnTrack::obtain(TxnConfig const &txn_cnf, std::string_view fqdn, IpEndpoint const &addr)
181 {
182   TxnState zret;
183   CryptoHash hash;
184   CryptoContext().hash_immediate(hash, fqdn.data(), fqdn.size());
185   Group::Key key{addr, hash, txn_cnf.match};
186   std::lock_guard<std::mutex> lock(_imp._mutex); // Table lock
187   auto loc = _imp._table.find(key);
188   if (loc != _imp._table.end()) {
189     zret._g = loc;
190   } else {
191     zret._g = new Group(key, fqdn, txn_cnf.min);
192     _imp._table.insert(zret._g);
193   }
194   return zret;
195 }
196 
197 bool
equal(const Key & lhs,const Key & rhs)198 OutboundConnTrack::Group::equal(const Key &lhs, const Key &rhs)
199 {
200   bool zret = false;
201   if (lhs._match_type == rhs._match_type) {
202     switch (lhs._match_type) {
203     case MATCH_IP:
204       zret = ats_ip_addr_eq(&lhs._addr.sa, &rhs._addr.sa);
205       break;
206     case MATCH_PORT:
207       zret = ats_ip_addr_port_eq(&lhs._addr.sa, &rhs._addr.sa);
208       break;
209     case MATCH_HOST:
210       zret = lhs._hash == rhs._hash;
211       break;
212     case MATCH_BOTH:
213       zret = (lhs._hash == rhs._hash && ats_ip_addr_port_eq(&lhs._addr.sa, &rhs._addr.sa));
214       break;
215     }
216   }
217 
218   if (is_debug_tag_set(DEBUG_TAG)) {
219     ts::LocalBufferWriter<256> w;
220     w.print("Comparing {} to {} -> {}\0", lhs, rhs, zret ? "match" : "fail");
221     Debug(DEBUG_TAG, "%s", w.data());
222   }
223 
224   return zret;
225 }
226 
227 bool
should_alert(std::time_t * lat)228 OutboundConnTrack::Group::should_alert(std::time_t *lat)
229 {
230   bool zret = false;
231   // This is a bit clunky because the goal is to store just the tick count as an atomic.
232   // Might check to see if an atomic time_point is really atomic and avoid this.
233   Ticker last_tick{_last_alert};                  // Load the most recent alert time in ticks.
234   TimePoint last{TimePoint::duration{last_tick}}; // Most recent alert time in a time_point.
235   TimePoint now = Clock::now();                   // Current time_point.
236   if (last + _global_config->alert_delay <= now) {
237     // it's been long enough, swap out our time for the last time. The winner of this swap
238     // does the actual alert, leaving its current time as the last alert time.
239     zret = _last_alert.compare_exchange_strong(last_tick, now.time_since_epoch().count());
240     if (zret && lat) {
241       *lat = Clock::to_time_t(last);
242     }
243   }
244   return zret;
245 }
246 
247 std::time_t
get_last_alert_epoch_time() const248 OutboundConnTrack::Group::get_last_alert_epoch_time() const
249 {
250   return Clock::to_time_t(TimePoint{TimePoint::duration{Ticker{_last_alert}}});
251 }
252 
253 void
get(std::vector<Group const * > & groups)254 OutboundConnTrack::get(std::vector<Group const *> &groups)
255 {
256   std::lock_guard<std::mutex> lock(_imp._mutex); // TABLE LOCK
257   groups.resize(0);
258   groups.reserve(_imp._table.count());
259   for (Group const &g : _imp._table) {
260     groups.push_back(&g);
261   }
262 }
263 
264 std::string
to_json_string()265 OutboundConnTrack::to_json_string()
266 {
267   std::string text;
268   size_t extent = 0;
269   static const ts::BWFormat header_fmt{R"({{"count": {}, "list": [
270 )"};
271   static const ts::BWFormat item_fmt{
272     R"(  {{"type": "{}", "ip": "{}", "fqdn": "{}", "current": {}, "max": {}, "blocked": {}, "queued": {}, "alert": {}}},
273 )"};
274   static const std::string_view trailer{" \n]}"};
275 
276   static const auto printer = [](ts::BufferWriter &w, Group const *g) -> ts::BufferWriter & {
277     w.print(item_fmt, g->_match_type, g->_addr, g->_fqdn, g->_count.load(), g->_count_max.load(), g->_blocked.load(),
278             g->_rescheduled.load(), g->get_last_alert_epoch_time());
279     return w;
280   };
281 
282   ts::FixedBufferWriter null_bw{nullptr}; // Empty buffer for sizing work.
283   std::vector<Group const *> groups;
284 
285   self_type::get(groups);
286 
287   null_bw.print(header_fmt, groups.size()).extent();
288   for (auto g : groups) {
289     printer(null_bw, g);
290   }
291   extent = null_bw.extent() + trailer.size() - 2; // 2 for the trailing comma newline that will get clipped.
292 
293   text.resize(extent);
294   ts::FixedBufferWriter w(const_cast<char *>(text.data()), text.size());
295   w.clip(trailer.size());
296   w.print(header_fmt, groups.size());
297   for (auto g : groups) {
298     printer(w, g);
299   }
300   w.extend(trailer.size());
301   w.write(trailer);
302   return text;
303 }
304 
305 void
dump(FILE * f)306 OutboundConnTrack::dump(FILE *f)
307 {
308   std::vector<Group const *> groups;
309 
310   self_type::get(groups);
311 
312   if (groups.size()) {
313     fprintf(f, "\nUpstream Connection Tracking\n%7s | %5s | %10s | %24s | %33s | %8s |\n", "Current", "Block", "Queue", "Address",
314             "Hostname Hash", "Match");
315     fprintf(f, "------|-------|---------|--------------------------|-----------------------------------|----------|\n");
316 
317     for (Group const *g : groups) {
318       ts::LocalBufferWriter<128> w;
319       w.print("{:7} | {:5} | {:5} | {:24} | {:33} | {:8} |\n", g->_count.load(), g->_blocked.load(), g->_rescheduled.load(),
320               g->_addr, g->_hash, g->_match_type);
321       fwrite(w.data(), w.size(), 1, f);
322     }
323 
324     fprintf(f, "------|-------|-------|--------------------------|-----------------------------------|----------|\n");
325   }
326 }
327 
328 struct ShowConnectionCount : public ShowCont {
ShowConnectionCountShowConnectionCount329   ShowConnectionCount(Continuation *c, HTTPHdr *h) : ShowCont(c, h) { SET_HANDLER(&ShowConnectionCount::showHandler); }
330   int
showHandlerShowConnectionCount331   showHandler(int event, Event *e)
332   {
333     CHECK_SHOW(show(OutboundConnTrack::to_json_string().c_str()));
334     return completeJson(event, e);
335   }
336 };
337 
338 Action *
register_ShowConnectionCount(Continuation * c,HTTPHdr * h)339 register_ShowConnectionCount(Continuation *c, HTTPHdr *h)
340 {
341   ShowConnectionCount *s = new ShowConnectionCount(c, h);
342   this_ethread()->schedule_imm(s);
343   return &s->action;
344 }
345 
346 bool
lookup_match_type(std::string_view tag,OutboundConnTrack::MatchType & type)347 OutboundConnTrack::lookup_match_type(std::string_view tag, OutboundConnTrack::MatchType &type)
348 {
349   // Search the array for the tag.
350   for (OutboundConnTrack::MatchType idx :
351        {OutboundConnTrack::MATCH_IP, OutboundConnTrack::MATCH_PORT, OutboundConnTrack::MATCH_HOST, OutboundConnTrack::MATCH_BOTH}) {
352     if (tag == MATCH_TYPE_NAME[idx]) {
353       type = idx;
354       return true;
355     }
356   }
357   return false;
358 }
359 
360 void
Warning_Bad_Match_Type(std::string_view tag)361 OutboundConnTrack::Warning_Bad_Match_Type(std::string_view tag)
362 {
363   ts::LocalBufferWriter<256> w;
364   w.print("Invalid value '{}' for '{}' - must be one of", tag, CONFIG_VAR_MATCH);
365   for (auto n : MATCH_TYPE_NAME) {
366     w.write(" '"sv);
367     w.write(n);
368     w.write("',"sv);
369   }
370   w.auxBuffer()[-1] = '\0'; // clip trailing comma and null terminate.
371   Warning("%s", w.data());
372 }
373 
374 void
Note_Unblocked(const TxnConfig * config,int count,sockaddr const * addr)375 OutboundConnTrack::TxnState::Note_Unblocked(const TxnConfig *config, int count, sockaddr const *addr)
376 {
377   time_t lat; // last alert time (epoch seconds)
378 
379   if ((_g->_blocked > 0 || _g->_rescheduled > 0) && _g->should_alert(&lat)) {
380     auto blocked     = _g->_blocked.exchange(0);
381     auto rescheduled = _g->_rescheduled.exchange(0);
382     ts::LocalBufferWriter<256> w;
383     w.print("upstream unblocked: [{}] count={} limit={} group=({}) blocked={} queued={} upstream={}\0",
384             ts::bwf::Date(lat, "%b %d %H:%M:%S"sv), count, config->max, *_g, blocked, rescheduled, addr);
385     Debug(DEBUG_TAG, "%s", w.data());
386     Note("%s", w.data());
387   }
388 }
389 
390 void
Warn_Blocked(const TxnConfig * config,int64_t sm_id,int count,sockaddr const * addr,char const * debug_tag)391 OutboundConnTrack::TxnState::Warn_Blocked(const TxnConfig *config, int64_t sm_id, int count, sockaddr const *addr,
392                                           char const *debug_tag)
393 {
394   bool alert_p     = _g->should_alert();
395   auto blocked     = alert_p ? _g->_blocked.exchange(0) : _g->_blocked.load();
396   auto rescheduled = alert_p ? _g->_rescheduled.exchange(0) : _g->_rescheduled.load();
397 
398   if (alert_p || debug_tag) {
399     ts::LocalBufferWriter<256> w;
400     w.print("[{}] too many connections: count={} limit={} group=({}) blocked={} queued={} upstream={}\0", sm_id, count, config->max,
401             *_g, blocked, rescheduled, addr);
402 
403     if (debug_tag) {
404       Debug(debug_tag, "%s", w.data());
405     }
406     if (alert_p) {
407       Warning("%s", w.data());
408     }
409   }
410 }
411 
412 namespace ts
413 {
414 BufferWriter &
bwformat(BufferWriter & w,BWFSpec const & spec,OutboundConnTrack::MatchType type)415 bwformat(BufferWriter &w, BWFSpec const &spec, OutboundConnTrack::MatchType type)
416 {
417   if (spec.has_numeric_type()) {
418     bwformat(w, spec, static_cast<unsigned int>(type));
419   } else {
420     bwformat(w, spec, OutboundConnTrack::MATCH_TYPE_NAME[type]);
421   }
422   return w;
423 }
424 
425 BufferWriter &
bwformat(BufferWriter & w,BWFSpec const & spec,OutboundConnTrack::Group::Key const & key)426 bwformat(BufferWriter &w, BWFSpec const &spec, OutboundConnTrack::Group::Key const &key)
427 {
428   switch (key._match_type) {
429   case OutboundConnTrack::MATCH_BOTH:
430     w.print("{:s} {},{}", key._match_type, key._addr, key._hash);
431     break;
432   case OutboundConnTrack::MATCH_HOST:
433     w.print("{:s} {}", key._match_type, key._hash);
434     break;
435   case OutboundConnTrack::MATCH_PORT:
436     w.print("{:s} {}", key._match_type, key._addr);
437     break;
438   case OutboundConnTrack::MATCH_IP:
439     w.print("{:s} {::a}", key._match_type, key._addr);
440     break;
441   }
442   return w;
443 }
444 
445 BufferWriter &
bwformat(BufferWriter & w,BWFSpec const & spec,OutboundConnTrack::Group const & g)446 bwformat(BufferWriter &w, BWFSpec const &spec, OutboundConnTrack::Group const &g)
447 {
448   switch (g._match_type) {
449   case OutboundConnTrack::MATCH_BOTH:
450     w.print("{:s} {},{}", g._match_type, g._addr, g._fqdn);
451     break;
452   case OutboundConnTrack::MATCH_HOST:
453     w.print("{:s} {}", g._match_type, g._fqdn);
454     break;
455   case OutboundConnTrack::MATCH_PORT:
456     w.print("{:s} {}", g._match_type, g._addr);
457     break;
458   case OutboundConnTrack::MATCH_IP:
459     w.print("{:s} {::a}", g._match_type, g._addr);
460     break;
461   }
462   return w;
463 }
464 
465 } // namespace ts
466