1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2019 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "zmq_common.h"
26 
27 /* Global variables */
28 struct p_zmq_host nfacctd_zmq_host;
29 struct p_zmq_host telemetry_zmq_host;
30 
31 /* Functions */
p_zmq_set_address(struct p_zmq_host * zmq_host,char * address)32 void p_zmq_set_address(struct p_zmq_host *zmq_host, char *address)
33 {
34   char proto[] = "://", tcp_proto[] = "tcp://", inproc_proto[] = "inproc://";
35 
36   if (zmq_host && address) {
37     if (!strstr(address, proto)) {
38       snprintf(zmq_host->sock.str, sizeof(zmq_host->sock.str), "tcp://%s", address);
39     }
40     else {
41       if (strstr(address, tcp_proto)) {
42 	snprintf(zmq_host->sock.str, sizeof(zmq_host->sock.str), "%s", address);
43       }
44       else if (strstr(address, inproc_proto)) {
45 	snprintf(zmq_host->sock_inproc.str, sizeof(zmq_host->sock_inproc.str), "%s", address);
46       }
47       else {
48 	Log(LOG_ERR, "ERROR ( %s ): p_zmq_set_address() unsupported protocol in '%s'.\nExiting.\n",
49 	    zmq_host->log_id, address);
50 	exit_gracefully(1);
51       }
52     }
53   }
54 }
55 
p_zmq_set_topic(struct p_zmq_host * zmq_host,u_int8_t topic)56 void p_zmq_set_topic(struct p_zmq_host *zmq_host, u_int8_t topic)
57 {
58   if (zmq_host) zmq_host->topic = topic;
59 }
60 
p_zmq_set_retry_timeout(struct p_zmq_host * zmq_host,int tout)61 void p_zmq_set_retry_timeout(struct p_zmq_host *zmq_host, int tout)
62 {
63   int ret;
64 
65   if (zmq_host) {
66     ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_RECONNECT_IVL, &tout, sizeof(tout));
67     if (ret != 0) {
68       Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() RECONNECT_IVL failed for topic %u: %s\nExiting.\n",
69 	  zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
70       exit_gracefully(1);
71     }
72   }
73 }
74 
p_zmq_set_username(struct p_zmq_host * zmq_host,char * username)75 void p_zmq_set_username(struct p_zmq_host *zmq_host, char *username)
76 {
77   if (zmq_host) {
78     if (strlen(username) >= sizeof(zmq_host->zap.username)) {
79       Log(LOG_ERR, "ERROR ( %s ): p_zmq_set_username(): username '%s' too long (maximum %lu chars). Exiting.\n",
80 	  zmq_host->log_id, username, (sizeof(zmq_host->zap.username) - 1));
81       exit_gracefully(1);
82     }
83     else strlcpy(zmq_host->zap.username, username, sizeof(zmq_host->zap.username));
84   }
85 }
86 
p_zmq_set_password(struct p_zmq_host * zmq_host,char * password)87 void p_zmq_set_password(struct p_zmq_host *zmq_host, char *password)
88 {
89   if (zmq_host) {
90     if (strlen(password) >= sizeof(zmq_host->zap.password)) {
91       Log(LOG_ERR, "ERROR ( %s ): p_zmq_set_password(): password '%s' too long (maximum %lu chars). Exiting.\n",
92 	  zmq_host->log_id, password, (sizeof(zmq_host->zap.password) - 1));
93       exit_gracefully(1);
94     }
95     else strlcpy(zmq_host->zap.password, password, sizeof(zmq_host->zap.password));
96   }
97 }
98 
p_zmq_set_random_username(struct p_zmq_host * zmq_host)99 void p_zmq_set_random_username(struct p_zmq_host *zmq_host)
100 {
101   if (zmq_host) generate_random_string(zmq_host->zap.username, (sizeof(zmq_host->zap.username) - 1));
102 }
103 
p_zmq_set_random_password(struct p_zmq_host * zmq_host)104 void p_zmq_set_random_password(struct p_zmq_host *zmq_host)
105 {
106   if (zmq_host) generate_random_string(zmq_host->zap.password, (sizeof(zmq_host->zap.password) - 1));
107 }
108 
p_zmq_set_hwm(struct p_zmq_host * zmq_host,int hwm)109 void p_zmq_set_hwm(struct p_zmq_host *zmq_host, int hwm)
110 {
111   if (zmq_host) zmq_host->hwm = hwm;
112 }
113 
p_zmq_set_log_id(struct p_zmq_host * zmq_host,char * log_id)114 void p_zmq_set_log_id(struct p_zmq_host *zmq_host, char *log_id)
115 {
116   if (zmq_host) strlcpy(zmq_host->log_id, log_id, sizeof(zmq_host->log_id));
117 }
118 
p_zmq_get_address(struct p_zmq_host * zmq_host)119 char *p_zmq_get_address(struct p_zmq_host *zmq_host)
120 {
121   if (zmq_host) {
122     if (strlen(zmq_host->sock.str)) return zmq_host->sock.str;
123     else if (strlen(zmq_host->sock_inproc.str)) return zmq_host->sock_inproc.str;
124   }
125 
126   return NULL;
127 }
128 
p_zmq_get_topic(struct p_zmq_host * zmq_host)129 u_int8_t p_zmq_get_topic(struct p_zmq_host *zmq_host)
130 {
131   if (zmq_host) return zmq_host->topic;
132 
133   return 0;
134 }
135 
p_zmq_get_sock(struct p_zmq_host * zmq_host)136 void *p_zmq_get_sock(struct p_zmq_host *zmq_host)
137 {
138   if (zmq_host) return zmq_host->sock.obj;
139 
140   return NULL;
141 }
142 
p_zmq_get_fd(struct p_zmq_host * zmq_host)143 int p_zmq_get_fd(struct p_zmq_host *zmq_host)
144 {
145   int fd = ERR;
146   size_t len = sizeof(fd);
147 
148   if (zmq_host) {
149     zmq_getsockopt(zmq_host->sock.obj, ZMQ_FD, &fd, &len);
150   }
151 
152   return fd;
153 }
154 
p_zmq_init_push(struct p_zmq_host * zmq_host,char * address)155 void p_zmq_init_push(struct p_zmq_host *zmq_host, char *address)
156 {
157   if (zmq_host) {
158     memset(zmq_host, 0, sizeof(struct p_zmq_host));
159     p_zmq_set_address(zmq_host, address);
160   }
161 }
162 
p_zmq_init_pub(struct p_zmq_host * zmq_host,char * address,u_int8_t topic)163 void p_zmq_init_pub(struct p_zmq_host *zmq_host, char *address, u_int8_t topic)
164 {
165   if (zmq_host) {
166     memset(zmq_host, 0, sizeof(struct p_zmq_host));
167     p_zmq_set_address(zmq_host, address);
168     p_zmq_set_topic(zmq_host, topic);
169   }
170 }
171 
p_zmq_plugin_pipe_init_core(struct p_zmq_host * zmq_host,u_int8_t plugin_id,char * username,char * password)172 void p_zmq_plugin_pipe_init_core(struct p_zmq_host *zmq_host, u_int8_t plugin_id, char *username, char *password)
173 {
174   if (zmq_host) {
175     p_zmq_init_pub(zmq_host, NULL, plugin_id);
176 
177     if (!username) p_zmq_set_random_username(zmq_host);
178     else p_zmq_set_username(zmq_host, username);
179 
180     if (!password) p_zmq_set_random_password(zmq_host);
181     else p_zmq_set_password(zmq_host, password);
182   }
183 }
184 
p_zmq_init_sub(struct p_zmq_host * zmq_host)185 void p_zmq_init_sub(struct p_zmq_host *zmq_host)
186 {
187   p_zmq_plugin_pipe_init_plugin(zmq_host);
188 }
189 
p_zmq_init_pull(struct p_zmq_host * zmq_host)190 void p_zmq_init_pull(struct p_zmq_host *zmq_host)
191 {
192   p_zmq_plugin_pipe_init_plugin(zmq_host);
193 }
194 
p_zmq_plugin_pipe_init_plugin(struct p_zmq_host * zmq_host)195 void p_zmq_plugin_pipe_init_plugin(struct p_zmq_host *zmq_host)
196 {
197   if (zmq_host) {
198 
199 /*
200     if (zmq_host->sock.obj) {
201       zmq_unbind(zmq_host->sock.obj, zmq_host->sock.str);
202       zmq_close(zmq_host->sock.obj);
203     }
204 
205     if (zmq_host->zap.sock.obj) zmq_close(zmq_host->zap.sock.obj);
206     if (zmq_host->zap.thread) zmq_threadclose(zmq_host->zap.thread);
207 
208     if (zmq_host->ctx) {
209       zmq_ctx_shutdown(zmq_host->ctx);
210       zmq_ctx_term(zmq_host->ctx);
211       zmq_host->ctx = NULL;
212     }
213 */
214 
215     zmq_host->ctx = NULL;
216   }
217 }
218 
p_zmq_plugin_pipe_set_profile(struct configuration * cfg,char * value)219 int p_zmq_plugin_pipe_set_profile(struct configuration *cfg, char *value)
220 {
221   if (!strcmp("micro", value)) {
222     cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_MICRO;
223     cfg->buffer_size = PLUGIN_PIPE_ZMQ_MICRO_SIZE;
224   }
225   else if (!strcmp("small", value)) {
226     cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_SMALL;
227     cfg->buffer_size = PLUGIN_PIPE_ZMQ_SMALL_SIZE;
228   }
229   else if (!strcmp("medium", value)) {
230     cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_MEDIUM;
231     cfg->buffer_size = PLUGIN_PIPE_ZMQ_MEDIUM_SIZE;
232   }
233   else if (!strcmp("large", value)) {
234     cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_LARGE;
235     cfg->buffer_size = PLUGIN_PIPE_ZMQ_LARGE_SIZE;
236   }
237   else if (!strcmp("xlarge", value)) {
238     cfg->pipe_zmq_profile = PLUGIN_PIPE_ZMQ_XLARGE;
239     cfg->buffer_size = PLUGIN_PIPE_ZMQ_XLARGE_SIZE;
240   }
241   else return ERR;
242 
243   return SUCCESS;
244 }
245 
p_zmq_bind(struct p_zmq_host * zmq_host)246 int p_zmq_bind(struct p_zmq_host *zmq_host)
247 {
248   int ret = 0, as_server = TRUE;
249   size_t sock_strlen;
250 
251   if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
252     ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_SERVER, &as_server, sizeof(int));
253     if (ret == ERR) {
254       Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_SERVER failed for topic %u: %s\nExiting.\n",
255           zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
256       exit_gracefully(1);
257     }
258   }
259 
260   if (strlen(zmq_host->sock_inproc.str)) {
261     if (zmq_host->sock_inproc.obj_rx) {
262       ret = zmq_bind(zmq_host->sock_inproc.obj_rx, zmq_host->sock_inproc.str);
263     }
264     else {
265       ret = zmq_bind(zmq_host->sock_inproc.obj, zmq_host->sock_inproc.str);
266     }
267   }
268   else if (strlen(zmq_host->sock.str)) {
269     ret = zmq_bind(zmq_host->sock.obj, zmq_host->sock.str);
270   }
271   else {
272     ret = zmq_bind(zmq_host->sock.obj, "tcp://127.0.0.1:*");
273   }
274 
275   if (ret == ERR) {
276     Log(LOG_ERR, "ERROR ( %s ): zmq_bind() failed for topic %u: %s\nExiting.\n",
277         zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
278     exit_gracefully(1);
279   }
280 
281   if (!strlen(zmq_host->sock.str) && !strlen(zmq_host->sock_inproc.str)) {
282     sock_strlen = sizeof(zmq_host->sock.str);
283     ret = zmq_getsockopt(zmq_host->sock.obj, ZMQ_LAST_ENDPOINT, zmq_host->sock.str, &sock_strlen);
284     if (ret == ERR) {
285       Log(LOG_ERR, "ERROR ( %s ): zmq_getsockopt() ZMQ_LAST_ENDPOINT failed for topic %u: %s\nExiting.\n",
286 	  zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
287       exit_gracefully(1);
288     }
289   }
290 
291   return ret;
292 }
293 
p_zmq_connect(struct p_zmq_host * zmq_host)294 int p_zmq_connect(struct p_zmq_host *zmq_host)
295 {
296   int ret = 0;
297 
298   if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
299     ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_USERNAME, zmq_host->zap.username, strlen(zmq_host->zap.username));
300     if (ret == ERR) {
301       Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_USERNAME failed: %s\nExiting.\n",
302 	  zmq_host->log_id, zmq_strerror(errno));
303       exit_gracefully(1);
304     }
305 
306     ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_PASSWORD, zmq_host->zap.password, strlen(zmq_host->zap.password));
307     if (ret == ERR) {
308       Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_PASSWORD failed: %s\nExiting.\n",
309 	  zmq_host->log_id, zmq_strerror(errno));
310       exit_gracefully(1);
311     }
312   }
313 
314   if (strlen(zmq_host->sock.str)) {
315     ret = zmq_connect(zmq_host->sock.obj, zmq_host->sock.str);
316   }
317   else if (strlen(zmq_host->sock_inproc.str)) {
318     if (zmq_host->sock_inproc.obj_tx) {
319       ret = zmq_connect(zmq_host->sock_inproc.obj_tx, zmq_host->sock_inproc.str);
320     }
321     else {
322       ret = zmq_connect(zmq_host->sock_inproc.obj, zmq_host->sock_inproc.str);
323     }
324   }
325 
326   if (ret == ERR) {
327     Log(LOG_ERR, "ERROR ( %s ): zmq_connect() failed: %s (%s)\nExiting.\n",
328         zmq_host->log_id, (strlen(zmq_host->sock.str) ? zmq_host->sock.str : zmq_host->sock_inproc.str),
329         zmq_strerror(errno));
330     exit_gracefully(1);
331   }
332 
333   return ret;
334 }
335 
p_zmq_ctx_setup(struct p_zmq_host * zmq_host)336 void p_zmq_ctx_setup(struct p_zmq_host *zmq_host)
337 {
338   if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
339 }
340 
p_zmq_zap_setup(struct p_zmq_host * zmq_host)341 void p_zmq_zap_setup(struct p_zmq_host *zmq_host)
342 {
343   zmq_host->zap.thread = zmq_threadstart(&p_zmq_zap_handler, zmq_host);
344 }
345 
p_zmq_pub_setup(struct p_zmq_host * zmq_host)346 void p_zmq_pub_setup(struct p_zmq_host *zmq_host)
347 {
348   p_zmq_send_setup(zmq_host, ZMQ_PUB, FALSE);
349 }
350 
p_zmq_push_setup(struct p_zmq_host * zmq_host)351 void p_zmq_push_setup(struct p_zmq_host *zmq_host)
352 {
353   p_zmq_send_setup(zmq_host, ZMQ_PUSH, FALSE);
354 }
355 
p_zmq_push_connect_setup(struct p_zmq_host * zmq_host)356 void p_zmq_push_connect_setup(struct p_zmq_host *zmq_host)
357 {
358   p_zmq_send_setup(zmq_host, ZMQ_PUSH, TRUE);
359 }
360 
p_zmq_send_setup(struct p_zmq_host * zmq_host,int type,int do_connect)361 void p_zmq_send_setup(struct p_zmq_host *zmq_host, int type, int do_connect)
362 {
363   int ret, only_one = 1;
364   void *sock;
365 
366   if (!zmq_host) return;
367   if (type != ZMQ_PUB && type != ZMQ_PUSH) return;
368 
369   if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
370 
371   if (!do_connect) {
372     if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
373       p_zmq_zap_setup(zmq_host);
374     }
375   }
376 
377   sock = zmq_socket(zmq_host->ctx, type);
378   if (!sock) {
379     Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for topic %u: %s\nExiting.\n",
380         zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
381     exit_gracefully(1);
382   }
383 
384   ret = zmq_setsockopt(sock, ZMQ_SNDHWM, &zmq_host->hwm, sizeof(int));
385   if (ret == ERR) {
386     Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_SNDHWM failed for topic %u: %s\nExiting.\n",
387 	zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
388     exit_gracefully(1);
389   }
390 
391   ret = zmq_setsockopt(sock, ZMQ_BACKLOG, &only_one, sizeof(int));
392   if (ret == ERR) {
393     Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_BACKLOG failed for topic %u: %s\nExiting.\n",
394 	zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
395     exit_gracefully(1);
396   }
397 
398   if (strlen(zmq_host->sock_inproc.str)) {
399     zmq_host->sock_inproc.obj = sock;
400 
401     if (do_connect) {
402       zmq_host->sock_inproc.obj_tx = zmq_host->sock_inproc.obj;
403     }
404     else {
405       zmq_host->sock_inproc.obj_rx = zmq_host->sock_inproc.obj;
406     }
407   }
408   else {
409     zmq_host->sock.obj = sock;
410   }
411 
412   if (!do_connect) p_zmq_bind(zmq_host);
413   else p_zmq_connect(zmq_host);
414 
415   Log(LOG_DEBUG, "DEBUG ( %s ): p_zmq_send_setup() addr=%s username=%s password=%s\n",
416       zmq_host->log_id, (strlen(zmq_host->sock.str) ? zmq_host->sock.str : zmq_host->sock_inproc.str),
417       zmq_host->zap.username, zmq_host->zap.password);
418 }
419 
p_zmq_sub_setup(struct p_zmq_host * zmq_host)420 void p_zmq_sub_setup(struct p_zmq_host *zmq_host)
421 {
422   p_zmq_recv_setup(zmq_host, ZMQ_SUB, FALSE);
423 }
424 
p_zmq_pull_setup(struct p_zmq_host * zmq_host)425 void p_zmq_pull_setup(struct p_zmq_host *zmq_host)
426 {
427   p_zmq_recv_setup(zmq_host, ZMQ_PULL, FALSE);
428 }
429 
p_zmq_pull_bind_setup(struct p_zmq_host * zmq_host)430 void p_zmq_pull_bind_setup(struct p_zmq_host *zmq_host)
431 {
432   p_zmq_recv_setup(zmq_host, ZMQ_PULL, TRUE);
433 }
434 
p_zmq_recv_setup(struct p_zmq_host * zmq_host,int type,int do_bind)435 void p_zmq_recv_setup(struct p_zmq_host *zmq_host, int type, int do_bind)
436 {
437   int ret;
438   void *sock;
439 
440   if (!zmq_host) return;
441   if (type != ZMQ_SUB && type != ZMQ_PULL) return;
442 
443   if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
444 
445   if (do_bind) {
446     if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
447       p_zmq_zap_setup(zmq_host);
448     }
449   }
450 
451   sock = zmq_socket(zmq_host->ctx, type);
452   if (!sock) {
453     Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for topic %u: %s\nExiting.\n",
454         zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
455     exit_gracefully(1);
456   }
457 
458   ret = zmq_setsockopt(sock, ZMQ_RCVHWM, &zmq_host->hwm, sizeof(int));
459   if (ret == ERR) {
460     Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_RCVHWM failed for topic %u: %s\nExiting.\n",
461         zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
462     exit_gracefully(1);
463   }
464 
465   if (strlen(zmq_host->sock_inproc.str)) {
466     zmq_host->sock_inproc.obj = sock;
467 
468     if (!do_bind) {
469       zmq_host->sock_inproc.obj_tx = zmq_host->sock_inproc.obj;
470     }
471     else {
472       zmq_host->sock_inproc.obj_rx = zmq_host->sock_inproc.obj;
473     }
474   }
475   else zmq_host->sock.obj = sock;
476 
477   if (!do_bind) ret = p_zmq_connect(zmq_host);
478   else ret = p_zmq_bind(zmq_host);
479 
480   if (type == ZMQ_SUB) {
481     if (zmq_host->topic) {
482       ret = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, &zmq_host->topic, sizeof(zmq_host->topic));
483       if (ret == ERR) {
484 	Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() SUBSCRIBE failed for topic %u: %s\nExiting.\n",
485 	    zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
486 	exit_gracefully(1);
487       }
488     }
489     /* subscribe to all topics */
490     else zmq_setsockopt(sock, ZMQ_SUBSCRIBE, NULL, 0);
491   }
492 
493   Log(LOG_DEBUG, "DEBUG ( %s ): p_zmq_recv_setup() addr=%s username=%s password=%s\n",
494       zmq_host->log_id, (strlen(zmq_host->sock.str) ? zmq_host->sock.str : zmq_host->sock_inproc.str),
495       zmq_host->zap.username, zmq_host->zap.password);
496 }
497 
p_zmq_topic_send(struct p_zmq_host * zmq_host,void * buf,u_int64_t len)498 int p_zmq_topic_send(struct p_zmq_host *zmq_host, void *buf, u_int64_t len)
499 {
500   int ret;
501 
502   ret = zmq_send(zmq_host->sock.obj, &zmq_host->topic, sizeof(zmq_host->topic), ZMQ_SNDMORE);
503   if (ret == ERR) {
504     Log(LOG_ERR, "ERROR ( %s ): publishing topic to ZMQ: zmq_send(): %s [topic=%u]\n",
505 	zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
506     return ret;
507   }
508 
509   ret = zmq_send(zmq_host->sock.obj, buf, len, 0);
510   if (ret == ERR) {
511     Log(LOG_ERR, "ERROR ( %s ): publishing data to ZMQ: zmq_send(): %s [topic=%u]\n",
512 	zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
513     return ret;
514   }
515 
516   return ret;
517 }
518 
p_zmq_recv_poll(struct p_zmq_sock * sock,int timeout)519 int p_zmq_recv_poll(struct p_zmq_sock *sock, int timeout)
520 {
521   zmq_pollitem_t item[1];
522   void *s;
523 
524   if (sock->obj_rx) s = sock->obj_rx;
525   else s = sock->obj;
526 
527   item[0].socket = s;
528   item[0].events = ZMQ_POLLIN;
529 
530   return zmq_poll(item, 1, timeout);
531 }
532 
p_zmq_topic_recv(struct p_zmq_host * zmq_host,void * buf,u_int64_t len)533 int p_zmq_topic_recv(struct p_zmq_host *zmq_host, void *buf, u_int64_t len)
534 {
535   int ret = 0, events;
536   size_t elen = sizeof(events);
537   u_int8_t topic, retries = 0;
538 
539   zmq_events_again:
540   ret = zmq_getsockopt(zmq_host->sock.obj, ZMQ_EVENTS, &events, &elen);
541   if (ret == ERR) {
542     if (retries < PM_ZMQ_EVENTS_RETRIES) {
543       Log(LOG_DEBUG, "DEBUG ( %s ): consuming topic from ZMQ: zmq_getsockopt() for ZMQ_EVENTS: %s [topic=%u]\n",
544 	zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
545       retries++;
546       goto zmq_events_again;
547     }
548     else {
549       Log(LOG_ERR, "ERROR ( %s ): consuming topic from ZMQ: zmq_getsockopt() for ZMQ_EVENTS: %s [topic=%u]\n",
550 	zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
551 
552       return ret;
553     }
554   }
555 
556   if (events & ZMQ_POLLIN) {
557     ret = zmq_recv(zmq_host->sock.obj, &topic, 1, 0); /* read topic first */
558     if (ret == ERR) {
559       Log(LOG_ERR, "ERROR ( %s ): consuming topic from ZMQ: zmq_recv(): %s [topic=%u]\n",
560 	  zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
561       return ret;
562     }
563 
564     ret = zmq_recv(zmq_host->sock.obj, buf, len, 0); /* read actual data then */
565     if (ret == ERR)
566       Log(LOG_ERR, "ERROR ( %s ): consuming data from ZMQ: zmq_recv(): %s [topic=%u]\n",
567 	  zmq_host->log_id, zmq_strerror(errno), zmq_host->topic);
568     else if (ret > len) {
569       Log(LOG_ERR, "ERROR ( %s ): consuming data from ZMQ: zmq_recv(): buffer overrun [topic=%u]\n",
570 	  zmq_host->log_id, zmq_host->topic);
571       ret = ERR;
572     }
573   }
574 
575   return ret;
576 }
577 
p_zmq_recv_str(struct p_zmq_sock * sock)578 char *p_zmq_recv_str(struct p_zmq_sock *sock)
579 {
580   char buf[SRVBUFLEN];
581   int len;
582   void *s;
583 
584   if (sock->obj_rx) s = sock->obj_rx;
585   else s = sock->obj;
586 
587   memset(buf, 0, sizeof(buf));
588   len = zmq_recv(s, buf, (sizeof(buf) - 1), 0);
589   if (len == ERR) return NULL;
590   else return strndup(buf, sizeof(buf));
591 }
592 
p_zmq_send_str(struct p_zmq_sock * sock,char * buf)593 int p_zmq_send_str(struct p_zmq_sock *sock, char *buf)
594 {
595   int len;
596   void *s;
597 
598   if (sock->obj_rx) s = sock->obj_rx;
599   else s = sock->obj;
600 
601   len = zmq_send(s, buf, strlen(buf), 0);
602 
603   return len;
604 }
605 
p_zmq_sendmore_str(struct p_zmq_sock * sock,char * buf)606 int p_zmq_sendmore_str(struct p_zmq_sock *sock, char *buf)
607 {
608   int len;
609   void *s;
610 
611   if (sock->obj_rx) s = sock->obj_rx;
612   else s = sock->obj;
613 
614   len = zmq_send(s, buf, strlen(buf), ZMQ_SNDMORE);
615 
616   return len;
617 }
618 
p_zmq_recv_bin(struct p_zmq_sock * sock,void * buf,size_t len)619 int p_zmq_recv_bin(struct p_zmq_sock *sock, void *buf, size_t len)
620 {
621   int rcvlen;
622   void *s;
623 
624   if (sock->obj_rx) s = sock->obj_rx;
625   else s = sock->obj;
626 
627   rcvlen = zmq_recv(s, buf, len, 0);
628 
629   return rcvlen;
630 }
631 
p_zmq_send_bin(struct p_zmq_sock * sock,void * buf,size_t len,int nonblock)632 int p_zmq_send_bin(struct p_zmq_sock *sock, void *buf, size_t len, int nonblock)
633 {
634   int sndlen;
635   void *s;
636 
637   if (sock->obj_tx) s = sock->obj_tx;
638   else s = sock->obj;
639 
640   if (!nonblock) sndlen = zmq_send(s, buf, len, 0);
641   else sndlen = zmq_send(s, buf, len, ZMQ_DONTWAIT);
642 
643   return sndlen;
644 }
645 
p_zmq_sendmore_bin(struct p_zmq_sock * sock,void * buf,size_t len,int nonblock)646 int p_zmq_sendmore_bin(struct p_zmq_sock *sock, void *buf, size_t len, int nonblock)
647 {
648   int sndlen;
649   void *s;
650 
651   if (sock->obj_tx) s = sock->obj_tx;
652   else s = sock->obj;
653 
654   if (!nonblock) sndlen = zmq_send(s, buf, len, ZMQ_SNDMORE);
655   else sndlen = zmq_send(s, buf, len, (ZMQ_SNDMORE|ZMQ_DONTWAIT));
656 
657   return sndlen;
658 }
659 
p_zmq_zap_handler(void * zh)660 void p_zmq_zap_handler(void *zh)
661 {
662   struct p_zmq_host *zmq_host = (struct p_zmq_host *) zh;
663   struct p_zmq_sock zmq_sock;
664   int ret;
665 
666   memset(&zmq_sock, 0, sizeof(zmq_sock));
667 
668   zmq_sock.obj = zmq_socket(zmq_host->ctx, ZMQ_REP);
669   if (!zmq_sock.obj) {
670     Log(LOG_ERR, "ERROR ( %s ): zmq_socket() ZAP failed (%s)\nExiting.\n",
671         zmq_host->log_id, zmq_strerror(errno));
672     exit_gracefully(1);
673   }
674 
675   snprintf(zmq_sock.str, sizeof(zmq_sock.str), "%s", "inproc://zeromq.zap.01");
676   ret = zmq_bind(zmq_sock.obj, zmq_sock.str);
677   if (ret == ERR) {
678     Log(LOG_ERR, "ERROR ( %s ): zmq_bind() ZAP failed (%s)\nExiting.\n",
679         zmq_host->log_id, zmq_strerror(errno));
680     exit_gracefully(1);
681   }
682 
683   while (TRUE) {
684     char *version, *sequence, *domain, *address, *identity;
685     char *mechanism, *username, *password;
686 
687     version = p_zmq_recv_str(&zmq_sock);
688     if (!version) break;
689 
690     sequence = p_zmq_recv_str(&zmq_sock);
691     domain = p_zmq_recv_str(&zmq_sock);
692     address = p_zmq_recv_str(&zmq_sock);
693     identity = p_zmq_recv_str(&zmq_sock);
694     mechanism = p_zmq_recv_str(&zmq_sock);
695 
696     if (!strcmp(version, "1.0") && !strcmp(mechanism, "PLAIN")) {
697       username = p_zmq_recv_str(&zmq_sock);
698       password = p_zmq_recv_str(&zmq_sock);
699 
700       p_zmq_sendmore_str(&zmq_sock, version);
701       p_zmq_sendmore_str(&zmq_sock, sequence);
702 
703       if (!strcmp(username, zmq_host->zap.username) &&
704 	  !strcmp(password, zmq_host->zap.password)) {
705         p_zmq_sendmore_str(&zmq_sock, "200");
706         p_zmq_sendmore_str(&zmq_sock, "OK");
707         p_zmq_sendmore_str(&zmq_sock, "anonymous");
708         p_zmq_send_str(&zmq_sock, "");
709       }
710       else {
711         p_zmq_sendmore_str(&zmq_sock, "400");
712         p_zmq_sendmore_str(&zmq_sock, "Invalid username or password");
713         p_zmq_sendmore_str(&zmq_sock, "");
714         p_zmq_send_str(&zmq_sock, "");
715       }
716 
717       free(username);
718       free(password);
719     }
720     else {
721       p_zmq_sendmore_str(&zmq_sock, version);
722       p_zmq_sendmore_str(&zmq_sock, sequence);
723       p_zmq_sendmore_str(&zmq_sock, "400");
724       p_zmq_sendmore_str(&zmq_sock, "Unsupported auth mechanism");
725       p_zmq_sendmore_str(&zmq_sock, "");
726       p_zmq_send_str(&zmq_sock, "");
727     }
728 
729     free(version);
730     free(sequence);
731     free(domain);
732     free(address);
733     free(identity);
734     free(mechanism);
735   }
736 
737   zmq_close(zmq_sock.obj);
738 }
739 
p_zmq_router_setup(struct p_zmq_host * zmq_host,char * host,int port)740 void p_zmq_router_setup(struct p_zmq_host *zmq_host, char *host, int port)
741 {
742   char server_str[SHORTBUFLEN];
743   int ret, as_server = TRUE;
744 
745   if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
746 
747   zmq_host->sock.obj = zmq_socket(zmq_host->ctx, ZMQ_ROUTER);
748   if (!zmq_host->sock.obj) {
749     Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for ZMQ_ROUTER: %s\nExiting.\n",
750 	zmq_host->log_id, zmq_strerror(errno));
751     exit_gracefully(1);
752   }
753 
754   if (strlen(zmq_host->zap.username) && strlen(zmq_host->zap.password)) {
755     p_zmq_zap_setup(zmq_host);
756 
757     ret = zmq_setsockopt(zmq_host->sock.obj, ZMQ_PLAIN_SERVER, &as_server, sizeof(int));
758     if (ret == ERR) {
759       Log(LOG_ERR, "ERROR ( %s ): zmq_setsockopt() ZMQ_PLAIN_SERVER failed for topic %u: %s\nExiting.\n",
760 	  zmq_host->log_id, zmq_host->topic, zmq_strerror(errno));
761       exit_gracefully(1);
762     }
763   }
764 
765   snprintf(server_str, SHORTBUFLEN, "tcp://%s:%u", host, port);
766 
767   ret = zmq_bind(zmq_host->sock.obj, server_str);
768   if (ret == ERR) {
769     Log(LOG_ERR, "ERROR ( %s ): zmq_bind() failed for ZMQ_ROUTER: %s\nExiting.\n",
770 	zmq_host->log_id, zmq_strerror(errno));
771     exit_gracefully(1);
772   }
773 }
774 
p_zmq_dealer_inproc_setup(struct p_zmq_host * zmq_host)775 void p_zmq_dealer_inproc_setup(struct p_zmq_host *zmq_host)
776 {
777   int ret;
778 
779   if (!zmq_host->ctx) zmq_host->ctx = zmq_ctx_new();
780 
781   zmq_host->sock_inproc.obj = zmq_socket(zmq_host->ctx, ZMQ_DEALER);
782   if (!zmq_host->sock_inproc.obj) {
783     Log(LOG_ERR, "ERROR ( %s ): zmq_socket() failed for ZMQ_DEALER: %s\nExiting.\n",
784 	zmq_host->log_id, zmq_strerror(errno));
785     exit_gracefully(1);
786   }
787 
788   ret = zmq_bind(zmq_host->sock_inproc.obj, zmq_host->sock_inproc.str);
789   if (ret == ERR) {
790     Log(LOG_ERR, "ERROR ( %s ): zmq_bind() failed for ZMQ_DEALER: %s\nExiting.\n",
791 	zmq_host->log_id, zmq_strerror(errno));
792     exit_gracefully(1);
793   }
794 }
795 
p_zmq_proxy_setup(struct p_zmq_host * zmq_host)796 void p_zmq_proxy_setup(struct p_zmq_host *zmq_host)
797 {
798   int ret;
799 
800   ret = zmq_proxy(zmq_host->sock.obj, zmq_host->sock_inproc.obj, NULL);
801   if (ret == ERR) {
802     Log(LOG_ERR, "ERROR ( %s ): zmq_proxy() failed: %s\nExiting.\n",
803 	zmq_host->log_id, zmq_strerror(errno));
804     exit_gracefully(1);
805   }
806 }
807 
p_zmq_router_backend_setup(struct p_zmq_host * zmq_host,int thread_nbr)808 void p_zmq_router_backend_setup(struct p_zmq_host *zmq_host, int thread_nbr)
809 {
810   int idx;
811 
812   p_zmq_dealer_inproc_setup(zmq_host);
813   zmq_host->router_worker.threads = malloc(sizeof(void *) * thread_nbr);
814 
815   for (idx = 0; idx < thread_nbr; idx++) {
816     zmq_host->router_worker.threads[thread_nbr] = zmq_threadstart(&p_zmq_router_worker, zmq_host);
817   }
818 
819   p_zmq_proxy_setup(zmq_host);
820 }
821 
p_zmq_router_worker(void * zh)822 void p_zmq_router_worker(void *zh)
823 {
824   struct p_zmq_host *zmq_host = (struct p_zmq_host *) zh;
825   struct p_zmq_sock sock;
826   int ret;
827 
828   assert(zmq_host);
829   memset(&sock, 0, sizeof(sock));
830 
831   sock.obj = zmq_socket(zmq_host->ctx, ZMQ_REP);
832   if (!sock.obj) {
833     Log(LOG_ERR, "ERROR ( %s ): p_zmq_router_worker zmq_socket() failed: %s (%s)\nExiting.\n",
834         zmq_host->log_id, zmq_host->sock_inproc.str, zmq_strerror(errno));
835     exit_gracefully(1);
836   }
837 
838   ret = zmq_connect(sock.obj, zmq_host->sock_inproc.str);
839   if (ret == ERR) {
840     Log(LOG_ERR, "ERROR ( %s ): p_zmq_router_worker zmq_connect() failed: %s (%s)\nExiting.\n",
841         zmq_host->log_id, zmq_host->sock_inproc.str, zmq_strerror(errno));
842     exit_gracefully(1);
843   }
844 
845   zmq_host->router_worker.func(zmq_host, &sock);
846 }
847 
p_zmq_close(struct p_zmq_host * zmq_host)848 void p_zmq_close(struct p_zmq_host *zmq_host)
849 {
850   p_zmq_plugin_pipe_init_plugin(zmq_host);
851 }
852