1 /*-
2  * collectd - src/dpdk_telemetry.c
3  * MIT License
4  *
5  * Copyright(c) 2019 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of
9  * this software and associated documentation files (the "Software"), to deal in
10  * the Software without restriction, including without limitation the rights to
11  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
12  * of the Software, and to permit persons to whom the Software is furnished to
13  * do
14  * so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all
18  * copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26  * SOFTWARE.
27  *
28  */
29 
30 #include "collectd.h"
31 #include "plugin.h"
32 #include "utils/common/common.h"
33 #include "utils_time.h"
34 
35 #include <errno.h>
36 #include <jansson.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <sys/queue.h>
41 #include <sys/socket.h>
42 #include <sys/un.h>
43 #include <unistd.h>
44 
45 #define BUF_SIZE 100000
46 #define PLUGIN_NAME "dpdk_telemetry"
47 #define DEFAULT_DPDK_PATH "/var/run/dpdk/rte/telemetry"
48 #define DEFAULT_CLIENT_PATH "/var/run/.client"
49 #define MAX_COMMANDS 2
50 
51 struct client_info {
52   int s_send;
53   int s_recv;
54   int fd;
55   const char *dpdk_path;
56   const char *client_path;
57   struct sockaddr_un addr;
58   struct sockaddr_un addrs;
59 };
60 
61 typedef struct client_info client_info_t;
62 
63 static client_info_t client;
64 static char g_client_path[BUF_SIZE];
65 static char g_dpdk_path[BUF_SIZE];
66 
dpdk_telemetry_config(oconfig_item_t * ci)67 static int dpdk_telemetry_config(oconfig_item_t *ci) {
68   int ret, i;
69 
70   DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
71 
72   for (i = 0; i < ci->children_num; i++) {
73     oconfig_item_t *child = ci->children + i;
74 
75     if (strcasecmp("ClientSocketPath", child->key) == 0) {
76       ret = cf_util_get_string_buffer(child, g_client_path,
77                                       sizeof(g_client_path));
78     } else if (strcasecmp("DpdkSocketPath", child->key) == 0) {
79       ret = cf_util_get_string_buffer(child, g_dpdk_path, sizeof(g_dpdk_path));
80     } else {
81       ERROR(PLUGIN_NAME ": Unknown configuration parameter"
82                         "\"%s\"",
83             child->key);
84       ret = -1;
85     }
86 
87     if (ret < 0) {
88       INFO(PLUGIN_NAME ": %s:%d ret =%d", __FUNCTION__, __LINE__, ret);
89       return ret;
90     }
91   }
92 
93   return 0;
94 }
95 
dpdk_telemetry_parse(json_t * stats,json_t * port)96 static int dpdk_telemetry_parse(json_t *stats, json_t *port) {
97   json_t *statsArrayObj;
98   int portid;
99 
100   if (!stats) {
101     ERROR(PLUGIN_NAME ": Stats pointer is invalid");
102     return -1;
103   }
104 
105   if (!port) {
106     ERROR(PLUGIN_NAME ":  Port pointer is invalid");
107     return -1;
108   }
109   portid = json_integer_value(port);
110 
111   if (portid < -1) {
112     ERROR(PLUGIN_NAME ": portid is invalid");
113     return -1;
114   }
115 
116   json_t *name, *value;
117   const char *name_string;
118   int statslen, i;
119   uint64_t value_int;
120   statslen = json_array_size(stats);
121   for (i = 0; i < statslen; i++) {
122     statsArrayObj = json_array_get(stats, i);
123     name = json_object_get(statsArrayObj, "name");
124     value = json_object_get(statsArrayObj, "value");
125     if (!name) {
126       ERROR(PLUGIN_NAME ": Request does not have name field");
127       return -1;
128     }
129     if (!json_is_string(name)) {
130       ERROR(PLUGIN_NAME ": Metric name is not a string");
131       return -1;
132     }
133     name_string = json_string_value(name);
134     if (!value) {
135       ERROR(PLUGIN_NAME ": Request does not have value name");
136       return -1;
137     }
138     if (!json_is_integer(value)) {
139       ERROR(PLUGIN_NAME ": Metric value is not an integer");
140       return -1;
141     }
142 
143     char dev_name[DATA_MAX_NAME_LEN];
144     if (portid == -1)
145       snprintf(dev_name, sizeof(dev_name), "%s", name_string);
146     else
147       snprintf(dev_name, sizeof(dev_name), "%s.%d", name_string, portid);
148     value_int = json_integer_value(value);
149     value_t dpdk_telemetry_values[1];
150     value_list_t dpdk_telemetry_vl = VALUE_LIST_INIT;
151     dpdk_telemetry_values[0].counter = value_int;
152     dpdk_telemetry_vl.values = dpdk_telemetry_values;
153     dpdk_telemetry_vl.values_len = 1;
154     dpdk_telemetry_vl.time = cdtime();
155     snprintf(dpdk_telemetry_vl.host, sizeof(dpdk_telemetry_vl.host), "%s",
156              hostname_g);
157     snprintf(dpdk_telemetry_vl.plugin, sizeof(dpdk_telemetry_vl.plugin),
158              "dpdk_telemetry");
159     sstrncpy(dpdk_telemetry_vl.plugin_instance, dev_name,
160              sizeof(dpdk_telemetry_vl.plugin_instance));
161     snprintf(dpdk_telemetry_vl.type, sizeof(dpdk_telemetry_vl.type),
162              "dpdk_telemetry");
163     snprintf(dpdk_telemetry_vl.type_instance,
164              sizeof(dpdk_telemetry_vl.type_instance), "%s", name_string);
165 
166     int ret = plugin_dispatch_values(&dpdk_telemetry_vl);
167     if (ret < 0) {
168       ERROR(PLUGIN_NAME ": Failed to dispatch values");
169       return -1;
170     }
171   }
172   return 0;
173 }
174 
parse_json(char * buf)175 static int parse_json(char *buf) {
176 
177   if (!buf) {
178     ERROR(PLUGIN_NAME ": buf pointer is invalid");
179     return -1;
180   }
181   json_error_t error;
182   json_t *root = json_loads(buf, 0, &error);
183   int arraylen, i;
184   json_t *status, *dataArray, *stats, *dataArrayObj;
185   stats = NULL;
186 
187   if (!root) {
188     ERROR(PLUGIN_NAME ": Could not load JSON object from data passed in"
189                       " : %s",
190           error.text);
191     return -1;
192   } else if (!json_is_object(root)) {
193     ERROR(PLUGIN_NAME ": JSON Request is not a JSON object");
194     json_decref(root);
195     return -1;
196   }
197 
198   status = json_object_get(root, "status_code");
199   if (!status) {
200     ERROR(PLUGIN_NAME ": Request does not have status field");
201     return -1;
202   } else if (!json_is_string(status)) {
203     ERROR(PLUGIN_NAME ": Status value is not a string");
204     return -1;
205   }
206   dataArray = json_object_get(root, "data");
207   if (!dataArray) {
208     ERROR(PLUGIN_NAME ": Request does not have data field");
209     return -1;
210   }
211   arraylen = json_array_size(dataArray);
212   if (!arraylen) {
213     ERROR(PLUGIN_NAME ": No data to get");
214     return -1;
215   }
216 
217   for (i = 0; i < arraylen; i++) {
218     json_t *port;
219     dataArrayObj = json_array_get(dataArray, i);
220     port = json_object_get(dataArrayObj, "port");
221     stats = json_object_get(dataArrayObj, "stats");
222     if (!port) {
223       ERROR(PLUGIN_NAME ": Request does not have port field");
224       return -1;
225     }
226     if (!json_is_integer(port)) {
227       ERROR(PLUGIN_NAME ": Port value is not an integer");
228       return -1;
229     }
230 
231     if (!stats) {
232       ERROR(PLUGIN_NAME ": Request does not have stats field");
233       return -1;
234     }
235     dpdk_telemetry_parse(stats, port);
236   }
237   return 0;
238 }
239 
dpdk_telemetry_cleanup(void)240 static int dpdk_telemetry_cleanup(void) {
241   close(client.s_send);
242   close(client.s_recv);
243   close(client.fd);
244   client.s_send = -1;
245   client.s_recv = -1;
246   client.fd = -1;
247   return 0;
248 }
249 
dpdk_telemetry_socket_init(void)250 static int dpdk_telemetry_socket_init(void) {
251   DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
252   char message[BUF_SIZE];
253 
254   /* Here we look up the length of the g_dpdk_path string
255    * If it has a length we use it, otherwise we fall back to default
256    * See dpdk_telemetry_config() for details
257    */
258   client.dpdk_path = (strlen(g_dpdk_path)) ? g_dpdk_path : DEFAULT_DPDK_PATH;
259   client.client_path =
260       (strlen(g_client_path)) ? g_client_path : DEFAULT_CLIENT_PATH;
261   client.s_send = socket(AF_UNIX, SOCK_SEQPACKET, 0);
262   if (client.s_send < 0) {
263     ERROR(PLUGIN_NAME ": Failed to open socket errno(%d), error(%s)", errno,
264           strerror(errno));
265     return -1;
266   }
267   client.s_recv = socket(AF_UNIX, SOCK_SEQPACKET, 0);
268   if (client.s_recv < 0) {
269     ERROR(PLUGIN_NAME ": Failed to open message socket errno(%d), error(%s)",
270           errno, strerror(errno));
271     dpdk_telemetry_cleanup();
272     return -1;
273   }
274   client.addr.sun_family = AF_UNIX;
275   snprintf(client.addr.sun_path, sizeof(client.addr.sun_path), "%s",
276            client.dpdk_path);
277   if (connect(client.s_send, (struct sockaddr *)&client.addr,
278               sizeof(client.addr)) < 0) {
279     ERROR(PLUGIN_NAME ": Failed to connect errno(%d), error(%s)", errno,
280           strerror(errno));
281     dpdk_telemetry_cleanup();
282     return -1;
283   }
284   client.addrs.sun_family = AF_UNIX;
285   snprintf(client.addrs.sun_path, sizeof(client.addrs.sun_path), "%s",
286            client.client_path);
287   unlink(client.client_path);
288   if (bind(client.s_recv, (struct sockaddr *)&client.addrs,
289            sizeof(client.addrs)) < 0) {
290     ERROR(PLUGIN_NAME ": Failed to bind errno(%d), error(%s)", errno,
291           strerror(errno));
292     dpdk_telemetry_cleanup();
293     return -1;
294   }
295   if (listen(client.s_recv, 1) < 0) {
296     ERROR(PLUGIN_NAME ": Listen failed errno(%d), error(%s)", errno,
297           strerror(errno));
298     dpdk_telemetry_cleanup();
299     return -1;
300   }
301   snprintf(message, sizeof(message),
302            "{\"action\":1,\"command\":\"clients\""
303            ",\"data\":{\"client_path\":\"%s\"}}",
304            client.client_path);
305   if (send(client.s_send, message, strlen(message), 0) < 0) {
306     ERROR(PLUGIN_NAME ": Could not send register message errno(%d), error(%s)",
307           errno, strerror(errno));
308     dpdk_telemetry_cleanup();
309     return -1;
310   }
311   client.fd = accept(client.s_recv, NULL, NULL);
312   if (client.fd < 0) {
313     ERROR(PLUGIN_NAME ": Failed to accept errno(%d), error(%s)", errno,
314           strerror(errno));
315     dpdk_telemetry_cleanup();
316     return -1;
317   }
318   return 0;
319 }
320 
dpdk_telemetry_shutdown(void)321 static int dpdk_telemetry_shutdown(void) {
322   DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
323   char msg[BUF_SIZE];
324   int ret;
325 
326   snprintf(msg, sizeof(msg),
327            "{\"action\":2,\"command\":\"clients\""
328            ",\"data\":{\"client_path\":\"%s\"}}",
329            client.client_path);
330   ret = send(client.fd, msg, strlen(msg), 0);
331   if (ret < 0) {
332     ERROR(PLUGIN_NAME ": Could not send unregister message");
333     dpdk_telemetry_cleanup();
334     return -1;
335   }
336   dpdk_telemetry_cleanup();
337   return 0;
338 }
339 
dpdk_telemetry_read(user_data_t * ud)340 static int dpdk_telemetry_read(user_data_t *ud) {
341   DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
342   char buffer[BUF_SIZE];
343   int bytes = 0, ret;
344   char *json_string[MAX_COMMANDS] = {"{\"action\":0,\"command\":"
345                                      "\"ports_all_stat_values\",\"data\":null}",
346                                      "{\"action\":0,\"command\":"
347                                      "\"global_stat_values\",\"data\":null}"};
348   size_t size = sizeof(json_string) / sizeof(json_string[0]);
349 
350   for (int i = 0; i < size; i++) {
351     if (send(client.fd, json_string[i], strlen(json_string[i]), 0) < 0) {
352       ERROR(PLUGIN_NAME
353             ": Could not send request for stats errno(%d), error(%s)",
354             errno, strerror(errno));
355       if (errno == EBADF || errno == ECONNRESET || errno == ENOTCONN ||
356           errno == EPIPE) {
357         dpdk_telemetry_cleanup();
358         dpdk_telemetry_socket_init();
359         send(client.fd, json_string[i], strlen(json_string[i]), 0);
360       }
361     } else {
362       bytes = recv(client.fd, buffer, sizeof(buffer) - 1, 0);
363       if (bytes < 0) {
364         ERROR(PLUGIN_NAME ": Could not receive stats errno(%d), error(%s)",
365               errno, strerror(errno));
366         dpdk_telemetry_cleanup();
367         dpdk_telemetry_socket_init();
368       } else {
369         buffer[bytes] = '\0';
370         ret = parse_json(buffer);
371         if (ret < 0)
372           ERROR(PLUGIN_NAME ": Parsing failed");
373       }
374     }
375   }
376   return 0;
377 }
378 
dpdk_telemetry_init(void)379 static int dpdk_telemetry_init(void) {
380 
381   DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
382 
383   client.s_send = -1;
384   client.s_recv = -1;
385   client.fd = -1;
386   if (dpdk_telemetry_socket_init() < 0)
387     ERROR(PLUGIN_NAME ": Socket initialization failed.");
388 
389   return 0;
390 }
391 
module_register(void)392 void module_register(void) {
393   plugin_register_init(PLUGIN_NAME, dpdk_telemetry_init);
394   plugin_register_complex_config(PLUGIN_NAME, dpdk_telemetry_config);
395   plugin_register_complex_read(NULL, PLUGIN_NAME, dpdk_telemetry_read, 0, NULL);
396   plugin_register_shutdown(PLUGIN_NAME, dpdk_telemetry_shutdown);
397 }
398