1 /*-
2 * collectd - src/dpdk_telemetry.c
3 * MIT License
4 *
5 * Copyright(c) 2019 Intel Corporation. All rights reserved.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of
9 * this software and associated documentation files (the "Software"), to deal in
10 * the Software without restriction, including without limitation the rights to
11 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
12 * of the Software, and to permit persons to whom the Software is furnished to
13 * do
14 * so, subject to the following conditions:
15 *
16 * The above copyright notice and this permission notice shall be included in
17 * all
18 * copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 *
28 */
29
30 #include "collectd.h"
31 #include "plugin.h"
32 #include "utils/common/common.h"
33 #include "utils_time.h"
34
35 #include <errno.h>
36 #include <jansson.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <sys/queue.h>
41 #include <sys/socket.h>
42 #include <sys/un.h>
43 #include <unistd.h>
44
45 #define BUF_SIZE 100000
46 #define PLUGIN_NAME "dpdk_telemetry"
47 #define DEFAULT_DPDK_PATH "/var/run/dpdk/rte/telemetry"
48 #define DEFAULT_CLIENT_PATH "/var/run/.client"
49 #define MAX_COMMANDS 2
50
51 struct client_info {
52 int s_send;
53 int s_recv;
54 int fd;
55 const char *dpdk_path;
56 const char *client_path;
57 struct sockaddr_un addr;
58 struct sockaddr_un addrs;
59 };
60
61 typedef struct client_info client_info_t;
62
63 static client_info_t client;
64 static char g_client_path[BUF_SIZE];
65 static char g_dpdk_path[BUF_SIZE];
66
dpdk_telemetry_config(oconfig_item_t * ci)67 static int dpdk_telemetry_config(oconfig_item_t *ci) {
68 int ret, i;
69
70 DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
71
72 for (i = 0; i < ci->children_num; i++) {
73 oconfig_item_t *child = ci->children + i;
74
75 if (strcasecmp("ClientSocketPath", child->key) == 0) {
76 ret = cf_util_get_string_buffer(child, g_client_path,
77 sizeof(g_client_path));
78 } else if (strcasecmp("DpdkSocketPath", child->key) == 0) {
79 ret = cf_util_get_string_buffer(child, g_dpdk_path, sizeof(g_dpdk_path));
80 } else {
81 ERROR(PLUGIN_NAME ": Unknown configuration parameter"
82 "\"%s\"",
83 child->key);
84 ret = -1;
85 }
86
87 if (ret < 0) {
88 INFO(PLUGIN_NAME ": %s:%d ret =%d", __FUNCTION__, __LINE__, ret);
89 return ret;
90 }
91 }
92
93 return 0;
94 }
95
dpdk_telemetry_parse(json_t * stats,json_t * port)96 static int dpdk_telemetry_parse(json_t *stats, json_t *port) {
97 json_t *statsArrayObj;
98 int portid;
99
100 if (!stats) {
101 ERROR(PLUGIN_NAME ": Stats pointer is invalid");
102 return -1;
103 }
104
105 if (!port) {
106 ERROR(PLUGIN_NAME ": Port pointer is invalid");
107 return -1;
108 }
109 portid = json_integer_value(port);
110
111 if (portid < -1) {
112 ERROR(PLUGIN_NAME ": portid is invalid");
113 return -1;
114 }
115
116 json_t *name, *value;
117 const char *name_string;
118 int statslen, i;
119 uint64_t value_int;
120 statslen = json_array_size(stats);
121 for (i = 0; i < statslen; i++) {
122 statsArrayObj = json_array_get(stats, i);
123 name = json_object_get(statsArrayObj, "name");
124 value = json_object_get(statsArrayObj, "value");
125 if (!name) {
126 ERROR(PLUGIN_NAME ": Request does not have name field");
127 return -1;
128 }
129 if (!json_is_string(name)) {
130 ERROR(PLUGIN_NAME ": Metric name is not a string");
131 return -1;
132 }
133 name_string = json_string_value(name);
134 if (!value) {
135 ERROR(PLUGIN_NAME ": Request does not have value name");
136 return -1;
137 }
138 if (!json_is_integer(value)) {
139 ERROR(PLUGIN_NAME ": Metric value is not an integer");
140 return -1;
141 }
142
143 char dev_name[DATA_MAX_NAME_LEN];
144 if (portid == -1)
145 snprintf(dev_name, sizeof(dev_name), "%s", name_string);
146 else
147 snprintf(dev_name, sizeof(dev_name), "%s.%d", name_string, portid);
148 value_int = json_integer_value(value);
149 value_t dpdk_telemetry_values[1];
150 value_list_t dpdk_telemetry_vl = VALUE_LIST_INIT;
151 dpdk_telemetry_values[0].counter = value_int;
152 dpdk_telemetry_vl.values = dpdk_telemetry_values;
153 dpdk_telemetry_vl.values_len = 1;
154 dpdk_telemetry_vl.time = cdtime();
155 snprintf(dpdk_telemetry_vl.host, sizeof(dpdk_telemetry_vl.host), "%s",
156 hostname_g);
157 snprintf(dpdk_telemetry_vl.plugin, sizeof(dpdk_telemetry_vl.plugin),
158 "dpdk_telemetry");
159 sstrncpy(dpdk_telemetry_vl.plugin_instance, dev_name,
160 sizeof(dpdk_telemetry_vl.plugin_instance));
161 snprintf(dpdk_telemetry_vl.type, sizeof(dpdk_telemetry_vl.type),
162 "dpdk_telemetry");
163 snprintf(dpdk_telemetry_vl.type_instance,
164 sizeof(dpdk_telemetry_vl.type_instance), "%s", name_string);
165
166 int ret = plugin_dispatch_values(&dpdk_telemetry_vl);
167 if (ret < 0) {
168 ERROR(PLUGIN_NAME ": Failed to dispatch values");
169 return -1;
170 }
171 }
172 return 0;
173 }
174
parse_json(char * buf)175 static int parse_json(char *buf) {
176
177 if (!buf) {
178 ERROR(PLUGIN_NAME ": buf pointer is invalid");
179 return -1;
180 }
181 json_error_t error;
182 json_t *root = json_loads(buf, 0, &error);
183 int arraylen, i;
184 json_t *status, *dataArray, *stats, *dataArrayObj;
185 stats = NULL;
186
187 if (!root) {
188 ERROR(PLUGIN_NAME ": Could not load JSON object from data passed in"
189 " : %s",
190 error.text);
191 return -1;
192 } else if (!json_is_object(root)) {
193 ERROR(PLUGIN_NAME ": JSON Request is not a JSON object");
194 json_decref(root);
195 return -1;
196 }
197
198 status = json_object_get(root, "status_code");
199 if (!status) {
200 ERROR(PLUGIN_NAME ": Request does not have status field");
201 return -1;
202 } else if (!json_is_string(status)) {
203 ERROR(PLUGIN_NAME ": Status value is not a string");
204 return -1;
205 }
206 dataArray = json_object_get(root, "data");
207 if (!dataArray) {
208 ERROR(PLUGIN_NAME ": Request does not have data field");
209 return -1;
210 }
211 arraylen = json_array_size(dataArray);
212 if (!arraylen) {
213 ERROR(PLUGIN_NAME ": No data to get");
214 return -1;
215 }
216
217 for (i = 0; i < arraylen; i++) {
218 json_t *port;
219 dataArrayObj = json_array_get(dataArray, i);
220 port = json_object_get(dataArrayObj, "port");
221 stats = json_object_get(dataArrayObj, "stats");
222 if (!port) {
223 ERROR(PLUGIN_NAME ": Request does not have port field");
224 return -1;
225 }
226 if (!json_is_integer(port)) {
227 ERROR(PLUGIN_NAME ": Port value is not an integer");
228 return -1;
229 }
230
231 if (!stats) {
232 ERROR(PLUGIN_NAME ": Request does not have stats field");
233 return -1;
234 }
235 dpdk_telemetry_parse(stats, port);
236 }
237 return 0;
238 }
239
dpdk_telemetry_cleanup(void)240 static int dpdk_telemetry_cleanup(void) {
241 close(client.s_send);
242 close(client.s_recv);
243 close(client.fd);
244 client.s_send = -1;
245 client.s_recv = -1;
246 client.fd = -1;
247 return 0;
248 }
249
dpdk_telemetry_socket_init(void)250 static int dpdk_telemetry_socket_init(void) {
251 DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
252 char message[BUF_SIZE];
253
254 /* Here we look up the length of the g_dpdk_path string
255 * If it has a length we use it, otherwise we fall back to default
256 * See dpdk_telemetry_config() for details
257 */
258 client.dpdk_path = (strlen(g_dpdk_path)) ? g_dpdk_path : DEFAULT_DPDK_PATH;
259 client.client_path =
260 (strlen(g_client_path)) ? g_client_path : DEFAULT_CLIENT_PATH;
261 client.s_send = socket(AF_UNIX, SOCK_SEQPACKET, 0);
262 if (client.s_send < 0) {
263 ERROR(PLUGIN_NAME ": Failed to open socket errno(%d), error(%s)", errno,
264 strerror(errno));
265 return -1;
266 }
267 client.s_recv = socket(AF_UNIX, SOCK_SEQPACKET, 0);
268 if (client.s_recv < 0) {
269 ERROR(PLUGIN_NAME ": Failed to open message socket errno(%d), error(%s)",
270 errno, strerror(errno));
271 dpdk_telemetry_cleanup();
272 return -1;
273 }
274 client.addr.sun_family = AF_UNIX;
275 snprintf(client.addr.sun_path, sizeof(client.addr.sun_path), "%s",
276 client.dpdk_path);
277 if (connect(client.s_send, (struct sockaddr *)&client.addr,
278 sizeof(client.addr)) < 0) {
279 ERROR(PLUGIN_NAME ": Failed to connect errno(%d), error(%s)", errno,
280 strerror(errno));
281 dpdk_telemetry_cleanup();
282 return -1;
283 }
284 client.addrs.sun_family = AF_UNIX;
285 snprintf(client.addrs.sun_path, sizeof(client.addrs.sun_path), "%s",
286 client.client_path);
287 unlink(client.client_path);
288 if (bind(client.s_recv, (struct sockaddr *)&client.addrs,
289 sizeof(client.addrs)) < 0) {
290 ERROR(PLUGIN_NAME ": Failed to bind errno(%d), error(%s)", errno,
291 strerror(errno));
292 dpdk_telemetry_cleanup();
293 return -1;
294 }
295 if (listen(client.s_recv, 1) < 0) {
296 ERROR(PLUGIN_NAME ": Listen failed errno(%d), error(%s)", errno,
297 strerror(errno));
298 dpdk_telemetry_cleanup();
299 return -1;
300 }
301 snprintf(message, sizeof(message),
302 "{\"action\":1,\"command\":\"clients\""
303 ",\"data\":{\"client_path\":\"%s\"}}",
304 client.client_path);
305 if (send(client.s_send, message, strlen(message), 0) < 0) {
306 ERROR(PLUGIN_NAME ": Could not send register message errno(%d), error(%s)",
307 errno, strerror(errno));
308 dpdk_telemetry_cleanup();
309 return -1;
310 }
311 client.fd = accept(client.s_recv, NULL, NULL);
312 if (client.fd < 0) {
313 ERROR(PLUGIN_NAME ": Failed to accept errno(%d), error(%s)", errno,
314 strerror(errno));
315 dpdk_telemetry_cleanup();
316 return -1;
317 }
318 return 0;
319 }
320
dpdk_telemetry_shutdown(void)321 static int dpdk_telemetry_shutdown(void) {
322 DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
323 char msg[BUF_SIZE];
324 int ret;
325
326 snprintf(msg, sizeof(msg),
327 "{\"action\":2,\"command\":\"clients\""
328 ",\"data\":{\"client_path\":\"%s\"}}",
329 client.client_path);
330 ret = send(client.fd, msg, strlen(msg), 0);
331 if (ret < 0) {
332 ERROR(PLUGIN_NAME ": Could not send unregister message");
333 dpdk_telemetry_cleanup();
334 return -1;
335 }
336 dpdk_telemetry_cleanup();
337 return 0;
338 }
339
dpdk_telemetry_read(user_data_t * ud)340 static int dpdk_telemetry_read(user_data_t *ud) {
341 DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
342 char buffer[BUF_SIZE];
343 int bytes = 0, ret;
344 char *json_string[MAX_COMMANDS] = {"{\"action\":0,\"command\":"
345 "\"ports_all_stat_values\",\"data\":null}",
346 "{\"action\":0,\"command\":"
347 "\"global_stat_values\",\"data\":null}"};
348 size_t size = sizeof(json_string) / sizeof(json_string[0]);
349
350 for (int i = 0; i < size; i++) {
351 if (send(client.fd, json_string[i], strlen(json_string[i]), 0) < 0) {
352 ERROR(PLUGIN_NAME
353 ": Could not send request for stats errno(%d), error(%s)",
354 errno, strerror(errno));
355 if (errno == EBADF || errno == ECONNRESET || errno == ENOTCONN ||
356 errno == EPIPE) {
357 dpdk_telemetry_cleanup();
358 dpdk_telemetry_socket_init();
359 send(client.fd, json_string[i], strlen(json_string[i]), 0);
360 }
361 } else {
362 bytes = recv(client.fd, buffer, sizeof(buffer) - 1, 0);
363 if (bytes < 0) {
364 ERROR(PLUGIN_NAME ": Could not receive stats errno(%d), error(%s)",
365 errno, strerror(errno));
366 dpdk_telemetry_cleanup();
367 dpdk_telemetry_socket_init();
368 } else {
369 buffer[bytes] = '\0';
370 ret = parse_json(buffer);
371 if (ret < 0)
372 ERROR(PLUGIN_NAME ": Parsing failed");
373 }
374 }
375 }
376 return 0;
377 }
378
dpdk_telemetry_init(void)379 static int dpdk_telemetry_init(void) {
380
381 DEBUG(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
382
383 client.s_send = -1;
384 client.s_recv = -1;
385 client.fd = -1;
386 if (dpdk_telemetry_socket_init() < 0)
387 ERROR(PLUGIN_NAME ": Socket initialization failed.");
388
389 return 0;
390 }
391
module_register(void)392 void module_register(void) {
393 plugin_register_init(PLUGIN_NAME, dpdk_telemetry_init);
394 plugin_register_complex_config(PLUGIN_NAME, dpdk_telemetry_config);
395 plugin_register_complex_read(NULL, PLUGIN_NAME, dpdk_telemetry_read, 0, NULL);
396 plugin_register_shutdown(PLUGIN_NAME, dpdk_telemetry_shutdown);
397 }
398