1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  CMetrics
4  *  ========
5  *  Copyright 2021 Eduardo Silva <eduardo@calyptia.com>
6  *
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  */
19 #include <cmetrics/cmetrics.h>
20 #include <cmetrics/cmt_metric.h>
21 #include <cmetrics/cmt_map.h>
22 #include <cmetrics/cmt_sds.h>
23 #include <cmetrics/cmt_counter.h>
24 #include <cmetrics/cmt_gauge.h>
25 #include <cmetrics/cmt_untyped.h>
26 #include <cmetrics/cmt_hash.h>
27 #include <cmetrics/cmt_encode_prometheus_remote_write.h>
28 
29 static cmt_sds_t render_remote_write_context_to_sds(
30     struct cmt_prometheus_remote_write_context *context);
31 
32 static void destroy_prometheus_label_list(Prometheus__Label **label_list,
33                                           size_t entry_count);
34 
35 static void destroy_prometheus_sample_list(Prometheus__Sample **sample_list,
36                                            size_t entry_count);
37 
38 static void cmt_destroy_prometheus_remote_write_context(
39     struct cmt_prometheus_remote_write_context *context);
40 
41 static uint64_t calculate_label_set_hash(struct mk_list *label_values, uint64_t seed);
42 
43 static size_t count_metrics_with_matching_label_set(struct mk_list *metrics,
44                                                     uint64_t sequence_number,
45                                                     uint64_t desired_hash);
46 
47 static int append_entry_to_prometheus_label_list(Prometheus__Label **label_list,
48                                                  size_t *index,
49                                                  char *name,
50                                                  char *value);
51 
52 static int set_up_time_series_for_label_set(
53                                     struct cmt_prometheus_remote_write_context *context,
54                                     struct cmt_map *map,
55                                     struct cmt_metric *metric,
56                                     struct cmt_prometheus_time_series **time_series);
57 
58 static int pack_metric_metadata(struct cmt_prometheus_remote_write_context *context,
59                                 struct cmt_map *map,
60                                 struct cmt_metric *metric);
61 
62 static int append_metric_to_timeseries(struct cmt_prometheus_time_series *time_series,
63                                        struct cmt_metric *metric);
64 
65 static int pack_metric_sample(struct cmt_prometheus_remote_write_context *context,
66                               struct cmt_map *map,
67                               struct cmt_metric *metric,
68                               int add_metadata);
69 
70 static int pack_basic_type(struct cmt_prometheus_remote_write_context *context,
71                            struct cmt_map *map);
72 
render_remote_write_context_to_sds(struct cmt_prometheus_remote_write_context * context)73 cmt_sds_t render_remote_write_context_to_sds(
74     struct cmt_prometheus_remote_write_context *context)
75 {
76     size_t                                 write_request_size;
77     struct cmt_prometheus_time_series     *time_series_entry;
78     struct cmt_prometheus_metric_metadata *metadata_entry;
79     cmt_sds_t                              result_buffer;
80     size_t                                 entry_index;
81     struct mk_list                        *head;
82 
83     context->write_request.n_timeseries = mk_list_size(&context->time_series_entries);
84     context->write_request.n_metadata   = mk_list_size(&context->metadata_entries);
85 
86     context->write_request.timeseries = calloc(context->write_request.n_timeseries,
87                                                sizeof(Prometheus__TimeSeries *));
88 
89     if (context->write_request.timeseries == NULL) {
90         cmt_errno();
91 
92         return NULL;
93     }
94 
95     context->write_request.metadata = calloc(context->write_request.n_metadata,
96                                              sizeof(Prometheus__TimeSeries *));
97 
98     if (context->write_request.metadata == NULL) {
99         cmt_errno();
100 
101         free(context->write_request.timeseries);
102 
103         return NULL;
104     }
105 
106     entry_index = 0;
107 
108     mk_list_foreach(head, &context->time_series_entries) {
109         time_series_entry = mk_list_entry(head, struct cmt_prometheus_time_series, _head);
110 
111         context->write_request.timeseries[entry_index++] = &time_series_entry->data;
112     }
113 
114     entry_index = 0;
115 
116     mk_list_foreach(head, &context->metadata_entries) {
117         metadata_entry = mk_list_entry(head, struct cmt_prometheus_metric_metadata, _head);
118 
119         context->write_request.metadata[entry_index++] = &metadata_entry->data;
120     }
121 
122     write_request_size = prometheus__write_request__get_packed_size(&context->write_request);
123 
124     result_buffer = cmt_sds_create_size(write_request_size);
125 
126     if(result_buffer != NULL) {
127         prometheus__write_request__pack(&context->write_request, (uint8_t *) result_buffer);
128 
129         cmt_sds_set_len(result_buffer, write_request_size);
130     }
131 
132     free(context->write_request.timeseries);
133 
134     free(context->write_request.metadata);
135 
136     return result_buffer;
137 }
138 
cmt_destroy_prometheus_remote_write_context(struct cmt_prometheus_remote_write_context * context)139 void cmt_destroy_prometheus_remote_write_context(
140     struct cmt_prometheus_remote_write_context *context)
141 {
142     struct cmt_prometheus_time_series     *time_series_entry;
143     struct cmt_prometheus_metric_metadata *metadata_entry;
144     struct mk_list                        *head;
145     struct mk_list                        *tmp;
146 
147     mk_list_foreach_safe(head, tmp, &context->time_series_entries) {
148         time_series_entry = mk_list_entry(head, struct cmt_prometheus_time_series, _head);
149 
150         if (time_series_entry->data.labels != NULL) {
151             destroy_prometheus_label_list(time_series_entry->data.labels,
152                                           time_series_entry->data.n_labels);
153 
154             time_series_entry->data.labels = NULL;
155         }
156 
157         if (time_series_entry->data.samples != NULL) {
158             destroy_prometheus_sample_list(time_series_entry->data.samples,
159                                           time_series_entry->data.n_samples);
160 
161             time_series_entry->data.samples = NULL;
162         }
163 
164         mk_list_del(&time_series_entry->_head);
165 
166         free(time_series_entry);
167     }
168 
169     mk_list_foreach_safe(head, tmp, &context->metadata_entries) {
170         metadata_entry = mk_list_entry(head, struct cmt_prometheus_metric_metadata, _head);
171 
172         mk_list_del(&metadata_entry->_head);
173 
174         free(metadata_entry);
175     }
176 }
177 
calculate_label_set_hash(struct mk_list * label_values,uint64_t seed)178 uint64_t calculate_label_set_hash(struct mk_list *label_values, uint64_t seed)
179 {
180     struct cmt_map_label *label_value;
181     XXH64_state_t         state;
182     struct mk_list       *head;
183 
184     XXH64_reset(&state, 0);
185 
186     XXH64_update(&state, &seed, sizeof(uint64_t));
187 
188     mk_list_foreach(head, label_values) {
189         label_value = mk_list_entry(head, struct cmt_map_label, _head);
190 
191         XXH64_update(&state, label_value->name, cmt_sds_len(label_value->name));
192     }
193 
194     return XXH64_digest(&state);
195 }
196 
count_metrics_with_matching_label_set(struct mk_list * metrics,uint64_t sequence_number,uint64_t desired_hash)197 size_t count_metrics_with_matching_label_set(struct mk_list *metrics,
198                                              uint64_t sequence_number,
199                                              uint64_t desired_hash)
200 {
201     uint64_t           label_set_hash;
202     size_t             matches;
203     struct cmt_metric *metric;
204     struct mk_list    *head;
205 
206     matches = 0;
207 
208     mk_list_foreach(head, metrics) {
209         metric = mk_list_entry(head, struct cmt_metric, _head);
210 
211         label_set_hash = calculate_label_set_hash(&metric->labels, sequence_number);
212 
213         if (label_set_hash == desired_hash) {
214             matches++;
215         }
216     }
217 
218     return matches;
219 }
220 
append_entry_to_prometheus_label_list(Prometheus__Label ** label_list,size_t * index,char * name,char * value)221 int append_entry_to_prometheus_label_list(Prometheus__Label **label_list,
222                                           size_t *index,
223                                           char *name,
224                                           char *value)
225 {
226     label_list[*index] = calloc(1, sizeof(Prometheus__Label));
227 
228     if (label_list[*index] == NULL) {
229         cmt_errno();
230 
231         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
232     }
233 
234     prometheus__label__init(label_list[*index]);
235 
236     label_list[*index]->name = cmt_sds_create(name);
237 
238     if (label_list[*index]->name == NULL) {
239         free(label_list[*index]);
240 
241         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
242     }
243 
244     label_list[*index]->value = cmt_sds_create(value);
245 
246     if (label_list[*index]->value == NULL) {
247         cmt_sds_destroy(label_list[*index]->name);
248         free(label_list[*index]);
249 
250         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
251     }
252 
253     (*index)++;
254 
255     return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
256 }
257 
destroy_prometheus_sample_list(Prometheus__Sample ** sample_list,size_t entry_count)258 void destroy_prometheus_sample_list(Prometheus__Sample **sample_list,
259                                     size_t entry_count)
260 {
261     size_t index;
262 
263     if (sample_list != NULL) {
264         for (index = 0 ; index < entry_count ; index++) {
265             if (sample_list[index] != NULL) {
266                 free(sample_list[index]);
267                 sample_list[index] = NULL;
268             }
269         }
270 
271         free(sample_list);
272     }
273 }
274 
destroy_prometheus_label_list(Prometheus__Label ** label_list,size_t entry_count)275 void destroy_prometheus_label_list(Prometheus__Label **label_list,
276                                    size_t entry_count)
277 {
278     size_t index;
279 
280     if (label_list != NULL) {
281         for (index = 0 ; index < entry_count ; index++) {
282             if (label_list[index] != NULL) {
283                 if (label_list[index]->name != NULL) {
284                     cmt_sds_destroy(label_list[index]->name);
285                     label_list[index]->name = NULL;
286                 }
287 
288                 if (label_list[index]->value != NULL) {
289                     cmt_sds_destroy(label_list[index]->value);
290                     label_list[index]->value = NULL;
291                 }
292 
293                 free(label_list[index]);
294                 label_list[index] = NULL;
295             }
296         }
297 
298         free(label_list);
299     }
300 }
301 
set_up_time_series_for_label_set(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map,struct cmt_metric * metric,struct cmt_prometheus_time_series ** time_series)302 int set_up_time_series_for_label_set(struct cmt_prometheus_remote_write_context *context,
303                                      struct cmt_map *map,
304                                      struct cmt_metric *metric,
305                                      struct cmt_prometheus_time_series **time_series)
306 {
307     uint8_t                            time_series_match_found;
308     size_t                             label_set_hash_matches;
309     struct cmt_prometheus_time_series *time_series_entry;
310     uint64_t                           label_set_hash;
311     struct cmt_label                  *static_label;
312     size_t                             label_index;
313     size_t                             label_count;
314     struct cmt_map_label              *label_value;
315     struct cmt_map_label              *label_name;
316     Prometheus__Label                **label_list;
317     Prometheus__Sample               **value_list;
318     int                                result;
319     struct mk_list                    *head;
320 
321     label_set_hash = calculate_label_set_hash(&metric->labels, context->sequence_number);
322 
323     /* Determine if there is an existing time series for this label set */
324     time_series_match_found = CMT_FALSE;
325 
326     mk_list_foreach(head, &context->time_series_entries) {
327         time_series_entry = mk_list_entry(head, struct cmt_prometheus_time_series, _head);
328 
329         if (time_series_entry->label_set_hash == label_set_hash) {
330             time_series_match_found = CMT_TRUE;
331 
332             break;
333         }
334     }
335 
336     if (time_series_match_found == CMT_TRUE) {
337         *time_series = time_series_entry;
338 
339         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
340     }
341 
342     /* Find out how many samples share these label values */
343     label_set_hash_matches = count_metrics_with_matching_label_set(&map->metrics,
344                                                                    context->sequence_number,
345                                                                    label_set_hash);
346 
347     if (label_set_hash_matches == 0)
348     {
349         label_set_hash_matches++;
350     }
351 
352     /* Allocate the memory required for the label and value lists, we need to add
353      * one for the fixed __name__ label
354      */
355     label_count = mk_list_size(&context->cmt->static_labels->list) +
356                   mk_list_size(&metric->labels) +
357                   1;
358 
359 
360     time_series_entry = calloc(1, sizeof(struct cmt_prometheus_time_series));
361 
362     if (time_series_entry == NULL) {
363         cmt_errno();
364 
365         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
366     }
367 
368     label_list = calloc(label_count, sizeof(Prometheus__Label *));
369 
370     if (label_list == NULL) {
371         cmt_errno();
372 
373         free(time_series_entry);
374 
375         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
376     }
377 
378     value_list = calloc(label_set_hash_matches, sizeof(Prometheus__Sample *));
379 
380     if (value_list == NULL) {
381         cmt_errno();
382 
383         free(time_series_entry);
384         free(label_list);
385 
386         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
387     }
388 
389     /* Initialize the time series */
390     prometheus__time_series__init(&time_series_entry->data);
391 
392     time_series_entry->data.n_labels  = label_count;
393     time_series_entry->data.labels    = label_list;
394     time_series_entry->data.n_samples = label_set_hash_matches;
395     time_series_entry->data.samples   = value_list;
396 
397     time_series_entry->label_set_hash = label_set_hash;
398     time_series_entry->entries_set = 0;
399 
400     /* Initialize the label list */
401     label_index = 0;
402 
403     /* Add the __name__ label */
404     result = append_entry_to_prometheus_label_list(label_list,
405                                                    &label_index,
406                                                    "__name__",
407                                                    map->opts->fqname);
408 
409     if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS)
410     {
411         free(time_series_entry);
412         free(label_list);
413         free(value_list);
414 
415         return result;
416     }
417 
418     /* Add the static labels */
419     result = CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
420 
421     mk_list_foreach(head, &context->cmt->static_labels->list) {
422         static_label = mk_list_entry(head, struct cmt_label, _head);
423 
424         result = append_entry_to_prometheus_label_list(label_list,
425                                                        &label_index,
426                                                        static_label->key,
427                                                        static_label->val);
428 
429         if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS)
430         {
431             break;
432         }
433     }
434 
435     /* Add the specific labels */
436     if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS && label_count > 0) {
437         label_name = mk_list_entry_first(&map->label_keys, struct cmt_map_label, _head);
438 
439         mk_list_foreach(head, &metric->labels) {
440             label_value = mk_list_entry(head, struct cmt_map_label, _head);
441 
442             result = append_entry_to_prometheus_label_list(label_list,
443                                                            &label_index,
444                                                            label_name->name,
445                                                            label_value->name);
446 
447             if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS)
448             {
449                 break;
450             }
451 
452             label_name = mk_list_entry_next(&label_name->_head, struct cmt_map_label,
453                                             _head, &map->label_keys);
454         }
455     }
456 
457     if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
458         destroy_prometheus_label_list(label_list, label_index);
459         free(time_series_entry);
460         free(value_list);
461 
462         return result;
463     }
464 
465     /* Add the time series to the context so we can find it when we try to format
466      * a metric with these same labels;
467      */
468     mk_list_add(&time_series_entry->_head, &context->time_series_entries);
469 
470     *time_series = time_series_entry;
471 
472     return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
473 }
474 
475 
pack_metric_metadata(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map,struct cmt_metric * metric)476 int pack_metric_metadata(struct cmt_prometheus_remote_write_context *context,
477                          struct cmt_map *map,
478                          struct cmt_metric *metric)
479 {
480     struct cmt_prometheus_metric_metadata *metadata_entry;
481 
482     metadata_entry = calloc(1, sizeof(struct cmt_prometheus_metric_metadata));
483 
484     if (metadata_entry == NULL) {
485         cmt_errno();
486 
487         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
488     }
489 
490     prometheus__metric_metadata__init(&metadata_entry->data);
491 
492     if (map->type == CMT_COUNTER) {
493         metadata_entry->data.type = PROMETHEUS__METRIC_METADATA__METRIC_TYPE__COUNTER;
494     }
495     else if (map->type == CMT_GAUGE) {
496         metadata_entry->data.type = PROMETHEUS__METRIC_METADATA__METRIC_TYPE__GAUGE;
497     }
498     else if (map->type == CMT_UNTYPED) {
499         metadata_entry->data.type = PROMETHEUS__METRIC_METADATA__METRIC_TYPE__UNKNOWN;
500     }
501     else {
502         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_UNEXPECTED_METRIC_TYPE;
503     }
504 
505     metadata_entry->data.metric_family_name = map->opts->fqname;
506     metadata_entry->data.help = map->opts->fqname;
507     metadata_entry->data.unit = "unit";
508 
509     mk_list_add(&metadata_entry->_head, &context->metadata_entries);
510 
511     return 0;
512 }
513 
append_metric_to_timeseries(struct cmt_prometheus_time_series * time_series,struct cmt_metric * metric)514 int append_metric_to_timeseries(struct cmt_prometheus_time_series *time_series,
515                                 struct cmt_metric *metric)
516 {
517     uint64_t ts;
518     Prometheus__Sample *sample;
519 
520     sample = calloc(1, sizeof(Prometheus__Sample));
521 
522     if (sample == NULL) {
523         cmt_errno();
524 
525         return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ALLOCATION_ERROR;
526     }
527 
528     prometheus__sample__init(sample);
529 
530     sample->value = cmt_metric_get_value(metric);
531 
532     ts = cmt_metric_get_timestamp(metric);
533     sample->timestamp = ts / 1000000;
534     time_series->data.samples[time_series->entries_set++] = sample;
535 
536     return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
537 }
538 
pack_metric_sample(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map,struct cmt_metric * metric,int add_metadata)539 int pack_metric_sample(struct cmt_prometheus_remote_write_context *context,
540                        struct cmt_map *map,
541                        struct cmt_metric *metric,
542                        int add_metadata)
543 {
544     struct cmt_prometheus_time_series *time_series;
545     int                                result;
546 
547     result = set_up_time_series_for_label_set(context, map, metric, &time_series);
548 
549     if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
550         return result;
551     }
552 
553     if (add_metadata == CMT_TRUE) {
554         result = pack_metric_metadata(context, map, metric);
555 
556         if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
557             return result;
558         }
559     }
560 
561     return append_metric_to_timeseries(time_series, metric);
562 }
563 
pack_basic_type(struct cmt_prometheus_remote_write_context * context,struct cmt_map * map)564 int pack_basic_type(struct cmt_prometheus_remote_write_context *context,
565                     struct cmt_map *map)
566 {
567     int                add_metadata;
568     struct cmt_metric *metric;
569     int                result;
570     struct mk_list    *head;
571 
572     context->sequence_number++;
573     add_metadata = CMT_TRUE;
574 
575     if (map->metric_static_set == CMT_TRUE) {
576         result = pack_metric_sample(context, map, &map->metric, add_metadata);
577 
578         if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
579             return result;
580         }
581     }
582 
583     mk_list_foreach(head, &map->metrics) {
584         metric = mk_list_entry(head, struct cmt_metric, _head);
585 
586         result = pack_metric_sample(context, map, metric, add_metadata);
587 
588         if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
589             return result;
590         }
591 
592         if (add_metadata == CMT_TRUE) {
593             add_metadata = CMT_FALSE;
594         }
595     }
596 
597     return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS;
598 }
599 
600 /* Format all the registered metrics in Prometheus Text format */
cmt_encode_prometheus_remote_write_create(struct cmt * cmt)601 cmt_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
602 {
603     struct cmt_prometheus_remote_write_context context;
604     struct cmt_untyped                        *untyped;
605     struct cmt_counter                        *counter;
606     int                                        result;
607     struct cmt_gauge                          *gauge;
608     struct mk_list                            *head;
609     cmt_sds_t                                  buf;
610 
611     buf = NULL;
612 
613     memset(&context, 0, sizeof(struct cmt_prometheus_remote_write_context));
614 
615     prometheus__write_request__init(&context.write_request);
616 
617     context.cmt = cmt;
618 
619     mk_list_init(&context.time_series_entries);
620     mk_list_init(&context.metadata_entries);
621 
622     /* Counters */
623     mk_list_foreach(head, &cmt->counters) {
624         counter = mk_list_entry(head, struct cmt_counter, _head);
625         result = pack_basic_type(&context, counter->map);
626 
627         if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
628             break;
629         }
630     }
631 
632     if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
633         /* Gauges */
634         mk_list_foreach(head, &cmt->gauges) {
635             gauge = mk_list_entry(head, struct cmt_gauge, _head);
636             result = pack_basic_type(&context, gauge->map);
637 
638             if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
639                 break;
640             }
641         }
642 
643     }
644 
645     if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
646         /* Untyped */
647         mk_list_foreach(head, &cmt->untypeds) {
648             untyped = mk_list_entry(head, struct cmt_untyped, _head);
649             pack_basic_type(&context, untyped->map);
650         }
651     }
652 
653     if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) {
654         buf = render_remote_write_context_to_sds(&context);
655     }
656 
657     cmt_destroy_prometheus_remote_write_context(&context);
658 
659     return buf;
660 }
661 
cmt_encode_prometheus_remote_write_destroy(cmt_sds_t text)662 void cmt_encode_prometheus_remote_write_destroy(cmt_sds_t text)
663 {
664     cmt_sds_destroy(text);
665 }
666