1 /** @file
2
3 Outbound connection tracking support.
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include <algorithm>
25 #include <deque>
26 #include <records/P_RecDefs.h>
27 #include <HttpConfig.h>
28 #include "HttpConnectionCount.h"
29 #include "tscore/bwf_std_format.h"
30 #include "tscore/BufferWriter.h"
31
32 using namespace std::literals;
33
34 extern int http_config_cb(const char *, RecDataT, RecData, void *);
35
36 OutboundConnTrack::Imp OutboundConnTrack::_imp;
37
38 OutboundConnTrack::GlobalConfig *OutboundConnTrack::_global_config{nullptr};
39
40 const MgmtConverter OutboundConnTrack::MAX_CONV(
__anona1b610590102(const void *data) 41 [](const void *data) -> MgmtInt { return static_cast<MgmtInt>(*static_cast<const decltype(TxnConfig::max) *>(data)); },
__anona1b610590202(void *data, MgmtInt i) 42 [](void *data, MgmtInt i) -> void { *static_cast<decltype(TxnConfig::max) *>(data) = static_cast<decltype(TxnConfig::max)>(i); });
43
44 const MgmtConverter OutboundConnTrack::MIN_CONV(
__anona1b610590302(const void *data) 45 [](const void *data) -> MgmtInt { return static_cast<MgmtInt>(*static_cast<const decltype(TxnConfig::min) *>(data)); },
__anona1b610590402(void *data, MgmtInt i) 46 [](void *data, MgmtInt i) -> void { *static_cast<decltype(TxnConfig::min) *>(data) = static_cast<decltype(TxnConfig::min)>(i); });
47
48 // Do integer and string conversions.
49 const MgmtConverter OutboundConnTrack::MATCH_CONV{
__anona1b610590502(const void *data) 50 [](const void *data) -> MgmtInt { return static_cast<MgmtInt>(*static_cast<const decltype(TxnConfig::match) *>(data)); },
__anona1b610590602(void *data, MgmtInt i) 51 [](void *data, MgmtInt i) -> void {
52 // Problem - the InkAPITest requires being able to set an arbitrary value, so this can either
53 // correctly clamp or pass the regression tests. Currently it passes the tests.
54 // *static_cast<decltype(TxnConfig::match) *>(data) = std::clamp(static_cast<decltype(TxnConfig::match)>(i), MATCH_IP,
55 // MATCH_BOTH);
56 *static_cast<decltype(TxnConfig::match) *>(data) = static_cast<decltype(TxnConfig::match)>(i);
57 },
58 nullptr,
59 nullptr,
__anona1b610590702(const void *data) 60 [](const void *data) -> std::string_view {
61 auto t = *static_cast<const OutboundConnTrack::MatchType *>(data);
62 return t < 0 || t > OutboundConnTrack::MATCH_BOTH ? "Invalid"sv : OutboundConnTrack::MATCH_TYPE_NAME[t];
63 },
__anona1b610590802(void *data, std::string_view src) 64 [](void *data, std::string_view src) -> void {
65 OutboundConnTrack::MatchType t;
66 if (OutboundConnTrack::lookup_match_type(src, t)) {
67 *static_cast<OutboundConnTrack::MatchType *>(data) = t;
68 } else {
69 OutboundConnTrack::Warning_Bad_Match_Type(src);
70 }
71 }};
72
73 const std::array<std::string_view, static_cast<int>(OutboundConnTrack::MATCH_BOTH) + 1> OutboundConnTrack::MATCH_TYPE_NAME{
74 {"ip"sv, "port"sv, "host"sv, "both"sv}};
75
76 // Make sure the clock is millisecond resolution or finer.
77 static_assert(OutboundConnTrack::Group::Clock::period::num == 1);
78 static_assert(OutboundConnTrack::Group::Clock::period::den >= 1000);
79
80 // Configuration callback functions.
81 namespace
82 {
83 bool
Config_Update_Conntrack_Min(const char * name,RecDataT dtype,RecData data,void * cookie)84 Config_Update_Conntrack_Min(const char *name, RecDataT dtype, RecData data, void *cookie)
85 {
86 auto config = static_cast<OutboundConnTrack::TxnConfig *>(cookie);
87
88 if (RECD_INT == dtype) {
89 config->min = data.rec_int;
90 return true;
91 }
92 return false;
93 }
94
95 bool
Config_Update_Conntrack_Max(const char * name,RecDataT dtype,RecData data,void * cookie)96 Config_Update_Conntrack_Max(const char *name, RecDataT dtype, RecData data, void *cookie)
97 {
98 auto config = static_cast<OutboundConnTrack::TxnConfig *>(cookie);
99
100 if (RECD_INT == dtype) {
101 config->max = data.rec_int;
102 return true;
103 }
104 return false;
105 }
106
107 bool
Config_Update_Conntrack_Queue_Size(const char * name,RecDataT dtype,RecData data,void * cookie)108 Config_Update_Conntrack_Queue_Size(const char *name, RecDataT dtype, RecData data, void *cookie)
109 {
110 auto config = static_cast<OutboundConnTrack::GlobalConfig *>(cookie);
111
112 if (RECD_INT == dtype) {
113 config->queue_size = data.rec_int;
114 return true;
115 }
116 return false;
117 }
118
119 bool
Config_Update_Conntrack_Queue_Delay(const char * name,RecDataT dtype,RecData data,void * cookie)120 Config_Update_Conntrack_Queue_Delay(const char *name, RecDataT dtype, RecData data, void *cookie)
121 {
122 auto config = static_cast<OutboundConnTrack::GlobalConfig *>(cookie);
123
124 if (RECD_INT == dtype && data.rec_int > 0) {
125 config->queue_delay = std::chrono::milliseconds(data.rec_int);
126 return true;
127 }
128 return false;
129 }
130
131 bool
Config_Update_Conntrack_Match(const char * name,RecDataT dtype,RecData data,void * cookie)132 Config_Update_Conntrack_Match(const char *name, RecDataT dtype, RecData data, void *cookie)
133 {
134 auto config = static_cast<OutboundConnTrack::TxnConfig *>(cookie);
135
136 if (RECD_STRING == dtype) {
137 OutboundConnTrack::MatchType match_type;
138 std::string_view tag{data.rec_string};
139 if (OutboundConnTrack::lookup_match_type(tag, match_type)) {
140 config->match = match_type;
141 return true;
142 } else {
143 OutboundConnTrack::Warning_Bad_Match_Type(tag);
144 }
145 } else {
146 Warning("Invalid type for '%s' - must be 'INT'", OutboundConnTrack::CONFIG_VAR_MATCH.data());
147 }
148 return false;
149 }
150
151 bool
Config_Update_Conntrack_Alert_Delay(const char * name,RecDataT dtype,RecData data,void * cookie)152 Config_Update_Conntrack_Alert_Delay(const char *name, RecDataT dtype, RecData data, void *cookie)
153 {
154 auto config = static_cast<OutboundConnTrack::GlobalConfig *>(cookie);
155
156 if (RECD_INT == dtype && data.rec_int >= 0) {
157 config->alert_delay = std::chrono::seconds(data.rec_int);
158 return true;
159 }
160 return false;
161 }
162
163 } // namespace
164
165 void
config_init(GlobalConfig * global,TxnConfig * txn)166 OutboundConnTrack::config_init(GlobalConfig *global, TxnConfig *txn)
167 {
168 _global_config = global; // remember this for later retrieval.
169 // Per transaction lookup must be done at call time because it changes.
170
171 Enable_Config_Var(CONFIG_VAR_MIN, &Config_Update_Conntrack_Min, txn);
172 Enable_Config_Var(CONFIG_VAR_MAX, &Config_Update_Conntrack_Max, txn);
173 Enable_Config_Var(CONFIG_VAR_MATCH, &Config_Update_Conntrack_Match, txn);
174 Enable_Config_Var(CONFIG_VAR_QUEUE_SIZE, &Config_Update_Conntrack_Queue_Size, global);
175 Enable_Config_Var(CONFIG_VAR_QUEUE_DELAY, &Config_Update_Conntrack_Queue_Delay, global);
176 Enable_Config_Var(CONFIG_VAR_ALERT_DELAY, &Config_Update_Conntrack_Alert_Delay, global);
177 }
178
179 OutboundConnTrack::TxnState
obtain(TxnConfig const & txn_cnf,std::string_view fqdn,IpEndpoint const & addr)180 OutboundConnTrack::obtain(TxnConfig const &txn_cnf, std::string_view fqdn, IpEndpoint const &addr)
181 {
182 TxnState zret;
183 CryptoHash hash;
184 CryptoContext().hash_immediate(hash, fqdn.data(), fqdn.size());
185 Group::Key key{addr, hash, txn_cnf.match};
186 std::lock_guard<std::mutex> lock(_imp._mutex); // Table lock
187 auto loc = _imp._table.find(key);
188 if (loc != _imp._table.end()) {
189 zret._g = loc;
190 } else {
191 zret._g = new Group(key, fqdn, txn_cnf.min);
192 _imp._table.insert(zret._g);
193 }
194 return zret;
195 }
196
197 bool
equal(const Key & lhs,const Key & rhs)198 OutboundConnTrack::Group::equal(const Key &lhs, const Key &rhs)
199 {
200 bool zret = false;
201 if (lhs._match_type == rhs._match_type) {
202 switch (lhs._match_type) {
203 case MATCH_IP:
204 zret = ats_ip_addr_eq(&lhs._addr.sa, &rhs._addr.sa);
205 break;
206 case MATCH_PORT:
207 zret = ats_ip_addr_port_eq(&lhs._addr.sa, &rhs._addr.sa);
208 break;
209 case MATCH_HOST:
210 zret = lhs._hash == rhs._hash;
211 break;
212 case MATCH_BOTH:
213 zret = (lhs._hash == rhs._hash && ats_ip_addr_port_eq(&lhs._addr.sa, &rhs._addr.sa));
214 break;
215 }
216 }
217
218 if (is_debug_tag_set(DEBUG_TAG)) {
219 ts::LocalBufferWriter<256> w;
220 w.print("Comparing {} to {} -> {}\0", lhs, rhs, zret ? "match" : "fail");
221 Debug(DEBUG_TAG, "%s", w.data());
222 }
223
224 return zret;
225 }
226
227 bool
should_alert(std::time_t * lat)228 OutboundConnTrack::Group::should_alert(std::time_t *lat)
229 {
230 bool zret = false;
231 // This is a bit clunky because the goal is to store just the tick count as an atomic.
232 // Might check to see if an atomic time_point is really atomic and avoid this.
233 Ticker last_tick{_last_alert}; // Load the most recent alert time in ticks.
234 TimePoint last{TimePoint::duration{last_tick}}; // Most recent alert time in a time_point.
235 TimePoint now = Clock::now(); // Current time_point.
236 if (last + _global_config->alert_delay <= now) {
237 // it's been long enough, swap out our time for the last time. The winner of this swap
238 // does the actual alert, leaving its current time as the last alert time.
239 zret = _last_alert.compare_exchange_strong(last_tick, now.time_since_epoch().count());
240 if (zret && lat) {
241 *lat = Clock::to_time_t(last);
242 }
243 }
244 return zret;
245 }
246
247 std::time_t
get_last_alert_epoch_time() const248 OutboundConnTrack::Group::get_last_alert_epoch_time() const
249 {
250 return Clock::to_time_t(TimePoint{TimePoint::duration{Ticker{_last_alert}}});
251 }
252
253 void
get(std::vector<Group const * > & groups)254 OutboundConnTrack::get(std::vector<Group const *> &groups)
255 {
256 std::lock_guard<std::mutex> lock(_imp._mutex); // TABLE LOCK
257 groups.resize(0);
258 groups.reserve(_imp._table.count());
259 for (Group const &g : _imp._table) {
260 groups.push_back(&g);
261 }
262 }
263
264 std::string
to_json_string()265 OutboundConnTrack::to_json_string()
266 {
267 std::string text;
268 size_t extent = 0;
269 static const ts::BWFormat header_fmt{R"({{"count": {}, "list": [
270 )"};
271 static const ts::BWFormat item_fmt{
272 R"( {{"type": "{}", "ip": "{}", "fqdn": "{}", "current": {}, "max": {}, "blocked": {}, "queued": {}, "alert": {}}},
273 )"};
274 static const std::string_view trailer{" \n]}"};
275
276 static const auto printer = [](ts::BufferWriter &w, Group const *g) -> ts::BufferWriter & {
277 w.print(item_fmt, g->_match_type, g->_addr, g->_fqdn, g->_count.load(), g->_count_max.load(), g->_blocked.load(),
278 g->_rescheduled.load(), g->get_last_alert_epoch_time());
279 return w;
280 };
281
282 ts::FixedBufferWriter null_bw{nullptr}; // Empty buffer for sizing work.
283 std::vector<Group const *> groups;
284
285 self_type::get(groups);
286
287 null_bw.print(header_fmt, groups.size()).extent();
288 for (auto g : groups) {
289 printer(null_bw, g);
290 }
291 extent = null_bw.extent() + trailer.size() - 2; // 2 for the trailing comma newline that will get clipped.
292
293 text.resize(extent);
294 ts::FixedBufferWriter w(const_cast<char *>(text.data()), text.size());
295 w.clip(trailer.size());
296 w.print(header_fmt, groups.size());
297 for (auto g : groups) {
298 printer(w, g);
299 }
300 w.extend(trailer.size());
301 w.write(trailer);
302 return text;
303 }
304
305 void
dump(FILE * f)306 OutboundConnTrack::dump(FILE *f)
307 {
308 std::vector<Group const *> groups;
309
310 self_type::get(groups);
311
312 if (groups.size()) {
313 fprintf(f, "\nUpstream Connection Tracking\n%7s | %5s | %10s | %24s | %33s | %8s |\n", "Current", "Block", "Queue", "Address",
314 "Hostname Hash", "Match");
315 fprintf(f, "------|-------|---------|--------------------------|-----------------------------------|----------|\n");
316
317 for (Group const *g : groups) {
318 ts::LocalBufferWriter<128> w;
319 w.print("{:7} | {:5} | {:5} | {:24} | {:33} | {:8} |\n", g->_count.load(), g->_blocked.load(), g->_rescheduled.load(),
320 g->_addr, g->_hash, g->_match_type);
321 fwrite(w.data(), w.size(), 1, f);
322 }
323
324 fprintf(f, "------|-------|-------|--------------------------|-----------------------------------|----------|\n");
325 }
326 }
327
328 struct ShowConnectionCount : public ShowCont {
ShowConnectionCountShowConnectionCount329 ShowConnectionCount(Continuation *c, HTTPHdr *h) : ShowCont(c, h) { SET_HANDLER(&ShowConnectionCount::showHandler); }
330 int
showHandlerShowConnectionCount331 showHandler(int event, Event *e)
332 {
333 CHECK_SHOW(show(OutboundConnTrack::to_json_string().c_str()));
334 return completeJson(event, e);
335 }
336 };
337
338 Action *
register_ShowConnectionCount(Continuation * c,HTTPHdr * h)339 register_ShowConnectionCount(Continuation *c, HTTPHdr *h)
340 {
341 ShowConnectionCount *s = new ShowConnectionCount(c, h);
342 this_ethread()->schedule_imm(s);
343 return &s->action;
344 }
345
346 bool
lookup_match_type(std::string_view tag,OutboundConnTrack::MatchType & type)347 OutboundConnTrack::lookup_match_type(std::string_view tag, OutboundConnTrack::MatchType &type)
348 {
349 // Search the array for the tag.
350 for (OutboundConnTrack::MatchType idx :
351 {OutboundConnTrack::MATCH_IP, OutboundConnTrack::MATCH_PORT, OutboundConnTrack::MATCH_HOST, OutboundConnTrack::MATCH_BOTH}) {
352 if (tag == MATCH_TYPE_NAME[idx]) {
353 type = idx;
354 return true;
355 }
356 }
357 return false;
358 }
359
360 void
Warning_Bad_Match_Type(std::string_view tag)361 OutboundConnTrack::Warning_Bad_Match_Type(std::string_view tag)
362 {
363 ts::LocalBufferWriter<256> w;
364 w.print("Invalid value '{}' for '{}' - must be one of", tag, CONFIG_VAR_MATCH);
365 for (auto n : MATCH_TYPE_NAME) {
366 w.write(" '"sv);
367 w.write(n);
368 w.write("',"sv);
369 }
370 w.auxBuffer()[-1] = '\0'; // clip trailing comma and null terminate.
371 Warning("%s", w.data());
372 }
373
374 void
Note_Unblocked(const TxnConfig * config,int count,sockaddr const * addr)375 OutboundConnTrack::TxnState::Note_Unblocked(const TxnConfig *config, int count, sockaddr const *addr)
376 {
377 time_t lat; // last alert time (epoch seconds)
378
379 if ((_g->_blocked > 0 || _g->_rescheduled > 0) && _g->should_alert(&lat)) {
380 auto blocked = _g->_blocked.exchange(0);
381 auto rescheduled = _g->_rescheduled.exchange(0);
382 ts::LocalBufferWriter<256> w;
383 w.print("upstream unblocked: [{}] count={} limit={} group=({}) blocked={} queued={} upstream={}\0",
384 ts::bwf::Date(lat, "%b %d %H:%M:%S"sv), count, config->max, *_g, blocked, rescheduled, addr);
385 Debug(DEBUG_TAG, "%s", w.data());
386 Note("%s", w.data());
387 }
388 }
389
390 void
Warn_Blocked(const TxnConfig * config,int64_t sm_id,int count,sockaddr const * addr,char const * debug_tag)391 OutboundConnTrack::TxnState::Warn_Blocked(const TxnConfig *config, int64_t sm_id, int count, sockaddr const *addr,
392 char const *debug_tag)
393 {
394 bool alert_p = _g->should_alert();
395 auto blocked = alert_p ? _g->_blocked.exchange(0) : _g->_blocked.load();
396 auto rescheduled = alert_p ? _g->_rescheduled.exchange(0) : _g->_rescheduled.load();
397
398 if (alert_p || debug_tag) {
399 ts::LocalBufferWriter<256> w;
400 w.print("[{}] too many connections: count={} limit={} group=({}) blocked={} queued={} upstream={}\0", sm_id, count, config->max,
401 *_g, blocked, rescheduled, addr);
402
403 if (debug_tag) {
404 Debug(debug_tag, "%s", w.data());
405 }
406 if (alert_p) {
407 Warning("%s", w.data());
408 }
409 }
410 }
411
412 namespace ts
413 {
414 BufferWriter &
bwformat(BufferWriter & w,BWFSpec const & spec,OutboundConnTrack::MatchType type)415 bwformat(BufferWriter &w, BWFSpec const &spec, OutboundConnTrack::MatchType type)
416 {
417 if (spec.has_numeric_type()) {
418 bwformat(w, spec, static_cast<unsigned int>(type));
419 } else {
420 bwformat(w, spec, OutboundConnTrack::MATCH_TYPE_NAME[type]);
421 }
422 return w;
423 }
424
425 BufferWriter &
bwformat(BufferWriter & w,BWFSpec const & spec,OutboundConnTrack::Group::Key const & key)426 bwformat(BufferWriter &w, BWFSpec const &spec, OutboundConnTrack::Group::Key const &key)
427 {
428 switch (key._match_type) {
429 case OutboundConnTrack::MATCH_BOTH:
430 w.print("{:s} {},{}", key._match_type, key._addr, key._hash);
431 break;
432 case OutboundConnTrack::MATCH_HOST:
433 w.print("{:s} {}", key._match_type, key._hash);
434 break;
435 case OutboundConnTrack::MATCH_PORT:
436 w.print("{:s} {}", key._match_type, key._addr);
437 break;
438 case OutboundConnTrack::MATCH_IP:
439 w.print("{:s} {::a}", key._match_type, key._addr);
440 break;
441 }
442 return w;
443 }
444
445 BufferWriter &
bwformat(BufferWriter & w,BWFSpec const & spec,OutboundConnTrack::Group const & g)446 bwformat(BufferWriter &w, BWFSpec const &spec, OutboundConnTrack::Group const &g)
447 {
448 switch (g._match_type) {
449 case OutboundConnTrack::MATCH_BOTH:
450 w.print("{:s} {},{}", g._match_type, g._addr, g._fqdn);
451 break;
452 case OutboundConnTrack::MATCH_HOST:
453 w.print("{:s} {}", g._match_type, g._fqdn);
454 break;
455 case OutboundConnTrack::MATCH_PORT:
456 w.print("{:s} {}", g._match_type, g._addr);
457 break;
458 case OutboundConnTrack::MATCH_IP:
459 w.print("{:s} {::a}", g._match_type, g._addr);
460 break;
461 }
462 return w;
463 }
464
465 } // namespace ts
466