1 /** @file
2 
3   Private record core definitions
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "tscore/ink_platform.h"
25 #include "tscore/ink_memory.h"
26 
27 #include "tscore/TextBuffer.h"
28 #include "tscore/Tokenizer.h"
29 #include "tscore/ink_defs.h"
30 #include "tscore/ink_string.h"
31 
32 #include "P_RecFile.h"
33 #include "P_RecUtils.h"
34 #include "P_RecMessage.h"
35 #include "P_RecCore.h"
36 
37 #include <fstream>
38 
39 RecModeT g_mode_type = RECM_NULL;
40 
41 //-------------------------------------------------------------------------
42 // send_reset_message
43 //-------------------------------------------------------------------------
44 static RecErrT
send_reset_message(RecRecord * record)45 send_reset_message(RecRecord *record)
46 {
47   RecMessage *m;
48 
49   rec_mutex_acquire(&(record->lock));
50   m = RecMessageAlloc(RECG_RESET);
51   m = RecMessageMarshal_Realloc(m, record);
52   RecDebug(DL_Note, "[send] RECG_RESET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
53   RecMessageSend(m);
54   RecMessageFree(m);
55   rec_mutex_release(&(record->lock));
56 
57   return REC_ERR_OKAY;
58 }
59 
60 //-------------------------------------------------------------------------
61 // send_set_message
62 //-------------------------------------------------------------------------
63 static RecErrT
send_set_message(RecRecord * record)64 send_set_message(RecRecord *record)
65 {
66   RecMessage *m;
67 
68   rec_mutex_acquire(&(record->lock));
69   m = RecMessageAlloc(RECG_SET);
70   m = RecMessageMarshal_Realloc(m, record);
71   RecDebug(DL_Note, "[send] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
72   RecMessageSend(m);
73   RecMessageFree(m);
74   rec_mutex_release(&(record->lock));
75 
76   return REC_ERR_OKAY;
77 }
78 
79 //-------------------------------------------------------------------------
80 // send_register_message
81 //-------------------------------------------------------------------------
82 RecErrT
send_register_message(RecRecord * record)83 send_register_message(RecRecord *record)
84 {
85   RecMessage *m;
86 
87   rec_mutex_acquire(&(record->lock));
88   m = RecMessageAlloc(RECG_REGISTER);
89   m = RecMessageMarshal_Realloc(m, record);
90   RecDebug(DL_Note, "[send] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
91   RecMessageSend(m);
92   RecMessageFree(m);
93   rec_mutex_release(&(record->lock));
94 
95   return REC_ERR_OKAY;
96 }
97 
98 //-------------------------------------------------------------------------
99 // send_push_message
100 //-------------------------------------------------------------------------
101 RecErrT
send_push_message()102 send_push_message()
103 {
104   RecRecord *r;
105   RecMessage *m;
106   int i, num_records;
107   bool send_msg = false;
108 
109   m           = RecMessageAlloc(RECG_PUSH);
110   num_records = g_num_records;
111   for (i = 0; i < num_records; i++) {
112     r = &(g_records[i]);
113     rec_mutex_acquire(&(r->lock));
114     if (i_am_the_record_owner(r->rec_type)) {
115       if (r->sync_required & REC_PEER_SYNC_REQUIRED) {
116         m = RecMessageMarshal_Realloc(m, r);
117         r->sync_required &= ~REC_PEER_SYNC_REQUIRED;
118         send_msg = true;
119       }
120     }
121     rec_mutex_release(&(r->lock));
122   }
123   if (send_msg) {
124     RecDebug(DL_Note, "[send] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
125     RecMessageSend(m);
126   }
127   RecMessageFree(m);
128 
129   return REC_ERR_OKAY;
130 }
131 
132 //-------------------------------------------------------------------------
133 // send_pull_message
134 //-------------------------------------------------------------------------
135 RecErrT
send_pull_message(RecMessageT msg_type)136 send_pull_message(RecMessageT msg_type)
137 {
138   RecRecord *r;
139   RecMessage *m;
140   int i, num_records;
141 
142   m = RecMessageAlloc(msg_type);
143   switch (msg_type) {
144   case RECG_PULL_REQ:
145     // We're requesting all of the records from our peer.  No payload
146     // here, just send the message.
147     RecDebug(DL_Note, "[send] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
148     break;
149 
150   case RECG_PULL_ACK:
151     // Respond to a RECG_PULL_REQ message from our peer.  Send ALL
152     // records!  Also be sure to send a response even if it has no
153     // payload.  Our peer may be blocking and waiting for a response!
154     num_records = g_num_records;
155     for (i = 0; i < num_records; i++) {
156       r = &(g_records[i]);
157       if (i_am_the_record_owner(r->rec_type) || (REC_TYPE_IS_STAT(r->rec_type) && !(r->registered)) ||
158           (REC_TYPE_IS_STAT(r->rec_type) && (r->stat_meta.persist_type == RECP_NON_PERSISTENT))) {
159         rec_mutex_acquire(&(r->lock));
160         m = RecMessageMarshal_Realloc(m, r);
161         r->sync_required &= ~REC_PEER_SYNC_REQUIRED;
162         rec_mutex_release(&(r->lock));
163       }
164     }
165     RecDebug(DL_Note, "[send] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
166     break;
167 
168   default:
169     RecMessageFree(m);
170     return REC_ERR_FAIL;
171   }
172 
173   RecMessageSend(m);
174   RecMessageFree(m);
175 
176   return REC_ERR_OKAY;
177 }
178 
179 //-------------------------------------------------------------------------
180 // recv_message_cb
181 //-------------------------------------------------------------------------
182 RecErrT
recv_message_cb(RecMessage * msg,RecMessageT msg_type,void *)183 recv_message_cb(RecMessage *msg, RecMessageT msg_type, void * /* cookie */)
184 {
185   RecRecord *r;
186   RecMessageItr itr;
187 
188   switch (msg_type) {
189   case RECG_SET:
190 
191     RecDebug(DL_Note, "[recv] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
192     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
193       do {
194         if (REC_TYPE_IS_STAT(r->rec_type)) {
195           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), REC_SOURCE_EXPLICIT);
196         } else {
197           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), nullptr, REC_SOURCE_EXPLICIT);
198         }
199       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
200     }
201     break;
202 
203   case RECG_RESET:
204 
205     RecDebug(DL_Note, "[recv] RECG_RESET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
206     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
207       do {
208         if (REC_TYPE_IS_STAT(r->rec_type)) {
209           RecResetStatRecord(r->name);
210         } else {
211           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), nullptr, REC_SOURCE_EXPLICIT);
212         }
213       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
214     }
215     break;
216 
217   case RECG_REGISTER:
218     RecDebug(DL_Note, "[recv] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
219     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
220       do {
221         if (REC_TYPE_IS_STAT(r->rec_type)) {
222           RecRegisterStat(r->rec_type, r->name, r->data_type, r->data_default, r->stat_meta.persist_type);
223         } else if (REC_TYPE_IS_CONFIG(r->rec_type)) {
224           RecRegisterConfig(r->rec_type, r->name, r->data_type, r->data_default, r->config_meta.update_type,
225                             r->config_meta.check_type, r->config_meta.check_expr, r->config_meta.source,
226                             r->config_meta.access_type);
227         }
228       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
229     }
230     break;
231 
232   case RECG_PUSH:
233     RecDebug(DL_Note, "[recv] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
234     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
235       do {
236         RecForceInsert(r);
237       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
238     }
239     break;
240 
241   case RECG_PULL_ACK:
242     RecDebug(DL_Note, "[recv] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
243     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
244       do {
245         RecForceInsert(r);
246       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
247     }
248     break;
249 
250   case RECG_PULL_REQ:
251     RecDebug(DL_Note, "[recv] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
252     send_pull_message(RECG_PULL_ACK);
253     break;
254 
255   default:
256     ink_assert(!"Unexpected RecG type");
257     return REC_ERR_FAIL;
258   }
259 
260   return REC_ERR_OKAY;
261 }
262 
263 //-------------------------------------------------------------------------
264 // RecRegisterStatXXX
265 //-------------------------------------------------------------------------
266 #define REC_REGISTER_STAT_XXX(A, B)                                                                                           \
267   ink_assert((rec_type == RECT_NODE) || (rec_type == RECT_PROCESS) || (rec_type == RECT_LOCAL) || (rec_type == RECT_PLUGIN)); \
268   RecRecord *r;                                                                                                               \
269   RecData my_data_default;                                                                                                    \
270   my_data_default.A = data_default;                                                                                           \
271   if ((r = RecRegisterStat(rec_type, name, B, my_data_default, persist_type)) != nullptr) {                                   \
272     if (i_am_the_record_owner(r->rec_type)) {                                                                                 \
273       r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;                                                           \
274     } else {                                                                                                                  \
275       send_register_message(r);                                                                                               \
276     }                                                                                                                         \
277     return REC_ERR_OKAY;                                                                                                      \
278   } else {                                                                                                                    \
279     return REC_ERR_FAIL;                                                                                                      \
280   }
281 
282 RecErrT
_RecRegisterStatInt(RecT rec_type,const char * name,RecInt data_default,RecPersistT persist_type)283 _RecRegisterStatInt(RecT rec_type, const char *name, RecInt data_default, RecPersistT persist_type)
284 {
285   REC_REGISTER_STAT_XXX(rec_int, RECD_INT);
286 }
287 
288 RecErrT
_RecRegisterStatFloat(RecT rec_type,const char * name,RecFloat data_default,RecPersistT persist_type)289 _RecRegisterStatFloat(RecT rec_type, const char *name, RecFloat data_default, RecPersistT persist_type)
290 {
291   REC_REGISTER_STAT_XXX(rec_float, RECD_FLOAT);
292 }
293 
294 RecErrT
_RecRegisterStatString(RecT rec_type,const char * name,RecString data_default,RecPersistT persist_type)295 _RecRegisterStatString(RecT rec_type, const char *name, RecString data_default, RecPersistT persist_type)
296 {
297   REC_REGISTER_STAT_XXX(rec_string, RECD_STRING);
298 }
299 
300 RecErrT
_RecRegisterStatCounter(RecT rec_type,const char * name,RecCounter data_default,RecPersistT persist_type)301 _RecRegisterStatCounter(RecT rec_type, const char *name, RecCounter data_default, RecPersistT persist_type)
302 {
303   REC_REGISTER_STAT_XXX(rec_counter, RECD_COUNTER);
304 }
305 
306 //-------------------------------------------------------------------------
307 // RecRegisterConfigXXX
308 //-------------------------------------------------------------------------
309 #define REC_REGISTER_CONFIG_XXX(A, B)                                                                                           \
310   RecRecord *r;                                                                                                                 \
311   RecData my_data_default;                                                                                                      \
312   my_data_default.A = data_default;                                                                                             \
313   if ((r = RecRegisterConfig(rec_type, name, B, my_data_default, update_type, check_type, check_regex, source, access_type)) != \
314       nullptr) {                                                                                                                \
315     if (i_am_the_record_owner(r->rec_type)) {                                                                                   \
316       r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;                                                             \
317     } else {                                                                                                                    \
318       send_register_message(r);                                                                                                 \
319     }                                                                                                                           \
320     return REC_ERR_OKAY;                                                                                                        \
321   } else {                                                                                                                      \
322     return REC_ERR_FAIL;                                                                                                        \
323   }
324 
325 RecErrT
RecRegisterConfigInt(RecT rec_type,const char * name,RecInt data_default,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)326 RecRegisterConfigInt(RecT rec_type, const char *name, RecInt data_default, RecUpdateT update_type, RecCheckT check_type,
327                      const char *check_regex, RecSourceT source, RecAccessT access_type)
328 {
329   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
330   REC_REGISTER_CONFIG_XXX(rec_int, RECD_INT);
331 }
332 
333 RecErrT
RecRegisterConfigFloat(RecT rec_type,const char * name,RecFloat data_default,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)334 RecRegisterConfigFloat(RecT rec_type, const char *name, RecFloat data_default, RecUpdateT update_type, RecCheckT check_type,
335                        const char *check_regex, RecSourceT source, RecAccessT access_type)
336 {
337   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
338   REC_REGISTER_CONFIG_XXX(rec_float, RECD_FLOAT);
339 }
340 
341 RecErrT
RecRegisterConfigString(RecT rec_type,const char * name,const char * data_default_tmp,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)342 RecRegisterConfigString(RecT rec_type, const char *name, const char *data_default_tmp, RecUpdateT update_type, RecCheckT check_type,
343                         const char *check_regex, RecSourceT source, RecAccessT access_type)
344 {
345   RecString data_default = const_cast<RecString>(data_default_tmp);
346   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
347   REC_REGISTER_CONFIG_XXX(rec_string, RECD_STRING);
348 }
349 
350 RecErrT
RecRegisterConfigCounter(RecT rec_type,const char * name,RecCounter data_default,RecUpdateT update_type,RecCheckT check_type,const char * check_regex,RecSourceT source,RecAccessT access_type)351 RecRegisterConfigCounter(RecT rec_type, const char *name, RecCounter data_default, RecUpdateT update_type, RecCheckT check_type,
352                          const char *check_regex, RecSourceT source, RecAccessT access_type)
353 {
354   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
355   REC_REGISTER_CONFIG_XXX(rec_counter, RECD_COUNTER);
356 }
357 
358 //-------------------------------------------------------------------------
359 // RecSetRecordXXX
360 //-------------------------------------------------------------------------
361 RecErrT
RecSetRecord(RecT rec_type,const char * name,RecDataT data_type,RecData * data,RecRawStat * data_raw,RecSourceT source,bool lock)362 RecSetRecord(RecT rec_type, const char *name, RecDataT data_type, RecData *data, RecRawStat *data_raw, RecSourceT source, bool lock)
363 {
364   RecErrT err = REC_ERR_OKAY;
365   RecRecord *r1;
366 
367   // FIXME: Most of the time we set, we don't actually need to wrlock
368   // since we are not modifying the g_records_ht.
369   if (lock) {
370     ink_rwlock_wrlock(&g_records_rwlock);
371   }
372   if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
373     r1 = it->second;
374     if (i_am_the_record_owner(r1->rec_type)) {
375       rec_mutex_acquire(&(r1->lock));
376       if ((data_type != RECD_NULL) && (r1->data_type != data_type)) {
377         err = REC_ERR_FAIL;
378       } else {
379         bool rec_updated_p = false;
380         if (data_type == RECD_NULL) {
381           // If the caller didn't know the data type, they gave us a string
382           // and we should convert based on the record's data type.
383           ink_release_assert(data->rec_string != nullptr);
384           rec_updated_p = RecDataSetFromString(r1->data_type, &(r1->data), data->rec_string);
385         } else {
386           rec_updated_p = RecDataSet(data_type, &(r1->data), data);
387         }
388 
389         if (rec_updated_p) {
390           r1->sync_required = REC_SYNC_REQUIRED;
391           if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
392             r1->config_meta.update_required = REC_UPDATE_REQUIRED;
393           }
394         }
395 
396         if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != nullptr)) {
397           r1->stat_meta.data_raw = *data_raw;
398         } else if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
399           r1->config_meta.source = source;
400         }
401       }
402       rec_mutex_release(&(r1->lock));
403     } else {
404       // We don't need to ats_strdup() here as we will make copies of any
405       // strings when we marshal them into our RecMessage buffer.
406       RecRecord r2;
407 
408       RecRecordInit(&r2);
409       r2.rec_type  = rec_type;
410       r2.name      = name;
411       r2.data_type = (data_type != RECD_NULL) ? data_type : r1->data_type;
412       r2.data      = *data;
413       if (REC_TYPE_IS_STAT(r2.rec_type) && (data_raw != nullptr)) {
414         r2.stat_meta.data_raw = *data_raw;
415       } else if (REC_TYPE_IS_CONFIG(r2.rec_type)) {
416         r2.config_meta.source = source;
417       }
418       err = send_set_message(&r2);
419       RecRecordFree(&r2);
420     }
421   } else {
422     // Add the record but do not set the 'registered' flag, as this
423     // record really hasn't been registered yet.  Also, in order to
424     // add the record, we need to have a rec_type, so if the user
425     // calls RecSetRecord on a record we haven't registered yet, we
426     // should fail out here.
427     if ((rec_type == RECT_NULL) || (data_type == RECD_NULL)) {
428       err = REC_ERR_FAIL;
429       goto Ldone;
430     }
431     r1 = RecAlloc(rec_type, name, data_type);
432     RecDataSet(data_type, &(r1->data), data);
433     if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != nullptr)) {
434       r1->stat_meta.data_raw = *data_raw;
435     } else if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
436       r1->config_meta.source = source;
437     }
438     if (i_am_the_record_owner(r1->rec_type)) {
439       r1->sync_required = r1->sync_required | REC_PEER_SYNC_REQUIRED;
440     } else {
441       err = send_set_message(r1);
442     }
443     g_records_ht.emplace(name, r1);
444   }
445 
446 Ldone:
447   if (lock) {
448     ink_rwlock_unlock(&g_records_rwlock);
449   }
450 
451   return err;
452 }
453 
454 RecErrT
RecSetRecordConvert(const char * name,const RecString rec_string,RecSourceT source,bool lock)455 RecSetRecordConvert(const char *name, const RecString rec_string, RecSourceT source, bool lock)
456 {
457   RecData data;
458   data.rec_string = rec_string;
459   return RecSetRecord(RECT_NULL, name, RECD_NULL, &data, nullptr, source, lock);
460 }
461 
462 RecErrT
RecSetRecordInt(const char * name,RecInt rec_int,RecSourceT source,bool lock)463 RecSetRecordInt(const char *name, RecInt rec_int, RecSourceT source, bool lock)
464 {
465   RecData data;
466   data.rec_int = rec_int;
467   return RecSetRecord(RECT_NULL, name, RECD_INT, &data, nullptr, source, lock);
468 }
469 
470 RecErrT
RecSetRecordFloat(const char * name,RecFloat rec_float,RecSourceT source,bool lock)471 RecSetRecordFloat(const char *name, RecFloat rec_float, RecSourceT source, bool lock)
472 {
473   RecData data;
474   data.rec_float = rec_float;
475   return RecSetRecord(RECT_NULL, name, RECD_FLOAT, &data, nullptr, source, lock);
476 }
477 
478 RecErrT
RecSetRecordString(const char * name,const RecString rec_string,RecSourceT source,bool lock)479 RecSetRecordString(const char *name, const RecString rec_string, RecSourceT source, bool lock)
480 {
481   RecData data;
482   data.rec_string = rec_string;
483   return RecSetRecord(RECT_NULL, name, RECD_STRING, &data, nullptr, source, lock);
484 }
485 
486 RecErrT
RecSetRecordCounter(const char * name,RecCounter rec_counter,RecSourceT source,bool lock)487 RecSetRecordCounter(const char *name, RecCounter rec_counter, RecSourceT source, bool lock)
488 {
489   RecData data;
490   data.rec_counter = rec_counter;
491   return RecSetRecord(RECT_NULL, name, RECD_COUNTER, &data, nullptr, source, lock);
492 }
493 
494 // check the version of the snap file to remove records.snap or not
495 static void
CheckSnapFileVersion(const char * path)496 CheckSnapFileVersion(const char *path)
497 {
498   std::ifstream f(path, std::ios::binary);
499   if (f.good()) {
500     // get version, compare and remove
501     char data[VERSION_HDR_SIZE];
502     if (!f.read(data, VERSION_HDR_SIZE)) {
503       return;
504     }
505     if (data[0] != 'V' || data[1] != PACKAGE_VERSION[0] || data[2] != PACKAGE_VERSION[2] || data[3] != PACKAGE_VERSION[4] ||
506         data[4] != '\0') {
507       // not the right version found
508       if (remove(path) != 0) {
509         ink_warning("unable to remove incompatible snap file '%s'", path);
510       }
511     }
512   }
513 }
514 
515 //-------------------------------------------------------------------------
516 // RecReadStatsFile
517 //-------------------------------------------------------------------------
518 RecErrT
RecReadStatsFile()519 RecReadStatsFile()
520 {
521   RecRecord *r;
522   RecMessage *m;
523   RecMessageItr itr;
524   RecPersistT persist_type = RECP_NULL;
525   ats_scoped_str snap_fpath(RecConfigReadPersistentStatsPath());
526 
527   // lock our hash table
528   ink_rwlock_wrlock(&g_records_rwlock);
529 
530   CheckSnapFileVersion(snap_fpath);
531 
532   if ((m = RecMessageReadFromDisk(snap_fpath)) != nullptr) {
533     if (RecMessageUnmarshalFirst(m, &itr, &r) != REC_ERR_FAIL) {
534       do {
535         if ((r->name == nullptr) || (!strlen(r->name))) {
536           continue;
537         }
538 
539         // If we don't have a persistence type for this record, it means that it is not a stat, or it is
540         // not registered yet. Either way, it's ok to just set the persisted value and keep going.
541         if (RecGetRecordPersistenceType(r->name, &persist_type, false /* lock */) != REC_ERR_OKAY) {
542           RecDebug(DL_Debug, "restoring value for persisted stat '%s'", r->name);
543           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), REC_SOURCE_EXPLICIT, false);
544           continue;
545         }
546 
547         if (!REC_TYPE_IS_STAT(r->rec_type)) {
548           // This should not happen, but be defensive against records changing their type ..
549           RecLog(DL_Warning, "skipping restore of non-stat record '%s'", r->name);
550           continue;
551         }
552 
553         // Check whether the persistence type was changed by a new software version. If the record is
554         // already registered with an updated persistence type, then we don't want to set it. We should
555         // keep the registered value.
556         if (persist_type == RECP_NON_PERSISTENT) {
557           RecDebug(DL_Debug, "preserving current value of formerly persistent stat '%s'", r->name);
558           continue;
559         }
560 
561         RecDebug(DL_Debug, "restoring value for persisted stat '%s'", r->name);
562         RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), REC_SOURCE_EXPLICIT, false);
563       } while (RecMessageUnmarshalNext(m, &itr, &r) != REC_ERR_FAIL);
564     }
565   }
566 
567   ink_rwlock_unlock(&g_records_rwlock);
568   ats_free(m);
569 
570   return REC_ERR_OKAY;
571 }
572 
573 //-------------------------------------------------------------------------
574 // RecSyncStatsFile
575 //-------------------------------------------------------------------------
576 RecErrT
RecSyncStatsFile()577 RecSyncStatsFile()
578 {
579   RecRecord *r;
580   RecMessage *m;
581   int i, num_records;
582   bool sync_to_disk;
583   ats_scoped_str snap_fpath(RecConfigReadPersistentStatsPath());
584 
585   /*
586    * g_mode_type should be initialized by
587    * RecLocalInit() or RecProcessInit() earlier.
588    */
589   ink_assert(g_mode_type != RECM_NULL);
590 
591   if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
592     m            = RecMessageAlloc(RECG_NULL);
593     num_records  = g_num_records;
594     sync_to_disk = false;
595     for (i = 0; i < num_records; i++) {
596       r = &(g_records[i]);
597       rec_mutex_acquire(&(r->lock));
598       if (REC_TYPE_IS_STAT(r->rec_type)) {
599         if (r->stat_meta.persist_type == RECP_PERSISTENT) {
600           m            = RecMessageMarshal_Realloc(m, r);
601           sync_to_disk = true;
602         }
603       }
604       rec_mutex_release(&(r->lock));
605     }
606     if (sync_to_disk) {
607       RecDebug(DL_Note, "Writing '%s' [%d bytes]", (const char *)snap_fpath, m->o_write - m->o_start + sizeof(RecMessageHdr));
608       RecMessageWriteToDisk(m, snap_fpath);
609     }
610     RecMessageFree(m);
611   }
612 
613   return REC_ERR_OKAY;
614 }
615 
616 // Consume a parsed record, pushing it into the records hash table.
617 static void
RecConsumeConfigEntry(RecT rec_type,RecDataT data_type,const char * name,const char * value,RecSourceT source)618 RecConsumeConfigEntry(RecT rec_type, RecDataT data_type, const char *name, const char *value, RecSourceT source)
619 {
620   RecData data;
621 
622   memset(&data, 0, sizeof(RecData));
623   RecDataSetFromString(data_type, &data, value);
624   RecSetRecord(rec_type, name, data_type, &data, nullptr, source, false);
625   RecDataZero(data_type, &data);
626 }
627 
628 //-------------------------------------------------------------------------
629 // RecReadConfigFile
630 //-------------------------------------------------------------------------
631 RecErrT
RecReadConfigFile()632 RecReadConfigFile()
633 {
634   RecDebug(DL_Note, "Reading '%s'", g_rec_config_fpath);
635 
636   // lock our hash table
637   ink_rwlock_wrlock(&g_records_rwlock);
638 
639   // Parse the actual file and hash the values.
640   RecConfigFileParse(g_rec_config_fpath, RecConsumeConfigEntry);
641 
642   // release our hash table
643   ink_rwlock_unlock(&g_records_rwlock);
644 
645   return REC_ERR_OKAY;
646 }
647 
648 //-------------------------------------------------------------------------
649 // RecExecConfigUpdateCbs
650 //-------------------------------------------------------------------------
651 RecUpdateT
RecExecConfigUpdateCbs(unsigned int update_required_type)652 RecExecConfigUpdateCbs(unsigned int update_required_type)
653 {
654   RecRecord *r;
655   int i, num_records;
656   RecUpdateT update_type = RECU_NULL;
657 
658   num_records = g_num_records;
659   for (i = 0; i < num_records; i++) {
660     r = &(g_records[i]);
661     rec_mutex_acquire(&(r->lock));
662     if (REC_TYPE_IS_CONFIG(r->rec_type)) {
663       /* -- upgrade to support a list of callback functions
664          if ((r->config_meta.update_required & update_required_type) &&
665          (r->config_meta.update_cb)) {
666          (*(r->config_meta.update_cb))(r->name, r->data_type, r->data,
667          r->config_meta.update_cookie);
668          r->config_meta.update_required =
669          r->config_meta.update_required & ~update_required_type;
670          }
671        */
672 
673       if (r->config_meta.update_required) {
674         if (r->config_meta.update_type > update_type) {
675           update_type = r->config_meta.update_type;
676         }
677       }
678 
679       if ((r->config_meta.update_required & update_required_type) && (r->config_meta.update_cb_list)) {
680         RecConfigUpdateCbList *cur_callback = nullptr;
681         for (cur_callback = r->config_meta.update_cb_list; cur_callback; cur_callback = cur_callback->next) {
682           (*(cur_callback->update_cb))(r->name, r->data_type, r->data, cur_callback->update_cookie);
683         }
684         r->config_meta.update_required = r->config_meta.update_required & ~update_required_type;
685       }
686     }
687     rec_mutex_release(&(r->lock));
688   }
689 
690   return update_type;
691 }
692 
693 static RecErrT
reset_stat_record(RecRecord * rec)694 reset_stat_record(RecRecord *rec)
695 {
696   RecErrT err;
697 
698   if (i_am_the_record_owner(rec->rec_type)) {
699     rec_mutex_acquire(&(rec->lock));
700     ++(rec->version);
701     err = RecDataSet(rec->data_type, &(rec->data), &(rec->data_default)) ? REC_ERR_OKAY : REC_ERR_FAIL;
702     rec_mutex_release(&(rec->lock));
703   } else {
704     RecRecord r2;
705 
706     RecRecordInit(&r2);
707     r2.rec_type  = rec->rec_type;
708     r2.name      = rec->name;
709     r2.data_type = rec->data_type;
710     r2.data      = rec->data_default;
711 
712     err = send_reset_message(&r2);
713     RecRecordFree(&r2);
714   }
715 
716   return err;
717 }
718 
719 //------------------------------------------------------------------------
720 // RecResetStatRecord
721 //------------------------------------------------------------------------
722 RecErrT
RecResetStatRecord(const char * name)723 RecResetStatRecord(const char *name)
724 {
725   RecErrT err = REC_ERR_FAIL;
726 
727   if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
728     err = reset_stat_record(it->second);
729   }
730 
731   return err;
732 }
733 
734 //------------------------------------------------------------------------
735 // RecResetStatRecord
736 //------------------------------------------------------------------------
737 RecErrT
RecResetStatRecord(RecT type,bool all)738 RecResetStatRecord(RecT type, bool all)
739 {
740   int i, num_records;
741   RecErrT err = REC_ERR_OKAY;
742 
743   RecDebug(DL_Note, "Reset Statistics Records");
744 
745   num_records = g_num_records;
746   for (i = 0; i < num_records; i++) {
747     RecRecord *r1 = &(g_records[i]);
748 
749     if (!REC_TYPE_IS_STAT(r1->rec_type)) {
750       continue;
751     }
752 
753     if (r1->data_type == RECD_STRING) {
754       continue;
755     }
756 
757     if (((type == RECT_NULL) || (r1->rec_type == type)) && (all || (r1->stat_meta.persist_type != RECP_NON_PERSISTENT))) {
758       if (reset_stat_record(r1) != REC_ERR_OKAY) {
759         err = REC_ERR_FAIL;
760       }
761     }
762   }
763 
764   return err;
765 }
766 
767 RecErrT
RecSetSyncRequired(char * name,bool lock)768 RecSetSyncRequired(char *name, bool lock)
769 {
770   RecErrT err = REC_ERR_FAIL;
771   RecRecord *r1;
772 
773   // FIXME: Most of the time we set, we don't actually need to wrlock
774   // since we are not modifying the g_records_ht.
775   if (lock) {
776     ink_rwlock_wrlock(&g_records_rwlock);
777   }
778   if (auto it = g_records_ht.find(name); it != g_records_ht.end()) {
779     r1 = it->second;
780     if (i_am_the_record_owner(r1->rec_type)) {
781       rec_mutex_acquire(&(r1->lock));
782       r1->sync_required = REC_PEER_SYNC_REQUIRED;
783       if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
784         r1->config_meta.update_required = REC_UPDATE_REQUIRED;
785       }
786       rec_mutex_release(&(r1->lock));
787       err = REC_ERR_OKAY;
788     } else {
789       // No point of doing the following because our peer will
790       // set the value with RecDataSet. However, since
791       // r2.name == r1->name, the sync_required bit will not be
792       // set.
793 
794       /*
795          RecRecord r2;
796 
797          RecRecordInit(&r2);
798          r2.rec_type  = r1->rec_type;
799          r2.name      = r1->name;
800          r2.data_type = r1->data_type;
801          r2.data      = r1->data_default;
802 
803          err = send_set_message(&r2);
804          RecRecordFree(&r2);
805        */
806     }
807   }
808 
809   if (lock) {
810     ink_rwlock_unlock(&g_records_rwlock);
811   }
812 
813   return err;
814 }
815