1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "zmq_common.h"
26
27 /* Global variables */
28 struct p_zmq_host nfacctd_zmq_host;
29 struct p_zmq_host telemetry_zmq_host;
30
31 /* Functions */
p_zmq_set_address(struct p_zmq_host * zmq_host,char * address)32 void p_zmq_set_address(struct p_zmq_host *zmq_host, char *address)
33 {
34 char proto[] = "://", tcp_proto[] = "tcp://", inproc_proto[] = "inproc://";
35
36 if (zmq_host && address) {
37 if (!strstr(address, proto)) {
38 snprintf(zmq_host->sock.str, sizeof(zmq_host->sock.str), "tcp://%s", address);
39 }
40 else {
41 if (strstr(address, tcp_proto)) {
42 snprintf(zmq_host->sock.str, sizeof(zmq_host->sock.str), "%s", address);
43 }
44 else if (strstr(address, inproc_proto)) {
45 snprintf(zmq_host->sock_inproc.str, sizeof(zmq_host->sock_inproc.str), "%s", address);
46 }
47 else {
48 Log(LOG_ERR, "ERROR ( %s ): p_zmq_set_address() unsupported protocol in '%s'.\nExiting.\n",
49 zmq_host->log_id, address);
50 exit_gracefully(1);
51 }
52 }
53 }
54 }
55
p_zmq_set_topic(struct p_zmq_host * zmq_host,u_int8_t topic)56 void p_zmq_set_topic(struct p_zmq_host *zmq_host, u_int8_t topic)
57 {
58 if (zmq_host) zmq_host->topic = topic;
59 }
60
p_zmq_set_retry_timeout(struct p_zmq_host * zmq_host,int tout)61 void p_zmq_set_retry_timeout(struct p_zmq_host *zmq_host, int tout)
62 {
63 int ret;
64
65 if (zmq_host) {
66 ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_RECONNECT_IVL, &tout, sizeof(tout));
67 if (ret != 0) {
68 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() RECONNECT_IVL failed for topic %u: %s\nExiting.\n",
69 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
70 exit_gracefully(1);
71 }
72 }
73 }
74
p_zmq_set_username(struct p_zmq_host * zmq_host,char * username)75 void p_zmq_set_username(struct p_zmq_host *zmq_host, char *username)
76 {
77 if (zmq_host) {
78 if (strlen(username) >= sizeof(zmq_host->zap.username)) {
79 Log(LOG_ERR, "ERROR ( %s ): p_zmq_set_username(): username '%s' too long (maximum %lu chars). Exiting.\n",
80 zmq_host->log_id, username, (sizeof(zmq_host->zap.username) - 1));
81 exit_gracefully(1);
82 }
83 else strlcpy(zmq_host->zap.username, username, sizeof(zmq_host->zap.username));
84 }
85 }
86
p_zmq_set_password(struct p_zmq_host * zmq_host,char * password)87 void p_zmq_set_password(struct p_zmq_host *zmq_host, char *password)
88 {
89 if (zmq_host) {
90 if (strlen(password) >= sizeof(zmq_host->zap.password)) {
91 Log(LOG_ERR, "ERROR ( %s ): p_zmq_set_password(): password '%s' too long (maximum %lu chars). Exiting.\n",
92 zmq_host->log_id, password, (sizeof(zmq_host->zap.password) - 1));
93 exit_gracefully(1);
94 }
95 else strlcpy(zmq_host->zap.password, password, sizeof(zmq_host->zap.password));
96 }
97 }
98
p_zmq_set_random_username(struct p_zmq_host * zmq_host)99 void p_zmq_set_random_username(struct p_zmq_host *zmq_host)
100 {
101 if (zmq_host) generate_random_string(zmq_host->zap.username, (sizeof(zmq_host->zap.username) - 1));
102 }
103
p_zmq_set_random_password(struct p_zmq_host * zmq_host)104 void p_zmq_set_random_password(struct p_zmq_host *zmq_host)
105 {
106 if (zmq_host) generate_random_string(zmq_host->zap.password, (sizeof(zmq_host->zap.password) - 1));
107 }
108
p_zmq_set_hwm(struct p_zmq_host * zmq_host,int hwm)109 void p_zmq_set_hwm(struct p_zmq_host *zmq_host, int hwm)
110 {
111 if (zmq_host) zmq_host->hwm = hwm;
112 }
113
p_zmq_set_log_id(struct p_zmq_host * zmq_host,char * log_id)114 void p_zmq_set_log_id(struct p_zmq_host *zmq_host, char *log_id)
115 {
116 if (zmq_host) strlcpy(zmq_host->log_id, log_id, sizeof(zmq_host->log_id));
117 }
118
p_zmq_get_address(struct p_zmq_host * zmq_host)119 char *p_zmq_get_address(struct p_zmq_host *zmq_host)
120 {
121 if (zmq_host) {
122 if (strlen(zmq_host->sock.str)) return zmq_host->sock.str;
123 else if (strlen(zmq_host->sock_inproc.str)) return zmq_host->sock_inproc.str;
124 }
125
126 return NULL;
127 }
128
p_zmq_get_topic(struct p_zmq_host * zmq_host)129 u_int8_t p_zmq_get_topic(struct p_zmq_host *zmq_host)
130 {
131 if (zmq_host) return zmq_host->topic;
132
133 return 0;
134 }
135
p_zmq_get_sock(struct p_zmq_host * zmq_host)136 void *p_zmq_get_sock(struct p_zmq_host *zmq_host)
137 {
138 if (zmq_host) return zmq_host->sock.obj;
139
140 return NULL;
141 }
142
p_zmq_get_fd(struct p_zmq_host * zmq_host)143 int p_zmq_get_fd(struct p_zmq_host *zmq_host)
144 {
145 int fd = ERR;
146 size_t len = sizeof(fd);
147
148 if (zmq_host) {
149 zmq_getsockopt(zmq_host->sock.obj, ZMQ_FD, &fd, &len);
150 }
151
152 return fd;
153 }
154
p_zmq_init_push(struct p_zmq_host * zmq_host,char * address)155 void p_zmq_init_push(struct p_zmq_host *zmq_host, char *address)
156 {
157 if (zmq_host) {
158 memset(zmq_host, 0, sizeof(struct p_zmq_host));
159 p_zmq_set_address(zmq_host, address);
160 }
161 }
162
p_zmq_init_pub(struct p_zmq_host * zmq_host,char * address,u_int8_t topic)163 void p_zmq_init_pub(struct p_zmq_host *zmq_host, char *address, u_int8_t topic)
164 {
165 if (zmq_host) {
166 memset(zmq_host, 0, sizeof(struct p_zmq_host));
167 p_zmq_set_address(zmq_host, address);
168 p_zmq_set_topic(zmq_host, topic);
169 }
170 }
171
p_zmq_plugin_pipe_init_core(struct p_zmq_host * zmq_host,u_int8_t plugin_id,char * username,char * password)172 void p_zmq_plugin_pipe_init_core(struct p_zmq_host *zmq_host, u_int8_t plugin_id, char *username, char *password)
173 {
174 if (zmq_host) {
175 p_zmq_init_pub(zmq_host, NULL, plugin_id);
176
177 if (!username) p_zmq_set_random_username(zmq_host);
178 else p_zmq_set_username(zmq_host, username);
179
180 if (!password) p_zmq_set_random_password(zmq_host);
181 else p_zmq_set_password(zmq_host, password);
182 }
183 }
184
p_zmq_init_sub(struct p_zmq_host * zmq_host)185 void p_zmq_init_sub(struct p_zmq_host *zmq_host)
186 {
187 p_zmq_plugin_pipe_init_plugin(zmq_host);
188 }
189
p_zmq_init_pull(struct p_zmq_host * zmq_host)190 void p_zmq_init_pull(struct p_zmq_host *zmq_host)
191 {
192 p_zmq_plugin_pipe_init_plugin(zmq_host);
193 }
194
p_zmq_plugin_pipe_init_plugin(struct p_zmq_host * zmq_host)195 void p_zmq_plugin_pipe_init_plugin(struct p_zmq_host *zmq_host)
196 {
197 if (zmq_host) {
198
199 /*
200 if (zmq_host->sock.obj) {
201 zmq_unbind(zmq_host->sock.obj, zmq_host->sock.str);
202 zmq_close(zmq_host->sock.obj);
203 }
204
205 if (zmq_host->zap.sock.obj) zmq_close(zmq_host->zap.sock.obj);
206 if (zmq_host->zap.thread) zmq_threadclose(zmq_host->zap.thread);
207
208 if (zmq_host->ctx) {
209 zmq_ctx_shutdown(zmq_host->ctx);
210 zmq_ctx_term(zmq_host->ctx);
211 zmq_host->ctx = NULL;
212 }
213 */
214
215 zmq_host->ctx = NULL;
216 }
217 }
218
p_zmq_plugin_pipe_set_profile(struct configuration * cfg,char * value)219 int p_zmq_plugin_pipe_set_profile(struct configuration *cfg, char *value)
220 {
221 if (!strcmp("micro", value)) {
222 cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_MICRO;
223 cfg->buffer_size = PLUGIN_PIPE_ZMQ_MICRO_SIZE;
224 }
225 else if (!strcmp("small", value)) {
226 cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_SMALL;
227 cfg->buffer_size = PLUGIN_PIPE_ZMQ_SMALL_SIZE;
228 }
229 else if (!strcmp("medium", value)) {
230 cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_MEDIUM;
231 cfg->buffer_size = PLUGIN_PIPE_ZMQ_MEDIUM_SIZE;
232 }
233 else if (!strcmp("large", value)) {
234 cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_LARGE;
235 cfg->buffer_size = PLUGIN_PIPE_ZMQ_LARGE_SIZE;
236 }
237 else if (!strcmp("xlarge", value)) {
238 cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_XLARGE;
239 cfg->buffer_size = PLUGIN_PIPE_ZMQ_XLARGE_SIZE;
240 }
241 else return ERR;
242
243 return SUCCESS;
244 }
245
p_zmq_bind(struct p_zmq_host * zmq_host)246 int p_zmq_bind(struct p_zmq_host *zmq_host)
247 {
248 int ret = 0, as_server = TRUE;
249 size_t sock_strlen;
250
251 if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
252 ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_SERVER, &as_server, sizeof(int));
253 if (ret == ERR) {
254 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_SERVER failed for topic %u: %s\nExiting.\n",
255 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
256 exit_gracefully(1);
257 }
258 }
259
260 if (strlen(zmq_host->sock_inproc.str)) {
261 if (zmq_host->sock_inproc.obj_rx) {
262 ret = zmq_bind(zmq_host->sock_inproc.obj_rx, zmq_host->sock_inproc.str);
263 }
264 else {
265 ret = zmq_bind(zmq_host->sock_inproc.obj, zmq_host->sock_inproc.str);
266 }
267 }
268 else if (strlen(zmq_host->sock.str)) {
269 ret = zmq_bind(zmq_host->sock.obj, zmq_host->sock.str);
270 }
271 else {
272 ret = zmq_bind(zmq_host->sock.obj, "tcp://127.0.0.1:*");
273 }
274
275 if (ret == ERR) {
276 Log(LOG_ERR, "ERROR ( %s ): zmq_bind() failed for topic %u: %s\nExiting.\n",
277 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
278 exit_gracefully(1);
279 }
280
281 if (!strlen(zmq_host->sock.str) && !strlen(zmq_host->sock_inproc.str)) {
282 sock_strlen = sizeof(zmq_host->sock.str);
283 ret = zmq_getsockopt(zmq_host->sock.obj, ZMQ_LAST_ENDPOINT, zmq_host->sock.str, &sock_strlen);
284 if (ret == ERR) {
285 Log(LOG_ERR, "ERROR ( %s ): zmq_getsockopt() ZMQ_LAST_ENDPOINT failed for topic %u: %s\nExiting.\n",
286 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
287 exit_gracefully(1);
288 }
289 }
290
291 return ret;
292 }
293
p_zmq_connect(struct p_zmq_host * zmq_host)294 int p_zmq_connect(struct p_zmq_host *zmq_host)
295 {
296 int ret = 0;
297
298 if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
299 ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_USERNAME, zmq_host->zap.username, strlen(zmq_host->zap.username));
300 if (ret == ERR) {
301 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_USERNAME failed: %s\nExiting.\n",
302 zmq_host->log_id, zmq_strerror(errno));
303 exit_gracefully(1);
304 }
305
306 ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_PASSWORD, zmq_host->zap.password, strlen(zmq_host->zap.password));
307 if (ret == ERR) {
308 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_PASSWORD failed: %s\nExiting.\n",
309 zmq_host->log_id, zmq_strerror(errno));
310 exit_gracefully(1);
311 }
312 }
313
314 if (strlen(zmq_host->sock.str)) {
315 ret = zmq_connect(zmq_host->sock.obj, zmq_host->sock.str);
316 }
317 else if (strlen(zmq_host->sock_inproc.str)) {
318 if (zmq_host->sock_inproc.obj_tx) {
319 ret = zmq_connect(zmq_host->sock_inproc.obj_tx, zmq_host->sock_inproc.str);
320 }
321 else {
322 ret = zmq_connect(zmq_host->sock_inproc.obj, zmq_host->sock_inproc.str);
323 }
324 }
325
326 if (ret == ERR) {
327 Log(LOG_ERR, "ERROR ( %s ): zmq_connect() failed: %s (%s)\nExiting.\n",
328 zmq_host->log_id, (strlen(zmq_host->sock.str) ? zmq_host->sock.str : zmq_host->sock_inproc.str),
329 zmq_strerror(errno));
330 exit_gracefully(1);
331 }
332
333 return ret;
334 }
335
p_zmq_ctx_setup(struct p_zmq_host * zmq_host)336 void p_zmq_ctx_setup(struct p_zmq_host *zmq_host)
337 {
338 if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
339 }
340
p_zmq_zap_setup(struct p_zmq_host * zmq_host)341 void p_zmq_zap_setup(struct p_zmq_host *zmq_host)
342 {
343 zmq_host->zap.thread = zmq_threadstart(&p_zmq_zap_handler, zmq_host);
344 }
345
p_zmq_pub_setup(struct p_zmq_host * zmq_host)346 void p_zmq_pub_setup(struct p_zmq_host *zmq_host)
347 {
348 p_zmq_send_setup(zmq_host, ZMQ_PUB, FALSE);
349 }
350
p_zmq_push_setup(struct p_zmq_host * zmq_host)351 void p_zmq_push_setup(struct p_zmq_host *zmq_host)
352 {
353 p_zmq_send_setup(zmq_host, ZMQ_PUSH, FALSE);
354 }
355
p_zmq_push_connect_setup(struct p_zmq_host * zmq_host)356 void p_zmq_push_connect_setup(struct p_zmq_host *zmq_host)
357 {
358 p_zmq_send_setup(zmq_host, ZMQ_PUSH, TRUE);
359 }
360
p_zmq_send_setup(struct p_zmq_host * zmq_host,int type,int do_connect)361 void p_zmq_send_setup(struct p_zmq_host *zmq_host, int type, int do_connect)
362 {
363 int ret, only_one = 1;
364 void *sock;
365
366 if (!zmq_host) return;
367 if (type != ZMQ_PUB && type != ZMQ_PUSH) return;
368
369 if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
370
371 if (!do_connect) {
372 if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
373 p_zmq_zap_setup(zmq_host);
374 }
375 }
376
377 sock = zmq_socket(zmq_host->ctx, type);
378 if (!sock) {
379 Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for topic %u: %s\nExiting.\n",
380 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
381 exit_gracefully(1);
382 }
383
384 ret = zmq_setsockopt(sock, ZMQ_SNDHWM, &zmq_host->hwm, sizeof(int));
385 if (ret == ERR) {
386 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_SNDHWM failed for topic %u: %s\nExiting.\n",
387 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
388 exit_gracefully(1);
389 }
390
391 ret = zmq_setsockopt(sock, ZMQ_BACKLOG, &only_one, sizeof(int));
392 if (ret == ERR) {
393 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_BACKLOG failed for topic %u: %s\nExiting.\n",
394 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
395 exit_gracefully(1);
396 }
397
398 if (strlen(zmq_host->sock_inproc.str)) {
399 zmq_host->sock_inproc.obj = sock;
400
401 if (do_connect) {
402 zmq_host->sock_inproc.obj_tx = zmq_host->sock_inproc.obj;
403 }
404 else {
405 zmq_host->sock_inproc.obj_rx = zmq_host->sock_inproc.obj;
406 }
407 }
408 else {
409 zmq_host->sock.obj = sock;
410 }
411
412 if (!do_connect) p_zmq_bind(zmq_host);
413 else p_zmq_connect(zmq_host);
414
415 Log(LOG_DEBUG, "DEBUG ( %s ): p_zmq_send_setup() addr=%s username=%s password=%s\n",
416 zmq_host->log_id, (strlen(zmq_host->sock.str) ? zmq_host->sock.str : zmq_host->sock_inproc.str),
417 zmq_host->zap.username, zmq_host->zap.password);
418 }
419
p_zmq_sub_setup(struct p_zmq_host * zmq_host)420 void p_zmq_sub_setup(struct p_zmq_host *zmq_host)
421 {
422 p_zmq_recv_setup(zmq_host, ZMQ_SUB, FALSE);
423 }
424
p_zmq_pull_setup(struct p_zmq_host * zmq_host)425 void p_zmq_pull_setup(struct p_zmq_host *zmq_host)
426 {
427 p_zmq_recv_setup(zmq_host, ZMQ_PULL, FALSE);
428 }
429
p_zmq_pull_bind_setup(struct p_zmq_host * zmq_host)430 void p_zmq_pull_bind_setup(struct p_zmq_host *zmq_host)
431 {
432 p_zmq_recv_setup(zmq_host, ZMQ_PULL, TRUE);
433 }
434
p_zmq_recv_setup(struct p_zmq_host * zmq_host,int type,int do_bind)435 void p_zmq_recv_setup(struct p_zmq_host *zmq_host, int type, int do_bind)
436 {
437 int ret;
438 void *sock;
439
440 if (!zmq_host) return;
441 if (type != ZMQ_SUB && type != ZMQ_PULL) return;
442
443 if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
444
445 if (do_bind) {
446 if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
447 p_zmq_zap_setup(zmq_host);
448 }
449 }
450
451 sock = zmq_socket(zmq_host->ctx, type);
452 if (!sock) {
453 Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for topic %u: %s\nExiting.\n",
454 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
455 exit_gracefully(1);
456 }
457
458 ret = zmq_setsockopt(sock, ZMQ_RCVHWM, &zmq_host->hwm, sizeof(int));
459 if (ret == ERR) {
460 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_RCVHWM failed for topic %u: %s\nExiting.\n",
461 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
462 exit_gracefully(1);
463 }
464
465 if (strlen(zmq_host->sock_inproc.str)) {
466 zmq_host->sock_inproc.obj = sock;
467
468 if (!do_bind) {
469 zmq_host->sock_inproc.obj_tx = zmq_host->sock_inproc.obj;
470 }
471 else {
472 zmq_host->sock_inproc.obj_rx = zmq_host->sock_inproc.obj;
473 }
474 }
475 else zmq_host->sock.obj = sock;
476
477 if (!do_bind) ret = p_zmq_connect(zmq_host);
478 else ret = p_zmq_bind(zmq_host);
479
480 if (type == ZMQ_SUB) {
481 if (zmq_host->topic) {
482 ret = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, &zmq_host->topic, sizeof(zmq_host->topic));
483 if (ret == ERR) {
484 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() SUBSCRIBE failed for topic %u: %s\nExiting.\n",
485 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
486 exit_gracefully(1);
487 }
488 }
489 /* subscribe to all topics */
490 else zmq_setsockopt(sock, ZMQ_SUBSCRIBE, NULL, 0);
491 }
492
493 Log(LOG_DEBUG, "DEBUG ( %s ): p_zmq_recv_setup() addr=%s username=%s password=%s\n",
494 zmq_host->log_id, (strlen(zmq_host->sock.str) ? zmq_host->sock.str : zmq_host->sock_inproc.str),
495 zmq_host->zap.username, zmq_host->zap.password);
496 }
497
p_zmq_topic_send(struct p_zmq_host * zmq_host,void * buf,u_int64_t len)498 int p_zmq_topic_send(struct p_zmq_host *zmq_host, void *buf, u_int64_t len)
499 {
500 int ret;
501
502 ret = zmq_send(zmq_host->sock.obj, &zmq_host->topic, sizeof(zmq_host->topic), ZMQ_SNDMORE);
503 if (ret == ERR) {
504 Log(LOG_ERR, "ERROR ( %s ): publishing topic to ZMQ: zmq_send(): %s [topic=%u]\n",
505 zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
506 return ret;
507 }
508
509 ret = zmq_send(zmq_host->sock.obj, buf, len, 0);
510 if (ret == ERR) {
511 Log(LOG_ERR, "ERROR ( %s ): publishing data to ZMQ: zmq_send(): %s [topic=%u]\n",
512 zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
513 return ret;
514 }
515
516 return ret;
517 }
518
p_zmq_recv_poll(struct p_zmq_sock * sock,int timeout)519 int p_zmq_recv_poll(struct p_zmq_sock *sock, int timeout)
520 {
521 zmq_pollitem_t item[1];
522 void *s;
523
524 if (sock->obj_rx) s = sock->obj_rx;
525 else s = sock->obj;
526
527 item[0].socket = s;
528 item[0].events = ZMQ_POLLIN;
529
530 return zmq_poll(item, 1, timeout);
531 }
532
p_zmq_topic_recv(struct p_zmq_host * zmq_host,void * buf,u_int64_t len)533 int p_zmq_topic_recv(struct p_zmq_host *zmq_host, void *buf, u_int64_t len)
534 {
535 int ret = 0, events;
536 size_t elen = sizeof(events);
537 u_int8_t topic, retries = 0;
538
539 zmq_events_again:
540 ret = zmq_getsockopt(zmq_host->sock.obj, ZMQ_EVENTS, &events, &elen);
541 if (ret == ERR) {
542 if (retries < PM_ZMQ_EVENTS_RETRIES) {
543 Log(LOG_DEBUG, "DEBUG ( %s ): consuming topic from ZMQ: zmq_getsockopt() for ZMQ_EVENTS: %s [topic=%u]\n",
544 zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
545 retries++;
546 goto zmq_events_again;
547 }
548 else {
549 Log(LOG_ERR, "ERROR ( %s ): consuming topic from ZMQ: zmq_getsockopt() for ZMQ_EVENTS: %s [topic=%u]\n",
550 zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
551
552 return ret;
553 }
554 }
555
556 if (events & ZMQ_POLLIN) {
557 ret = zmq_recv(zmq_host->sock.obj, &topic, 1, 0); /* read topic first */
558 if (ret == ERR) {
559 Log(LOG_ERR, "ERROR ( %s ): consuming topic from ZMQ: zmq_recv(): %s [topic=%u]\n",
560 zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
561 return ret;
562 }
563
564 ret = zmq_recv(zmq_host->sock.obj, buf, len, 0); /* read actual data then */
565 if (ret == ERR)
566 Log(LOG_ERR, "ERROR ( %s ): consuming data from ZMQ: zmq_recv(): %s [topic=%u]\n",
567 zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
568 else if (ret > len) {
569 Log(LOG_ERR, "ERROR ( %s ): consuming data from ZMQ: zmq_recv(): buffer overrun [topic=%u]\n",
570 zmq_host->log_id, zmq_host->topic);
571 ret = ERR;
572 }
573 }
574
575 return ret;
576 }
577
p_zmq_recv_str(struct p_zmq_sock * sock)578 char *p_zmq_recv_str(struct p_zmq_sock *sock)
579 {
580 char buf[SRVBUFLEN];
581 int len;
582 void *s;
583
584 if (sock->obj_rx) s = sock->obj_rx;
585 else s = sock->obj;
586
587 memset(buf, 0, sizeof(buf));
588 len = zmq_recv(s, buf, (sizeof(buf) - 1), 0);
589 if (len == ERR) return NULL;
590 else return strndup(buf, sizeof(buf));
591 }
592
p_zmq_send_str(struct p_zmq_sock * sock,char * buf)593 int p_zmq_send_str(struct p_zmq_sock *sock, char *buf)
594 {
595 int len;
596 void *s;
597
598 if (sock->obj_rx) s = sock->obj_rx;
599 else s = sock->obj;
600
601 len = zmq_send(s, buf, strlen(buf), 0);
602
603 return len;
604 }
605
p_zmq_sendmore_str(struct p_zmq_sock * sock,char * buf)606 int p_zmq_sendmore_str(struct p_zmq_sock *sock, char *buf)
607 {
608 int len;
609 void *s;
610
611 if (sock->obj_rx) s = sock->obj_rx;
612 else s = sock->obj;
613
614 len = zmq_send(s, buf, strlen(buf), ZMQ_SNDMORE);
615
616 return len;
617 }
618
p_zmq_recv_bin(struct p_zmq_sock * sock,void * buf,size_t len)619 int p_zmq_recv_bin(struct p_zmq_sock *sock, void *buf, size_t len)
620 {
621 int rcvlen;
622 void *s;
623
624 if (sock->obj_rx) s = sock->obj_rx;
625 else s = sock->obj;
626
627 rcvlen = zmq_recv(s, buf, len, 0);
628
629 return rcvlen;
630 }
631
p_zmq_send_bin(struct p_zmq_sock * sock,void * buf,size_t len,int nonblock)632 int p_zmq_send_bin(struct p_zmq_sock *sock, void *buf, size_t len, int nonblock)
633 {
634 int sndlen;
635 void *s;
636
637 if (sock->obj_tx) s = sock->obj_tx;
638 else s = sock->obj;
639
640 if (!nonblock) sndlen = zmq_send(s, buf, len, 0);
641 else sndlen = zmq_send(s, buf, len, ZMQ_DONTWAIT);
642
643 return sndlen;
644 }
645
p_zmq_sendmore_bin(struct p_zmq_sock * sock,void * buf,size_t len,int nonblock)646 int p_zmq_sendmore_bin(struct p_zmq_sock *sock, void *buf, size_t len, int nonblock)
647 {
648 int sndlen;
649 void *s;
650
651 if (sock->obj_tx) s = sock->obj_tx;
652 else s = sock->obj;
653
654 if (!nonblock) sndlen = zmq_send(s, buf, len, ZMQ_SNDMORE);
655 else sndlen = zmq_send(s, buf, len, (ZMQ_SNDMORE|ZMQ_DONTWAIT));
656
657 return sndlen;
658 }
659
p_zmq_zap_handler(void * zh)660 void p_zmq_zap_handler(void *zh)
661 {
662 struct p_zmq_host *zmq_host = (struct p_zmq_host *) zh;
663 struct p_zmq_sock zmq_sock;
664 int ret;
665
666 memset(&zmq_sock, 0, sizeof(zmq_sock));
667
668 zmq_sock.obj = zmq_socket(zmq_host->ctx, ZMQ_REP);
669 if (!zmq_sock.obj) {
670 Log(LOG_ERR, "ERROR ( %s ): zmq_socket() ZAP failed (%s)\nExiting.\n",
671 zmq_host->log_id, zmq_strerror(errno));
672 exit_gracefully(1);
673 }
674
675 snprintf(zmq_sock.str, sizeof(zmq_sock.str), "%s", "inproc://zeromq.zap.01");
676 ret = zmq_bind(zmq_sock.obj, zmq_sock.str);
677 if (ret == ERR) {
678 Log(LOG_ERR, "ERROR ( %s ): zmq_bind() ZAP failed (%s)\nExiting.\n",
679 zmq_host->log_id, zmq_strerror(errno));
680 exit_gracefully(1);
681 }
682
683 while (TRUE) {
684 char *version, *sequence, *domain, *address, *identity;
685 char *mechanism, *username, *password;
686
687 version = p_zmq_recv_str(&zmq_sock);
688 if (!version) break;
689
690 sequence = p_zmq_recv_str(&zmq_sock);
691 domain = p_zmq_recv_str(&zmq_sock);
692 address = p_zmq_recv_str(&zmq_sock);
693 identity = p_zmq_recv_str(&zmq_sock);
694 mechanism = p_zmq_recv_str(&zmq_sock);
695
696 if (!strcmp(version, "1.0") && !strcmp(mechanism, "PLAIN")) {
697 username = p_zmq_recv_str(&zmq_sock);
698 password = p_zmq_recv_str(&zmq_sock);
699
700 p_zmq_sendmore_str(&zmq_sock, version);
701 p_zmq_sendmore_str(&zmq_sock, sequence);
702
703 if (!strcmp(username, zmq_host->zap.username) &&
704 !strcmp(password, zmq_host->zap.password)) {
705 p_zmq_sendmore_str(&zmq_sock, "200");
706 p_zmq_sendmore_str(&zmq_sock, "OK");
707 p_zmq_sendmore_str(&zmq_sock, "anonymous");
708 p_zmq_send_str(&zmq_sock, "");
709 }
710 else {
711 p_zmq_sendmore_str(&zmq_sock, "400");
712 p_zmq_sendmore_str(&zmq_sock, "Invalid username or password");
713 p_zmq_sendmore_str(&zmq_sock, "");
714 p_zmq_send_str(&zmq_sock, "");
715 }
716
717 free(username);
718 free(password);
719 }
720 else {
721 p_zmq_sendmore_str(&zmq_sock, version);
722 p_zmq_sendmore_str(&zmq_sock, sequence);
723 p_zmq_sendmore_str(&zmq_sock, "400");
724 p_zmq_sendmore_str(&zmq_sock, "Unsupported auth mechanism");
725 p_zmq_sendmore_str(&zmq_sock, "");
726 p_zmq_send_str(&zmq_sock, "");
727 }
728
729 free(version);
730 free(sequence);
731 free(domain);
732 free(address);
733 free(identity);
734 free(mechanism);
735 }
736
737 zmq_close(zmq_sock.obj);
738 }
739
p_zmq_router_setup(struct p_zmq_host * zmq_host,char * host,int port)740 void p_zmq_router_setup(struct p_zmq_host *zmq_host, char *host, int port)
741 {
742 char server_str[SHORTBUFLEN];
743 int ret, as_server = TRUE;
744
745 if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
746
747 zmq_host->sock.obj = zmq_socket(zmq_host->ctx, ZMQ_ROUTER);
748 if (!zmq_host->sock.obj) {
749 Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for ZMQ_ROUTER: %s\nExiting.\n",
750 zmq_host->log_id, zmq_strerror(errno));
751 exit_gracefully(1);
752 }
753
754 if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
755 p_zmq_zap_setup(zmq_host);
756
757 ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_SERVER, &as_server, sizeof(int));
758 if (ret == ERR) {
759 Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_SERVER failed for topic %u: %s\nExiting.\n",
760 zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
761 exit_gracefully(1);
762 }
763 }
764
765 snprintf(server_str, SHORTBUFLEN, "tcp://%s:%u", host, port);
766
767 ret = zmq_bind(zmq_host->sock.obj, server_str);
768 if (ret == ERR) {
769 Log(LOG_ERR, "ERROR ( %s ): zmq_bind() failed for ZMQ_ROUTER: %s\nExiting.\n",
770 zmq_host->log_id, zmq_strerror(errno));
771 exit_gracefully(1);
772 }
773 }
774
p_zmq_dealer_inproc_setup(struct p_zmq_host * zmq_host)775 void p_zmq_dealer_inproc_setup(struct p_zmq_host *zmq_host)
776 {
777 int ret;
778
779 if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
780
781 zmq_host->sock_inproc.obj = zmq_socket(zmq_host->ctx, ZMQ_DEALER);
782 if (!zmq_host->sock_inproc.obj) {
783 Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for ZMQ_DEALER: %s\nExiting.\n",
784 zmq_host->log_id, zmq_strerror(errno));
785 exit_gracefully(1);
786 }
787
788 ret = zmq_bind(zmq_host->sock_inproc.obj, zmq_host->sock_inproc.str);
789 if (ret == ERR) {
790 Log(LOG_ERR, "ERROR ( %s ): zmq_bind() failed for ZMQ_DEALER: %s\nExiting.\n",
791 zmq_host->log_id, zmq_strerror(errno));
792 exit_gracefully(1);
793 }
794 }
795
p_zmq_proxy_setup(struct p_zmq_host * zmq_host)796 void p_zmq_proxy_setup(struct p_zmq_host *zmq_host)
797 {
798 int ret;
799
800 ret = zmq_proxy(zmq_host->sock.obj, zmq_host->sock_inproc.obj, NULL);
801 if (ret == ERR) {
802 Log(LOG_ERR, "ERROR ( %s ): zmq_proxy() failed: %s\nExiting.\n",
803 zmq_host->log_id, zmq_strerror(errno));
804 exit_gracefully(1);
805 }
806 }
807
p_zmq_router_backend_setup(struct p_zmq_host * zmq_host,int thread_nbr)808 void p_zmq_router_backend_setup(struct p_zmq_host *zmq_host, int thread_nbr)
809 {
810 int idx;
811
812 p_zmq_dealer_inproc_setup(zmq_host);
813 zmq_host->router_worker.threads = malloc(sizeof(void *) * thread_nbr);
814
815 for (idx = 0; idx < thread_nbr; idx++) {
816 zmq_host->router_worker.threads[thread_nbr] = zmq_threadstart(&p_zmq_router_worker, zmq_host);
817 }
818
819 p_zmq_proxy_setup(zmq_host);
820 }
821
p_zmq_router_worker(void * zh)822 void p_zmq_router_worker(void *zh)
823 {
824 struct p_zmq_host *zmq_host = (struct p_zmq_host *) zh;
825 struct p_zmq_sock sock;
826 int ret;
827
828 assert(zmq_host);
829 memset(&sock, 0, sizeof(sock));
830
831 sock.obj = zmq_socket(zmq_host->ctx, ZMQ_REP);
832 if (!sock.obj) {
833 Log(LOG_ERR, "ERROR ( %s ): p_zmq_router_worker zmq_socket() failed: %s (%s)\nExiting.\n",
834 zmq_host->log_id, zmq_host->sock_inproc.str, zmq_strerror(errno));
835 exit_gracefully(1);
836 }
837
838 ret = zmq_connect(sock.obj, zmq_host->sock_inproc.str);
839 if (ret == ERR) {
840 Log(LOG_ERR, "ERROR ( %s ): p_zmq_router_worker zmq_connect() failed: %s (%s)\nExiting.\n",
841 zmq_host->log_id, zmq_host->sock_inproc.str, zmq_strerror(errno));
842 exit_gracefully(1);
843 }
844
845 zmq_host->router_worker.func(zmq_host, &sock);
846 }
847
p_zmq_close(struct p_zmq_host * zmq_host)848 void p_zmq_close(struct p_zmq_host *zmq_host)
849 {
850 p_zmq_plugin_pipe_init_plugin(zmq_host);
851 }
852