1 /** @file
2 
3   Record statistics support.
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "P_RecCore.h"
25 #include "P_RecProcess.h"
26 #include <string_view>
27 
28 //-------------------------------------------------------------------------
29 // raw_stat_get_total
30 //-------------------------------------------------------------------------
31 
32 namespace
33 {
34 // Commonly used access to a raw stat, avoid typos.
35 inline RecRawStat *
thread_stat(EThread * et,RecRawStatBlock * rsb,int id)36 thread_stat(EThread *et, RecRawStatBlock *rsb, int id)
37 {
38   return (reinterpret_cast<RecRawStat *>(reinterpret_cast<char *>(et) + rsb->ethr_stat_offset)) + id;
39 }
40 } // namespace
41 
42 static int
raw_stat_get_total(RecRawStatBlock * rsb,int id,RecRawStat * total)43 raw_stat_get_total(RecRawStatBlock *rsb, int id, RecRawStat *total)
44 {
45   total->sum   = 0;
46   total->count = 0;
47 
48   // get global values
49   total->sum   = rsb->global[id]->sum;
50   total->count = rsb->global[id]->count;
51 
52   // get thread local values
53   for (EThread *et : eventProcessor.active_ethreads()) {
54     RecRawStat *tlp = thread_stat(et, rsb, id);
55     total->sum += tlp->sum;
56     total->count += tlp->count;
57   }
58 
59   for (EThread *et : eventProcessor.active_dthreads()) {
60     RecRawStat *tlp = thread_stat(et, rsb, id);
61     total->sum += tlp->sum;
62     total->count += tlp->count;
63   }
64 
65   if (total->sum < 0) { // Assure that we stay positive
66     total->sum = 0;
67   }
68 
69   return REC_ERR_OKAY;
70 }
71 
72 //-------------------------------------------------------------------------
73 // raw_stat_sync_to_global
74 //-------------------------------------------------------------------------
75 static int
raw_stat_sync_to_global(RecRawStatBlock * rsb,int id)76 raw_stat_sync_to_global(RecRawStatBlock *rsb, int id)
77 {
78   RecRawStat total;
79 
80   total.sum   = 0;
81   total.count = 0;
82 
83   // sum the thread local values
84   for (EThread *et : eventProcessor.active_ethreads()) {
85     RecRawStat *tlp = thread_stat(et, rsb, id);
86     total.sum += tlp->sum;
87     total.count += tlp->count;
88   }
89 
90   for (EThread *et : eventProcessor.active_dthreads()) {
91     RecRawStat *tlp = thread_stat(et, rsb, id);
92     total.sum += tlp->sum;
93     total.count += tlp->count;
94   }
95 
96   if (total.sum < 0) { // Assure that we stay positive
97     total.sum = 0;
98   }
99 
100   // lock so the setting of the globals and last values are atomic
101   {
102     ink_scoped_mutex_lock lock(rsb->mutex);
103 
104     // get the delta from the last sync
105     RecRawStat delta;
106     delta.sum   = total.sum - rsb->global[id]->last_sum;
107     delta.count = total.count - rsb->global[id]->last_count;
108 
109     // increment the global values by the delta
110     ink_atomic_increment(&(rsb->global[id]->sum), delta.sum);
111     ink_atomic_increment(&(rsb->global[id]->count), delta.count);
112 
113     // set the new totals as the last values seen
114     ink_atomic_swap(&(rsb->global[id]->last_sum), total.sum);
115     ink_atomic_swap(&(rsb->global[id]->last_count), total.count);
116   }
117 
118   return REC_ERR_OKAY;
119 }
120 
121 //-------------------------------------------------------------------------
122 // raw_stat_clear
123 //-------------------------------------------------------------------------
124 static int
raw_stat_clear(RecRawStatBlock * rsb,int id)125 raw_stat_clear(RecRawStatBlock *rsb, int id)
126 {
127   Debug("stats", "raw_stat_clear(): rsb pointer:%p id:%d", rsb, id);
128 
129   // the globals need to be reset too
130   // lock so the setting of the globals and last values are atomic
131   {
132     ink_scoped_mutex_lock lock(rsb->mutex);
133     ink_atomic_swap(&(rsb->global[id]->sum), static_cast<int64_t>(0));
134     ink_atomic_swap(&(rsb->global[id]->last_sum), static_cast<int64_t>(0));
135     ink_atomic_swap(&(rsb->global[id]->count), static_cast<int64_t>(0));
136     ink_atomic_swap(&(rsb->global[id]->last_count), static_cast<int64_t>(0));
137   }
138   // reset the local stats
139   for (EThread *et : eventProcessor.active_ethreads()) {
140     RecRawStat *tlp = thread_stat(et, rsb, id);
141     ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
142     ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
143   }
144 
145   for (EThread *et : eventProcessor.active_dthreads()) {
146     RecRawStat *tlp = thread_stat(et, rsb, id);
147     ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
148     ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
149   }
150 
151   return REC_ERR_OKAY;
152 }
153 
154 //-------------------------------------------------------------------------
155 // raw_stat_clear_sum
156 //-------------------------------------------------------------------------
157 static int
raw_stat_clear_sum(RecRawStatBlock * rsb,int id)158 raw_stat_clear_sum(RecRawStatBlock *rsb, int id)
159 {
160   Debug("stats", "raw_stat_clear_sum(): rsb pointer:%p id:%d", rsb, id);
161 
162   // the globals need to be reset too
163   // lock so the setting of the globals and last values are atomic
164   {
165     ink_scoped_mutex_lock lock(rsb->mutex);
166     ink_atomic_swap(&(rsb->global[id]->sum), static_cast<int64_t>(0));
167     ink_atomic_swap(&(rsb->global[id]->last_sum), static_cast<int64_t>(0));
168   }
169 
170   // reset the local stats
171   for (EThread *et : eventProcessor.active_ethreads()) {
172     RecRawStat *tlp = thread_stat(et, rsb, id);
173     ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
174   }
175 
176   for (EThread *et : eventProcessor.active_dthreads()) {
177     RecRawStat *tlp = thread_stat(et, rsb, id);
178     ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
179   }
180 
181   return REC_ERR_OKAY;
182 }
183 
184 //-------------------------------------------------------------------------
185 // raw_stat_clear_count
186 //-------------------------------------------------------------------------
187 static int
raw_stat_clear_count(RecRawStatBlock * rsb,int id)188 raw_stat_clear_count(RecRawStatBlock *rsb, int id)
189 {
190   Debug("stats", "raw_stat_clear_count(): rsb pointer:%p id:%d", rsb, id);
191 
192   // the globals need to be reset too
193   // lock so the setting of the globals and last values are atomic
194   {
195     ink_scoped_mutex_lock lock(rsb->mutex);
196     ink_atomic_swap(&(rsb->global[id]->count), static_cast<int64_t>(0));
197     ink_atomic_swap(&(rsb->global[id]->last_count), static_cast<int64_t>(0));
198   }
199 
200   // reset the local stats
201   for (EThread *et : eventProcessor.active_ethreads()) {
202     RecRawStat *tlp = thread_stat(et, rsb, id);
203     ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
204   }
205 
206   for (EThread *et : eventProcessor.active_dthreads()) {
207     RecRawStat *tlp = thread_stat(et, rsb, id);
208     ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
209   }
210 
211   return REC_ERR_OKAY;
212 }
213 
214 //-------------------------------------------------------------------------
215 // RecAllocateRawStatBlock
216 //-------------------------------------------------------------------------
217 RecRawStatBlock *
RecAllocateRawStatBlock(int num_stats)218 RecAllocateRawStatBlock(int num_stats)
219 {
220   off_t ethr_stat_offset;
221   RecRawStatBlock *rsb;
222 
223   // allocate thread-local raw-stat memory
224   if ((ethr_stat_offset = eventProcessor.allocate(num_stats * sizeof(RecRawStat))) == -1) {
225     return nullptr;
226   }
227 
228   // create the raw-stat-block structure
229   rsb = static_cast<RecRawStatBlock *>(ats_malloc(sizeof(RecRawStatBlock)));
230   memset(rsb, 0, sizeof(RecRawStatBlock));
231 
232   rsb->global = static_cast<RecRawStat **>(ats_malloc(num_stats * sizeof(RecRawStat *)));
233   memset(rsb->global, 0, num_stats * sizeof(RecRawStat *));
234 
235   rsb->num_stats        = 0;
236   rsb->max_stats        = num_stats;
237   rsb->ethr_stat_offset = ethr_stat_offset;
238 
239   ink_mutex_init(&(rsb->mutex));
240   return rsb;
241 }
242 
243 //-------------------------------------------------------------------------
244 // RecRegisterRawStat
245 //-------------------------------------------------------------------------
246 int
_RecRegisterRawStat(RecRawStatBlock * rsb,RecT rec_type,const char * name,RecDataT data_type,RecPersistT persist_type,int id,RecRawStatSyncCb sync_cb)247 _RecRegisterRawStat(RecRawStatBlock *rsb, RecT rec_type, const char *name, RecDataT data_type, RecPersistT persist_type, int id,
248                     RecRawStatSyncCb sync_cb)
249 {
250   Debug("stats", "RecRawStatSyncCb(%s): rsb pointer:%p id:%d", name, rsb, id);
251 
252   // check to see if we're good to proceed
253   ink_assert(id < rsb->max_stats);
254 
255   int err = REC_ERR_OKAY;
256 
257   RecRecord *r;
258   RecData data_default;
259   memset(&data_default, 0, sizeof(RecData));
260 
261   // register the record
262   if ((r = RecRegisterStat(rec_type, name, data_type, data_default, persist_type)) == nullptr) {
263     err = REC_ERR_FAIL;
264     goto Ldone;
265   }
266 
267   r->rsb_id = id; // This is the index within the RSB raw block for this stat, used for lookups by name.
268   if (i_am_the_record_owner(r->rec_type)) {
269     r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;
270   } else {
271     send_register_message(r);
272   }
273 
274   // store a pointer to our record->stat_meta.data_raw in our rsb
275   rsb->global[id]             = &(r->stat_meta.data_raw);
276   rsb->global[id]->last_sum   = 0;
277   rsb->global[id]->last_count = 0;
278 
279   // setup the periodic sync callback
280   if (sync_cb) {
281     RecRegisterRawStatSyncCb(name, sync_cb, rsb, id);
282   }
283 
284 Ldone:
285   return err;
286 }
287 
288 //-------------------------------------------------------------------------
289 // RecRawStatSync...
290 //-------------------------------------------------------------------------
291 
292 // Note: On these RecRawStatSync callbacks, our 'data' is protected
293 // under its lock by the caller, so no need to worry!
294 int
RecRawStatSyncSum(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)295 RecRawStatSyncSum(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
296 {
297   RecRawStat total;
298 
299   Debug("stats", "raw sync:sum for %s", name);
300   raw_stat_sync_to_global(rsb, id);
301   total.sum   = rsb->global[id]->sum;
302   total.count = rsb->global[id]->count;
303   RecDataSetFromInt64(data_type, data, total.sum);
304 
305   return REC_ERR_OKAY;
306 }
307 
308 int
RecRawStatSyncCount(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)309 RecRawStatSyncCount(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
310 {
311   RecRawStat total;
312 
313   Debug("stats", "raw sync:count for %s", name);
314   raw_stat_sync_to_global(rsb, id);
315   total.sum   = rsb->global[id]->sum;
316   total.count = rsb->global[id]->count;
317   RecDataSetFromInt64(data_type, data, total.count);
318 
319   return REC_ERR_OKAY;
320 }
321 
322 int
RecRawStatSyncAvg(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)323 RecRawStatSyncAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
324 {
325   RecRawStat total;
326   RecFloat avg = 0.0f;
327 
328   Debug("stats", "raw sync:avg for %s", name);
329   raw_stat_sync_to_global(rsb, id);
330   total.sum   = rsb->global[id]->sum;
331   total.count = rsb->global[id]->count;
332   if (total.count != 0) {
333     avg = static_cast<float>(static_cast<double>(total.sum) / static_cast<double>(total.count));
334   }
335   RecDataSetFromFloat(data_type, data, avg);
336   return REC_ERR_OKAY;
337 }
338 
339 int
RecRawStatSyncHrTimeAvg(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)340 RecRawStatSyncHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
341 {
342   RecRawStat total;
343   RecFloat r;
344 
345   Debug("stats", "raw sync:hr-timeavg for %s", name);
346   raw_stat_sync_to_global(rsb, id);
347   total.sum   = rsb->global[id]->sum;
348   total.count = rsb->global[id]->count;
349 
350   if (total.count == 0) {
351     r = 0.0f;
352   } else {
353     r = static_cast<float>(static_cast<double>(total.sum) / static_cast<double>(total.count));
354     r = r / static_cast<float>(HRTIME_SECOND);
355   }
356 
357   RecDataSetFromFloat(data_type, data, r);
358   return REC_ERR_OKAY;
359 }
360 
361 int
RecRawStatSyncIntMsecsToFloatSeconds(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)362 RecRawStatSyncIntMsecsToFloatSeconds(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
363 {
364   RecRawStat total;
365   RecFloat r;
366 
367   Debug("stats", "raw sync:seconds for %s", name);
368   raw_stat_sync_to_global(rsb, id);
369   total.sum   = rsb->global[id]->sum;
370   total.count = rsb->global[id]->count;
371 
372   if (total.count == 0) {
373     r = 0.0f;
374   } else {
375     r = static_cast<float>(static_cast<double>(total.sum) / 1000);
376   }
377 
378   RecDataSetFromFloat(data_type, data, r);
379   return REC_ERR_OKAY;
380 }
381 
382 //-------------------------------------------------------------------------
383 // RecSetRawStatXXX
384 //-------------------------------------------------------------------------
385 int
RecSetRawStatSum(RecRawStatBlock * rsb,int id,int64_t data)386 RecSetRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
387 {
388   raw_stat_clear_sum(rsb, id);
389   ink_atomic_swap(&(rsb->global[id]->sum), data);
390   return REC_ERR_OKAY;
391 }
392 
393 int
RecSetRawStatCount(RecRawStatBlock * rsb,int id,int64_t data)394 RecSetRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
395 {
396   raw_stat_clear_count(rsb, id);
397   ink_atomic_swap(&(rsb->global[id]->count), data);
398   return REC_ERR_OKAY;
399 }
400 
401 //-------------------------------------------------------------------------
402 // RecGetRawStatXXX
403 //-------------------------------------------------------------------------
404 
405 int
RecGetRawStatSum(RecRawStatBlock * rsb,int id,int64_t * data)406 RecGetRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
407 {
408   RecRawStat total;
409 
410   raw_stat_get_total(rsb, id, &total);
411   *data = total.sum;
412   return REC_ERR_OKAY;
413 }
414 
415 int
RecGetRawStatCount(RecRawStatBlock * rsb,int id,int64_t * data)416 RecGetRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
417 {
418   RecRawStat total;
419 
420   raw_stat_get_total(rsb, id, &total);
421   *data = total.count;
422   return REC_ERR_OKAY;
423 }
424 
425 //-------------------------------------------------------------------------
426 // RecIncrGlobalRawStatXXX
427 //-------------------------------------------------------------------------
428 int
RecIncrGlobalRawStat(RecRawStatBlock * rsb,int id,int64_t incr)429 RecIncrGlobalRawStat(RecRawStatBlock *rsb, int id, int64_t incr)
430 {
431   ink_atomic_increment(&(rsb->global[id]->sum), incr);
432   ink_atomic_increment(&(rsb->global[id]->count), 1);
433   return REC_ERR_OKAY;
434 }
435 
436 int
RecIncrGlobalRawStatSum(RecRawStatBlock * rsb,int id,int64_t incr)437 RecIncrGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t incr)
438 {
439   ink_atomic_increment(&(rsb->global[id]->sum), incr);
440   return REC_ERR_OKAY;
441 }
442 
443 int
RecIncrGlobalRawStatCount(RecRawStatBlock * rsb,int id,int64_t incr)444 RecIncrGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t incr)
445 {
446   ink_atomic_increment(&(rsb->global[id]->count), incr);
447   return REC_ERR_OKAY;
448 }
449 
450 //-------------------------------------------------------------------------
451 // RecSetGlobalRawStatXXX
452 //-------------------------------------------------------------------------
453 int
RecSetGlobalRawStatSum(RecRawStatBlock * rsb,int id,int64_t data)454 RecSetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
455 {
456   ink_atomic_swap(&(rsb->global[id]->sum), data);
457   return REC_ERR_OKAY;
458 }
459 
460 int
RecSetGlobalRawStatCount(RecRawStatBlock * rsb,int id,int64_t data)461 RecSetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
462 {
463   ink_atomic_swap(&(rsb->global[id]->count), data);
464   return REC_ERR_OKAY;
465 }
466 
467 //-------------------------------------------------------------------------
468 // RecGetGlobalRawStatXXX
469 //-------------------------------------------------------------------------
470 int
RecGetGlobalRawStatSum(RecRawStatBlock * rsb,int id,int64_t * data)471 RecGetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
472 {
473   *data = rsb->global[id]->sum;
474   return REC_ERR_OKAY;
475 }
476 
477 int
RecGetGlobalRawStatCount(RecRawStatBlock * rsb,int id,int64_t * data)478 RecGetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
479 {
480   *data = rsb->global[id]->count;
481   return REC_ERR_OKAY;
482 }
483 
484 //-------------------------------------------------------------------------
485 // RegGetGlobalRawStatXXXPtr
486 //-------------------------------------------------------------------------
487 RecRawStat *
RecGetGlobalRawStatPtr(RecRawStatBlock * rsb,int id)488 RecGetGlobalRawStatPtr(RecRawStatBlock *rsb, int id)
489 {
490   return rsb->global[id];
491 }
492 
493 int64_t *
RecGetGlobalRawStatSumPtr(RecRawStatBlock * rsb,int id)494 RecGetGlobalRawStatSumPtr(RecRawStatBlock *rsb, int id)
495 {
496   return &(rsb->global[id]->sum);
497 }
498 
499 int64_t *
RecGetGlobalRawStatCountPtr(RecRawStatBlock * rsb,int id)500 RecGetGlobalRawStatCountPtr(RecRawStatBlock *rsb, int id)
501 {
502   return &(rsb->global[id]->count);
503 }
504 
505 //-------------------------------------------------------------------------
506 // RecRegisterRawStatSyncCb
507 //-------------------------------------------------------------------------
508 int
RecRegisterRawStatSyncCb(const char * name,RecRawStatSyncCb sync_cb,RecRawStatBlock * rsb,int id)509 RecRegisterRawStatSyncCb(const char *name, RecRawStatSyncCb sync_cb, RecRawStatBlock *rsb, int id)
510 {
511   int err = REC_ERR_FAIL;
512 
513   ink_rwlock_rdlock(&g_records_rwlock);
514   if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
515     RecRecord *r = it->second;
516 
517     rec_mutex_acquire(&(r->lock));
518     if (REC_TYPE_IS_STAT(r->rec_type)) {
519       if (r->stat_meta.sync_cb) {
520         // We shouldn't register sync callbacks twice...
521         Fatal("attempted to register %s twice", name);
522       }
523 
524       RecRawStat *raw;
525 
526       r->stat_meta.sync_rsb = rsb;
527       r->stat_meta.sync_id  = id;
528       r->stat_meta.sync_cb  = sync_cb;
529 
530       raw = RecGetGlobalRawStatPtr(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
531 
532       raw->version = r->version;
533 
534       err = REC_ERR_OKAY;
535     }
536     rec_mutex_release(&(r->lock));
537   }
538 
539   ink_rwlock_unlock(&g_records_rwlock);
540 
541   return err;
542 }
543 
544 //-------------------------------------------------------------------------
545 // RecExecRawStatSyncCbs
546 //-------------------------------------------------------------------------
547 int
RecExecRawStatSyncCbs()548 RecExecRawStatSyncCbs()
549 {
550   RecRecord *r;
551   int i, num_records;
552 
553   num_records = g_num_records;
554   for (i = 0; i < num_records; i++) {
555     r = &(g_records[i]);
556     rec_mutex_acquire(&(r->lock));
557     if (REC_TYPE_IS_STAT(r->rec_type)) {
558       if (r->stat_meta.sync_cb) {
559         if (r->version && r->version != r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version) {
560           raw_stat_clear(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
561           r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
562         } else {
563           (*(r->stat_meta.sync_cb))(r->name, r->data_type, &(r->data), r->stat_meta.sync_rsb, r->stat_meta.sync_id);
564         }
565         r->sync_required = REC_SYNC_REQUIRED;
566       }
567     }
568     rec_mutex_release(&(r->lock));
569   }
570 
571   return REC_ERR_OKAY;
572 }
573 
574 int
RecRawStatUpdateSum(RecRawStatBlock * rsb,int id)575 RecRawStatUpdateSum(RecRawStatBlock *rsb, int id)
576 {
577   RecRawStat *raw = rsb->global[id];
578   if (nullptr != raw) {
579     RecRecord *r = reinterpret_cast<RecRecord *>(reinterpret_cast<char *>(raw) -
580                                                  (reinterpret_cast<char *>(&reinterpret_cast<RecRecord *>(0)->stat_meta) -
581                                                   reinterpret_cast<char *>(reinterpret_cast<RecRecord *>(0))));
582 
583     RecDataSetFromInt64(r->data_type, &r->data, rsb->global[id]->sum);
584     r->sync_required = REC_SYNC_REQUIRED;
585     return REC_ERR_OKAY;
586   }
587   return REC_ERR_FAIL;
588 }
589