1 /** @file
2
3 Private record core definitions
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "tscore/ink_platform.h"
25 #include "tscore/ink_memory.h"
26
27 #include "tscore/TextBuffer.h"
28 #include "tscore/Tokenizer.h"
29 #include "tscore/ink_defs.h"
30 #include "tscore/ink_string.h"
31
32 #include "P_RecFile.h"
33 #include "P_RecUtils.h"
34 #include "P_RecMessage.h"
35 #include "P_RecCore.h"
36
37 #include <fstream>
38
39 RecModeT g_mode_type = RECM_NULL;
40
41 //-------------------------------------------------------------------------
42 // send_reset_message
43 //-------------------------------------------------------------------------
44 static RecErrT
send_reset_message(RecRecord * record)45 send_reset_message(RecRecord *record)
46 {
47 RecMessage *m;
48
49 rec_mutex_acquire(&(record->lock));
50 m = RecMessageAlloc(RECG_RESET);
51 m = RecMessageMarshal_Realloc(m, record);
52 RecDebug(DL_Note, "[send] RECG_RESET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
53 RecMessageSend(m);
54 RecMessageFree(m);
55 rec_mutex_release(&(record->lock));
56
57 return REC_ERR_OKAY;
58 }
59
60 //-------------------------------------------------------------------------
61 // send_set_message
62 //-------------------------------------------------------------------------
63 static RecErrT
send_set_message(RecRecord * record)64 send_set_message(RecRecord *record)
65 {
66 RecMessage *m;
67
68 rec_mutex_acquire(&(record->lock));
69 m = RecMessageAlloc(RECG_SET);
70 m = RecMessageMarshal_Realloc(m, record);
71 RecDebug(DL_Note, "[send] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
72 RecMessageSend(m);
73 RecMessageFree(m);
74 rec_mutex_release(&(record->lock));
75
76 return REC_ERR_OKAY;
77 }
78
79 //-------------------------------------------------------------------------
80 // send_register_message
81 //-------------------------------------------------------------------------
82 RecErrT
send_register_message(RecRecord * record)83 send_register_message(RecRecord *record)
84 {
85 RecMessage *m;
86
87 rec_mutex_acquire(&(record->lock));
88 m = RecMessageAlloc(RECG_REGISTER);
89 m = RecMessageMarshal_Realloc(m, record);
90 RecDebug(DL_Note, "[send] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
91 RecMessageSend(m);
92 RecMessageFree(m);
93 rec_mutex_release(&(record->lock));
94
95 return REC_ERR_OKAY;
96 }
97
98 //-------------------------------------------------------------------------
99 // send_push_message
100 //-------------------------------------------------------------------------
101 RecErrT
send_push_message()102 send_push_message()
103 {
104 RecRecord *r;
105 RecMessage *m;
106 int i, num_records;
107 bool send_msg = false;
108
109 m = RecMessageAlloc(RECG_PUSH);
110 num_records = g_num_records;
111 for (i = 0; i < num_records; i++) {
112 r = &(g_records[i]);
113 rec_mutex_acquire(&(r->lock));
114 if (i_am_the_record_owner(r->rec_type)) {
115 if (r->sync_required & REC_PEER_SYNC_REQUIRED) {
116 m = RecMessageMarshal_Realloc(m, r);
117 r->sync_required &= ~REC_PEER_SYNC_REQUIRED;
118 send_msg = true;
119 }
120 }
121 rec_mutex_release(&(r->lock));
122 }
123 if (send_msg) {
124 RecDebug(DL_Note, "[send] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
125 RecMessageSend(m);
126 }
127 RecMessageFree(m);
128
129 return REC_ERR_OKAY;
130 }
131
132 //-------------------------------------------------------------------------
133 // send_pull_message
134 //-------------------------------------------------------------------------
135 RecErrT
send_pull_message(RecMessageT msg_type)136 send_pull_message(RecMessageT msg_type)
137 {
138 RecRecord *r;
139 RecMessage *m;
140 int i, num_records;
141
142 m = RecMessageAlloc(msg_type);
143 switch (msg_type) {
144 case RECG_PULL_REQ:
145 // We're requesting all of the records from our peer. No payload
146 // here, just send the message.
147 RecDebug(DL_Note, "[send] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
148 break;
149
150 case RECG_PULL_ACK:
151 // Respond to a RECG_PULL_REQ message from our peer. Send ALL
152 // records! Also be sure to send a response even if it has no
153 // payload. Our peer may be blocking and waiting for a response!
154 num_records = g_num_records;
155 for (i = 0; i < num_records; i++) {
156 r = &(g_records[i]);
157 if (i_am_the_record_owner(r->rec_type) || (REC_TYPE_IS_STAT(r->rec_type) && !(r->registered)) ||
158 (REC_TYPE_IS_STAT(r->rec_type) && (r->stat_meta.persist_type == RECP_NON_PERSISTENT))) {
159 rec_mutex_acquire(&(r->lock));
160 m = RecMessageMarshal_Realloc(m, r);
161 r->sync_required &= ~REC_PEER_SYNC_REQUIRED;
162 rec_mutex_release(&(r->lock));
163 }
164 }
165 RecDebug(DL_Note, "[send] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
166 break;
167
168 default:
169 RecMessageFree(m);
170 return REC_ERR_FAIL;
171 }
172
173 RecMessageSend(m);
174 RecMessageFree(m);
175
176 return REC_ERR_OKAY;
177 }
178
179 //-------------------------------------------------------------------------
180 // recv_message_cb
181 //-------------------------------------------------------------------------
182 RecErrT
recv_message_cb(RecMessage * msg,RecMessageT msg_type,void *)183 recv_message_cb(RecMessage *msg, RecMessageT msg_type, void * /* cookie */)
184 {
185 RecRecord *r;
186 RecMessageItr itr;
187
188 switch (msg_type) {
189 case RECG_SET:
190
191 RecDebug(DL_Note, "[recv] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
192 if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
193 do {
194 if (REC_TYPE_IS_STAT(r->rec_type)) {
195 RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), REC_SOURCE_EXPLICIT);
196 } else {
197 RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), nullptr, REC_SOURCE_EXPLICIT);
198 }
199 } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
200 }
201 break;
202
203 case RECG_RESET:
204
205 RecDebug(DL_Note, "[recv] RECG_RESET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
206 if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
207 do {
208 if (REC_TYPE_IS_STAT(r->rec_type)) {
209 RecResetStatRecord(r->name);
210 } else {
211 RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), nullptr, REC_SOURCE_EXPLICIT);
212 }
213 } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
214 }
215 break;
216
217 case RECG_REGISTER:
218 RecDebug(DL_Note, "[recv] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
219 if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
220 do {
221 if (REC_TYPE_IS_STAT(r->rec_type)) {
222 RecRegisterStat(r->rec_type, r->name, r->data_type, r->data_default, r->stat_meta.persist_type);
223 } else if (REC_TYPE_IS_CONFIG(r->rec_type)) {
224 RecRegisterConfig(r->rec_type, r->name, r->data_type, r->data_default, r->config_meta.update_type,
225 r->config_meta.check_type, r->config_meta.check_expr, r->config_meta.source,
226 r->config_meta.access_type);
227 }
228 } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
229 }
230 break;
231
232 case RECG_PUSH:
233 RecDebug(DL_Note, "[recv] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
234 if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
235 do {
236 RecForceInsert(r);
237 } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
238 }
239 break;
240
241 case RECG_PULL_ACK:
242 RecDebug(DL_Note, "[recv] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
243 if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
244 do {
245 RecForceInsert(r);
246 } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
247 }
248 break;
249
250 case RECG_PULL_REQ:
251 RecDebug(DL_Note, "[recv] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
252 send_pull_message(RECG_PULL_ACK);
253 break;
254
255 default:
256 ink_assert(!"Unexpected RecG type");
257 return REC_ERR_FAIL;
258 }
259
260 return REC_ERR_OKAY;
261 }
262
263 //-------------------------------------------------------------------------
264 // RecRegisterStatXXX
265 //-------------------------------------------------------------------------
266 #define REC_REGISTER_STAT_XXX(A, B) \
267 ink_assert((rec_type == RECT_NODE) || (rec_type == RECT_PROCESS) || (rec_type == RECT_LOCAL) || (rec_type == RECT_PLUGIN)); \
268 RecRecord *r; \
269 RecData my_data_default; \
270 my_data_default.A = data_default; \
271 if ((r = RecRegisterStat(rec_type, name, B, my_data_default, persist_type)) != nullptr) { \
272 if (i_am_the_record_owner(r->rec_type)) { \
273 r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
274 } else { \
275 send_register_message(r); \
276 } \
277 return REC_ERR_OKAY; \
278 } else { \
279 return REC_ERR_FAIL; \
280 }
281
282 RecErrT
_RecRegisterStatInt(RecT rec_type,const char * name,RecInt data_default,RecPersistT persist_type)283 _RecRegisterStatInt(RecT rec_type, const char *name, RecInt data_default, RecPersistT persist_type)
284 {
285 REC_REGISTER_STAT_XXX(rec_int, RECD_INT);
286 }
287
288 RecErrT
_RecRegisterStatFloat(RecT rec_type,const char * name,RecFloat data_default,RecPersistT persist_type)289 _RecRegisterStatFloat(RecT rec_type, const char *name, RecFloat data_default, RecPersistT persist_type)
290 {
291 REC_REGISTER_STAT_XXX(rec_float, RECD_FLOAT);
292 }
293
294 RecErrT
_RecRegisterStatString(RecT rec_type,const char * name,RecString data_default,RecPersistT persist_type)295 _RecRegisterStatString(RecT rec_type, const char *name, RecString data_default, RecPersistT persist_type)
296 {
297 REC_REGISTER_STAT_XXX(rec_string, RECD_STRING);
298 }
299
300 RecErrT
_RecRegisterStatCounter(RecT rec_type,const char * name,RecCounter data_default,RecPersistT persist_type)301 _RecRegisterStatCounter(RecT rec_type, const char *name, RecCounter data_default, RecPersistT persist_type)
302 {
303 REC_REGISTER_STAT_XXX(rec_counter, RECD_COUNTER);
304 }
305
306 //-------------------------------------------------------------------------
307 // RecRegisterConfigXXX
308 //-------------------------------------------------------------------------
309 #define REC_REGISTER_CONFIG_XXX(A, B) \
310 RecRecord *r; \
311 RecData my_data_default; \
312 my_data_default.A = data_default; \
313 if ((r = RecRegisterConfig(rec_type, name, B, my_data_default, update_type, check_type, check_regex, source, access_type)) != \
314 nullptr) { \
315 if (i_am_the_record_owner(r->rec_type)) { \
316 r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
317 } else { \
318 send_register_message(r); \
319 } \
320 return REC_ERR_OKAY; \
321 } else { \
322 return REC_ERR_FAIL; \
323 }
324
325 RecErrT
RecRegisterConfigInt(RecT rec_type,const char * name,RecInt data_default,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)326 RecRegisterConfigInt(RecT rec_type, const char *name, RecInt data_default, RecUpdateT update_type, RecCheckT check_type,
327 const char *check_regex, RecSourceT source, RecAccessT access_type)
328 {
329 ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
330 REC_REGISTER_CONFIG_XXX(rec_int, RECD_INT);
331 }
332
333 RecErrT
RecRegisterConfigFloat(RecT rec_type,const char * name,RecFloat data_default,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)334 RecRegisterConfigFloat(RecT rec_type, const char *name, RecFloat data_default, RecUpdateT update_type, RecCheckT check_type,
335 const char *check_regex, RecSourceT source, RecAccessT access_type)
336 {
337 ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
338 REC_REGISTER_CONFIG_XXX(rec_float, RECD_FLOAT);
339 }
340
341 RecErrT
RecRegisterConfigString(RecT rec_type,const char * name,const char * data_default_tmp,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)342 RecRegisterConfigString(RecT rec_type, const char *name, const char *data_default_tmp, RecUpdateT update_type, RecCheckT check_type,
343 const char *check_regex, RecSourceT source, RecAccessT access_type)
344 {
345 RecString data_default = const_cast<RecString>(data_default_tmp);
346 ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
347 REC_REGISTER_CONFIG_XXX(rec_string, RECD_STRING);
348 }
349
350 RecErrT
RecRegisterConfigCounter(RecT rec_type,const char * name,RecCounter data_default,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)351 RecRegisterConfigCounter(RecT rec_type, const char *name, RecCounter data_default, RecUpdateT update_type, RecCheckT check_type,
352 const char *check_regex, RecSourceT source, RecAccessT access_type)
353 {
354 ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
355 REC_REGISTER_CONFIG_XXX(rec_counter, RECD_COUNTER);
356 }
357
358 //-------------------------------------------------------------------------
359 // RecSetRecordXXX
360 //-------------------------------------------------------------------------
361 RecErrT
RecSetRecord(RecT rec_type,const char * name,RecDataT data_type,RecData * data,RecRawStat * data_raw,RecSourceT source,bool lock)362 RecSetRecord(RecT rec_type, const char *name, RecDataT data_type, RecData *data, RecRawStat *data_raw, RecSourceT source, bool lock)
363 {
364 RecErrT err = REC_ERR_OKAY;
365 RecRecord *r1;
366
367 // FIXME: Most of the time we set, we don't actually need to wrlock
368 // since we are not modifying the g_records_ht.
369 if (lock) {
370 ink_rwlock_wrlock(&g_records_rwlock);
371 }
372 if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
373 r1 = it->second;
374 if (i_am_the_record_owner(r1->rec_type)) {
375 rec_mutex_acquire(&(r1->lock));
376 if ((data_type != RECD_NULL) && (r1->data_type != data_type)) {
377 err = REC_ERR_FAIL;
378 } else {
379 bool rec_updated_p = false;
380 if (data_type == RECD_NULL) {
381 // If the caller didn't know the data type, they gave us a string
382 // and we should convert based on the record's data type.
383 ink_release_assert(data->rec_string != nullptr);
384 rec_updated_p = RecDataSetFromString(r1->data_type, &(r1->data), data->rec_string);
385 } else {
386 rec_updated_p = RecDataSet(data_type, &(r1->data), data);
387 }
388
389 if (rec_updated_p) {
390 r1->sync_required = REC_SYNC_REQUIRED;
391 if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
392 r1->config_meta.update_required = REC_UPDATE_REQUIRED;
393 }
394 }
395
396 if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != nullptr)) {
397 r1->stat_meta.data_raw = *data_raw;
398 } else if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
399 r1->config_meta.source = source;
400 }
401 }
402 rec_mutex_release(&(r1->lock));
403 } else {
404 // We don't need to ats_strdup() here as we will make copies of any
405 // strings when we marshal them into our RecMessage buffer.
406 RecRecord r2;
407
408 RecRecordInit(&r2);
409 r2.rec_type = rec_type;
410 r2.name = name;
411 r2.data_type = (data_type != RECD_NULL) ? data_type : r1->data_type;
412 r2.data = *data;
413 if (REC_TYPE_IS_STAT(r2.rec_type) && (data_raw != nullptr)) {
414 r2.stat_meta.data_raw = *data_raw;
415 } else if (REC_TYPE_IS_CONFIG(r2.rec_type)) {
416 r2.config_meta.source = source;
417 }
418 err = send_set_message(&r2);
419 RecRecordFree(&r2);
420 }
421 } else {
422 // Add the record but do not set the 'registered' flag, as this
423 // record really hasn't been registered yet. Also, in order to
424 // add the record, we need to have a rec_type, so if the user
425 // calls RecSetRecord on a record we haven't registered yet, we
426 // should fail out here.
427 if ((rec_type == RECT_NULL) || (data_type == RECD_NULL)) {
428 err = REC_ERR_FAIL;
429 goto Ldone;
430 }
431 r1 = RecAlloc(rec_type, name, data_type);
432 RecDataSet(data_type, &(r1->data), data);
433 if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != nullptr)) {
434 r1->stat_meta.data_raw = *data_raw;
435 } else if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
436 r1->config_meta.source = source;
437 }
438 if (i_am_the_record_owner(r1->rec_type)) {
439 r1->sync_required = r1->sync_required | REC_PEER_SYNC_REQUIRED;
440 } else {
441 err = send_set_message(r1);
442 }
443 g_records_ht.emplace(name, r1);
444 }
445
446 Ldone:
447 if (lock) {
448 ink_rwlock_unlock(&g_records_rwlock);
449 }
450
451 return err;
452 }
453
454 RecErrT
RecSetRecordConvert(const char * name,const RecString rec_string,RecSourceT source,bool lock)455 RecSetRecordConvert(const char *name, const RecString rec_string, RecSourceT source, bool lock)
456 {
457 RecData data;
458 data.rec_string = rec_string;
459 return RecSetRecord(RECT_NULL, name, RECD_NULL, &data, nullptr, source, lock);
460 }
461
462 RecErrT
RecSetRecordInt(const char * name,RecInt rec_int,RecSourceT source,bool lock)463 RecSetRecordInt(const char *name, RecInt rec_int, RecSourceT source, bool lock)
464 {
465 RecData data;
466 data.rec_int = rec_int;
467 return RecSetRecord(RECT_NULL, name, RECD_INT, &data, nullptr, source, lock);
468 }
469
470 RecErrT
RecSetRecordFloat(const char * name,RecFloat rec_float,RecSourceT source,bool lock)471 RecSetRecordFloat(const char *name, RecFloat rec_float, RecSourceT source, bool lock)
472 {
473 RecData data;
474 data.rec_float = rec_float;
475 return RecSetRecord(RECT_NULL, name, RECD_FLOAT, &data, nullptr, source, lock);
476 }
477
478 RecErrT
RecSetRecordString(const char * name,const RecString rec_string,RecSourceT source,bool lock)479 RecSetRecordString(const char *name, const RecString rec_string, RecSourceT source, bool lock)
480 {
481 RecData data;
482 data.rec_string = rec_string;
483 return RecSetRecord(RECT_NULL, name, RECD_STRING, &data, nullptr, source, lock);
484 }
485
486 RecErrT
RecSetRecordCounter(const char * name,RecCounter rec_counter,RecSourceT source,bool lock)487 RecSetRecordCounter(const char *name, RecCounter rec_counter, RecSourceT source, bool lock)
488 {
489 RecData data;
490 data.rec_counter = rec_counter;
491 return RecSetRecord(RECT_NULL, name, RECD_COUNTER, &data, nullptr, source, lock);
492 }
493
494 // check the version of the snap file to remove records.snap or not
495 static void
CheckSnapFileVersion(const char * path)496 CheckSnapFileVersion(const char *path)
497 {
498 std::ifstream f(path, std::ios::binary);
499 if (f.good()) {
500 // get version, compare and remove
501 char data[VERSION_HDR_SIZE];
502 if (!f.read(data, VERSION_HDR_SIZE)) {
503 return;
504 }
505 if (data[0] != 'V' || data[1] != PACKAGE_VERSION[0] || data[2] != PACKAGE_VERSION[2] || data[3] != PACKAGE_VERSION[4] ||
506 data[4] != '\0') {
507 // not the right version found
508 if (remove(path) != 0) {
509 ink_warning("unable to remove incompatible snap file '%s'", path);
510 }
511 }
512 }
513 }
514
515 //-------------------------------------------------------------------------
516 // RecReadStatsFile
517 //-------------------------------------------------------------------------
518 RecErrT
RecReadStatsFile()519 RecReadStatsFile()
520 {
521 RecRecord *r;
522 RecMessage *m;
523 RecMessageItr itr;
524 RecPersistT persist_type = RECP_NULL;
525 ats_scoped_str snap_fpath(RecConfigReadPersistentStatsPath());
526
527 // lock our hash table
528 ink_rwlock_wrlock(&g_records_rwlock);
529
530 CheckSnapFileVersion(snap_fpath);
531
532 if ((m = RecMessageReadFromDisk(snap_fpath)) != nullptr) {
533 if (RecMessageUnmarshalFirst(m, &itr, &r) != REC_ERR_FAIL) {
534 do {
535 if ((r->name == nullptr) || (!strlen(r->name))) {
536 continue;
537 }
538
539 // If we don't have a persistence type for this record, it means that it is not a stat, or it is
540 // not registered yet. Either way, it's ok to just set the persisted value and keep going.
541 if (RecGetRecordPersistenceType(r->name, &persist_type, false /* lock */) != REC_ERR_OKAY) {
542 RecDebug(DL_Debug, "restoring value for persisted stat '%s'", r->name);
543 RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), REC_SOURCE_EXPLICIT, false);
544 continue;
545 }
546
547 if (!REC_TYPE_IS_STAT(r->rec_type)) {
548 // This should not happen, but be defensive against records changing their type ..
549 RecLog(DL_Warning, "skipping restore of non-stat record '%s'", r->name);
550 continue;
551 }
552
553 // Check whether the persistence type was changed by a new software version. If the record is
554 // already registered with an updated persistence type, then we don't want to set it. We should
555 // keep the registered value.
556 if (persist_type == RECP_NON_PERSISTENT) {
557 RecDebug(DL_Debug, "preserving current value of formerly persistent stat '%s'", r->name);
558 continue;
559 }
560
561 RecDebug(DL_Debug, "restoring value for persisted stat '%s'", r->name);
562 RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), REC_SOURCE_EXPLICIT, false);
563 } while (RecMessageUnmarshalNext(m, &itr, &r) != REC_ERR_FAIL);
564 }
565 }
566
567 ink_rwlock_unlock(&g_records_rwlock);
568 ats_free(m);
569
570 return REC_ERR_OKAY;
571 }
572
573 //-------------------------------------------------------------------------
574 // RecSyncStatsFile
575 //-------------------------------------------------------------------------
576 RecErrT
RecSyncStatsFile()577 RecSyncStatsFile()
578 {
579 RecRecord *r;
580 RecMessage *m;
581 int i, num_records;
582 bool sync_to_disk;
583 ats_scoped_str snap_fpath(RecConfigReadPersistentStatsPath());
584
585 /*
586 * g_mode_type should be initialized by
587 * RecLocalInit() or RecProcessInit() earlier.
588 */
589 ink_assert(g_mode_type != RECM_NULL);
590
591 if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
592 m = RecMessageAlloc(RECG_NULL);
593 num_records = g_num_records;
594 sync_to_disk = false;
595 for (i = 0; i < num_records; i++) {
596 r = &(g_records[i]);
597 rec_mutex_acquire(&(r->lock));
598 if (REC_TYPE_IS_STAT(r->rec_type)) {
599 if (r->stat_meta.persist_type == RECP_PERSISTENT) {
600 m = RecMessageMarshal_Realloc(m, r);
601 sync_to_disk = true;
602 }
603 }
604 rec_mutex_release(&(r->lock));
605 }
606 if (sync_to_disk) {
607 RecDebug(DL_Note, "Writing '%s' [%d bytes]", (const char *)snap_fpath, m->o_write - m->o_start + sizeof(RecMessageHdr));
608 RecMessageWriteToDisk(m, snap_fpath);
609 }
610 RecMessageFree(m);
611 }
612
613 return REC_ERR_OKAY;
614 }
615
616 // Consume a parsed record, pushing it into the records hash table.
617 static void
RecConsumeConfigEntry(RecT rec_type,RecDataT data_type,const char * name,const char * value,RecSourceT source)618 RecConsumeConfigEntry(RecT rec_type, RecDataT data_type, const char *name, const char *value, RecSourceT source)
619 {
620 RecData data;
621
622 memset(&data, 0, sizeof(RecData));
623 RecDataSetFromString(data_type, &data, value);
624 RecSetRecord(rec_type, name, data_type, &data, nullptr, source, false);
625 RecDataZero(data_type, &data);
626 }
627
628 //-------------------------------------------------------------------------
629 // RecReadConfigFile
630 //-------------------------------------------------------------------------
631 RecErrT
RecReadConfigFile()632 RecReadConfigFile()
633 {
634 RecDebug(DL_Note, "Reading '%s'", g_rec_config_fpath);
635
636 // lock our hash table
637 ink_rwlock_wrlock(&g_records_rwlock);
638
639 // Parse the actual file and hash the values.
640 RecConfigFileParse(g_rec_config_fpath, RecConsumeConfigEntry);
641
642 // release our hash table
643 ink_rwlock_unlock(&g_records_rwlock);
644
645 return REC_ERR_OKAY;
646 }
647
648 //-------------------------------------------------------------------------
649 // RecExecConfigUpdateCbs
650 //-------------------------------------------------------------------------
651 RecUpdateT
RecExecConfigUpdateCbs(unsigned int update_required_type)652 RecExecConfigUpdateCbs(unsigned int update_required_type)
653 {
654 RecRecord *r;
655 int i, num_records;
656 RecUpdateT update_type = RECU_NULL;
657
658 num_records = g_num_records;
659 for (i = 0; i < num_records; i++) {
660 r = &(g_records[i]);
661 rec_mutex_acquire(&(r->lock));
662 if (REC_TYPE_IS_CONFIG(r->rec_type)) {
663 /* -- upgrade to support a list of callback functions
664 if ((r->config_meta.update_required & update_required_type) &&
665 (r->config_meta.update_cb)) {
666 (*(r->config_meta.update_cb))(r->name, r->data_type, r->data,
667 r->config_meta.update_cookie);
668 r->config_meta.update_required =
669 r->config_meta.update_required & ~update_required_type;
670 }
671 */
672
673 if (r->config_meta.update_required) {
674 if (r->config_meta.update_type > update_type) {
675 update_type = r->config_meta.update_type;
676 }
677 }
678
679 if ((r->config_meta.update_required & update_required_type) && (r->config_meta.update_cb_list)) {
680 RecConfigUpdateCbList *cur_callback = nullptr;
681 for (cur_callback = r->config_meta.update_cb_list; cur_callback; cur_callback = cur_callback->next) {
682 (*(cur_callback->update_cb))(r->name, r->data_type, r->data, cur_callback->update_cookie);
683 }
684 r->config_meta.update_required = r->config_meta.update_required & ~update_required_type;
685 }
686 }
687 rec_mutex_release(&(r->lock));
688 }
689
690 return update_type;
691 }
692
693 static RecErrT
reset_stat_record(RecRecord * rec)694 reset_stat_record(RecRecord *rec)
695 {
696 RecErrT err;
697
698 if (i_am_the_record_owner(rec->rec_type)) {
699 rec_mutex_acquire(&(rec->lock));
700 ++(rec->version);
701 err = RecDataSet(rec->data_type, &(rec->data), &(rec->data_default)) ? REC_ERR_OKAY : REC_ERR_FAIL;
702 rec_mutex_release(&(rec->lock));
703 } else {
704 RecRecord r2;
705
706 RecRecordInit(&r2);
707 r2.rec_type = rec->rec_type;
708 r2.name = rec->name;
709 r2.data_type = rec->data_type;
710 r2.data = rec->data_default;
711
712 err = send_reset_message(&r2);
713 RecRecordFree(&r2);
714 }
715
716 return err;
717 }
718
719 //------------------------------------------------------------------------
720 // RecResetStatRecord
721 //------------------------------------------------------------------------
722 RecErrT
RecResetStatRecord(const char * name)723 RecResetStatRecord(const char *name)
724 {
725 RecErrT err = REC_ERR_FAIL;
726
727 if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
728 err = reset_stat_record(it->second);
729 }
730
731 return err;
732 }
733
734 //------------------------------------------------------------------------
735 // RecResetStatRecord
736 //------------------------------------------------------------------------
737 RecErrT
RecResetStatRecord(RecT type,bool all)738 RecResetStatRecord(RecT type, bool all)
739 {
740 int i, num_records;
741 RecErrT err = REC_ERR_OKAY;
742
743 RecDebug(DL_Note, "Reset Statistics Records");
744
745 num_records = g_num_records;
746 for (i = 0; i < num_records; i++) {
747 RecRecord *r1 = &(g_records[i]);
748
749 if (!REC_TYPE_IS_STAT(r1->rec_type)) {
750 continue;
751 }
752
753 if (r1->data_type == RECD_STRING) {
754 continue;
755 }
756
757 if (((type == RECT_NULL) || (r1->rec_type == type)) && (all || (r1->stat_meta.persist_type != RECP_NON_PERSISTENT))) {
758 if (reset_stat_record(r1) != REC_ERR_OKAY) {
759 err = REC_ERR_FAIL;
760 }
761 }
762 }
763
764 return err;
765 }
766
767 RecErrT
RecSetSyncRequired(char * name,bool lock)768 RecSetSyncRequired(char *name, bool lock)
769 {
770 RecErrT err = REC_ERR_FAIL;
771 RecRecord *r1;
772
773 // FIXME: Most of the time we set, we don't actually need to wrlock
774 // since we are not modifying the g_records_ht.
775 if (lock) {
776 ink_rwlock_wrlock(&g_records_rwlock);
777 }
778 if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
779 r1 = it->second;
780 if (i_am_the_record_owner(r1->rec_type)) {
781 rec_mutex_acquire(&(r1->lock));
782 r1->sync_required = REC_PEER_SYNC_REQUIRED;
783 if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
784 r1->config_meta.update_required = REC_UPDATE_REQUIRED;
785 }
786 rec_mutex_release(&(r1->lock));
787 err = REC_ERR_OKAY;
788 } else {
789 // No point of doing the following because our peer will
790 // set the value with RecDataSet. However, since
791 // r2.name == r1->name, the sync_required bit will not be
792 // set.
793
794 /*
795 RecRecord r2;
796
797 RecRecordInit(&r2);
798 r2.rec_type = r1->rec_type;
799 r2.name = r1->name;
800 r2.data_type = r1->data_type;
801 r2.data = r1->data_default;
802
803 err = send_set_message(&r2);
804 RecRecordFree(&r2);
805 */
806 }
807 }
808
809 if (lock) {
810 ink_rwlock_unlock(&g_records_rwlock);
811 }
812
813 return err;
814 }
815