1 /** @file
2
3 Record statistics support.
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "P_RecCore.h"
25 #include "P_RecProcess.h"
26 #include <string_view>
27
28 //-------------------------------------------------------------------------
29 // raw_stat_get_total
30 //-------------------------------------------------------------------------
31
32 namespace
33 {
34 // Commonly used access to a raw stat, avoid typos.
35 inline RecRawStat *
thread_stat(EThread * et,RecRawStatBlock * rsb,int id)36 thread_stat(EThread *et, RecRawStatBlock *rsb, int id)
37 {
38 return (reinterpret_cast<RecRawStat *>(reinterpret_cast<char *>(et) + rsb->ethr_stat_offset)) + id;
39 }
40 } // namespace
41
42 static int
raw_stat_get_total(RecRawStatBlock * rsb,int id,RecRawStat * total)43 raw_stat_get_total(RecRawStatBlock *rsb, int id, RecRawStat *total)
44 {
45 total->sum = 0;
46 total->count = 0;
47
48 // get global values
49 total->sum = rsb->global[id]->sum;
50 total->count = rsb->global[id]->count;
51
52 // get thread local values
53 for (EThread *et : eventProcessor.active_ethreads()) {
54 RecRawStat *tlp = thread_stat(et, rsb, id);
55 total->sum += tlp->sum;
56 total->count += tlp->count;
57 }
58
59 for (EThread *et : eventProcessor.active_dthreads()) {
60 RecRawStat *tlp = thread_stat(et, rsb, id);
61 total->sum += tlp->sum;
62 total->count += tlp->count;
63 }
64
65 if (total->sum < 0) { // Assure that we stay positive
66 total->sum = 0;
67 }
68
69 return REC_ERR_OKAY;
70 }
71
72 //-------------------------------------------------------------------------
73 // raw_stat_sync_to_global
74 //-------------------------------------------------------------------------
75 static int
raw_stat_sync_to_global(RecRawStatBlock * rsb,int id)76 raw_stat_sync_to_global(RecRawStatBlock *rsb, int id)
77 {
78 RecRawStat total;
79
80 total.sum = 0;
81 total.count = 0;
82
83 // sum the thread local values
84 for (EThread *et : eventProcessor.active_ethreads()) {
85 RecRawStat *tlp = thread_stat(et, rsb, id);
86 total.sum += tlp->sum;
87 total.count += tlp->count;
88 }
89
90 for (EThread *et : eventProcessor.active_dthreads()) {
91 RecRawStat *tlp = thread_stat(et, rsb, id);
92 total.sum += tlp->sum;
93 total.count += tlp->count;
94 }
95
96 if (total.sum < 0) { // Assure that we stay positive
97 total.sum = 0;
98 }
99
100 // lock so the setting of the globals and last values are atomic
101 {
102 ink_scoped_mutex_lock lock(rsb->mutex);
103
104 // get the delta from the last sync
105 RecRawStat delta;
106 delta.sum = total.sum - rsb->global[id]->last_sum;
107 delta.count = total.count - rsb->global[id]->last_count;
108
109 // increment the global values by the delta
110 ink_atomic_increment(&(rsb->global[id]->sum), delta.sum);
111 ink_atomic_increment(&(rsb->global[id]->count), delta.count);
112
113 // set the new totals as the last values seen
114 ink_atomic_swap(&(rsb->global[id]->last_sum), total.sum);
115 ink_atomic_swap(&(rsb->global[id]->last_count), total.count);
116 }
117
118 return REC_ERR_OKAY;
119 }
120
121 //-------------------------------------------------------------------------
122 // raw_stat_clear
123 //-------------------------------------------------------------------------
124 static int
raw_stat_clear(RecRawStatBlock * rsb,int id)125 raw_stat_clear(RecRawStatBlock *rsb, int id)
126 {
127 Debug("stats", "raw_stat_clear(): rsb pointer:%p id:%d", rsb, id);
128
129 // the globals need to be reset too
130 // lock so the setting of the globals and last values are atomic
131 {
132 ink_scoped_mutex_lock lock(rsb->mutex);
133 ink_atomic_swap(&(rsb->global[id]->sum), static_cast<int64_t>(0));
134 ink_atomic_swap(&(rsb->global[id]->last_sum), static_cast<int64_t>(0));
135 ink_atomic_swap(&(rsb->global[id]->count), static_cast<int64_t>(0));
136 ink_atomic_swap(&(rsb->global[id]->last_count), static_cast<int64_t>(0));
137 }
138 // reset the local stats
139 for (EThread *et : eventProcessor.active_ethreads()) {
140 RecRawStat *tlp = thread_stat(et, rsb, id);
141 ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
142 ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
143 }
144
145 for (EThread *et : eventProcessor.active_dthreads()) {
146 RecRawStat *tlp = thread_stat(et, rsb, id);
147 ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
148 ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
149 }
150
151 return REC_ERR_OKAY;
152 }
153
154 //-------------------------------------------------------------------------
155 // raw_stat_clear_sum
156 //-------------------------------------------------------------------------
157 static int
raw_stat_clear_sum(RecRawStatBlock * rsb,int id)158 raw_stat_clear_sum(RecRawStatBlock *rsb, int id)
159 {
160 Debug("stats", "raw_stat_clear_sum(): rsb pointer:%p id:%d", rsb, id);
161
162 // the globals need to be reset too
163 // lock so the setting of the globals and last values are atomic
164 {
165 ink_scoped_mutex_lock lock(rsb->mutex);
166 ink_atomic_swap(&(rsb->global[id]->sum), static_cast<int64_t>(0));
167 ink_atomic_swap(&(rsb->global[id]->last_sum), static_cast<int64_t>(0));
168 }
169
170 // reset the local stats
171 for (EThread *et : eventProcessor.active_ethreads()) {
172 RecRawStat *tlp = thread_stat(et, rsb, id);
173 ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
174 }
175
176 for (EThread *et : eventProcessor.active_dthreads()) {
177 RecRawStat *tlp = thread_stat(et, rsb, id);
178 ink_atomic_swap(&(tlp->sum), static_cast<int64_t>(0));
179 }
180
181 return REC_ERR_OKAY;
182 }
183
184 //-------------------------------------------------------------------------
185 // raw_stat_clear_count
186 //-------------------------------------------------------------------------
187 static int
raw_stat_clear_count(RecRawStatBlock * rsb,int id)188 raw_stat_clear_count(RecRawStatBlock *rsb, int id)
189 {
190 Debug("stats", "raw_stat_clear_count(): rsb pointer:%p id:%d", rsb, id);
191
192 // the globals need to be reset too
193 // lock so the setting of the globals and last values are atomic
194 {
195 ink_scoped_mutex_lock lock(rsb->mutex);
196 ink_atomic_swap(&(rsb->global[id]->count), static_cast<int64_t>(0));
197 ink_atomic_swap(&(rsb->global[id]->last_count), static_cast<int64_t>(0));
198 }
199
200 // reset the local stats
201 for (EThread *et : eventProcessor.active_ethreads()) {
202 RecRawStat *tlp = thread_stat(et, rsb, id);
203 ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
204 }
205
206 for (EThread *et : eventProcessor.active_dthreads()) {
207 RecRawStat *tlp = thread_stat(et, rsb, id);
208 ink_atomic_swap(&(tlp->count), static_cast<int64_t>(0));
209 }
210
211 return REC_ERR_OKAY;
212 }
213
214 //-------------------------------------------------------------------------
215 // RecAllocateRawStatBlock
216 //-------------------------------------------------------------------------
217 RecRawStatBlock *
RecAllocateRawStatBlock(int num_stats)218 RecAllocateRawStatBlock(int num_stats)
219 {
220 off_t ethr_stat_offset;
221 RecRawStatBlock *rsb;
222
223 // allocate thread-local raw-stat memory
224 if ((ethr_stat_offset = eventProcessor.allocate(num_stats * sizeof(RecRawStat))) == -1) {
225 return nullptr;
226 }
227
228 // create the raw-stat-block structure
229 rsb = static_cast<RecRawStatBlock *>(ats_malloc(sizeof(RecRawStatBlock)));
230 memset(rsb, 0, sizeof(RecRawStatBlock));
231
232 rsb->global = static_cast<RecRawStat **>(ats_malloc(num_stats * sizeof(RecRawStat *)));
233 memset(rsb->global, 0, num_stats * sizeof(RecRawStat *));
234
235 rsb->num_stats = 0;
236 rsb->max_stats = num_stats;
237 rsb->ethr_stat_offset = ethr_stat_offset;
238
239 ink_mutex_init(&(rsb->mutex));
240 return rsb;
241 }
242
243 //-------------------------------------------------------------------------
244 // RecRegisterRawStat
245 //-------------------------------------------------------------------------
246 int
_RecRegisterRawStat(RecRawStatBlock * rsb,RecT rec_type,const char * name,RecDataT data_type,RecPersistT persist_type,int id,RecRawStatSyncCb sync_cb)247 _RecRegisterRawStat(RecRawStatBlock *rsb, RecT rec_type, const char *name, RecDataT data_type, RecPersistT persist_type, int id,
248 RecRawStatSyncCb sync_cb)
249 {
250 Debug("stats", "RecRawStatSyncCb(%s): rsb pointer:%p id:%d", name, rsb, id);
251
252 // check to see if we're good to proceed
253 ink_assert(id < rsb->max_stats);
254
255 int err = REC_ERR_OKAY;
256
257 RecRecord *r;
258 RecData data_default;
259 memset(&data_default, 0, sizeof(RecData));
260
261 // register the record
262 if ((r = RecRegisterStat(rec_type, name, data_type, data_default, persist_type)) == nullptr) {
263 err = REC_ERR_FAIL;
264 goto Ldone;
265 }
266
267 r->rsb_id = id; // This is the index within the RSB raw block for this stat, used for lookups by name.
268 if (i_am_the_record_owner(r->rec_type)) {
269 r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;
270 } else {
271 send_register_message(r);
272 }
273
274 // store a pointer to our record->stat_meta.data_raw in our rsb
275 rsb->global[id] = &(r->stat_meta.data_raw);
276 rsb->global[id]->last_sum = 0;
277 rsb->global[id]->last_count = 0;
278
279 // setup the periodic sync callback
280 if (sync_cb) {
281 RecRegisterRawStatSyncCb(name, sync_cb, rsb, id);
282 }
283
284 Ldone:
285 return err;
286 }
287
288 //-------------------------------------------------------------------------
289 // RecRawStatSync...
290 //-------------------------------------------------------------------------
291
292 // Note: On these RecRawStatSync callbacks, our 'data' is protected
293 // under its lock by the caller, so no need to worry!
294 int
RecRawStatSyncSum(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)295 RecRawStatSyncSum(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
296 {
297 RecRawStat total;
298
299 Debug("stats", "raw sync:sum for %s", name);
300 raw_stat_sync_to_global(rsb, id);
301 total.sum = rsb->global[id]->sum;
302 total.count = rsb->global[id]->count;
303 RecDataSetFromInt64(data_type, data, total.sum);
304
305 return REC_ERR_OKAY;
306 }
307
308 int
RecRawStatSyncCount(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)309 RecRawStatSyncCount(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
310 {
311 RecRawStat total;
312
313 Debug("stats", "raw sync:count for %s", name);
314 raw_stat_sync_to_global(rsb, id);
315 total.sum = rsb->global[id]->sum;
316 total.count = rsb->global[id]->count;
317 RecDataSetFromInt64(data_type, data, total.count);
318
319 return REC_ERR_OKAY;
320 }
321
322 int
RecRawStatSyncAvg(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)323 RecRawStatSyncAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
324 {
325 RecRawStat total;
326 RecFloat avg = 0.0f;
327
328 Debug("stats", "raw sync:avg for %s", name);
329 raw_stat_sync_to_global(rsb, id);
330 total.sum = rsb->global[id]->sum;
331 total.count = rsb->global[id]->count;
332 if (total.count != 0) {
333 avg = static_cast<float>(static_cast<double>(total.sum) / static_cast<double>(total.count));
334 }
335 RecDataSetFromFloat(data_type, data, avg);
336 return REC_ERR_OKAY;
337 }
338
339 int
RecRawStatSyncHrTimeAvg(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)340 RecRawStatSyncHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
341 {
342 RecRawStat total;
343 RecFloat r;
344
345 Debug("stats", "raw sync:hr-timeavg for %s", name);
346 raw_stat_sync_to_global(rsb, id);
347 total.sum = rsb->global[id]->sum;
348 total.count = rsb->global[id]->count;
349
350 if (total.count == 0) {
351 r = 0.0f;
352 } else {
353 r = static_cast<float>(static_cast<double>(total.sum) / static_cast<double>(total.count));
354 r = r / static_cast<float>(HRTIME_SECOND);
355 }
356
357 RecDataSetFromFloat(data_type, data, r);
358 return REC_ERR_OKAY;
359 }
360
361 int
RecRawStatSyncIntMsecsToFloatSeconds(const char * name,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)362 RecRawStatSyncIntMsecsToFloatSeconds(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
363 {
364 RecRawStat total;
365 RecFloat r;
366
367 Debug("stats", "raw sync:seconds for %s", name);
368 raw_stat_sync_to_global(rsb, id);
369 total.sum = rsb->global[id]->sum;
370 total.count = rsb->global[id]->count;
371
372 if (total.count == 0) {
373 r = 0.0f;
374 } else {
375 r = static_cast<float>(static_cast<double>(total.sum) / 1000);
376 }
377
378 RecDataSetFromFloat(data_type, data, r);
379 return REC_ERR_OKAY;
380 }
381
382 //-------------------------------------------------------------------------
383 // RecSetRawStatXXX
384 //-------------------------------------------------------------------------
385 int
RecSetRawStatSum(RecRawStatBlock * rsb,int id,int64_t data)386 RecSetRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
387 {
388 raw_stat_clear_sum(rsb, id);
389 ink_atomic_swap(&(rsb->global[id]->sum), data);
390 return REC_ERR_OKAY;
391 }
392
393 int
RecSetRawStatCount(RecRawStatBlock * rsb,int id,int64_t data)394 RecSetRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
395 {
396 raw_stat_clear_count(rsb, id);
397 ink_atomic_swap(&(rsb->global[id]->count), data);
398 return REC_ERR_OKAY;
399 }
400
401 //-------------------------------------------------------------------------
402 // RecGetRawStatXXX
403 //-------------------------------------------------------------------------
404
405 int
RecGetRawStatSum(RecRawStatBlock * rsb,int id,int64_t * data)406 RecGetRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
407 {
408 RecRawStat total;
409
410 raw_stat_get_total(rsb, id, &total);
411 *data = total.sum;
412 return REC_ERR_OKAY;
413 }
414
415 int
RecGetRawStatCount(RecRawStatBlock * rsb,int id,int64_t * data)416 RecGetRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
417 {
418 RecRawStat total;
419
420 raw_stat_get_total(rsb, id, &total);
421 *data = total.count;
422 return REC_ERR_OKAY;
423 }
424
425 //-------------------------------------------------------------------------
426 // RecIncrGlobalRawStatXXX
427 //-------------------------------------------------------------------------
428 int
RecIncrGlobalRawStat(RecRawStatBlock * rsb,int id,int64_t incr)429 RecIncrGlobalRawStat(RecRawStatBlock *rsb, int id, int64_t incr)
430 {
431 ink_atomic_increment(&(rsb->global[id]->sum), incr);
432 ink_atomic_increment(&(rsb->global[id]->count), 1);
433 return REC_ERR_OKAY;
434 }
435
436 int
RecIncrGlobalRawStatSum(RecRawStatBlock * rsb,int id,int64_t incr)437 RecIncrGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t incr)
438 {
439 ink_atomic_increment(&(rsb->global[id]->sum), incr);
440 return REC_ERR_OKAY;
441 }
442
443 int
RecIncrGlobalRawStatCount(RecRawStatBlock * rsb,int id,int64_t incr)444 RecIncrGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t incr)
445 {
446 ink_atomic_increment(&(rsb->global[id]->count), incr);
447 return REC_ERR_OKAY;
448 }
449
450 //-------------------------------------------------------------------------
451 // RecSetGlobalRawStatXXX
452 //-------------------------------------------------------------------------
453 int
RecSetGlobalRawStatSum(RecRawStatBlock * rsb,int id,int64_t data)454 RecSetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
455 {
456 ink_atomic_swap(&(rsb->global[id]->sum), data);
457 return REC_ERR_OKAY;
458 }
459
460 int
RecSetGlobalRawStatCount(RecRawStatBlock * rsb,int id,int64_t data)461 RecSetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
462 {
463 ink_atomic_swap(&(rsb->global[id]->count), data);
464 return REC_ERR_OKAY;
465 }
466
467 //-------------------------------------------------------------------------
468 // RecGetGlobalRawStatXXX
469 //-------------------------------------------------------------------------
470 int
RecGetGlobalRawStatSum(RecRawStatBlock * rsb,int id,int64_t * data)471 RecGetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
472 {
473 *data = rsb->global[id]->sum;
474 return REC_ERR_OKAY;
475 }
476
477 int
RecGetGlobalRawStatCount(RecRawStatBlock * rsb,int id,int64_t * data)478 RecGetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
479 {
480 *data = rsb->global[id]->count;
481 return REC_ERR_OKAY;
482 }
483
484 //-------------------------------------------------------------------------
485 // RegGetGlobalRawStatXXXPtr
486 //-------------------------------------------------------------------------
487 RecRawStat *
RecGetGlobalRawStatPtr(RecRawStatBlock * rsb,int id)488 RecGetGlobalRawStatPtr(RecRawStatBlock *rsb, int id)
489 {
490 return rsb->global[id];
491 }
492
493 int64_t *
RecGetGlobalRawStatSumPtr(RecRawStatBlock * rsb,int id)494 RecGetGlobalRawStatSumPtr(RecRawStatBlock *rsb, int id)
495 {
496 return &(rsb->global[id]->sum);
497 }
498
499 int64_t *
RecGetGlobalRawStatCountPtr(RecRawStatBlock * rsb,int id)500 RecGetGlobalRawStatCountPtr(RecRawStatBlock *rsb, int id)
501 {
502 return &(rsb->global[id]->count);
503 }
504
505 //-------------------------------------------------------------------------
506 // RecRegisterRawStatSyncCb
507 //-------------------------------------------------------------------------
508 int
RecRegisterRawStatSyncCb(const char * name,RecRawStatSyncCb sync_cb,RecRawStatBlock * rsb,int id)509 RecRegisterRawStatSyncCb(const char *name, RecRawStatSyncCb sync_cb, RecRawStatBlock *rsb, int id)
510 {
511 int err = REC_ERR_FAIL;
512
513 ink_rwlock_rdlock(&g_records_rwlock);
514 if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
515 RecRecord *r = it->second;
516
517 rec_mutex_acquire(&(r->lock));
518 if (REC_TYPE_IS_STAT(r->rec_type)) {
519 if (r->stat_meta.sync_cb) {
520 // We shouldn't register sync callbacks twice...
521 Fatal("attempted to register %s twice", name);
522 }
523
524 RecRawStat *raw;
525
526 r->stat_meta.sync_rsb = rsb;
527 r->stat_meta.sync_id = id;
528 r->stat_meta.sync_cb = sync_cb;
529
530 raw = RecGetGlobalRawStatPtr(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
531
532 raw->version = r->version;
533
534 err = REC_ERR_OKAY;
535 }
536 rec_mutex_release(&(r->lock));
537 }
538
539 ink_rwlock_unlock(&g_records_rwlock);
540
541 return err;
542 }
543
544 //-------------------------------------------------------------------------
545 // RecExecRawStatSyncCbs
546 //-------------------------------------------------------------------------
547 int
RecExecRawStatSyncCbs()548 RecExecRawStatSyncCbs()
549 {
550 RecRecord *r;
551 int i, num_records;
552
553 num_records = g_num_records;
554 for (i = 0; i < num_records; i++) {
555 r = &(g_records[i]);
556 rec_mutex_acquire(&(r->lock));
557 if (REC_TYPE_IS_STAT(r->rec_type)) {
558 if (r->stat_meta.sync_cb) {
559 if (r->version && r->version != r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version) {
560 raw_stat_clear(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
561 r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
562 } else {
563 (*(r->stat_meta.sync_cb))(r->name, r->data_type, &(r->data), r->stat_meta.sync_rsb, r->stat_meta.sync_id);
564 }
565 r->sync_required = REC_SYNC_REQUIRED;
566 }
567 }
568 rec_mutex_release(&(r->lock));
569 }
570
571 return REC_ERR_OKAY;
572 }
573
574 int
RecRawStatUpdateSum(RecRawStatBlock * rsb,int id)575 RecRawStatUpdateSum(RecRawStatBlock *rsb, int id)
576 {
577 RecRawStat *raw = rsb->global[id];
578 if (nullptr != raw) {
579 RecRecord *r = reinterpret_cast<RecRecord *>(reinterpret_cast<char *>(raw) -
580 (reinterpret_cast<char *>(&reinterpret_cast<RecRecord *>(0)->stat_meta) -
581 reinterpret_cast<char *>(reinterpret_cast<RecRecord *>(0))));
582
583 RecDataSetFromInt64(r->data_type, &r->data, rsb->global[id]->sum);
584 r->sync_required = REC_SYNC_REQUIRED;
585 return REC_ERR_OKAY;
586 }
587 return REC_ERR_FAIL;
588 }
589