1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* CMetrics
4 * ========
5 * Copyright 2021 Eduardo Silva <eduardo@calyptia.com>
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 #include <cmetrics/cmetrics.h>
20 #include <cmetrics/cmt_metric.h>
21 #include <cmetrics/cmt_map.h>
22 #include <cmetrics/cmt_sds.h>
23 #include <cmetrics/cmt_counter.h>
24 #include <cmetrics/cmt_gauge.h>
25 #include <cmetrics/cmt_untyped.h>
26 #include <cmetrics/cmt_hash.h>
27 #include <cmetrics/cmt_encode_prometheus_remote_write.h>
28
29 static cmt_sds_t render_remote_write_context_to_sds(
30 struct cmt_prometheus_remote_write_context *context);
31
32 static void destroy_prometheus_label_list(Prometheus__Label **label_list,
33 size_t entry_count);
34
35 static void destroy_prometheus_sample_list(Prometheus__Sample **sample_list,
36 size_t entry_count);
37
38 static void cmt_destroy_prometheus_remote_write_context(
39 struct cmt_prometheus_remote_write_context *context);
40
41 static uint64_t calculate_label_set_hash(struct mk_list *label_values, uint64_t seed);
42
43 static size_t count_metrics_with_matching_label_set(struct mk_list *metrics,
44 uint64_t sequence_number,
45 uint64_t desired_hash);
46
47 static int append_entry_to_prometheus_label_list(Prometheus__Label **label_list,
48 size_t *index,
49 char *name,
50 char *value);
51
52 static int set_up_time_series_for_label_set(
53 struct cmt_prometheus_remote_write_context *context,
54 struct cmt_map *map,
55 struct cmt_metric *metric,
56 struct cmt_prometheus_time_series **time_series);
57
58 static int pack_metric_metadata(struct cmt_prometheus_remote_write_context *context,
59 struct cmt_map *map,
60 struct cmt_metric *metric);
61
62 static int append_metric_to_timeseries(struct cmt_prometheus_time_series *time_series,
63 struct cmt_metric *metric);
64
65 static int pack_metric_sample(struct cmt_prometheus_remote_write_context *context,
66 struct cmt_map *map,
67 struct cmt_metric *metric,
68 int add_metadata);
69
70 static int pack_basic_type(struct cmt_prometheus_remote_write_context *context,
71 struct cmt_map *map);
72
render_remote_write_context_to_sds(struct cmt_prometheus_remote_write_context * context)73 cmt_sds_t render_remote_write_context_to_sds(
74 struct cmt_prometheus_remote_write_context *context)
75 {
76 size_t write_request_size;
77 struct cmt_prometheus_time_series *time_series_entry;
78 struct cmt_prometheus_metric_metadata *metadata_entry;
79 cmt_sds_t result_buffer;
80 size_t entry_index;
81 struct mk_list *head;
82
83 context->write_request.n_timeseries = mk_list_size(&context->time_series_entries);
84 context->write_request.n_metadata = mk_list_size(&context->metadata_entries);
85
86 context->write_request.timeseries = calloc(context->write_request.n_timeseries,
87 sizeof(Prometheus__TimeSeries *));
88
89 if (context->write_request.timeseries == NULL) {
90 cmt_errno();
91
92 return NULL;
93 }
94
95 context->write_request.metadata = calloc(context->write_request.n_metadata,
96 sizeof(Prometheus__TimeSeries *));
97
98 if (context->write_request.metadata == NULL) {
99 cmt_errno();
100
101 free(context->write_request.timeseries);
102
103 return NULL;
104 }
105
106 entry_index = 0;
107
108 mk_list_foreach(head, &context->time_series_entries) {
109 time_series_entry = mk_list_entry(head, struct cmt_prometheus_time_series, _head);
110
111 context->write_request.timeseries[entry_index++] = &time_series_entry->data;
112 }
113
114 entry_index = 0;
115
116 mk_list_foreach(head, &context->metadata_entries) {
117 metadata_entry = mk_list_entry(head, struct cmt_prometheus_metric_metadata, _head);
118
119 context->write_request.metadata[entry_index++] = &metadata_entry->data;
120 }
121
122 write_request_size = prometheus__write_request__get_packed_size(&context->write_request);
123
124 result_buffer = cmt_sds_create_size(write_request_size);
125
126 if(result_buffer != NULL) {
127 prometheus__write_request__pack(&context->write_request, (uint8_t *) result_buffer);
128
129 cmt_sds_set_len(result_buffer, write_request_size);
130 }
131
132 free(context->write_request.timeseries);
133
134 free(context->write_request.metadata);
135
136 return result_buffer;
137 }
138
cmt_destroy_prometheus_remote_write_context(struct cmt_prometheus_remote_write_context * context)139 void cmt_destroy_prometheus_remote_write_context(
140 struct cmt_prometheus_remote_write_context *context)
141 {
142 struct cmt_prometheus_time_series *time_series_entry;
143 struct cmt_prometheus_metric_metadata *metadata_entry;
144 struct mk_list *head;
145 struct mk_list *tmp;
146
147 mk_list_foreach_safe(head, tmp, &context->time_series_entries) {
148 time_series_entry = mk_list_entry(head, struct cmt_prometheus_time_series, _head);
149
150 if (time_series_entry->data.labels != NULL) {
151 destroy_prometheus_label_list(time_series_entry->data.labels,
152 time_series_entry->data.n_labels);
153
154 time_series_entry->data.labels = NULL;
155 }
156
157 if (time_series_entry->data.samples != NULL) {
158 destroy_prometheus_sample_list(time_series_entry->data.samples,
159 time_series_entry->data.n_samples);
160
161 time_series_entry->data.samples = NULL;
162 }
163
164 mk_list_del(&time_series_entry->_head);
165
166 free(time_series_entry);
167 }
168
169 mk_list_foreach_safe(head, tmp, &context->metadata_entries) {
170 metadata_entry = mk_list_entry(head, struct cmt_prometheus_metric_metadata, _head);
171
172 mk_list_del(&metadata_entry->_head);
173
174 free(metadata_entry);
175 }
176 }
177
calculate_label_set_hash(struct mk_list * label_values,uint64_t seed)178 uint64_t calculate_label_set_hash(struct mk_list *label_values, uint64_t seed)
179 {
180 struct cmt_map_label *label_value;
181 XXH64_state_t state;
182 struct mk_list *head;
183
184 XXH64_reset(&state, 0);
185
186 XXH64_update(&state, &seed, sizeof(uint64_t));
187
188 mk_list_foreach(head, label_values) {
189 label_value = mk_list_entry(head, struct cmt_map_label, _head);
190
191 XXH64_update(&state, label_value->name, cmt_sds_len(label_value->name));
192 }
193
194 return XXH64_digest(&state);
195 }
196
count_metrics_with_matching_label_set(struct mk_list * metrics,uint64_t sequence_number,uint64_t desired_hash)197 size_t count_metrics_with_matching_label_set(struct mk_list *metrics,
198 uint64_t sequence_number,
199 uint64_t desired_hash)
200 {
201 uint64_t label_set_hash;
202 size_t matches;
203 struct cmt_metric *metric;
204 struct mk_list *head;
205
206 matches = 0;
207
208 mk_list_foreach(head, metrics) {
209 metric = mk_list_entry(head, struct cmt_metric, _head);
210
211 label_set_hash = calculate_label_set_hash(&metric->labels, sequence_number);
212
213 if (label_set_hash == desired_hash) {
214 matches++;
215 }
216 }
217
218 return matches;
219 }
220
append_entry_to_prometheus_label_list(Prometheus__Label ** label_list,size_t * index,char * name,char * value)221 int append_entry_to_prometheus_label_list(Prometheus__Label **label_list,
222 size_t *index,
223 char *name,
224 char *value)
225 {
226 label_list[*index] = calloc(1, sizeof(Prometheus__Label));
227
228 if (label_list[*index] == NULL) {
229 cmt_errno();
230
231 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
232 }
233
234 prometheus__label__init(label_list[*index]);
235
236 label_list[*index]->name = cmt_sds_create(name);
237
238 if (label_list[*index]->name == NULL) {
239 free(label_list[*index]);
240
241 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
242 }
243
244 label_list[*index]->value = cmt_sds_create(value);
245
246 if (label_list[*index]->value == NULL) {
247 cmt_sds_destroy(label_list[*index]->name);
248 free(label_list[*index]);
249
250 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
251 }
252
253 (*index)++;
254
255 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
256 }
257
destroy_prometheus_sample_list(Prometheus__Sample ** sample_list,size_t entry_count)258 void destroy_prometheus_sample_list(Prometheus__Sample **sample_list,
259 size_t entry_count)
260 {
261 size_t index;
262
263 if (sample_list != NULL) {
264 for (index = 0 ; index < entry_count ; index++) {
265 if (sample_list[index] != NULL) {
266 free(sample_list[index]);
267 sample_list[index] = NULL;
268 }
269 }
270
271 free(sample_list);
272 }
273 }
274
destroy_prometheus_label_list(Prometheus__Label ** label_list,size_t entry_count)275 void destroy_prometheus_label_list(Prometheus__Label **label_list,
276 size_t entry_count)
277 {
278 size_t index;
279
280 if (label_list != NULL) {
281 for (index = 0 ; index < entry_count ; index++) {
282 if (label_list[index] != NULL) {
283 if (label_list[index]->name != NULL) {
284 cmt_sds_destroy(label_list[index]->name);
285 label_list[index]->name = NULL;
286 }
287
288 if (label_list[index]->value != NULL) {
289 cmt_sds_destroy(label_list[index]->value);
290 label_list[index]->value = NULL;
291 }
292
293 free(label_list[index]);
294 label_list[index] = NULL;
295 }
296 }
297
298 free(label_list);
299 }
300 }
301
set_up_time_series_for_label_set(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map,struct cmt_metric * metric,struct cmt_prometheus_time_series ** time_series)302 int set_up_time_series_for_label_set(struct cmt_prometheus_remote_write_context *context,
303 struct cmt_map *map,
304 struct cmt_metric *metric,
305 struct cmt_prometheus_time_series **time_series)
306 {
307 uint8_t time_series_match_found;
308 size_t label_set_hash_matches;
309 struct cmt_prometheus_time_series *time_series_entry;
310 uint64_t label_set_hash;
311 struct cmt_label *static_label;
312 size_t label_index;
313 size_t label_count;
314 struct cmt_map_label *label_value;
315 struct cmt_map_label *label_name;
316 Prometheus__Label **label_list;
317 Prometheus__Sample **value_list;
318 int result;
319 struct mk_list *head;
320
321 label_set_hash = calculate_label_set_hash(&metric->labels, context->sequence_number);
322
323 /* Determine if there is an existing time series for this label set */
324 time_series_match_found = CMT_FALSE;
325
326 mk_list_foreach(head, &context->time_series_entries) {
327 time_series_entry = mk_list_entry(head, struct cmt_prometheus_time_series, _head);
328
329 if (time_series_entry->label_set_hash == label_set_hash) {
330 time_series_match_found = CMT_TRUE;
331
332 break;
333 }
334 }
335
336 if (time_series_match_found == CMT_TRUE) {
337 *time_series = time_series_entry;
338
339 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
340 }
341
342 /* Find out how many samples share these label values */
343 label_set_hash_matches = count_metrics_with_matching_label_set(&map->metrics,
344 context->sequence_number,
345 label_set_hash);
346
347 if (label_set_hash_matches == 0)
348 {
349 label_set_hash_matches++;
350 }
351
352 /* Allocate the memory required for the label and value lists, we need to add
353 * one for the fixed __name__ label
354 */
355 label_count = mk_list_size(&context->cmt->static_labels->list) +
356 mk_list_size(&metric->labels) +
357 1;
358
359
360 time_series_entry = calloc(1, sizeof(struct cmt_prometheus_time_series));
361
362 if (time_series_entry == NULL) {
363 cmt_errno();
364
365 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
366 }
367
368 label_list = calloc(label_count, sizeof(Prometheus__Label *));
369
370 if (label_list == NULL) {
371 cmt_errno();
372
373 free(time_series_entry);
374
375 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
376 }
377
378 value_list = calloc(label_set_hash_matches, sizeof(Prometheus__Sample *));
379
380 if (value_list == NULL) {
381 cmt_errno();
382
383 free(time_series_entry);
384 free(label_list);
385
386 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
387 }
388
389 /* Initialize the time series */
390 prometheus__time_series__init(&time_series_entry->data);
391
392 time_series_entry->data.n_labels = label_count;
393 time_series_entry->data.labels = label_list;
394 time_series_entry->data.n_samples = label_set_hash_matches;
395 time_series_entry->data.samples = value_list;
396
397 time_series_entry->label_set_hash = label_set_hash;
398 time_series_entry->entries_set = 0;
399
400 /* Initialize the label list */
401 label_index = 0;
402
403 /* Add the __name__ label */
404 result = append_entry_to_prometheus_label_list(label_list,
405 &label_index,
406 "__name__",
407 map->opts->fqname);
408
409 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS)
410 {
411 free(time_series_entry);
412 free(label_list);
413 free(value_list);
414
415 return result;
416 }
417
418 /* Add the static labels */
419 result = CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
420
421 mk_list_foreach(head, &context->cmt->static_labels->list) {
422 static_label = mk_list_entry(head, struct cmt_label, _head);
423
424 result = append_entry_to_prometheus_label_list(label_list,
425 &label_index,
426 static_label->key,
427 static_label->val);
428
429 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS)
430 {
431 break;
432 }
433 }
434
435 /* Add the specific labels */
436 if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS && label_count > 0) {
437 label_name = mk_list_entry_first(&map->label_keys, struct cmt_map_label, _head);
438
439 mk_list_foreach(head, &metric->labels) {
440 label_value = mk_list_entry(head, struct cmt_map_label, _head);
441
442 result = append_entry_to_prometheus_label_list(label_list,
443 &label_index,
444 label_name->name,
445 label_value->name);
446
447 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS)
448 {
449 break;
450 }
451
452 label_name = mk_list_entry_next(&label_name->_head, struct cmt_map_label,
453 _head, &map->label_keys);
454 }
455 }
456
457 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
458 destroy_prometheus_label_list(label_list, label_index);
459 free(time_series_entry);
460 free(value_list);
461
462 return result;
463 }
464
465 /* Add the time series to the context so we can find it when we try to format
466 * a metric with these same labels;
467 */
468 mk_list_add(&time_series_entry->_head, &context->time_series_entries);
469
470 *time_series = time_series_entry;
471
472 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
473 }
474
475
pack_metric_metadata(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map,struct cmt_metric * metric)476 int pack_metric_metadata(struct cmt_prometheus_remote_write_context *context,
477 struct cmt_map *map,
478 struct cmt_metric *metric)
479 {
480 struct cmt_prometheus_metric_metadata *metadata_entry;
481
482 metadata_entry = calloc(1, sizeof(struct cmt_prometheus_metric_metadata));
483
484 if (metadata_entry == NULL) {
485 cmt_errno();
486
487 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
488 }
489
490 prometheus__metric_metadata__init(&metadata_entry->data);
491
492 if (map->type == CMT_COUNTER) {
493 metadata_entry->data.type = PROMETHEUS__METRIC_METADATA__METRIC_TYPE__COUNTER;
494 }
495 else if (map->type == CMT_GAUGE) {
496 metadata_entry->data.type = PROMETHEUS__METRIC_METADATA__METRIC_TYPE__GAUGE;
497 }
498 else if (map->type == CMT_UNTYPED) {
499 metadata_entry->data.type = PROMETHEUS__METRIC_METADATA__METRIC_TYPE__UNKNOWN;
500 }
501 else {
502 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_UNEXPECTED_METRIC_TYPE;
503 }
504
505 metadata_entry->data.metric_family_name = map->opts->fqname;
506 metadata_entry->data.help = map->opts->fqname;
507 metadata_entry->data.unit = "unit";
508
509 mk_list_add(&metadata_entry->_head, &context->metadata_entries);
510
511 return 0;
512 }
513
append_metric_to_timeseries(struct cmt_prometheus_time_series * time_series,struct cmt_metric * metric)514 int append_metric_to_timeseries(struct cmt_prometheus_time_series *time_series,
515 struct cmt_metric *metric)
516 {
517 uint64_t ts;
518 Prometheus__Sample *sample;
519
520 sample = calloc(1, sizeof(Prometheus__Sample));
521
522 if (sample == NULL) {
523 cmt_errno();
524
525 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
526 }
527
528 prometheus__sample__init(sample);
529
530 sample->value = cmt_metric_get_value(metric);
531
532 ts = cmt_metric_get_timestamp(metric);
533 sample->timestamp = ts / 1000000;
534 time_series->data.samples[time_series->entries_set++] = sample;
535
536 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
537 }
538
pack_metric_sample(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map,struct cmt_metric * metric,int add_metadata)539 int pack_metric_sample(struct cmt_prometheus_remote_write_context *context,
540 struct cmt_map *map,
541 struct cmt_metric *metric,
542 int add_metadata)
543 {
544 struct cmt_prometheus_time_series *time_series;
545 int result;
546
547 result = set_up_time_series_for_label_set(context, map, metric, &time_series);
548
549 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
550 return result;
551 }
552
553 if (add_metadata == CMT_TRUE) {
554 result = pack_metric_metadata(context, map, metric);
555
556 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
557 return result;
558 }
559 }
560
561 return append_metric_to_timeseries(time_series, metric);
562 }
563
pack_basic_type(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map)564 int pack_basic_type(struct cmt_prometheus_remote_write_context *context,
565 struct cmt_map *map)
566 {
567 int add_metadata;
568 struct cmt_metric *metric;
569 int result;
570 struct mk_list *head;
571
572 context->sequence_number++;
573 add_metadata = CMT_TRUE;
574
575 if (map->metric_static_set == CMT_TRUE) {
576 result = pack_metric_sample(context, map, &map->metric, add_metadata);
577
578 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
579 return result;
580 }
581 }
582
583 mk_list_foreach(head, &map->metrics) {
584 metric = mk_list_entry(head, struct cmt_metric, _head);
585
586 result = pack_metric_sample(context, map, metric, add_metadata);
587
588 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
589 return result;
590 }
591
592 if (add_metadata == CMT_TRUE) {
593 add_metadata = CMT_FALSE;
594 }
595 }
596
597 return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
598 }
599
600 /* Format all the registered metrics in Prometheus Text format */
cmt_encode_prometheus_remote_write_create(struct cmt * cmt)601 cmt_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
602 {
603 struct cmt_prometheus_remote_write_context context;
604 struct cmt_untyped *untyped;
605 struct cmt_counter *counter;
606 int result;
607 struct cmt_gauge *gauge;
608 struct mk_list *head;
609 cmt_sds_t buf;
610
611 buf = NULL;
612
613 memset(&context, 0, sizeof(struct cmt_prometheus_remote_write_context));
614
615 prometheus__write_request__init(&context.write_request);
616
617 context.cmt = cmt;
618
619 mk_list_init(&context.time_series_entries);
620 mk_list_init(&context.metadata_entries);
621
622 /* Counters */
623 mk_list_foreach(head, &cmt->counters) {
624 counter = mk_list_entry(head, struct cmt_counter, _head);
625 result = pack_basic_type(&context, counter->map);
626
627 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
628 break;
629 }
630 }
631
632 if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
633 /* Gauges */
634 mk_list_foreach(head, &cmt->gauges) {
635 gauge = mk_list_entry(head, struct cmt_gauge, _head);
636 result = pack_basic_type(&context, gauge->map);
637
638 if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
639 break;
640 }
641 }
642
643 }
644
645 if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
646 /* Untyped */
647 mk_list_foreach(head, &cmt->untypeds) {
648 untyped = mk_list_entry(head, struct cmt_untyped, _head);
649 pack_basic_type(&context, untyped->map);
650 }
651 }
652
653 if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
654 buf = render_remote_write_context_to_sds(&context);
655 }
656
657 cmt_destroy_prometheus_remote_write_context(&context);
658
659 return buf;
660 }
661
cmt_encode_prometheus_remote_write_destroy(cmt_sds_t text)662 void cmt_encode_prometheus_remote_write_destroy(cmt_sds_t text)
663 {
664 cmt_sds_destroy(text);
665 }
666