1 /**
2 * collectd - src/utils_ovs.c
3 *
4 * Copyright(c) 2016 Intel Corporation. All rights reserved.
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 *of
8 * this software and associated documentation files (the "Software"), to deal in
9 * the Software without restriction, including without limitation the rights to
10 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
11 * of the Software, and to permit persons to whom the Software is furnished to
12 *do
13 * so, subject to the following conditions:
14 *
15 * The above copyright notice and this permission notice shall be included in
16 *all
17 * copies or substantial portions of the Software.
18 *
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * SOFTWARE.
26 *
27 * Authors:
28 * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
29 **/
30
31 /* clang-format off */
32 /*
33 * OVS DB API internal architecture diagram
34 * +------------------------------------------------------------------------------+
35 * |OVS plugin |OVS utils |
36 * | | +------------------------+ |
37 * | | | echo handler | JSON request/ |
38 * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
39 * | | | | | | | result |
40 * | | | +------------------------+ | | |
41 * | | | | +----+---+--------+ |
42 * | +----------+ | | +------------------------+ | | | | |
43 * | | update | | | | update handler | | | YAJL | JSON | |
44 * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | |
45 * | +----------+ | | | | | | | | |
46 * | | | +------------------------+ | +--------+---+----+ |
47 * | | | | ^ |
48 * | +----------+ | | +------------------------+ | | |
49 * | | result | | | | result handler | | | |
50 * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | |
51 * | +----------+ | | | | data | |
52 * | | | +------------------------+ | |
53 * | | | | |
54 * | | | +------------------+ +------------+----+ |
55 * | +----------+ | | |thread| | |thread| | |
56 * | | init | | | | | reconnect | | |
57 * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | |
58 * | +----------+ | | +------------------+ +--------+--------+ |
59 * | | | ^ |
60 * +----------------+-------------------------------------------------------------+
61 * | |
62 * JSON|echo reply raw|data
63 * v v
64 * +-------------------+----------------------------------------------+-----------+
65 * | TCP/UNIX socket |
66 * +-------------------------------------------------------------------------------
67 */
68 /* clang-format on */
69
70 /* collectd headers */
71 #include "collectd.h"
72
73 #include "utils/common/common.h"
74
75 /* private headers */
76 #include "utils/ovs/ovs.h"
77
78 /* system libraries */
79 #if HAVE_NETDB_H
80 #include <netdb.h>
81 #endif
82 #if HAVE_ARPA_INET_H
83 #include <arpa/inet.h>
84 #endif
85 #if HAVE_POLL_H
86 #include <poll.h>
87 #endif
88 #if HAVE_SYS_UN_H
89 #include <sys/un.h>
90 #endif
91
92 #include <semaphore.h>
93
94 #define OVS_ERROR(fmt, ...) \
95 do { \
96 ERROR("ovs_utils: " fmt, ##__VA_ARGS__); \
97 } while (0)
98 #define OVS_DEBUG(fmt, ...) \
99 do { \
100 DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__, \
101 ##__VA_ARGS__); \
102 } while (0)
103
104 #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
105 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
106 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
107
108 #define OVS_DB_EVENT_NONE 0
109 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
110 #define OVS_DB_EVENT_TERMINATE 1
111 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
112 #define OVS_DB_EVENT_CONN_TERMINATED 3
113
114 #define OVS_DB_POLL_STATE_RUNNING 1
115 #define OVS_DB_POLL_STATE_EXITING 2
116
117 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
118
119 #define OVS_YAJL_CALL(func, ...) \
120 do { \
121 yajl_gen_ret = yajl_gen_status_ok; \
122 if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
123 goto yajl_gen_failure; \
124 } while (0)
125 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
126 #define OVS_ERROR_BUFF_SIZE 512
127 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
128
129 /* JSON reader internal data */
130 struct ovs_json_reader_s {
131 char *buff_ptr;
132 size_t buff_size;
133 size_t buff_offset;
134 size_t json_offset;
135 };
136 typedef struct ovs_json_reader_s ovs_json_reader_t;
137
138 /* Result callback declaration */
139 struct ovs_result_cb_s {
140 sem_t sync;
141 ovs_db_result_cb_t call;
142 };
143 typedef struct ovs_result_cb_s ovs_result_cb_t;
144
145 /* Table callback declaration */
146 struct ovs_table_cb_s {
147 ovs_db_table_cb_t call;
148 };
149 typedef struct ovs_table_cb_s ovs_table_cb_t;
150
151 /* Callback declaration */
152 struct ovs_callback_s {
153 uint64_t uid;
154 union {
155 ovs_result_cb_t result;
156 ovs_table_cb_t table;
157 };
158 struct ovs_callback_s *next;
159 struct ovs_callback_s *prev;
160 };
161 typedef struct ovs_callback_s ovs_callback_t;
162
163 /* Event thread data declaration */
164 struct ovs_event_thread_s {
165 pthread_t tid;
166 pthread_mutex_t mutex;
167 pthread_cond_t cond;
168 int value;
169 };
170 typedef struct ovs_event_thread_s ovs_event_thread_t;
171
172 /* Poll thread data declaration */
173 struct ovs_poll_thread_s {
174 pthread_t tid;
175 pthread_mutex_t mutex;
176 int state;
177 };
178 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
179
180 /* OVS DB internal data declaration */
181 struct ovs_db_s {
182 ovs_poll_thread_t poll_thread;
183 ovs_event_thread_t event_thread;
184 pthread_mutex_t mutex;
185 ovs_callback_t *remote_cb;
186 ovs_db_callback_t cb;
187 char service[OVS_DB_ADDR_SERVICE_SIZE];
188 char node[OVS_DB_ADDR_NODE_SIZE];
189 char unix_path[OVS_DB_ADDR_NODE_SIZE];
190 int sock;
191 };
192
193 /* Global variables */
194 static uint64_t ovs_uid;
195 static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
196
197 /* Post an event to event thread.
198 * Possible events are:
199 * OVS_DB_EVENT_TERMINATE
200 * OVS_DB_EVENT_CONN_ESTABLISHED
201 * OVS_DB_EVENT_CONN_TERMINATED
202 */
ovs_db_event_post(ovs_db_t * pdb,int event)203 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
204 pthread_mutex_lock(&pdb->event_thread.mutex);
205 pdb->event_thread.value = event;
206 pthread_mutex_unlock(&pdb->event_thread.mutex);
207 pthread_cond_signal(&pdb->event_thread.cond);
208 }
209
210 /* Check if POLL thread is still running. Returns
211 * 1 if running otherwise 0 is returned */
ovs_db_poll_is_running(ovs_db_t * pdb)212 static bool ovs_db_poll_is_running(ovs_db_t *pdb) {
213 int state = 0;
214 pthread_mutex_lock(&pdb->poll_thread.mutex);
215 state = pdb->poll_thread.state;
216 pthread_mutex_unlock(&pdb->poll_thread.mutex);
217 return state == OVS_DB_POLL_STATE_RUNNING;
218 }
219
220 /* Generate unique identifier (UID). It is used by OVS DB API
221 * to set "id" field for any OVS DB JSON request. */
ovs_uid_generate()222 static uint64_t ovs_uid_generate() {
223 uint64_t new_uid;
224 pthread_mutex_lock(&ovs_uid_mutex);
225 new_uid = ++ovs_uid;
226 pthread_mutex_unlock(&ovs_uid_mutex);
227 return new_uid;
228 }
229
230 /*
231 * Callback API. These function are used to store
232 * registered callbacks in OVS DB API.
233 */
234
235 /* Add new callback into OVS DB object */
ovs_db_callback_add(ovs_db_t * pdb,ovs_callback_t * new_cb)236 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
237 pthread_mutex_lock(&pdb->mutex);
238 if (pdb->remote_cb)
239 pdb->remote_cb->prev = new_cb;
240 new_cb->next = pdb->remote_cb;
241 new_cb->prev = NULL;
242 pdb->remote_cb = new_cb;
243 pthread_mutex_unlock(&pdb->mutex);
244 }
245
246 /* Remove callback from OVS DB object */
ovs_db_callback_remove(ovs_db_t * pdb,ovs_callback_t * del_cb)247 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
248 pthread_mutex_lock(&pdb->mutex);
249 ovs_callback_t *pre_cb = del_cb->prev;
250 ovs_callback_t *next_cb = del_cb->next;
251
252 if (next_cb)
253 next_cb->prev = del_cb->prev;
254
255 if (pre_cb)
256 pre_cb->next = del_cb->next;
257 else
258 pdb->remote_cb = del_cb->next;
259
260 free(del_cb);
261 pthread_mutex_unlock(&pdb->mutex);
262 }
263
264 /* Remove all callbacks form OVS DB object */
ovs_db_callback_remove_all(ovs_db_t * pdb)265 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
266 pthread_mutex_lock(&pdb->mutex);
267 while (pdb->remote_cb != NULL) {
268 ovs_callback_t *del_cb = pdb->remote_cb;
269 pdb->remote_cb = del_cb->next;
270 sfree(del_cb);
271 }
272 pthread_mutex_unlock(&pdb->mutex);
273 }
274
275 /* Get/find callback in OVS DB object by UID. Returns pointer
276 * to requested callback otherwise NULL is returned.
277 *
278 * IMPORTANT NOTE:
279 * The OVS DB mutex MUST be locked by the caller
280 * to make sure that returned callback is still valid.
281 */
ovs_db_callback_get(ovs_db_t * pdb,uint64_t uid)282 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
283 for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
284 if (cb->uid == uid)
285 return cb;
286 return NULL;
287 }
288
289 /* Send all requested data to the socket. Returns 0 if
290 * ALL request data has been sent otherwise negative value
291 * is returned */
ovs_db_data_send(const ovs_db_t * pdb,const char * data,size_t len)292 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
293 ssize_t nbytes = 0;
294 size_t rem = len;
295 size_t off = 0;
296
297 while (rem > 0) {
298 if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
299 return -1;
300 rem -= (size_t)nbytes;
301 off += (size_t)nbytes;
302 }
303 return 0;
304 }
305
306 /*
307 * YAJL (Yet Another JSON Library) helper functions
308 * Documentation (https://lloyd.github.io/yajl/)
309 */
310
311 /* Add null-terminated string into YAJL generator handle (JSON object).
312 * Similar function to yajl_gen_string() but takes null-terminated string
313 * instead of string and its length.
314 *
315 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
316 * string - Null-terminated string
317 */
ovs_yajl_gen_tstring(yajl_gen hander,const char * string)318 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
319 const char *string) {
320 return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
321 }
322
323 /* Add YAJL value into YAJL generator handle (JSON object)
324 *
325 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
326 * jval - YAJL value usually returned by yajl_tree_get()
327 */
ovs_yajl_gen_val(yajl_gen jgen,yajl_val jval)328 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
329 size_t array_len = 0;
330 yajl_val *jvalues = NULL;
331 yajl_val jobj_value = NULL;
332 const char *obj_key = NULL;
333 size_t obj_len = 0;
334 yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
335
336 if (jval == NULL)
337 return yajl_gen_generation_complete;
338
339 if (YAJL_IS_STRING(jval))
340 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
341 else if (YAJL_IS_DOUBLE(jval))
342 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
343 else if (YAJL_IS_INTEGER(jval))
344 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
345 else if (YAJL_IS_TRUE(jval))
346 OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
347 else if (YAJL_IS_FALSE(jval))
348 OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
349 else if (YAJL_IS_NULL(jval))
350 OVS_YAJL_CALL(yajl_gen_null, jgen);
351 else if (YAJL_IS_ARRAY(jval)) {
352 /* create new array and add all elements into the array */
353 array_len = YAJL_GET_ARRAY(jval)->len;
354 jvalues = YAJL_GET_ARRAY(jval)->values;
355 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
356 for (size_t i = 0; i < array_len; i++)
357 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
358 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
359 } else if (YAJL_IS_OBJECT(jval)) {
360 /* create new object and add all elements into the object */
361 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
362 obj_len = YAJL_GET_OBJECT(jval)->len;
363 for (size_t i = 0; i < obj_len; i++) {
364 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
365 jobj_value = YAJL_GET_OBJECT(jval)->values[i];
366 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
367 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
368 }
369 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
370 } else {
371 OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
372 (int)(jval)->type);
373 goto yajl_gen_failure;
374 }
375 return yajl_gen_status_ok;
376
377 yajl_gen_failure:
378 OVS_ERROR("%s() error to generate value", __FUNCTION__);
379 return yajl_gen_ret;
380 }
381
382 /* OVS DB echo request handler. When OVS DB sends
383 * "echo" request to the client, client should generate
384 * "echo" replay with the same content received in the
385 * request */
ovs_db_table_echo_cb(const ovs_db_t * pdb,yajl_val jnode)386 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
387 yajl_val jparams;
388 yajl_val jid;
389 yajl_gen jgen;
390 size_t resp_len = 0;
391 const char *resp = NULL;
392 const char *params_path[] = {"params", NULL};
393 const char *id_path[] = {"id", NULL};
394 yajl_gen_status yajl_gen_ret;
395
396 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
397 return -1;
398
399 /* check & get request attributes */
400 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
401 ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
402 OVS_ERROR("parse echo request failed");
403 goto yajl_gen_failure;
404 }
405
406 /* generate JSON echo response */
407 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
408
409 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
410 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
411
412 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
413 OVS_YAJL_CALL(yajl_gen_null, jgen);
414
415 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
416 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
417
418 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
419 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
420 &resp_len);
421
422 /* send the response */
423 OVS_DEBUG("response: %s", resp);
424 if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
425 OVS_ERROR("send echo reply failed");
426 goto yajl_gen_failure;
427 }
428 /* clean up and return success */
429 yajl_gen_clear(jgen);
430 return 0;
431
432 yajl_gen_failure:
433 /* release memory */
434 yajl_gen_clear(jgen);
435 return -1;
436 }
437
438 /* Get OVS DB registered callback by YAJL val. The YAJL
439 * value should be YAJL string (UID). Returns NULL if
440 * callback hasn't been found. See also ovs_db_callback_get()
441 * description for addition info.
442 */
ovs_db_table_callback_get(ovs_db_t * pdb,yajl_val jid)443 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
444 char *endptr = NULL;
445 const char *suid = NULL;
446 uint64_t uid;
447
448 if (jid && YAJL_IS_STRING(jid)) {
449 suid = YAJL_GET_STRING(jid);
450 uid = (uint64_t)strtoul(suid, &endptr, 16);
451 if (*endptr == '\0' && uid)
452 return ovs_db_callback_get(pdb, uid);
453 }
454
455 return NULL;
456 }
457
458 /* OVS DB table update event handler.
459 * This callback is called by POLL thread if OVS DB
460 * table update callback is received from the DB
461 * server. Once registered callback found, it's called
462 * by this handler. */
ovs_db_table_update_cb(ovs_db_t * pdb,yajl_val jnode)463 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
464 ovs_callback_t *cb = NULL;
465 yajl_val jvalue;
466 yajl_val jparams;
467 yajl_val jtable_updates;
468 const char *params_path[] = {"params", NULL};
469 const char *id_path[] = {"id", NULL};
470
471 /* check & get request attributes */
472 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
473 (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
474 OVS_ERROR("invalid OVS DB request received");
475 return -1;
476 }
477
478 /* check array length: [<json-value>, <table-updates>] */
479 if ((YAJL_GET_ARRAY(jparams) == NULL) ||
480 (YAJL_GET_ARRAY(jparams)->len != 2)) {
481 OVS_ERROR("invalid OVS DB request received");
482 return -1;
483 }
484
485 jvalue = YAJL_GET_ARRAY(jparams)->values[0];
486 jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
487 if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
488 OVS_ERROR("invalid OVS DB request id or table update received");
489 return -1;
490 }
491
492 /* find registered callback based on <json-value> */
493 pthread_mutex_lock(&pdb->mutex);
494 cb = ovs_db_table_callback_get(pdb, jvalue);
495 if (cb == NULL || cb->table.call == NULL) {
496 OVS_ERROR("No OVS DB table update callback found");
497 pthread_mutex_unlock(&pdb->mutex);
498 return -1;
499 }
500
501 /* call registered callback */
502 cb->table.call(jtable_updates);
503 pthread_mutex_unlock(&pdb->mutex);
504 return 0;
505 }
506
507 /* OVS DB result request handler.
508 * This callback is called by POLL thread if OVS DB
509 * result reply is received from the DB server.
510 * Once registered callback found, it's called
511 * by this handler. */
ovs_db_result_cb(ovs_db_t * pdb,yajl_val jnode)512 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
513 ovs_callback_t *cb = NULL;
514 yajl_val jresult;
515 yajl_val jerror;
516 yajl_val jid;
517 const char *result_path[] = {"result", NULL};
518 const char *error_path[] = {"error", NULL};
519 const char *id_path[] = {"id", NULL};
520
521 jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
522 jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
523 jid = yajl_tree_get(jnode, id_path, yajl_t_string);
524
525 /* check & get result attributes */
526 if (!jresult || !jerror || !jid)
527 return -1;
528
529 /* try to find registered callback */
530 pthread_mutex_lock(&pdb->mutex);
531 cb = ovs_db_table_callback_get(pdb, jid);
532 if (cb != NULL && cb->result.call != NULL) {
533 /* call registered callback */
534 cb->result.call(jresult, jerror);
535 /* unlock owner of the reply */
536 sem_post(&cb->result.sync);
537 }
538
539 pthread_mutex_unlock(&pdb->mutex);
540 return 0;
541 }
542
543 /* Handle JSON data (one request) and call
544 * appropriate event OVS DB handler. Currently,
545 * update callback 'ovs_db_table_update_cb' and
546 * result callback 'ovs_db_result_cb' is supported.
547 */
ovs_db_json_data_process(ovs_db_t * pdb,const char * data,size_t len)548 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
549 size_t len) {
550 const char *method = NULL;
551 char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
552 const char *method_path[] = {"method", NULL};
553 const char *result_path[] = {"result", NULL};
554 char *sjson = NULL;
555 yajl_val jnode, jval;
556
557 /* duplicate the data to make null-terminated string
558 * required for yajl_tree_parse() */
559 if ((sjson = calloc(1, len + 1)) == NULL)
560 return -1;
561
562 sstrncpy(sjson, data, len + 1);
563 OVS_DEBUG("[len=%" PRIsz "] %s", len, sjson);
564
565 /* parse json data */
566 jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
567 if (jnode == NULL) {
568 OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
569 sfree(sjson);
570 return -1;
571 }
572
573 /* get method name */
574 if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
575 if ((method = YAJL_GET_STRING(jval)) == NULL) {
576 yajl_tree_free(jnode);
577 sfree(sjson);
578 return -1;
579 }
580 if (strcmp("echo", method) == 0) {
581 /* echo request from the server */
582 if (ovs_db_table_echo_cb(pdb, jnode) < 0)
583 OVS_ERROR("handle echo request failed");
584 } else if (strcmp("update", method) == 0) {
585 /* update notification */
586 if (ovs_db_table_update_cb(pdb, jnode) < 0)
587 OVS_ERROR("handle update notification failed");
588 }
589 } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
590 /* result notification */
591 if (ovs_db_result_cb(pdb, jnode) < 0)
592 OVS_ERROR("handle result reply failed");
593 } else
594 OVS_ERROR("connot find method or result failed");
595
596 /* release memory */
597 yajl_tree_free(jnode);
598 sfree(sjson);
599 return 0;
600 }
601
602 /*
603 * JSON reader implementation.
604 *
605 * This module process raw JSON data (byte stream) and
606 * returns fully-fledged JSON data which can be processed
607 * (parsed) by YAJL later.
608 */
609
610 /* Allocate JSON reader instance */
ovs_json_reader_alloc()611 static ovs_json_reader_t *ovs_json_reader_alloc() {
612 ovs_json_reader_t *jreader = calloc(1, sizeof(*jreader));
613 if (jreader == NULL)
614 return NULL;
615
616 return jreader;
617 }
618
619 /* Push raw data into into the JSON reader for processing */
ovs_json_reader_push_data(ovs_json_reader_t * jreader,const char * data,size_t data_len)620 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
621 const char *data, size_t data_len) {
622 char *new_buff = NULL;
623 size_t available = jreader->buff_size - jreader->buff_offset;
624
625 /* check/update required memory space */
626 if (available < data_len) {
627 OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
628 (int)jreader->buff_size, (int)available, (int)data_len);
629
630 /* allocate new chunk of memory */
631 new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
632 if (new_buff == NULL)
633 return -1;
634
635 /* point to new allocated memory */
636 jreader->buff_ptr = new_buff;
637 jreader->buff_size += data_len;
638 }
639
640 /* store input data */
641 memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
642 jreader->buff_offset += data_len;
643 return 0;
644 }
645
646 /* Pop one fully-fledged JSON if already exists. Returns 0 if
647 * completed JSON already exists otherwise negative value is
648 * returned */
ovs_json_reader_pop(ovs_json_reader_t * jreader,const char ** json_ptr,size_t * json_len_ptr)649 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
650 const char **json_ptr, size_t *json_len_ptr) {
651 size_t nbraces = 0;
652 size_t json_len = 0;
653 char *json = NULL;
654
655 /* search open/close brace */
656 for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
657 if (jreader->buff_ptr[i] == '{') {
658 nbraces++;
659 } else if (jreader->buff_ptr[i] == '}')
660 if (nbraces)
661 if (!(--nbraces)) {
662 /* JSON data */
663 *json_ptr = jreader->buff_ptr + jreader->json_offset;
664 *json_len_ptr = json_len + 1;
665 jreader->json_offset = i + 1;
666 return 0;
667 }
668
669 /* increase JSON data length */
670 if (nbraces)
671 json_len++;
672 }
673
674 if (jreader->json_offset) {
675 if (jreader->json_offset < jreader->buff_offset) {
676 /* shift data to the beginning of the buffer
677 * and zero rest of the buffer data */
678 json = &jreader->buff_ptr[jreader->json_offset];
679 json_len = jreader->buff_offset - jreader->json_offset;
680 for (size_t i = 0; i < jreader->buff_size; i++)
681 jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
682 jreader->buff_offset = json_len;
683 } else
684 /* reset the buffer */
685 jreader->buff_offset = 0;
686
687 /* data is at the beginning of the buffer */
688 jreader->json_offset = 0;
689 }
690
691 return -1;
692 }
693
694 /* Reset JSON reader. It is useful when start processing
695 * new raw data. E.g.: in case of lost stream connection.
696 */
ovs_json_reader_reset(ovs_json_reader_t * jreader)697 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
698 if (jreader) {
699 jreader->buff_offset = 0;
700 jreader->json_offset = 0;
701 }
702 }
703
704 /* Release internal data allocated for JSON reader */
ovs_json_reader_free(ovs_json_reader_t * jreader)705 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
706 if (jreader) {
707 free(jreader->buff_ptr);
708 free(jreader);
709 }
710 }
711
712 /* Reconnect to OVS DB and call the OVS DB post connection init callback
713 * if connection has been established.
714 */
ovs_db_reconnect(ovs_db_t * pdb)715 static void ovs_db_reconnect(ovs_db_t *pdb) {
716 const char *node_info = pdb->node;
717 struct addrinfo *result;
718
719 if (pdb->unix_path[0] != '\0') {
720 /* use UNIX socket instead of INET address */
721 node_info = pdb->unix_path;
722
723 struct sockaddr_un *sa_unix = calloc(1, sizeof(*sa_unix));
724 if (sa_unix == NULL)
725 return;
726
727 result = calloc(1, sizeof(*result));
728 if (result == NULL) {
729 free(sa_unix);
730 return;
731 }
732
733 result->ai_family = AF_UNIX;
734 result->ai_socktype = SOCK_STREAM;
735 result->ai_addrlen = sizeof(*sa_unix);
736 result->ai_addr = (struct sockaddr *)sa_unix;
737 sa_unix->sun_family = result->ai_family;
738 sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
739 } else {
740 /* inet socket address */
741 struct addrinfo hints;
742
743 /* setup criteria for selecting the socket address */
744 memset(&hints, 0, sizeof(hints));
745 hints.ai_family = AF_UNSPEC;
746 hints.ai_socktype = SOCK_STREAM;
747
748 /* get socket addresses */
749 int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
750 if (ret != 0) {
751 OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
752 return;
753 }
754 }
755 /* try to connect to the server */
756 for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
757 int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
758 if (sock < 0) {
759 OVS_DEBUG("socket(): %s", STRERRNO);
760 continue;
761 }
762 if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
763 close(sock);
764 OVS_DEBUG("connect(): %s [family=%d]", STRERRNO, rp->ai_family);
765 } else {
766 /* send notification to event thread */
767 pdb->sock = sock;
768 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
769 break;
770 }
771 }
772
773 if (pdb->sock < 0)
774 OVS_ERROR("connect to \"%s\" failed", node_info);
775
776 freeaddrinfo(result);
777 }
778
779 /* POLL worker thread.
780 * It listens on OVS DB connection for incoming
781 * requests/reply/events etc. Also, it reconnects to OVS DB
782 * if connection has been lost.
783 */
ovs_poll_worker(void * arg)784 static void *ovs_poll_worker(void *arg) {
785 ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
786 ovs_json_reader_t *jreader = NULL;
787 struct pollfd poll_fd = {
788 .fd = pdb->sock,
789 .events = POLLIN | POLLPRI,
790 .revents = 0,
791 };
792
793 /* create JSON reader instance */
794 if ((jreader = ovs_json_reader_alloc()) == NULL) {
795 OVS_ERROR("initialize json reader failed");
796 return NULL;
797 }
798
799 /* poll data */
800 while (ovs_db_poll_is_running(pdb)) {
801 poll_fd.fd = pdb->sock;
802 int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
803 if (poll_ret < 0) {
804 OVS_ERROR("poll(): %s", STRERRNO);
805 break;
806 } else if (poll_ret == 0) {
807 OVS_DEBUG("poll(): timeout");
808 if (pdb->sock < 0)
809 /* invalid fd, so try to reconnect */
810 ovs_db_reconnect(pdb);
811 continue;
812 }
813 if (poll_fd.revents & POLLNVAL) {
814 /* invalid file descriptor, clean-up */
815 ovs_db_callback_remove_all(pdb);
816 ovs_json_reader_reset(jreader);
817 /* setting poll FD to -1 tells poll() call to ignore this FD.
818 * In that case poll() call will return timeout all the time */
819 pdb->sock = (-1);
820 } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
821 /* connection is broken */
822 close(poll_fd.fd);
823 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
824 OVS_ERROR("poll() peer closed its end of the channel");
825 } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
826 /* read incoming data */
827 char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
828 ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
829 if (nbytes < 0) {
830 OVS_ERROR("recv(): %s", STRERRNO);
831 /* read error? Try to reconnect */
832 close(poll_fd.fd);
833 continue;
834 } else if (nbytes == 0) {
835 close(poll_fd.fd);
836 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
837 OVS_ERROR("recv() peer has performed an orderly shutdown");
838 continue;
839 }
840 /* read incoming data */
841 size_t json_len = 0;
842 const char *json = NULL;
843 OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
844 ovs_json_reader_push_data(jreader, buff, nbytes);
845 while (!ovs_json_reader_pop(jreader, &json, &json_len))
846 /* process JSON data */
847 ovs_db_json_data_process(pdb, json, json_len);
848 }
849 }
850
851 OVS_DEBUG("poll thread has been completed");
852 ovs_json_reader_free(jreader);
853 return NULL;
854 }
855
856 /* EVENT worker thread.
857 * Perform task based on incoming events. This
858 * task can be done asynchronously which allows to
859 * handle OVS DB callback like 'init_cb'.
860 */
ovs_event_worker(void * arg)861 static void *ovs_event_worker(void *arg) {
862 ovs_db_t *pdb = (ovs_db_t *)arg;
863
864 while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
865 /* wait for an event */
866 struct timespec ts;
867 clock_gettime(CLOCK_REALTIME, &ts);
868 ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
869 int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
870 &pdb->event_thread.mutex, &ts);
871 if (!ret || ret == ETIMEDOUT) {
872 /* handle the event */
873 OVS_DEBUG("handle event %d", pdb->event_thread.value);
874 switch (pdb->event_thread.value) {
875 case OVS_DB_EVENT_CONN_ESTABLISHED:
876 if (pdb->cb.post_conn_init)
877 pdb->cb.post_conn_init(pdb);
878 /* reset event */
879 pdb->event_thread.value = OVS_DB_EVENT_NONE;
880 break;
881 case OVS_DB_EVENT_CONN_TERMINATED:
882 if (pdb->cb.post_conn_terminate)
883 pdb->cb.post_conn_terminate();
884 /* reset event */
885 pdb->event_thread.value = OVS_DB_EVENT_NONE;
886 break;
887 case OVS_DB_EVENT_NONE:
888 /* wait timeout */
889 OVS_DEBUG("no event received (timeout)");
890 break;
891 default:
892 OVS_DEBUG("unknown event received");
893 break;
894 }
895 } else {
896 /* unexpected error */
897 OVS_ERROR("pthread_cond_timedwait() failed");
898 break;
899 }
900 }
901
902 OVS_DEBUG("event thread has been completed");
903 return NULL;
904 }
905
906 /* Initialize EVENT thread */
ovs_db_event_thread_init(ovs_db_t * pdb)907 static int ovs_db_event_thread_init(ovs_db_t *pdb) {
908 pdb->event_thread.tid = (pthread_t){0};
909 /* init event thread condition variable */
910 if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
911 return -1;
912 }
913 /* init event thread mutex */
914 if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
915 pthread_cond_destroy(&pdb->event_thread.cond);
916 return -1;
917 }
918 /* Hold the event thread mutex. It ensures that no events
919 * will be lost while thread is still starting. Once event
920 * thread is started and ready to accept events, it will release
921 * the mutex */
922 if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
923 pthread_mutex_destroy(&pdb->event_thread.mutex);
924 pthread_cond_destroy(&pdb->event_thread.cond);
925 return -1;
926 }
927 /* start event thread */
928 pthread_t tid;
929 if (plugin_thread_create(&tid, ovs_event_worker, pdb, "utils_ovs:event") !=
930 0) {
931 pthread_mutex_unlock(&pdb->event_thread.mutex);
932 pthread_mutex_destroy(&pdb->event_thread.mutex);
933 pthread_cond_destroy(&pdb->event_thread.cond);
934 return -1;
935 }
936 pdb->event_thread.tid = tid;
937 return 0;
938 }
939
940 /* Terminate EVENT thread */
ovs_db_event_thread_terminate(ovs_db_t * pdb)941 static int ovs_db_event_thread_terminate(ovs_db_t *pdb) {
942 if (pthread_equal(pdb->event_thread.tid, (pthread_t){0})) {
943 /* already terminated */
944 return 0;
945 }
946 ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
947 if (pthread_join(pdb->event_thread.tid, NULL) != 0)
948 return -1;
949 /* Event thread always holds the thread mutex when
950 * performs some task (handles event) and releases it when
951 * while sleeping. Thus, if event thread exits, the mutex
952 * remains locked */
953 pdb->event_thread.tid = (pthread_t){0};
954 pthread_mutex_unlock(&pdb->event_thread.mutex);
955 return 0;
956 }
957
958 /* Destroy EVENT thread private data */
ovs_db_event_thread_data_destroy(ovs_db_t * pdb)959 static void ovs_db_event_thread_data_destroy(ovs_db_t *pdb) {
960 /* destroy mutex */
961 pthread_mutex_destroy(&pdb->event_thread.mutex);
962 pthread_cond_destroy(&pdb->event_thread.cond);
963 }
964
965 /* Initialize POLL thread */
ovs_db_poll_thread_init(ovs_db_t * pdb)966 static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
967 pdb->poll_thread.tid = (pthread_t){0};
968 /* init event thread mutex */
969 if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
970 return -1;
971 }
972 /* start poll thread */
973 pthread_t tid;
974 pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
975 if (plugin_thread_create(&tid, ovs_poll_worker, pdb, "utils_ovs:poll") != 0) {
976 pthread_mutex_destroy(&pdb->poll_thread.mutex);
977 return -1;
978 }
979 pdb->poll_thread.tid = tid;
980 return 0;
981 }
982
983 /* Destroy POLL thread */
984 /* XXX: Must hold pdb->mutex when calling! */
ovs_db_poll_thread_destroy(ovs_db_t * pdb)985 static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
986 if (pthread_equal(pdb->poll_thread.tid, (pthread_t){0})) {
987 /* already destroyed */
988 return 0;
989 }
990 /* change thread state */
991 pthread_mutex_lock(&pdb->poll_thread.mutex);
992 pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
993 pthread_mutex_unlock(&pdb->poll_thread.mutex);
994 /* join the thread */
995 if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
996 return -1;
997 pthread_mutex_destroy(&pdb->poll_thread.mutex);
998 pdb->poll_thread.tid = (pthread_t){0};
999 return 0;
1000 }
1001
1002 /*
1003 * Public OVS DB API implementation
1004 */
1005
ovs_db_init(const char * node,const char * service,const char * unix_path,ovs_db_callback_t * cb)1006 ovs_db_t *ovs_db_init(const char *node, const char *service,
1007 const char *unix_path, ovs_db_callback_t *cb) {
1008 int ret;
1009
1010 /* sanity check */
1011 if (node == NULL || service == NULL || unix_path == NULL)
1012 return NULL;
1013
1014 /* allocate db data & fill it */
1015 ovs_db_t *pdb = calloc(1, sizeof(*pdb));
1016 if (pdb == NULL)
1017 return NULL;
1018 pdb->sock = -1;
1019
1020 /* store the OVS DB address */
1021 sstrncpy(pdb->node, node, sizeof(pdb->node));
1022 sstrncpy(pdb->service, service, sizeof(pdb->service));
1023 sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
1024
1025 /* setup OVS DB callbacks */
1026 if (cb)
1027 pdb->cb = *cb;
1028
1029 /* init OVS DB mutex attributes */
1030 pthread_mutexattr_t mutex_attr;
1031 if (pthread_mutexattr_init(&mutex_attr)) {
1032 OVS_ERROR("OVS DB mutex attribute init failed");
1033 sfree(pdb);
1034 return NULL;
1035 }
1036 /* set OVS DB mutex as recursive */
1037 if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
1038 OVS_ERROR("Failed to set OVS DB mutex as recursive");
1039 pthread_mutexattr_destroy(&mutex_attr);
1040 sfree(pdb);
1041 return NULL;
1042 }
1043 /* init OVS DB mutex */
1044 if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1045 OVS_ERROR("OVS DB mutex init failed");
1046 pthread_mutexattr_destroy(&mutex_attr);
1047 sfree(pdb);
1048 return NULL;
1049 }
1050 /* destroy mutex attributes */
1051 pthread_mutexattr_destroy(&mutex_attr);
1052
1053 /* init event thread */
1054 if (ovs_db_event_thread_init(pdb) < 0) {
1055 ret = ovs_db_destroy(pdb);
1056 if (ret > 0)
1057 goto failure;
1058 else
1059 return NULL;
1060 }
1061
1062 /* init polling thread */
1063 if (ovs_db_poll_thread_init(pdb) < 0) {
1064 ret = ovs_db_destroy(pdb);
1065 if (ret > 0) {
1066 ovs_db_event_thread_data_destroy(pdb);
1067 goto failure;
1068 } else {
1069 return NULL;
1070 }
1071 }
1072 return pdb;
1073
1074 failure:
1075 pthread_mutex_destroy(&pdb->mutex);
1076 sfree(pdb);
1077 return NULL;
1078 }
1079
ovs_db_send_request(ovs_db_t * pdb,const char * method,const char * params,ovs_db_result_cb_t cb)1080 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1081 ovs_db_result_cb_t cb) {
1082 int ret = 0;
1083 yajl_gen_status yajl_gen_ret;
1084 yajl_val jparams;
1085 yajl_gen jgen;
1086 ovs_callback_t *new_cb = NULL;
1087 uint64_t uid;
1088 char uid_buff[OVS_UID_STR_SIZE];
1089 const char *req = NULL;
1090 size_t req_len = 0;
1091 struct timespec ts;
1092
1093 /* sanity check */
1094 if (!pdb || !method || !params)
1095 return -1;
1096
1097 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1098 return -1;
1099
1100 /* try to parse params */
1101 if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1102 OVS_ERROR("params is not a JSON string");
1103 yajl_gen_clear(jgen);
1104 return -1;
1105 }
1106
1107 /* generate method field */
1108 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1109
1110 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1111 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1112
1113 /* generate params field */
1114 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1115 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1116 yajl_tree_free(jparams);
1117
1118 /* generate id field */
1119 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1120 uid = ovs_uid_generate();
1121 ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1122 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1123
1124 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1125
1126 if (cb) {
1127 /* register result callback */
1128 if ((new_cb = calloc(1, sizeof(*new_cb))) == NULL)
1129 goto yajl_gen_failure;
1130
1131 /* add new callback to front */
1132 sem_init(&new_cb->result.sync, 0, 0);
1133 new_cb->result.call = cb;
1134 new_cb->uid = uid;
1135 ovs_db_callback_add(pdb, new_cb);
1136 }
1137
1138 /* send the request */
1139 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1140 OVS_DEBUG("%s", req);
1141 if (!ovs_db_data_send(pdb, req, req_len)) {
1142 if (cb) {
1143 /* wait for result */
1144 clock_gettime(CLOCK_REALTIME, &ts);
1145 ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1146 if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1147 OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1148 OVS_DB_SEND_REQ_TIMEOUT);
1149 ret = (-1);
1150 }
1151 }
1152 } else {
1153 OVS_ERROR("ovs_db_data_send() failed");
1154 ret = (-1);
1155 }
1156
1157 yajl_gen_failure:
1158 if (new_cb) {
1159 /* destroy callback */
1160 sem_destroy(&new_cb->result.sync);
1161 ovs_db_callback_remove(pdb, new_cb);
1162 }
1163
1164 /* release memory */
1165 yajl_gen_clear(jgen);
1166 return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1167 }
1168
ovs_db_table_cb_register(ovs_db_t * pdb,const char * tb_name,const char ** tb_column,ovs_db_table_cb_t update_cb,ovs_db_result_cb_t result_cb,unsigned int flags)1169 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1170 const char **tb_column,
1171 ovs_db_table_cb_t update_cb,
1172 ovs_db_result_cb_t result_cb, unsigned int flags) {
1173 yajl_gen jgen;
1174 yajl_gen_status yajl_gen_ret;
1175 ovs_callback_t *new_cb = NULL;
1176 char uid_str[OVS_UID_STR_SIZE];
1177 char *params;
1178 size_t params_len;
1179 int ovs_db_ret = 0;
1180
1181 /* sanity check */
1182 if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1183 return -1;
1184
1185 /* allocate new update callback */
1186 if ((new_cb = calloc(1, sizeof(*new_cb))) == NULL)
1187 return -1;
1188
1189 /* init YAJL generator */
1190 if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
1191 sfree(new_cb);
1192 return -1;
1193 }
1194
1195 /* add new callback to front */
1196 new_cb->table.call = update_cb;
1197 new_cb->uid = ovs_uid_generate();
1198 ovs_db_callback_add(pdb, new_cb);
1199
1200 /* make update notification request
1201 * [<db-name>, <json-value>, <monitor-requests>] */
1202 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1203 {
1204 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1205
1206 /* uid string <json-value> */
1207 ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1208 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1209
1210 /* <monitor-requests> */
1211 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1212 {
1213 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1214 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1215 {
1216 /* <monitor-request> */
1217 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1218 {
1219 if (tb_column) {
1220 /* columns within the table to be monitored */
1221 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1222 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1223 for (; *tb_column; tb_column++)
1224 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1225 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1226 }
1227 /* specify select option */
1228 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1229 {
1230 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1231 {
1232 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1233 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1234 flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1235 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1236 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1237 flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1238 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1239 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1240 flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1241 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1242 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1243 flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1244 }
1245 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1246 }
1247 }
1248 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1249 }
1250 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1251 }
1252 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1253 }
1254 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1255
1256 /* make a request to subscribe to given table */
1257 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms,
1258 ¶ms_len);
1259 if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1260 OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1261 ovs_db_ret = (-1);
1262 }
1263
1264 yajl_gen_failure:
1265 /* release memory */
1266 yajl_gen_clear(jgen);
1267 return ovs_db_ret;
1268 }
1269
ovs_db_destroy(ovs_db_t * pdb)1270 int ovs_db_destroy(ovs_db_t *pdb) {
1271 int ovs_db_ret = 0;
1272 int ret = 0;
1273
1274 /* sanity check */
1275 if (pdb == NULL)
1276 return -1;
1277
1278 /* stop event thread */
1279 if (ovs_db_event_thread_terminate(pdb) < 0) {
1280 OVS_ERROR("stop event thread failed");
1281 ovs_db_ret = -1;
1282 }
1283
1284 /* try to lock the structure before releasing */
1285 if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1286 OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
1287 return ret;
1288 }
1289
1290 /* stop poll thread and destroy thread's private data */
1291 if (ovs_db_poll_thread_destroy(pdb) < 0) {
1292 OVS_ERROR("destroy poll thread failed");
1293 ovs_db_ret = -1;
1294 }
1295
1296 /* destroy event thread private data */
1297 ovs_db_event_thread_data_destroy(pdb);
1298
1299 pthread_mutex_unlock(&pdb->mutex);
1300
1301 /* unsubscribe callbacks */
1302 ovs_db_callback_remove_all(pdb);
1303
1304 /* close connection */
1305 if (pdb->sock >= 0)
1306 close(pdb->sock);
1307
1308 /* release DB handler */
1309 pthread_mutex_destroy(&pdb->mutex);
1310 sfree(pdb);
1311 return ovs_db_ret;
1312 }
1313
1314 /*
1315 * Public OVS utils API implementation
1316 */
1317
1318 /* Get YAJL value by key from YAJL dictionary
1319 *
1320 * EXAMPLE:
1321 * {
1322 * "key_a" : <YAJL return value>
1323 * "key_b" : <YAJL return value>
1324 * }
1325 */
ovs_utils_get_value_by_key(yajl_val jval,const char * key)1326 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1327 const char *obj_key = NULL;
1328
1329 /* check params */
1330 if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1331 return NULL;
1332
1333 /* find a value by key */
1334 for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1335 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1336 if (strcmp(obj_key, key) == 0)
1337 return YAJL_GET_OBJECT(jval)->values[i];
1338 }
1339
1340 return NULL;
1341 }
1342
1343 /* Get OVS DB map value by given map key
1344 *
1345 * FROM RFC7047:
1346 *
1347 * <pair>
1348 * A 2-element JSON array that represents a pair within a database
1349 * map. The first element is an <atom> that represents the key, and
1350 * the second element is an <atom> that represents the value.
1351 *
1352 * <map>
1353 * A 2-element JSON array that represents a database map value. The
1354 * first element of the array must be the string "map", and the
1355 * second element must be an array of zero or more <pair>s giving the
1356 * values in the map. All of the <pair>s must have the same key and
1357 * value types.
1358 *
1359 * EXAMPLE:
1360 * [
1361 * "map", [
1362 * [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
1363 * ]
1364 * ]
1365 */
ovs_utils_get_map_value(yajl_val jval,const char * key)1366 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1367 size_t map_len = 0;
1368 size_t array_len = 0;
1369 yajl_val *map_values = NULL;
1370 yajl_val *array_values = NULL;
1371 const char *str_val = NULL;
1372
1373 /* check YAJL array */
1374 if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1375 return NULL;
1376
1377 /* check a database map value (2-element, first one should be a string */
1378 array_len = YAJL_GET_ARRAY(jval)->len;
1379 array_values = YAJL_GET_ARRAY(jval)->values;
1380 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1381 (!YAJL_IS_ARRAY(array_values[1])))
1382 return NULL;
1383
1384 /* check first element of the array */
1385 str_val = YAJL_GET_STRING(array_values[0]);
1386 if (str_val == NULL || strcmp("map", str_val) != 0)
1387 return NULL;
1388
1389 /* try to find map value by map key */
1390 if (YAJL_GET_ARRAY(array_values[1]) == NULL)
1391 return NULL;
1392
1393 map_len = YAJL_GET_ARRAY(array_values[1])->len;
1394 map_values = YAJL_GET_ARRAY(array_values[1])->values;
1395 for (size_t i = 0; i < map_len; i++) {
1396 /* check YAJL array */
1397 if (!YAJL_IS_ARRAY(map_values[i]) || YAJL_GET_ARRAY(map_values[i]) == NULL)
1398 break;
1399
1400 /* check a database pair value (2-element, first one represents a key
1401 * and it should be a string in our case */
1402 array_len = YAJL_GET_ARRAY(map_values[i])->len;
1403 array_values = YAJL_GET_ARRAY(map_values[i])->values;
1404 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1405 break;
1406
1407 /* return map value if given key equals map key */
1408 str_val = YAJL_GET_STRING(array_values[0]);
1409 if (str_val != NULL && strcmp(key, str_val) == 0)
1410 return array_values[1];
1411 }
1412 return NULL;
1413 }
1414