1 /* =========================================================================
2 zsys - system-level methods
3
4 Copyright (c) the Contributors as noted in the AUTHORS file.
5 This file is part of CZMQ, the high-level C binding for 0MQ:
6 http://czmq.zeromq.org.
7
8 This Source Code Form is subject to the terms of the Mozilla Public
9 License, v. 2.0. If a copy of the MPL was not distributed with this
10 file, You can obtain one at http://mozilla.org/MPL/2.0/.
11 =========================================================================
12 */
13
14 /*
15 @header
16 The zsys class provides a portable wrapper for system calls. We collect
17 them here to reduce the number of weird #ifdefs in other classes. As far
18 as possible, the bulk of CZMQ classes are fully portable.
19 @discuss
20 @end
21 */
22
23 #include "czmq_classes.h"
24
25 // For getcwd() variants
26 #if (defined (WIN32))
27 # include <direct.h>
28 #else
29 # include <unistd.h>
30 #endif
31
32 // --------------------------------------------------------------------------
33 // Signal handling
34
35 // These are global variables accessible to CZMQ application code
36 volatile int zsys_interrupted = 0; // Current name
37 volatile int zctx_interrupted = 0; // Deprecated name
38
39 static void s_signal_handler (int signal_value);
40
41 // We use these variables for signal handling
42 static bool s_first_time = true;
43 static bool handle_signals = true;
44 #if defined (__UNIX__)
45 static struct sigaction sigint_default;
46 static struct sigaction sigterm_default;
47
48 #elif defined (__WINDOWS__)
49 static zsys_handler_fn *installed_handler_fn;
50 static BOOL WINAPI
s_handler_fn_shim(DWORD ctrltype)51 s_handler_fn_shim (DWORD ctrltype)
52 {
53 // Return TRUE for events that we handle
54 if (ctrltype == CTRL_C_EVENT && installed_handler_fn != NULL) {
55 installed_handler_fn (ctrltype);
56 return TRUE;
57 }
58 if (ctrltype == CTRL_CLOSE_EVENT && installed_handler_fn != NULL) {
59 installed_handler_fn (ctrltype);
60 return TRUE;
61 }
62 return FALSE;
63 }
64 #endif
65
66 // --------------------------------------------------------------------------
67 // Global context handling
68
69 // ZeroMQ context for this process
70 static void *s_process_ctx = NULL;
71 static bool s_initialized = false;
72 static bool s_shutting_down = false;
73 #if defined (__UNIX__)
74 static void zsys_cleanup (void);
75 #endif
76
77 #ifndef S_DEFAULT_ZSYS_FILE_STABLE_AGE_MSEC
78 // This is a private tunable that is likely to be replaced or tweaked later
79 // per comment block at s_zsys_file_stable() implementation, to reflect
80 // the best stat data granularity available on host OS *and* used by czmq.
81 #define S_DEFAULT_ZSYS_FILE_STABLE_AGE_MSEC 5000
82 #endif
83
84 // Default globals for new sockets and other joys; these can all be set
85 // from the environment, or via the zsys_set_xxx API.
86 static size_t s_io_threads = 1; // ZSYS_IO_THREADS=1
87 static int s_thread_sched_policy = -1; // ZSYS_THREAD_SCHED_POLICY=-1
88 static int s_thread_priority = -1; // ZSYS_THREAD_PRIORITY=-1
89 static int s_thread_name_prefix = -1; // ZSYS_THREAD_NAME_PREFIX=-1
90 static char s_thread_name_prefix_str[16] = "0"; // ZSYS_THREAD_NAME_PREFIX_STR="0"
91 static size_t s_max_sockets = 1024; // ZSYS_MAX_SOCKETS=1024
92 static int s_max_msgsz = INT_MAX; // ZSYS_MAX_MSGSZ=INT_MAX
93 static int64_t s_file_stable_age_msec = S_DEFAULT_ZSYS_FILE_STABLE_AGE_MSEC;
94 // ZSYS_FILE_STABLE_AGE_MSEC=5000
95 static size_t s_linger = 0; // ZSYS_LINGER=0
96 static size_t s_sndhwm = 1000; // ZSYS_SNDHWM=1000
97 static size_t s_rcvhwm = 1000; // ZSYS_RCVHWM=1000
98 static size_t s_pipehwm = 1000; // ZSYS_PIPEHWM=1000
99 static int s_ipv6 = 0; // ZSYS_IPV6=0
100 static char *s_interface = NULL; // ZSYS_INTERFACE=
101 static char *s_ipv6_address = NULL; // ZSYS_IPV6_ADDRESS=
102 static char *s_ipv6_mcast_address = NULL; // ZSYS_IPV6_MCAST_ADDRESS=
103 static int s_auto_use_fd = 0; // ZSYS_AUTO_USE_FD=0
104 static char *s_logident = NULL; // ZSYS_LOGIDENT=
105 static FILE *s_logstream = NULL; // ZSYS_LOGSTREAM=stdout/stderr
106 static bool s_logsystem = false; // ZSYS_LOGSYSTEM=true/false
107 static zsock_t *s_logsender = NULL; // ZSYS_LOGSENDER=
108 static int s_zero_copy_recv = 1; // ZSYS_ZERO_COPY_RECV=1
109 static char *s_ipv4_mcast_address = NULL; // ZSYS_IPV4_MCAST_ADDRESS=
110 static unsigned char s_mcast_ttl = 1; // ZSYS_MCAST_TTL=1
111
112 // Track number of open sockets so we can zmq_term() safely
113 static size_t s_open_sockets = 0;
114
115 // We keep a list of open sockets to report leaks to developers
116 static zlist_t *s_sockref_list = NULL;
117
118 // This defines a single zsock_new() caller instance
119 typedef struct {
120 void *handle;
121 int type;
122 const char *filename;
123 size_t line_nbr;
124 } s_sockref_t;
125
126 // Mutex macros
127 #if defined (__UNIX__)
128 typedef pthread_mutex_t zsys_mutex_t;
129 # define ZMUTEX_INIT(m) pthread_mutex_init (&m, NULL);
130 # define ZMUTEX_LOCK(m) pthread_mutex_lock (&m);
131 # define ZMUTEX_UNLOCK(m) pthread_mutex_unlock (&m);
132 # define ZMUTEX_DESTROY(m) pthread_mutex_destroy (&m);
133 #elif defined (__WINDOWS__)
134 typedef CRITICAL_SECTION zsys_mutex_t;
135 # define ZMUTEX_INIT(m) InitializeCriticalSection (&m);
136 # define ZMUTEX_LOCK(m) EnterCriticalSection (&m);
137 # define ZMUTEX_UNLOCK(m) LeaveCriticalSection (&m);
138 # define ZMUTEX_DESTROY(m) DeleteCriticalSection (&m);
139 #endif
140
141 // Mutex to guard socket counter
142 static zsys_mutex_t s_mutex;
143 #if defined (__UNIX__)
144 // Mutex to guard the multiple zsys_init() - to make it threadsafe
145 static zsys_mutex_t s_init_mutex;
146 #endif
147 // Implementation for the zsys_vprintf() which is known from legacy
148 // and poses as a stable interface now.
149 static inline
150 char *
151 s_zsys_vprintf_hint (int hint, const char *format, va_list argptr);
152
153 // --------------------------------------------------------------------------
154 // Initialize CZMQ zsys layer; this happens automatically when you create
155 // a socket or an actor; however this call lets you force initialization
156 // earlier, so e.g. logging is properly set-up before you start working.
157 // Not threadsafe, so call only from main thread. Safe to call multiple
158 // times. Returns global CZMQ context.
159
160 #if defined (__UNIX__)
161 // mutex for pthread_once to run the init function only once in a process
162 static pthread_once_t init_all_mutex_var = PTHREAD_ONCE_INIT;
163
164 // handler to initialize mutexes one time in multi threaded env
zsys_initialize_mutex()165 static void zsys_initialize_mutex() {
166 ZMUTEX_INIT (s_mutex);
167 ZMUTEX_INIT (s_init_mutex);
168 }
169
170 // handler to detect fork condition and cleanup the stale context inherited from parent process
zsys_pthread_at_fork_handler(void)171 static void zsys_pthread_at_fork_handler(void) {
172 // re-initialize mutexes
173 ZMUTEX_INIT (s_init_mutex);
174 ZMUTEX_INIT (s_mutex);
175 // call cleanup
176 zsys_cleanup();
177 }
178 #endif
179
180 void *
zsys_init(void)181 zsys_init (void)
182 {
183 #if defined (__UNIX__)
184 //To avoid two inits at same time
185 pthread_once(&init_all_mutex_var, zsys_initialize_mutex);
186
187 if (s_initialized) {
188 assert (s_process_ctx);
189 return s_process_ctx;
190 }
191
192 ZMUTEX_LOCK (s_init_mutex);
193 #endif
194
195 // Doing this again here... to ensure that after mutex wait if the thread 2 gets execution, it will
196 // will get the context right away
197 if (s_initialized) {
198 assert (s_process_ctx);
199 #if defined (__UNIX__)
200 // unlock the mutex before returning the context
201 ZMUTEX_UNLOCK(s_init_mutex);
202 #endif
203 return s_process_ctx;
204 }
205
206 // Pull process defaults from environment
207 if (getenv ("ZSYS_IO_THREADS"))
208 s_io_threads = atoi (getenv ("ZSYS_IO_THREADS"));
209
210 if (getenv ("ZSYS_MAX_SOCKETS"))
211 s_max_sockets = atoi (getenv ("ZSYS_MAX_SOCKETS"));
212
213 if (getenv ("ZSYS_MAX_MSGSZ"))
214 s_max_msgsz = atoi (getenv ("ZSYS_MAX_MSGSZ"));
215
216 if (getenv ("ZSYS_ZERO_COPY_RECV"))
217 s_zero_copy_recv = atoi (getenv ("ZSYS_ZERO_COPY_RECV"));
218
219 if (getenv ("ZSYS_FILE_STABLE_AGE_MSEC"))
220 s_file_stable_age_msec = atoi (getenv ("ZSYS_FILE_STABLE_AGE_MSEC"));
221
222 if (getenv ("ZSYS_LINGER"))
223 s_linger = atoi (getenv ("ZSYS_LINGER"));
224
225 if (getenv ("ZSYS_SNDHWM"))
226 s_sndhwm = atoi (getenv ("ZSYS_SNDHWM"));
227
228 if (getenv ("ZSYS_RCVHWM"))
229 s_rcvhwm = atoi (getenv ("ZSYS_RCVHWM"));
230
231 if (getenv ("ZSYS_PIPEHWM"))
232 s_pipehwm = atoi (getenv ("ZSYS_PIPEHWM"));
233
234 if (getenv ("ZSYS_IPV6"))
235 s_ipv6 = atoi (getenv ("ZSYS_IPV6"));
236
237 if (getenv ("ZSYS_LOGSTREAM")) {
238 if (streq (getenv ("ZSYS_LOGSTREAM"), "stdout"))
239 s_logstream = stdout;
240 else
241 if (streq (getenv ("ZSYS_LOGSTREAM"), "stderr"))
242 s_logstream = stderr;
243 }
244 else
245 s_logstream = stdout;
246
247 if (getenv ("ZSYS_LOGSYSTEM")) {
248 if (streq (getenv ("ZSYS_LOGSYSTEM"), "true"))
249 s_logsystem = true;
250 else
251 if (streq (getenv ("ZSYS_LOGSYSTEM"), "false"))
252 s_logsystem = false;
253 }
254
255 if (getenv ("ZSYS_AUTO_USE_FD"))
256 s_auto_use_fd = atoi (getenv ("ZSYS_AUTO_USE_FD"));
257
258 zsys_catch_interrupts ();
259
260 #if defined (__WINDOWS__)
261 ZMUTEX_INIT (s_mutex);
262 #endif
263 s_sockref_list = zlist_new ();
264 if (!s_sockref_list) {
265 zsys_shutdown ();
266 #if defined (__UNIX__)
267 ZMUTEX_UNLOCK(s_init_mutex);
268 #endif
269 return NULL;
270 }
271 srandom ((unsigned) time (NULL));
272
273 assert (!s_process_ctx);
274 // We use zmq_init/zmq_term to keep compatibility back to ZMQ v2
275 s_process_ctx = zmq_init ((int) s_io_threads);
276 #if defined (ZMQ_MAX_SOCKETS)
277 zmq_ctx_set (s_process_ctx, ZMQ_MAX_SOCKETS, (int) s_max_sockets);
278 #endif
279 s_initialized = true;
280
281 #if defined (__UNIX__)
282 atexit (zsys_shutdown);
283 pthread_atfork(NULL, NULL, &zsys_pthread_at_fork_handler);
284 //don't hold the lock because some of the function will call zsys_init again
285 ZMUTEX_UNLOCK(s_init_mutex);
286 #endif
287
288 // The following functions call zsys_init(), so they MUST be called after
289 // s_initialized is set in order to avoid an infinite recursion
290 if (getenv ("ZSYS_INTERFACE"))
291 zsys_set_interface (getenv ("ZSYS_INTERFACE"));
292
293 if (getenv ("ZSYS_IPV6_ADDRESS"))
294 zsys_set_ipv6_address (getenv ("ZSYS_IPV6_ADDRESS"));
295
296 if (getenv ("ZSYS_IPV6_MCAST_ADDRESS"))
297 zsys_set_ipv6_mcast_address (getenv ("ZSYS_IPV6_MCAST_ADDRESS"));
298 else
299 zsys_set_ipv6_mcast_address ("ff02:0:0:0:0:0:0:1");
300
301 if (getenv ("ZSYS_IPV4_MCAST_ADDRESS"))
302 zsys_set_ipv4_mcast_address (getenv ("ZSYS_IPV4_MCAST_ADDRESS"));
303 else
304 zsys_set_ipv4_mcast_address (NULL);
305
306
307 if (getenv ("ZSYS_LOGIDENT"))
308 zsys_set_logident (getenv ("ZSYS_LOGIDENT"));
309
310 if (getenv ("ZSYS_LOGSENDER"))
311 zsys_set_logsender (getenv ("ZSYS_LOGSENDER"));
312
313 zsys_set_max_msgsz (s_max_msgsz);
314
315 #if defined ZMQ_ZERO_COPY_RECV
316 zmq_ctx_set (s_process_ctx, ZMQ_ZERO_COPY_RECV, s_zero_copy_recv);
317 #endif
318
319 zsys_set_file_stable_age_msec (s_file_stable_age_msec);
320
321 if (getenv ("ZSYS_THREAD_PRIORITY"))
322 zsys_set_thread_priority (atoi (getenv ("ZSYS_THREAD_PRIORITY")));
323 else
324 zsys_set_thread_priority (s_thread_priority);
325
326 if (getenv ("ZSYS_THREAD_SCHED_POLICY"))
327 zsys_set_thread_sched_policy (atoi (getenv ("ZSYS_THREAD_SCHED_POLICY")));
328 else
329 zsys_set_thread_sched_policy (s_thread_sched_policy);
330
331 if (getenv ("ZSYS_THREAD_NAME_PREFIX"))
332 zsys_set_thread_name_prefix (atoi (getenv ("ZSYS_THREAD_NAME_PREFIX")));
333 else
334 zsys_set_thread_name_prefix (s_thread_name_prefix);
335
336 if (getenv ("ZSYS_THREAD_NAME_PREFIX_STR"))
337 zsys_set_thread_name_prefix_str (getenv ("ZSYS_THREAD_NAME_PREFIX"));
338 else
339 zsys_set_thread_name_prefix_str (s_thread_name_prefix_str);
340
341 return s_process_ctx;
342 }
343
344 // atexit or manual termination for the process
345 void
zsys_shutdown(void)346 zsys_shutdown (void)
347 {
348 if (!s_initialized || s_shutting_down)
349 return;
350
351 s_shutting_down = true;
352
353 // The atexit handler is called when the main function exits;
354 // however we may have zactor threads shutting down and still
355 // trying to close their sockets. So if we suspect there are
356 // actors busy (s_open_sockets > 0), then we sleep for a few
357 // hundred milliseconds to allow the actors, if any, to get in
358 // and close their sockets.
359 ZMUTEX_LOCK (s_mutex);
360 size_t busy = s_open_sockets;
361 ZMUTEX_UNLOCK (s_mutex);
362 if (busy)
363 zclock_sleep (200);
364
365 // No matter, we are now going to shut down
366 // Print the source reference for any sockets the app did not
367 // destroy properly.
368 ZMUTEX_LOCK (s_mutex);
369 s_sockref_t *sockref = (s_sockref_t *) zlist_pop (s_sockref_list);
370 while (sockref) {
371 assert (sockref->filename);
372 zsys_error ("[%d]dangling '%s' socket created at %s:%d",
373 getpid (),
374 zsys_sockname (sockref->type),
375 sockref->filename, (int) sockref->line_nbr);
376 zmq_close (sockref->handle);
377 freen (sockref);
378 sockref = (s_sockref_t *) zlist_pop (s_sockref_list);
379 --s_open_sockets;
380 }
381 zlist_destroy (&s_sockref_list);
382 ZMUTEX_UNLOCK (s_mutex);
383
384 // Close logsender socket if opened (don't do this in critical section)
385 if (s_logsender)
386 zsock_destroy (&s_logsender);
387
388 if (s_open_sockets == 0)
389 {
390 zmq_term(s_process_ctx);
391 s_process_ctx = NULL;
392 s_io_threads = 1;
393 s_thread_sched_policy = -1;
394 s_thread_priority = -1;
395 s_thread_name_prefix = -1;
396 strcpy (s_thread_name_prefix_str, "0");
397 s_max_sockets = 1024;
398 s_max_msgsz = INT_MAX;
399 s_file_stable_age_msec = S_DEFAULT_ZSYS_FILE_STABLE_AGE_MSEC;
400 s_linger = 0;
401 s_sndhwm = 1000;
402 s_rcvhwm = 1000;
403 s_pipehwm = 1000;
404 s_ipv6 = 0;
405 s_auto_use_fd = 0;
406 s_logstream = NULL;
407 s_logsystem = false;
408 }
409 else
410 zsys_error ("dangling sockets: cannot terminate ZMQ safely");
411
412 ZMUTEX_DESTROY (s_mutex);
413
414 // Free dynamically allocated properties
415 freen (s_interface);
416 freen (s_ipv6_address);
417 freen (s_ipv6_mcast_address);
418 freen (s_logident);
419
420 zsys_interrupted = 0;
421 zctx_interrupted = 0;
422
423 zsys_handler_reset ();
424
425 #if defined (__UNIX__)
426 closelog (); // Just to be pedantic
427 #endif
428
429 s_initialized = false;
430 s_shutting_down = false;
431 }
432
433 #if defined (__UNIX__)
434 // Restores all CZMQ global state to initial values
435 static void
zsys_cleanup(void)436 zsys_cleanup (void)
437 {
438 ZMUTEX_LOCK (s_init_mutex);
439 s_process_ctx = NULL;
440 zsys_interrupted = 0;
441 zctx_interrupted = 0;
442
443 s_first_time = true;
444 handle_signals = true;
445
446 s_initialized = false;
447
448 s_io_threads = 1;
449 s_max_sockets = 1024;
450 s_max_msgsz = INT_MAX;
451 s_linger = 0;
452 s_sndhwm = 1000;
453 s_rcvhwm = 1000;
454 s_pipehwm = 1000;
455 s_ipv6 = 0;
456 s_interface = NULL;
457 s_ipv6_address = NULL;
458 s_ipv6_mcast_address = NULL;
459 s_auto_use_fd = 0;
460 s_logident = NULL;
461 s_logstream = NULL;
462 s_logsystem = false;
463 s_logsender = NULL;
464
465 s_open_sockets = 0;
466 ZMUTEX_UNLOCK (s_init_mutex);
467 }
468 #endif
469
470 // --------------------------------------------------------------------------
471 // Get a new ZMQ socket, automagically creating a ZMQ context if this is
472 // the first time. Caller is responsible for destroying the ZMQ socket
473 // before process exits, to avoid a ZMQ deadlock. Note: you should not use
474 // this method in CZMQ apps, use zsock_new() instead. This is for system
475 // use only, really.
476
477 void *
zsys_socket(int type,const char * filename,size_t line_nbr)478 zsys_socket (int type, const char *filename, size_t line_nbr)
479 {
480 // First time initialization; if the application is mixing
481 // its own threading calls with zsock, this may fail if two
482 // threads try to create sockets at the same time. In such
483 // apps, they MUST create a socket in the main program before
484 // starting any threads. If the app uses zactor for its threads
485 // then we can guarantee this to always be safe.
486 zsys_init ();
487 ZMUTEX_LOCK (s_mutex);
488 void *handle = zmq_socket (s_process_ctx, type);
489 if (handle) {
490 // Configure socket with process defaults
491 zsock_set_linger (handle, (int) s_linger);
492 #if (ZMQ_VERSION_MAJOR == 2)
493 // TODO: v2/v3 socket api in zsock_option.inc are not public (not
494 // added to include/zsock.h) so we have to use zmq_setsockopt directly
495 // This should be fixed and zsock_set_hwm should be used instead
496 # if defined (ZMQ_HWM)
497 uint64_t value = s_sndhwm;
498 int rc = zmq_setsockopt (handle, ZMQ_HWM, &value, sizeof (uint64_t));
499 assert (rc == 0 || zmq_errno () == ETERM);
500 # endif
501 #else
502 // For later versions we use separate SNDHWM and RCVHWM
503 zsock_set_sndhwm (handle, (int) s_sndhwm);
504 zsock_set_rcvhwm (handle, (int) s_rcvhwm);
505 # if defined (ZMQ_IPV6)
506 zsock_set_ipv6 (handle, s_ipv6);
507 # else
508 zsock_set_ipv4only (handle, s_ipv6? 0: 1);
509 # endif
510 #endif
511 // Add socket to reference tracker so we can report leaks; this is
512 // done only when the caller passes a filename/line_nbr
513 if (filename) {
514 s_sockref_t *sockref = (s_sockref_t *) zmalloc (sizeof (s_sockref_t));
515 if (sockref) {
516 sockref->handle = handle;
517 sockref->type = type;
518 sockref->filename = filename;
519 sockref->line_nbr = line_nbr;
520 zlist_append (s_sockref_list, sockref);
521 }
522 else {
523 zmq_close (handle);
524 ZMUTEX_UNLOCK (s_mutex);
525 return NULL;
526 }
527 }
528 s_open_sockets++;
529 }
530 ZMUTEX_UNLOCK (s_mutex);
531 return handle;
532 }
533
534 // --------------------------------------------------------------------------
535 // Destroy/close a ZMQ socket. You should call this for every socket you
536 // create using zsys_socket().
537
538 int
zsys_close(void * handle,const char * filename,size_t line_nbr)539 zsys_close (void *handle, const char *filename, size_t line_nbr)
540 {
541 ZMUTEX_LOCK (s_mutex);
542 // It's possible atexit() has already happened if we're running under
543 // a debugger that redirects the main thread exit.
544 if (s_sockref_list) {
545 s_sockref_t *sockref = (s_sockref_t *) zlist_first (s_sockref_list);
546 while (sockref) {
547 if (sockref->handle == handle) {
548 zlist_remove (s_sockref_list, sockref);
549 freen (sockref);
550 break;
551 }
552 sockref = (s_sockref_t *) zlist_next (s_sockref_list);
553 }
554 }
555 s_open_sockets--;
556 zmq_close (handle);
557 ZMUTEX_UNLOCK (s_mutex);
558 return 0;
559 }
560
561
562 // --------------------------------------------------------------------------
563 // Return ZMQ socket name for socket type
564
565 char *
zsys_sockname(int socktype)566 zsys_sockname (int socktype)
567 {
568 char *type_names [] = {
569 "PAIR", "PUB", "SUB", "REQ", "REP",
570 "DEALER", "ROUTER", "PULL", "PUSH",
571 "XPUB", "XSUB", "STREAM",
572 "SERVER", "CLIENT",
573 "RADIO", "DISH",
574 "SCATTER", "GATHER", "DGRAM"
575 };
576 // This array matches ZMQ_XXX type definitions
577 assert (ZMQ_PAIR == 0);
578 #if defined (ZMQ_DGRAM)
579 assert (socktype >= 0 && socktype <= ZMQ_DGRAM);
580 #elif defined (ZMQ_SCATTER)
581 assert (socktype >= 0 && socktype <= ZMQ_SCATTER);
582 #elif defined (ZMQ_DISH)
583 assert (socktype >= 0 && socktype <= ZMQ_DISH);
584 #elif defined (ZMQ_CLIENT)
585 assert (socktype >= 0 && socktype <= ZMQ_CLIENT);
586 #elif defined (ZMQ_STREAM)
587 assert (socktype >= 0 && socktype <= ZMQ_STREAM);
588 #else
589 assert (socktype >= 0 && socktype <= ZMQ_XSUB);
590 #endif
591 return type_names [socktype];
592 }
593
594
595 // --------------------------------------------------------------------------
596 // Create a pipe, which consists of two PAIR sockets connected over inproc.
597 // The pipe is configured to use the zsys_pipehwm setting. Returns the
598 // frontend socket successful, NULL if failed.
599
600 zsock_t *
zsys_create_pipe(zsock_t ** backend_p)601 zsys_create_pipe (zsock_t **backend_p)
602 {
603 zsock_t *frontend = zsock_new (ZMQ_PAIR);
604 zsock_t *backend = zsock_new (ZMQ_PAIR);
605 assert (frontend);
606 assert (backend);
607
608 #if (ZMQ_VERSION_MAJOR == 2)
609 // TODO: v2/v3 socket api in zsock_option.inc are not public (not
610 // added to include/zsock.h) so we have to use zmq_setsockopt directly
611 // This should be fixed and zsock_set_hwm should be used instead
612 # if defined (ZMQ_HWM)
613 uint64_t value = zsys_pipehwm ();
614 int ret = zmq_setsockopt (zsock_resolve (frontend), ZMQ_HWM, &value,
615 sizeof (uint64_t));
616 assert (ret == 0 || zmq_errno () == ETERM);
617 value = zsys_pipehwm ();
618 ret = zmq_setsockopt (zsock_resolve (backend), ZMQ_HWM, &value,
619 sizeof (uint64_t));
620 assert (ret == 0 || zmq_errno () == ETERM);
621 # endif
622 #else
623 zsock_set_sndhwm (frontend, (int) zsys_pipehwm ());
624 zsock_set_sndhwm (backend, (int) zsys_pipehwm ());
625 #endif
626 // Now bind and connect pipe ends
627 char endpoint [32];
628 while (true) {
629 sprintf (endpoint, "inproc://pipe-%04x-%04x",
630 randof (0x10000), randof (0x10000));
631 if (zsock_bind (frontend, "%s", endpoint) == 0)
632 break;
633 }
634 int rc = zsock_connect (backend, "%s", endpoint);
635 assert (rc != -1); // Connect cannot fail
636
637 // Return frontend and backend sockets
638 *backend_p = backend;
639 return frontend;
640 }
641
642
643 // --------------------------------------------------------------------------
644 // Set interrupt handler; this saves the default handlers so that a
645 // zsys_handler_reset () can restore them. If you call this multiple times
646 // then the last handler will take affect. If handler_fn is NULL, disables
647 // default SIGINT/SIGTERM handling in CZMQ.
648
649 void
zsys_handler_set(zsys_handler_fn * handler_fn)650 zsys_handler_set (zsys_handler_fn *handler_fn)
651 {
652 if (!handler_fn) {
653 // Disable existing or future signal handling
654 zsys_handler_reset ();
655 handle_signals = false;
656 }
657 else {
658 handle_signals = true;
659 #if defined (__UNIX__)
660 if (s_first_time) {
661 // If first time, save default handlers
662 sigaction (SIGINT, NULL, &sigint_default);
663 sigaction (SIGTERM, NULL, &sigterm_default);
664 s_first_time = false;
665 }
666 // Install signal handler for SIGINT and SIGTERM
667 struct sigaction action;
668 action.sa_handler = handler_fn;
669 action.sa_flags = 0;
670 sigemptyset (&action.sa_mask);
671 sigaction (SIGINT, &action, NULL);
672 sigaction (SIGTERM, &action, NULL);
673 #elif defined (__WINDOWS__)
674 installed_handler_fn = handler_fn;
675 if (s_first_time) {
676 SetConsoleCtrlHandler (s_handler_fn_shim, TRUE);
677 s_first_time = false;
678 }
679 #else
680 # error "No signal handling defined for this platform"
681 #endif
682 }
683 }
684
685
686 // --------------------------------------------------------------------------
687 // Reset interrupt handler, call this at exit if needed
688 // Idempotent; safe to call multiple times
689
690 void
zsys_handler_reset(void)691 zsys_handler_reset (void)
692 {
693 #if defined (__UNIX__)
694 // Restore default handlers if not already done
695 if (handle_signals && !s_first_time) {
696 sigaction (SIGINT, &sigint_default, NULL);
697 sigaction (SIGTERM, &sigterm_default, NULL);
698 sigint_default.sa_handler = NULL;
699 sigterm_default.sa_handler = NULL;
700 s_first_time = true;
701 }
702 #elif defined (__WINDOWS__)
703 if (handle_signals && !s_first_time) {
704 SetConsoleCtrlHandler (s_handler_fn_shim, FALSE);
705 installed_handler_fn = NULL;
706 s_first_time = true;
707 }
708 #endif
709 }
710
711
712 // --------------------------------------------------------------------------
713 // Set default interrupt handler, so Ctrl-C or SIGTERM will set
714 // zsys_interrupted. Idempotent; safe to call multiple times.
715
716 void
zsys_catch_interrupts(void)717 zsys_catch_interrupts (void)
718 {
719 // Catch SIGINT and SIGTERM unless ZSYS_SIGHANDLER=false
720 if ((getenv ("ZSYS_SIGHANDLER") == NULL
721 || strneq (getenv ("ZSYS_SIGHANDLER"), "false"))
722 && handle_signals)
723 zsys_handler_set (s_signal_handler);
724 }
725
726 // Default internal signal handler
727 static void
s_signal_handler(int signal_value)728 s_signal_handler (int signal_value)
729 {
730 zctx_interrupted = 1;
731 zsys_interrupted = 1;
732 }
733
734
735 // --------------------------------------------------------------------------
736 // Return true if file exists, else zero
737
738 bool
zsys_file_exists(const char * filename)739 zsys_file_exists (const char *filename)
740 {
741 assert (filename);
742 return zsys_file_mode (filename) != -1;
743 }
744
745
746 // --------------------------------------------------------------------------
747 // Return size of file, or -1 if not found
748
749 ssize_t
zsys_file_size(const char * filename)750 zsys_file_size (const char *filename)
751 {
752 struct stat
753 stat_buf;
754
755 assert (filename);
756 if (stat ((char *) filename, &stat_buf) == 0)
757 return stat_buf.st_size;
758 else
759 return -1;
760 }
761
762
763 // --------------------------------------------------------------------------
764 // Return file modification time (accounted in seconds usually since
765 // UNIX Epoch, with granularity dependent on underlying filesystem,
766 // and starting point dependent on host OS and maybe its bitness).
767 // Per https://msdn.microsoft.com/en-us/library/w4ddyt9h(vs.71).aspx :
768 // Note In all versions of Microsoft C/C++ except Microsoft C/C++
769 // version 7.0, and in all versions of Microsoft Visual C++, the time
770 // function returns the current time as the number of seconds elapsed
771 // since midnight on January 1, 1970. In Microsoft C/C++ version 7.0,
772 // time() returned the current time as the number of seconds elapsed
773 // since midnight on December 31, 1899.
774 // This value is "arithmetic" with no big guarantees in the standards, and
775 // normally it should be manipulated with host's datetime suite of routines,
776 // including difftime(), or converted to "struct tm" for any predictable use.
777 // Returns 0 if the file does not exist.
778
779 time_t
zsys_file_modified(const char * filename)780 zsys_file_modified (const char *filename)
781 {
782 struct stat stat_buf;
783 if (stat (filename, &stat_buf) == 0)
784 return stat_buf.st_mtime;
785 else
786 return 0;
787 }
788
789
790 // --------------------------------------------------------------------------
791 // Return file mode; provides at least support for the POSIX S_ISREG(m)
792 // and S_ISDIR(m) macros and the S_IRUSR and S_IWUSR bits, on all boxes.
793 // Returns a mode_t cast to int, or -1 in case of error.
794
795 int
zsys_file_mode(const char * filename)796 zsys_file_mode (const char *filename)
797 {
798 #if (defined (__WINDOWS__))
799 DWORD dwfa = GetFileAttributesA (filename);
800 if (dwfa == 0xffffffff)
801 return -1;
802
803 dbyte mode = 0;
804 if (dwfa & FILE_ATTRIBUTE_DIRECTORY)
805 mode |= S_IFDIR;
806 else
807 mode |= S_IFREG;
808
809 if (!(dwfa & FILE_ATTRIBUTE_HIDDEN))
810 mode |= S_IRUSR;
811 if (!(dwfa & FILE_ATTRIBUTE_READONLY))
812 mode |= S_IWUSR;
813
814 return mode;
815 #else
816 struct stat stat_buf;
817 if (stat ((char *) filename, &stat_buf) == 0)
818 return stat_buf.st_mode;
819 else
820 return -1;
821 #endif
822 }
823
824
825 // --------------------------------------------------------------------------
826 // Delete file, return 0 if OK, -1 if not possible.
827
828 int
zsys_file_delete(const char * filename)829 zsys_file_delete (const char *filename)
830 {
831 assert (filename);
832 #if (defined (__WINDOWS__))
833 return DeleteFileA (filename)? 0: -1;
834 #else
835 return unlink (filename);
836 #endif
837 }
838
839
840 // --------------------------------------------------------------------------
841 // Check if file is 'stable'
842
843 // Internal implementation rigged with debugs and called from selftest
844 static bool
s_zsys_file_stable(const char * filename,bool verbose)845 s_zsys_file_stable (const char *filename, bool verbose)
846 {
847 struct stat stat_buf;
848 if (stat (filename, &stat_buf) == 0) {
849 // File is 'stable' if older (per filesystem stats) than a threshold.
850 // This used to mean more than 1 second old, counted in microseconds
851 // after inflating the st_mtime data - but this way of calculation
852 // has a caveat: if we created the file at Nsec.999msec, or rather
853 // the FS metadata was updated at that point, the st_mtime will be
854 // (after inflation) N.000. So a few milliseconds later, at (N+1)sec,
855 // we find the age difference seems over 1000 so the file is 1 sec
856 // old - even though it has barely been created. Compounding the
857 // issue, some filesystems have worse timestamp precision - e.g. the
858 // FAT filesystem variants are widespread (per SD standards) on
859 // removable media, and only account even seconds in stat data.
860 // Solutions are two-fold: when using stat fields that are precise
861 // to a second (or inpredictably two), we should actually check for
862 // (age > 3000+) in rounded-microsecond accounting. Also, for some
863 // systems we can have `configure`-time checks on presence of more
864 // precise (and less standardized) stat timestamp fields, where we
865 // can presumably avoid rounding to thousands and use (age > 2000).
866 // It might also help to define a zsys_file_modified_msec() whose
867 // actual granularity will be OS-dependent (rounded to 1000 or not).
868 // These are TODO ideas for subsequent work.
869
870 #if (defined (WIN32))
871 # define EPOCH_DIFFERENCE 11644473600LL
872 long age = (long) (zclock_time () - EPOCH_DIFFERENCE * 1000 - (stat_buf.st_mtime * 1000));
873 if (verbose)
874 zsys_debug ("zsys_file_stable@WIN32: file '%s' age is %ld msec "
875 "at timestamp %" PRIi64 " where st_mtime was %jd adjusted by %jd",
876 filename, age, zclock_time (), (intmax_t)(stat_buf.st_mtime * 1000),
877 (intmax_t)(EPOCH_DIFFERENCE * 1000) );
878 #else
879 long age = (long) (zclock_time () - (stat_buf.st_mtime * 1000));
880 if (verbose)
881 zsys_debug ("zsys_file_stable@non-WIN32: file '%s' age is %ld msec "
882 "at timestamp %" PRIi64 " where st_mtime was %jd",
883 filename, age, zclock_time (), (intmax_t)(stat_buf.st_mtime * 1000) );
884 #endif
885 return (age > s_file_stable_age_msec);
886 }
887 else {
888 if (verbose)
889 zsys_debug ("zsys_file_stable: could not stat file '%s'", filename);
890 return false; // File doesn't exist, so not stable
891 }
892 }
893
894 // Public implementation does not do debugs
895 bool
zsys_file_stable(const char * filename)896 zsys_file_stable (const char *filename)
897 {
898 return s_zsys_file_stable(filename, false);
899 }
900
901 // --------------------------------------------------------------------------
902 // Create a file path if it doesn't exist. The file path is treated as a
903 // printf format.
904
905 int
zsys_dir_create(const char * pathname,...)906 zsys_dir_create (const char *pathname, ...)
907 {
908 va_list argptr;
909 va_start (argptr, pathname);
910 char *formatted = zsys_vprintf (pathname, argptr);
911 va_end (argptr);
912 if (!formatted)
913 return -1;
914
915 // Create parent directory levels if needed
916 char *slash = strchr (formatted + 1, '/');
917 while (true) {
918 if (slash)
919 *slash = 0; // Cut at slash
920 int mode = zsys_file_mode (formatted);
921 if (mode == -1) {
922 // Does not exist, try to create it
923 #if (defined (__WINDOWS__))
924 if (!CreateDirectoryA (formatted, NULL)) {
925 #else
926 if (mkdir (formatted, 0775)) {
927 #endif
928 freen (formatted);
929 return -1; // Failed
930 }
931 }
932 else
933 if ((mode & S_IFDIR) == 0) {
934 // Not a directory, abort
935 }
936 if (!slash) // End if last segment
937 break;
938 *slash = '/';
939 slash = strchr (slash + 1, '/');
940 }
941 zstr_free (&formatted);
942 return 0;
943 }
944
945
946 // --------------------------------------------------------------------------
947 // Remove a file path if empty; the pathname is treated as printf format.
948
949 int
950 zsys_dir_delete (const char *pathname, ...)
951 {
952 va_list argptr;
953 va_start (argptr, pathname);
954 char *formatted = zsys_vprintf (pathname, argptr);
955 va_end (argptr);
956 if (!formatted)
957 return -1;
958
959 #if (defined (__WINDOWS__))
960 int rc = RemoveDirectoryA (formatted)? 0: -1;
961 #else
962 int rc = rmdir (formatted);
963 #endif
964 zstr_free (&formatted);
965 return rc;
966 }
967
968
969 // --------------------------------------------------------------------------
970 // Move to a specified working directory. Returns 0 if OK, -1 if this failed.
971
972 int
973 zsys_dir_change (const char *pathname)
974 {
975 assert (pathname);
976 #if (defined (__UNIX__))
977 return chdir (pathname);
978 #elif (defined (__WINDOWS__))
979 return !SetCurrentDirectoryA (pathname);
980 #endif
981 return -1; // Not implemented
982 }
983
984
985 // --------------------------------------------------------------------------
986 // Set private file creation mode; all files created from here will be
987 // readable/writable by the owner only.
988
989 #if !defined (__WINDOWS__)
990 static mode_t s_old_mask = 0;
991 #endif
992
993 void
994 zsys_file_mode_private (void)
995 {
996 #if !defined (__WINDOWS__)
997 s_old_mask = umask (S_IWGRP | S_IWOTH | S_IRGRP | S_IROTH);
998 #endif
999 }
1000
1001
1002 // --------------------------------------------------------------------------
1003 // Reset default file creation mode; all files created from here will use
1004 // process file mode defaults.
1005
1006 void
1007 zsys_file_mode_default (void)
1008 {
1009 // Reset process file create mask
1010 #if !defined (__WINDOWS__)
1011 if (s_old_mask)
1012 umask (s_old_mask);
1013 #endif
1014 }
1015
1016
1017 // --------------------------------------------------------------------------
1018 // Return the CZMQ version for run-time API detection; returns version
1019 // number into provided fields, providing reference isn't null in each case.
1020
1021 void
1022 zsys_version (int *major, int *minor, int *patch)
1023 {
1024 if (major)
1025 *major = CZMQ_VERSION_MAJOR;
1026 if (minor)
1027 *minor = CZMQ_VERSION_MINOR;
1028 if (patch)
1029 *patch = CZMQ_VERSION_PATCH;
1030 }
1031
1032
1033 // --------------------------------------------------------------------------
1034 // Format a string using printf formatting, returning a freshly allocated
1035 // buffer. If there was insufficient memory, returns NULL. Free the returned
1036 // string using zstr_free(). The hinted version allows to optimize by using
1037 // a larger starting buffer size (known to/assumed by the developer) and so
1038 // avoid reallocations.
1039
1040 char *
1041 zsys_sprintf_hint (int hint, const char *format, ...)
1042 {
1043 va_list argptr;
1044 va_start (argptr, format);
1045 char *string = s_zsys_vprintf_hint (hint, format, argptr);
1046 va_end (argptr);
1047 return (string);
1048 }
1049
1050 // --------------------------------------------------------------------------
1051 // Format a string using printf formatting, returning a freshly allocated
1052 // buffer. If there was insufficient memory, returns NULL. Free the returned
1053 // string using zstr_free().
1054
1055 char *
1056 zsys_sprintf (const char *format, ...)
1057 {
1058 // Effectively this is a copy of the small zsys_sprintf_hint with
1059 // hardcoded hint value; this is is cheaper than parsing va_list
1060 // several times to just call the other implementation cleanly.
1061 va_list argptr;
1062 va_start (argptr, format);
1063 char *string = s_zsys_vprintf_hint (256, format, argptr);
1064 va_end (argptr);
1065 return (string);
1066 }
1067
1068 // --------------------------------------------------------------------------
1069 // Format a string with variable arguments, returning a freshly allocated
1070 // buffer. If there was insufficient memory, returns NULL. Free the returned
1071 // string using zstr_free().
1072
1073 static inline
1074 char *
1075 s_zsys_vprintf_hint (int hint, const char *format, va_list argptr)
1076 {
1077 if (hint <= 0) {
1078 // The hint is not a hard requrement so no error here.
1079 // Just fall back to legacy default.
1080 hint = 256;
1081 }
1082 // Must use int "size" to compare to "required" (from vsnprintf)
1083 int size = hint;
1084 char *string = (char *) malloc (size);
1085 if (!string)
1086 return NULL;
1087
1088 // Using argptr is destructive, so we take a copy each time we need it
1089 // We define va_copy for Windows in czmq_prelude.h
1090 va_list my_argptr;
1091 va_copy (my_argptr, argptr);
1092 int required = vsnprintf (string, size, format, my_argptr);
1093 va_end (my_argptr);
1094 #ifdef __WINDOWS__
1095 if (required < 0 || required >= size) {
1096 va_copy (my_argptr, argptr);
1097 #ifdef _MSC_VER
1098 required = _vscprintf (format, argptr);
1099 #else
1100 required = vsnprintf (NULL, 0, format, argptr);
1101 #endif
1102 va_end (my_argptr);
1103 }
1104 #endif
1105 if (required < 0) {
1106 // vsnprintf failed at unknown point; at least prohibit
1107 // accesses by string consumers into random memory
1108 string [hint - 1] = '\0';
1109 return string;
1110 }
1111 // If formatted string cannot fit into small string, reallocate a
1112 // larger buffer for it. If it did fit, we only called vsnprintf()
1113 // once and already have the good result, so pre-counting with a
1114 // NULL string would not be beneficial for shorter texts (always
1115 // calling vsnprintf() twice then).
1116 if (required >= hint) {
1117 size = required + 1;
1118 freen (string);
1119 string = (char *) malloc (size);
1120 if (string) {
1121 va_copy (my_argptr, argptr);
1122 vsnprintf (string, size, format, my_argptr);
1123 va_end (my_argptr);
1124 }
1125 }
1126 return string;
1127 }
1128
1129 char *
1130 zsys_vprintf (const char *format, va_list argptr)
1131 {
1132 return (s_zsys_vprintf_hint(256, format, argptr));
1133 }
1134
1135 // --------------------------------------------------------------------------
1136 // Create a UDP beacon socket; if the routable option is true, uses
1137 // multicast (not yet implemented), else uses broadcast. This method
1138 // and related ones might _eventually_ be moved to a zudp class.
1139
1140 SOCKET
1141 zsys_udp_new (bool routable)
1142 {
1143 //IPV6 Multicast not implemented yet so only allow routable if IPv4
1144 assert ((routable && !zsys_ipv6 ()) || !routable);
1145 SOCKET udpsock;
1146 int type = SOCK_DGRAM;
1147 #ifdef CZMQ_HAVE_SOCK_CLOEXEC
1148 // Ensure socket is closed by exec() functions.
1149 type |= SOCK_CLOEXEC;
1150 #endif
1151
1152 if (zsys_ipv6 ())
1153 udpsock = socket (AF_INET6, type, IPPROTO_UDP);
1154 else
1155 udpsock = socket (AF_INET, type, IPPROTO_UDP);
1156 if (udpsock == INVALID_SOCKET) {
1157 zsys_socket_error ("socket");
1158 return INVALID_SOCKET;
1159 }
1160
1161 // On Windows, preventing sockets to be inherited by child processes.
1162 #if defined (__WINDOWS__) && defined (HANDLE_FLAG_INHERIT)
1163 if (!SetHandleInformation ((HANDLE) udpsock, HANDLE_FLAG_INHERIT, 0))
1164 zsys_socket_error ("SetHandleInformation (HANDLE_FLAG_INHERIT)");
1165 #endif
1166
1167 // Ask operating system for broadcast permissions on socket
1168 int on = 1;
1169 if (setsockopt (udpsock, SOL_SOCKET, SO_BROADCAST,
1170 (char *) &on, sizeof (on)) == SOCKET_ERROR)
1171 zsys_socket_error ("setsockopt (SO_BROADCAST)");
1172
1173 // Allow multiple owners to bind to socket; incoming
1174 // messages will replicate to each owner
1175 if (setsockopt (udpsock, SOL_SOCKET, SO_REUSEADDR,
1176 (char *) &on, sizeof (on)) == SOCKET_ERROR)
1177 zsys_socket_error ("setsockopt (SO_REUSEADDR)");
1178
1179 #if defined (SO_REUSEPORT)
1180 // On some platforms we have to ask to reuse the port
1181 if (setsockopt (udpsock, SOL_SOCKET, SO_REUSEPORT,
1182 (char *) &on, sizeof (on)) == SOCKET_ERROR)
1183 zsys_socket_error ("setsockopt (SO_REUSEPORT)");
1184 #endif
1185
1186 // Only set TLL for IPv4
1187 if (routable && zsys_mcast_ttl () > 1) {
1188 int ttl = zsys_mcast_ttl ();
1189 if (setsockopt (udpsock, IPPROTO_IP, IP_MULTICAST_TTL, (const char *) &ttl,
1190 sizeof (ttl))
1191 == SOCKET_ERROR)
1192 zsys_socket_error ("setsockopt (IP_MULTICAST_TTL)");
1193 }
1194
1195 // TODO
1196 // Set TLL for IPv6
1197 /*if (routable && zsys_ipv6 ()) {
1198 int ttl = zsys_mcast_ttl ();
1199 if (setsockopt (udpsock, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
1200 (char *) &ttl, sizeof (ttl))
1201 == SOCKET_ERROR)
1202 zsys_socket_error ("setsockopt (IP_MULTICAST_TTL)");
1203 }*/
1204 return udpsock;
1205 }
1206
1207
1208 // --------------------------------------------------------------------------
1209 // Close a UDP socket
1210
1211 int
1212 zsys_udp_close (SOCKET handle)
1213 {
1214 #if (defined (__WINDOWS__))
1215 return closesocket (handle);
1216 #else
1217 return close (handle);
1218 #endif
1219 }
1220
1221
1222 // --------------------------------------------------------------------------
1223 // Send zframe to UDP socket, return -1 if sending failed due to
1224 // interface having disappeared (happens easily with WiFi)
1225
1226 int
1227 zsys_udp_send (SOCKET udpsock, zframe_t *frame, inaddr_t *address, int addrlen)
1228 {
1229 assert (frame);
1230 assert (address);
1231
1232 if (sendto (udpsock,
1233 (char *) zframe_data (frame), (int) zframe_size (frame),
1234 0, // Flags
1235 (struct sockaddr *) address, addrlen) == -1) {
1236 zsys_debug ("zsys_udp_send: failed, reason=%s", strerror (errno));
1237 return -1; // UDP broadcast not possible
1238 }
1239 else
1240 return 0;
1241 }
1242
1243
1244 // --------------------------------------------------------------------------
1245 // Receive zframe from UDP socket, and set address of peer that sent it
1246 // The peername must be a char [INET_ADDRSTRLEN] array if IPv6 is disabled or
1247 // NI_MAXHOST if it's enabled. Returns NULL when failing to get peer address.
1248
1249 zframe_t *
1250 zsys_udp_recv (SOCKET udpsock, char *peername, int peerlen)
1251 {
1252 char buffer [UDP_FRAME_MAX];
1253 in6addr_t address6;
1254 socklen_t address_len = sizeof (in6addr_t);
1255 ssize_t size = recvfrom (
1256 udpsock,
1257 buffer, UDP_FRAME_MAX,
1258 0, // Flags
1259 (struct sockaddr *) &address6, &address_len);
1260
1261 if (size == SOCKET_ERROR)
1262 zsys_socket_error ("recvfrom");
1263
1264 // Get sender address as printable string
1265 int rc = getnameinfo ((struct sockaddr *) &address6, address_len,
1266 peername, peerlen, NULL, 0, NI_NUMERICHOST);
1267
1268 if (rc) {
1269 zsys_warning ("zsys_udp_recv: getnameinfo failed, reason=%s",
1270 gai_strerror (rc));
1271 return NULL;
1272 }
1273
1274 // Some platform's getnameinfo, like Solaris, appear not to append the
1275 // interface name when parsing a link-local IPv6 address. These addresses
1276 // cannot be used without the interface, so we must append it manually.
1277 // On Windows, if_indextoname is only available from Vista.
1278 #if !defined (__WINDOWS__) || (_WIN32_WINNT >= 0x0600)
1279 if (address6.sin6_family == AF_INET6 &&
1280 IN6_IS_ADDR_LINKLOCAL (&address6.sin6_addr) &&
1281 !strchr (peername, '%')) {
1282 char ifname [IF_NAMESIZE] = {0};
1283 if_indextoname (address6.sin6_scope_id, ifname);
1284 strcat (peername, "%");
1285 strcat (peername, ifname);
1286 }
1287 #endif
1288
1289 return zframe_new (buffer, size);
1290 }
1291
1292
1293 // --------------------------------------------------------------------------
1294 // Handle an I/O error on some socket operation; will report and die on
1295 // fatal errors, and continue silently on "try again" errors.
1296
1297 void
1298 zsys_socket_error (const char *reason)
1299 {
1300 bool check_errno;
1301
1302 #if defined (__WINDOWS__)
1303 switch (WSAGetLastError ()) {
1304 case WSAEINTR: errno = EINTR; break;
1305 case WSAEBADF: errno = EBADF; break;
1306 case WSAEWOULDBLOCK: errno = EAGAIN; break;
1307 case WSAEINPROGRESS: errno = EAGAIN; break;
1308 case WSAENETDOWN: errno = ENETDOWN; break;
1309 case WSAECONNRESET: errno = ECONNRESET; break;
1310 case WSAECONNABORTED: errno = EPIPE; break;
1311 case WSAESHUTDOWN: errno = ECONNRESET; break;
1312 case WSAEINVAL: errno = EPIPE; break;
1313 case WSAEADDRNOTAVAIL: errno = EADDRNOTAVAIL; break;
1314 case WSAEADDRINUSE: errno = EADDRINUSE; break;
1315 default: errno = GetLastError ();
1316 }
1317 #endif
1318
1319 check_errno = (errno == EAGAIN
1320 || errno == ENETDOWN
1321 || errno == EHOSTUNREACH
1322 || errno == ENETUNREACH
1323 || errno == EINTR
1324 || errno == EPIPE
1325 || errno == ECONNRESET);
1326 #if defined (ENOPROTOOPT)
1327 check_errno = (check_errno || errno == ENOPROTOOPT);
1328 #endif
1329 #if defined (EHOSTDOWN)
1330 check_errno = (check_errno || errno == EHOSTDOWN);
1331 #endif
1332 #if defined (EOPNOTSUPP)
1333 check_errno = (check_errno || errno == EOPNOTSUPP);
1334 #endif
1335 #if defined (EWOULDBLOCK)
1336 check_errno = (check_errno || errno == EWOULDBLOCK);
1337 #endif
1338 #if defined (EPROTO)
1339 check_errno = (check_errno || errno == EPROTO);
1340 #endif
1341 #if defined (ENONET)
1342 check_errno = (check_errno || errno == ENONET);
1343 #endif
1344
1345 if (check_errno)
1346 return; // Ignore error and try again
1347 else {
1348 zsys_error ("(UDP) error '%s' on %s", strerror (errno), reason);
1349 assert (false);
1350 }
1351 }
1352
1353
1354 // --------------------------------------------------------------------------
1355 // Return current host name, for use in public tcp:// endpoints. Caller gets
1356 // a freshly allocated string, should free it using zstr_free(). If the host
1357 // name is not resolvable, returns NULL.
1358
1359 char *
1360 zsys_hostname (void)
1361 {
1362 char hostname [NI_MAXHOST];
1363 gethostname (hostname, NI_MAXHOST);
1364 hostname [NI_MAXHOST - 1] = 0;
1365 struct hostent *host = gethostbyname (hostname);
1366
1367 if (host && host->h_name)
1368 return strdup (host->h_name);
1369 else
1370 return NULL;
1371 }
1372
1373
1374 // --------------------------------------------------------------------------
1375 // Move the current process into the background. The precise effect depends
1376 // on the operating system. On POSIX boxes, moves to a specified working
1377 // directory (if specified), closes all file handles, reopens stdin, stdout,
1378 // and stderr to the null device, and sets the process to ignore SIGHUP. On
1379 // Windows, does nothing. Returns 0 if OK, -1 if there was an error.
1380
1381 int
1382 zsys_daemonize (const char *workdir)
1383 {
1384 #if (defined (__UNIX__))
1385 // Defines umask for new files this process will create
1386 mode_t file_mask = 027; // Complement of 0750
1387
1388 // Recreate our process as a child of init
1389 int fork_result = fork ();
1390 if (fork_result < 0) // < 0 is an error
1391 return -1; // Could not fork
1392 else
1393 if (fork_result > 0) // > 0 is the parent process
1394 exit (0); // End parent process
1395
1396 // Move to a safe and known directory, which is supplied as an
1397 // argument to this function (or not, if workdir is NULL or empty).
1398 if (workdir && zsys_dir_change (workdir)) {
1399 zsys_error ("cannot chdir to '%s'", workdir);
1400 return -1;
1401 }
1402 // Close all open file descriptors inherited from the parent
1403 // process, to reduce the resources we use
1404 int file_handle = sysconf (_SC_OPEN_MAX);
1405 while (file_handle)
1406 close (file_handle--); // Ignore any errors
1407
1408 // Set the umask for new files we might create
1409 umask (file_mask);
1410
1411 // Set standard input and output to the null device so that any
1412 // code that assumes that these files are open will work
1413 file_handle = open ("/dev/null", O_RDWR);
1414 int fh_stdout = dup (file_handle);
1415 int fh_stderr = dup (file_handle);
1416 assert (fh_stdout);
1417 assert (fh_stderr);
1418
1419 // Ignore any hangup signal from the controlling console
1420 signal (SIGHUP, SIG_IGN);
1421 #endif
1422 return 0;
1423 }
1424
1425
1426 // --------------------------------------------------------------------------
1427 // Drop the process ID into the lockfile, with exclusive lock, and switch
1428 // the process to the specified group and/or user. Any of the arguments
1429 // may be null, indicating a no-op. Returns 0 on success, -1 on failure.
1430 // Note if you combine this with zsys_daemonize, run after, not before
1431 // that method, or the lockfile will hold the wrong process ID.
1432
1433 int
1434 zsys_run_as (const char *lockfile, const char *group, const char *user)
1435 {
1436 #if (defined (__UNIX__))
1437 // Switch to effective user ID (who owns executable); for
1438 // system services this should be root, so that we can write
1439 // the PID file into e.g. /var/run/
1440 if (seteuid (geteuid ())) {
1441 zsys_error ("cannot set effective user id: %s", strerror (errno));
1442 return -1;
1443 }
1444 if (lockfile) {
1445 // We enforce a lock on the lockfile, if specified, so that
1446 // only one copy of the process can run at once.
1447 int handle = open (lockfile, O_RDWR | O_CREAT, 0640);
1448 if (handle < 0) {
1449 zsys_error ("cannot open lockfile '%s': %s", lockfile, strerror (errno));
1450 return -1;
1451 }
1452 else {
1453 struct flock filelock;
1454 filelock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK
1455 filelock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END
1456 filelock.l_start = 0; // Offset from l_whence
1457 filelock.l_len = 0; // length, 0 = to EOF
1458 filelock.l_pid = getpid ();
1459 if (fcntl (handle, F_SETLK, &filelock)) {
1460 zsys_error ("cannot get lock: %s", strerror (errno));
1461 return -1;
1462 }
1463 }
1464 // We record the current process id in the lock file
1465 char pid_buffer [32];
1466 snprintf (pid_buffer, sizeof (pid_buffer), "%6" PRIi64 "\n", (int64_t)getpid ());
1467 if ((size_t) write (handle, pid_buffer, strlen (pid_buffer)) != strlen (pid_buffer)) {
1468 zsys_error ("cannot write to lockfile: %s", strerror (errno));
1469 close (handle);
1470 return -1;
1471 }
1472 }
1473 if (group) {
1474 zsys_info ("running under group '%s'", group);
1475 struct group *grpbuf = NULL;
1476 grpbuf = getgrnam (group);
1477 if (grpbuf == NULL || setgid (grpbuf->gr_gid)) {
1478 zsys_error ("could not switch group: %s", strerror (errno));
1479 return -1;
1480 }
1481 }
1482 if (user) {
1483 zsys_info ("running under user '%s'", user);
1484 struct passwd *pwdbuf = NULL;
1485 pwdbuf = getpwnam (user);
1486 if (pwdbuf == NULL || setuid (pwdbuf->pw_uid)) {
1487 zsys_error ("could not switch user: %s", strerror (errno));
1488 return -1;
1489 }
1490 }
1491 else {
1492 // Switch back to real user ID (who started process)
1493 if (setuid (getuid ())) {
1494 zsys_error ("cannot set real user id: %s", strerror (errno));
1495 return -1;
1496 }
1497 }
1498 return 0;
1499 #else
1500 // This is not yet ported to Windows and should not succeed there.
1501 return -1;
1502 #endif
1503 }
1504
1505
1506 // --------------------------------------------------------------------------
1507 // Returns true if the underlying libzmq supports CURVE security.
1508 // Uses a heuristic probe according to the version of libzmq being used.
1509
1510 bool
1511 zsys_has_curve (void)
1512 {
1513 #if defined (ZMQ_CURVE_SERVER)
1514 # if defined (ZMQ_HAS_CAPABILITIES)
1515 // This is the most modern way of probing libzmq capabilities
1516 return zmq_has ("curve") != 0;
1517 # else
1518 // However trying the zmq_setsockopt will also work
1519 int rc = -1; // assume we fail
1520 void *ctx = zmq_ctx_new ();
1521 if (ctx) {
1522 void *pub = zmq_socket (ctx, ZMQ_PUB);
1523 if (pub) {
1524 int as_server = 1;
1525 rc = zmq_setsockopt (pub, ZMQ_CURVE_SERVER, &as_server, sizeof (int));
1526 zmq_close (pub);
1527 }
1528 zmq_term (ctx);
1529 }
1530 return rc != -1;
1531 # endif
1532 #else
1533 return false;
1534 #endif
1535 }
1536
1537
1538 // --------------------------------------------------------------------------
1539 // Configure the number of I/O threads that ZeroMQ will use. A good
1540 // rule of thumb is one thread per gigabit of traffic in or out. The
1541 // default is 1, sufficient for most applications. If the environment
1542 // variable ZSYS_IO_THREADS is defined, that provides the default.
1543 // Note that this method is valid only before any socket is created.
1544
1545 void
1546 zsys_set_io_threads (size_t io_threads)
1547 {
1548 zsys_init ();
1549 ZMUTEX_LOCK (s_mutex);
1550 if (s_open_sockets)
1551 zsys_error ("zsys_io_threads() is not valid after creating sockets");
1552 assert (s_open_sockets == 0);
1553
1554 s_io_threads = io_threads;
1555 #if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0)
1556 zmq_term (s_process_ctx);
1557 s_process_ctx = zmq_init ((int) s_io_threads);
1558 #else
1559 # if defined (ZMQ_IO_THREADS)
1560 zmq_ctx_set (s_process_ctx, ZMQ_IO_THREADS, s_io_threads);
1561 # endif
1562 #endif
1563 ZMUTEX_UNLOCK (s_mutex);
1564
1565 #if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0)
1566 // Reinitialised outside of the lock to avoid recursive lock
1567 zsys_set_max_msgsz (s_max_msgsz);
1568 zsys_set_max_sockets (s_max_sockets);
1569 #endif
1570 }
1571
1572
1573 // --------------------------------------------------------------------------
1574 // Configure the scheduling policy of the ZMQ context thread pool.
1575 // Not available on Windows. See the sched_setscheduler man page or sched.h
1576 // for more information. If the environment variable ZSYS_THREAD_SCHED_POLICY
1577 // is defined, that provides the default.
1578 // Note that this method is valid only before any socket is created.
1579
1580 void
1581 zsys_set_thread_sched_policy (int policy)
1582 {
1583 if (policy < 0)
1584 return;
1585
1586 zsys_init ();
1587 ZMUTEX_LOCK (s_mutex);
1588 // If the app is misusing this method, burn it with fire
1589 if (s_open_sockets)
1590 zsys_error ("zsys_set_thread_sched_policy() is not valid after"
1591 " creating sockets");
1592 assert (s_open_sockets == 0);
1593 s_thread_sched_policy = policy;
1594 #if defined (ZMQ_THREAD_SCHED_POLICY)
1595 zmq_ctx_set (s_process_ctx, ZMQ_THREAD_SCHED_POLICY, s_thread_sched_policy);
1596 #endif
1597 ZMUTEX_UNLOCK (s_mutex);
1598 }
1599
1600
1601 // --------------------------------------------------------------------------
1602 // Configure the numeric prefix to each thread created for the internal
1603 // context's thread pool. This option is only supported on Linux.
1604 // If the environment variable ZSYS_THREAD_NAME_PREFIX is defined, that
1605 // provides the default.
1606 // Note that this method is valid only before any socket is created.
1607
1608 void
1609 zsys_set_thread_name_prefix (int prefix)
1610 {
1611 if (prefix < 0)
1612 return;
1613
1614 zsys_init ();
1615 ZMUTEX_LOCK (s_mutex);
1616 // If the app is misusing this method, burn it with fire
1617 if (s_open_sockets)
1618 zsys_error ("zsys_set_thread_name_prefix() is not valid after"
1619 " creating sockets");
1620 assert (s_open_sockets == 0);
1621 s_thread_name_prefix = prefix;
1622 #if defined (ZMQ_THREAD_NAME_PREFIX)
1623 zmq_ctx_set (s_process_ctx, ZMQ_THREAD_NAME_PREFIX, s_thread_name_prefix);
1624 #endif
1625 ZMUTEX_UNLOCK (s_mutex);
1626 }
1627
1628 // --------------------------------------------------------------------------
1629 // Return ZMQ_THREAD_NAME_PREFIX option.
1630 int
1631 zsys_thread_name_prefix ()
1632 {
1633 zsys_init ();
1634 ZMUTEX_LOCK (s_mutex);
1635 #if defined (ZMQ_THREAD_NAME_PREFIX)
1636 s_thread_name_prefix = zmq_ctx_get (s_process_ctx, ZMQ_THREAD_NAME_PREFIX);
1637 #endif
1638 ZMUTEX_UNLOCK (s_mutex);
1639 return s_thread_name_prefix;
1640 }
1641
1642
1643 // --------------------------------------------------------------------------
1644 // Configure the string prefix to each thread created for the internal
1645 // context's thread pool. This option is only supported on Linux.
1646 // If the environment variable ZSYS_THREAD_NAME_PREFIX_STR is defined, that
1647 // provides the default.
1648 // Note that this method is valid only before any socket is created.
1649
1650 void
1651 zsys_set_thread_name_prefix_str (const char *prefix)
1652 {
1653 size_t prefix_len = 0;
1654
1655 if (!prefix)
1656 return;
1657 prefix_len = strlen (prefix);
1658 if (prefix_len == 0 || prefix_len > sizeof (s_thread_name_prefix_str) - 1)
1659 return;
1660
1661 zsys_init ();
1662 ZMUTEX_LOCK (s_mutex);
1663 // If the app is misusing this method, burn it with fire
1664 if (s_open_sockets)
1665 zsys_error ("zsys_set_thread_name_prefix() is not valid after"
1666 " creating sockets");
1667 assert (s_open_sockets == 0);
1668 strcpy(s_thread_name_prefix_str, prefix);
1669 #if defined (ZMQ_THREAD_NAME_PREFIX) && defined (ZMQ_BUILD_DRAFT_API) && \
1670 ((ZMQ_VERSION_MAJOR > 4) || \
1671 ((ZMQ_VERSION_MAJOR >= 4) && ((ZMQ_VERSION_MINOR > 3) || \
1672 ((ZMQ_VERSION_MINOR >= 3) && (ZMQ_VERSION_PATCH >= 3)))))
1673 zmq_ctx_set_ext (s_process_ctx, ZMQ_THREAD_NAME_PREFIX, s_thread_name_prefix_str, sizeof (s_thread_name_prefix_str));
1674 #endif
1675 ZMUTEX_UNLOCK (s_mutex);
1676 }
1677
1678 // --------------------------------------------------------------------------
1679 // Return ZMQ_THREAD_NAME_PREFIX_STR option.
1680 const char *
1681 zsys_thread_name_prefix_str ()
1682 {
1683 size_t prefix_len = sizeof (s_thread_name_prefix_str);
1684
1685 zsys_init ();
1686 ZMUTEX_LOCK (s_mutex);
1687 #if defined (ZMQ_THREAD_NAME_PREFIX) && defined (ZMQ_BUILD_DRAFT_API) && \
1688 ((ZMQ_VERSION_MAJOR > 4) || \
1689 ((ZMQ_VERSION_MAJOR >= 4) && ((ZMQ_VERSION_MINOR > 3) || \
1690 ((ZMQ_VERSION_MINOR >= 3) && (ZMQ_VERSION_PATCH >= 3)))))
1691 zmq_ctx_get_ext (s_process_ctx, ZMQ_THREAD_NAME_PREFIX, s_thread_name_prefix_str, &prefix_len);
1692 #else
1693 (void) prefix_len;
1694 #endif
1695 ZMUTEX_UNLOCK (s_mutex);
1696 return s_thread_name_prefix_str;
1697 }
1698
1699
1700 // --------------------------------------------------------------------------
1701 // Adds a specific CPU to the affinity list of the ZMQ context thread pool.
1702 // This option is only supported on Linux.
1703 // Note that this method is valid only before any socket is created.
1704
1705 void
1706 zsys_thread_affinity_cpu_add (int cpu)
1707 {
1708 if (cpu < 0)
1709 return;
1710
1711 zsys_init ();
1712 ZMUTEX_LOCK (s_mutex);
1713 // If the app is misusing this method, burn it with fire
1714 if (s_open_sockets)
1715 zsys_error ("zsys_set_thread_sched_policy() is not valid after"
1716 " creating sockets");
1717 assert (s_open_sockets == 0);
1718 #if defined (ZMQ_THREAD_AFFINITY_CPU_ADD)
1719 zmq_ctx_set (s_process_ctx, ZMQ_THREAD_AFFINITY_CPU_ADD, cpu);
1720 #endif
1721 ZMUTEX_UNLOCK (s_mutex);
1722 }
1723
1724
1725 // --------------------------------------------------------------------------
1726 // Removes a specific CPU to the affinity list of the ZMQ context thread pool.
1727 // This option is only supported on Linux.
1728 // Note that this method is valid only before any socket is created.
1729
1730 void
1731 zsys_thread_affinity_cpu_remove (int cpu)
1732 {
1733 if (cpu < 0)
1734 return;
1735
1736 zsys_init ();
1737 ZMUTEX_LOCK (s_mutex);
1738 // If the app is misusing this method, burn it with fire
1739 if (s_open_sockets)
1740 zsys_error ("zsys_set_thread_sched_policy() is not valid after"
1741 " creating sockets");
1742 assert (s_open_sockets == 0);
1743 #if defined (ZMQ_THREAD_AFFINITY_CPU_REMOVE)
1744 zmq_ctx_set (s_process_ctx, ZMQ_THREAD_AFFINITY_CPU_REMOVE, cpu);
1745 #endif
1746 ZMUTEX_UNLOCK (s_mutex);
1747 }
1748
1749
1750 // --------------------------------------------------------------------------
1751 // Configure the scheduling priority of the ZMQ context thread pool.
1752 // Not available on Windows. See the sched_setscheduler man page or sched.h
1753 // for more information. If the environment variable ZSYS_THREAD_PRIORITY is
1754 // defined, that provides the default.
1755 // Note that this method is valid only before any socket is created.
1756
1757 void
1758 zsys_set_thread_priority (int priority)
1759 {
1760 if (priority < 0)
1761 return;
1762
1763 zsys_init ();
1764 ZMUTEX_LOCK (s_mutex);
1765 // If the app is misusing this method, burn it with fire
1766 if (s_open_sockets)
1767 zsys_error ("zsys_set_thread_priority() is not valid after"
1768 " creating sockets");
1769 assert (s_open_sockets == 0);
1770 s_thread_priority = priority;
1771 #if defined (ZMQ_THREAD_PRIORITY)
1772 zmq_ctx_set (s_process_ctx, ZMQ_THREAD_PRIORITY, s_thread_priority);
1773 #endif
1774 ZMUTEX_UNLOCK (s_mutex);
1775 }
1776
1777
1778 // --------------------------------------------------------------------------
1779 // Configure the number of sockets that ZeroMQ will allow. The default
1780 // is 1024. The actual limit depends on the system, and you can query it
1781 // by using zsys_socket_limit (). A value of zero means "maximum".
1782 // Note that this method is valid only before any socket is created.
1783
1784 void
1785 zsys_set_max_sockets (size_t max_sockets)
1786 {
1787 zsys_init ();
1788 ZMUTEX_LOCK (s_mutex);
1789 // If the app is misusing this method, burn it with fire
1790 if (s_open_sockets)
1791 zsys_error ("zsys_max_sockets() is not valid after creating sockets");
1792 assert (s_open_sockets == 0);
1793 s_max_sockets = max_sockets? max_sockets: zsys_socket_limit ();
1794 #if defined (ZMQ_MAX_SOCKETS)
1795 zmq_ctx_set (s_process_ctx, ZMQ_MAX_SOCKETS, (int) s_max_sockets);
1796 #endif
1797 ZMUTEX_UNLOCK (s_mutex);
1798 }
1799
1800
1801 // --------------------------------------------------------------------------
1802 // Return maximum number of ZeroMQ sockets that the system will support.
1803
1804 size_t
1805 zsys_socket_limit (void)
1806 {
1807 size_t socket_limit;
1808 #if defined (ZMQ_SOCKET_LIMIT)
1809 if (s_process_ctx)
1810 socket_limit = (size_t) zmq_ctx_get (s_process_ctx, ZMQ_SOCKET_LIMIT);
1811 else {
1812 void *ctx = zmq_init (1);
1813 socket_limit = (size_t) zmq_ctx_get (ctx, ZMQ_SOCKET_LIMIT);
1814 zmq_term (ctx);
1815 }
1816 // ZeroMQ used to report a nonsense value (2^31) which if used would
1817 // on zmq_ctx_set (ZMQ_MAX_SOCKETS) cause an out-of-memory error. So
1818 // if we're running on an older library, enforce a sane limit.
1819 if (socket_limit > 65535)
1820 socket_limit = 65535;
1821 #else
1822 socket_limit = 1024;
1823 #endif
1824 return socket_limit;
1825 }
1826
1827
1828 // --------------------------------------------------------------------------
1829 // Configure the maximum allowed size of a message sent.
1830 // The default is INT_MAX.
1831
1832 void
1833 zsys_set_max_msgsz (int max_msgsz)
1834 {
1835 if (max_msgsz < 0)
1836 return;
1837
1838 zsys_init ();
1839 ZMUTEX_LOCK (s_mutex);
1840 s_max_msgsz = max_msgsz;
1841 #if defined (ZMQ_MAX_MSGSZ)
1842 zmq_ctx_set (s_process_ctx, ZMQ_MAX_MSGSZ, (int) s_max_msgsz);
1843 #endif
1844 ZMUTEX_UNLOCK (s_mutex);
1845 }
1846
1847 // --------------------------------------------------------------------------
1848 // Configure whether to use zero copy strategy in libzmq. If the environment
1849 // variable ZSYS_ZERO_COPY_RECV is defined, that provides the default.
1850 // Otherwise the default is 1.
1851
1852 void
1853 zsys_set_zero_copy_recv(int zero_copy)
1854 {
1855 zsys_init ();
1856 ZMUTEX_LOCK (s_mutex);
1857 s_zero_copy_recv = zero_copy;
1858 #if defined (ZMQ_ZERO_COPY_RECV)
1859 zmq_ctx_set (s_process_ctx, ZMQ_ZERO_COPY_RECV, s_zero_copy_recv);
1860 #endif
1861 ZMUTEX_UNLOCK (s_mutex);
1862 }
1863
1864 // --------------------------------------------------------------------------
1865 // Return ZMQ_ZERO_COPY_RECV option.
1866 int
1867 zsys_zero_copy_recv()
1868 {
1869 zsys_init ();
1870 ZMUTEX_LOCK (s_mutex);
1871 #if defined (ZMQ_ZERO_COPY_RECV)
1872 s_zero_copy_recv = zmq_ctx_get (s_process_ctx, ZMQ_ZERO_COPY_RECV);
1873 #endif
1874 ZMUTEX_UNLOCK (s_mutex);
1875 return s_zero_copy_recv;
1876 }
1877
1878
1879 // --------------------------------------------------------------------------
1880 // Return maximum message size.
1881
1882 int
1883 zsys_max_msgsz (void)
1884 {
1885 zsys_init ();
1886 ZMUTEX_LOCK (s_mutex);
1887 #if defined (ZMQ_MAX_MSGSZ)
1888 s_max_msgsz = zmq_ctx_get (s_process_ctx, ZMQ_MAX_MSGSZ);
1889 #endif
1890 ZMUTEX_UNLOCK (s_mutex);
1891 return s_max_msgsz;
1892 }
1893
1894
1895 // --------------------------------------------------------------------------
1896 // *** Draft method, for development use, may change without warning ***
1897 // Check if default interrupt handler of Ctrl-C or SIGTERM was called.
1898 // Does not work if ZSYS_SIGHANDLER is false and code does not call
1899 // set interrupted on signal.
1900 bool
1901 zsys_is_interrupted (void)
1902 {
1903 return zsys_interrupted != 0;
1904 }
1905
1906 // --------------------------------------------------------------------------
1907 // *** Draft method, for development use, may change without warning ***
1908 // Set interrupted flag. This is done by default signal handler, however
1909 // this can be handy for language bindings or cases without default
1910 // signal handler.
1911 void
1912 zsys_set_interrupted (void)
1913 {
1914 zctx_interrupted = 1;
1915 zsys_interrupted = 1;
1916 }
1917
1918 // --------------------------------------------------------------------------
1919 // Configure the threshold value of filesystem object age per st_mtime
1920 // that should elapse until we consider that object "stable" at the
1921 // current zclock_time() moment.
1922 // The default is S_DEFAULT_ZSYS_FILE_STABLE_AGE_MSEC defined in zsys.c
1923 // which generally depends on host OS, with fallback value of 5000.
1924
1925 void
1926 zsys_set_file_stable_age_msec (int64_t file_stable_age_msec)
1927 {
1928 if (file_stable_age_msec < 1)
1929 return;
1930
1931 zsys_init ();
1932 ZMUTEX_LOCK (s_mutex);
1933 s_file_stable_age_msec = file_stable_age_msec;
1934 ZMUTEX_UNLOCK (s_mutex);
1935 }
1936
1937
1938 // --------------------------------------------------------------------------
1939 // Return current threshold value of file stable age in msec.
1940 // This can be used in code that chooses to wait for this timeout
1941 // before testing if a filesystem object is "stable" or not.
1942
1943 // Note that the OS timer quantization can bite you, so it may be
1944 // reasonably safe to sleep/wait/poll for a larger timeout before
1945 // assuming a fault, e.g. the default timer resolution on Windows
1946 // is 15.6 ms (per timer interrupt 64 times a second), graphed here:
1947 // https://msdn.microsoft.com/en-us/library/windows/desktop/dn553408(v=vs.85).aspx
1948 // and Unix/Linux OSes also have different-resolution timers.
1949
1950 int64_t
1951 zsys_file_stable_age_msec (void)
1952 {
1953 zsys_init ();
1954 return s_file_stable_age_msec;
1955 }
1956
1957
1958 // --------------------------------------------------------------------------
1959 // Configure the default linger timeout in msecs for new zsock instances.
1960 // You can also set this separately on each zsock_t instance. The default
1961 // linger time is zero, i.e. any pending messages will be dropped. If the
1962 // environment variable ZSYS_LINGER is defined, that provides the default.
1963 // Note that process exit will typically be delayed by the linger time.
1964
1965 void
1966 zsys_set_linger (size_t linger)
1967 {
1968 zsys_init ();
1969 ZMUTEX_LOCK (s_mutex);
1970 s_linger = linger;
1971 ZMUTEX_UNLOCK (s_mutex);
1972 }
1973
1974
1975 // --------------------------------------------------------------------------
1976 // Configure the default outgoing pipe limit (HWM) for new zsock instances.
1977 // You can also set this separately on each zsock_t instance. The default
1978 // HWM is 1,000, on all versions of ZeroMQ. If the environment variable
1979 // ZSYS_SNDHWM is defined, that provides the default. Note that a value of
1980 // zero means no limit, i.e. infinite memory consumption.
1981
1982 void
1983 zsys_set_sndhwm (size_t sndhwm)
1984 {
1985 zsys_init ();
1986 ZMUTEX_LOCK (s_mutex);
1987 s_sndhwm = sndhwm;
1988 ZMUTEX_UNLOCK (s_mutex);
1989 }
1990
1991
1992 // --------------------------------------------------------------------------
1993 // Configure the default incoming pipe limit (HWM) for new zsock instances.
1994 // You can also set this separately on each zsock_t instance. The default
1995 // HWM is 1,000, on all versions of ZeroMQ. If the environment variable
1996 // ZSYS_RCVHWM is defined, that provides the default. Note that a value of
1997 // zero means no limit, i.e. infinite memory consumption.
1998
1999 void
2000 zsys_set_rcvhwm (size_t rcvhwm)
2001 {
2002 zsys_init ();
2003 ZMUTEX_LOCK (s_mutex);
2004 s_rcvhwm = rcvhwm;
2005 ZMUTEX_UNLOCK (s_mutex);
2006 }
2007
2008
2009 // --------------------------------------------------------------------------
2010 // Configure the default HWM for zactor internal pipes; this is set on both
2011 // ends of the pipe, for outgoing messages only (sndhwm). The default HWM is
2012 // 1,000, on all versions of ZeroMQ. If the environment var ZSYS_ACTORHWM is
2013 // defined, that provides the default. Note that a value of zero means no
2014 // limit, i.e. infinite memory consumption.
2015
2016 void
2017 zsys_set_pipehwm (size_t pipehwm)
2018 {
2019 zsys_init ();
2020 ZMUTEX_LOCK (s_mutex);
2021 s_pipehwm = pipehwm;
2022 ZMUTEX_UNLOCK (s_mutex);
2023 }
2024
2025
2026 // --------------------------------------------------------------------------
2027 // Return the HWM for zactor internal pipes.
2028
2029 size_t
2030 zsys_pipehwm (void)
2031 {
2032 return s_pipehwm;
2033 }
2034
2035
2036 // --------------------------------------------------------------------------
2037 // Configure use of IPv6 for new zsock instances. By default sockets accept
2038 // and make only IPv4 connections. When you enable IPv6, sockets will accept
2039 // and connect to both IPv4 and IPv6 peers. You can override the setting on
2040 // each zsock_t instance. The default is IPv4 only (ipv6 set to 0). If the
2041 // environment variable ZSYS_IPV6 is defined (as 1 or 0), this provides the
2042 // default. Note: has no effect on ZMQ v2.
2043
2044 void
2045 zsys_set_ipv6 (int ipv6)
2046 {
2047 zsys_init ();
2048 ZMUTEX_LOCK (s_mutex);
2049 s_ipv6 = ipv6;
2050 ZMUTEX_UNLOCK (s_mutex);
2051 }
2052
2053
2054 // --------------------------------------------------------------------------
2055 // Return use of IPv6 for zsock instances.
2056
2057 int
2058 zsys_ipv6 (void)
2059 {
2060 return s_ipv6;
2061 }
2062
2063 // Test if ipv6 is available on the system. The only way to reliably
2064 // check is to actually open a socket and try to bind it. (ported from
2065 // libzmq)
2066
2067 bool
2068 zsys_ipv6_available (void)
2069 {
2070 #if defined(__WINDOWS__) && (_WIN32_WINNT < 0x0600)
2071 return 0;
2072 #else
2073 int rc, ipv6 = 1;
2074 struct sockaddr_in6 test_addr;
2075
2076 memset (&test_addr, 0, sizeof (test_addr));
2077 test_addr.sin6_family = AF_INET6;
2078 inet_pton (AF_INET6, "::1", &(test_addr.sin6_addr));
2079
2080 SOCKET fd = socket (AF_INET6, SOCK_STREAM, IPPROTO_IP);
2081 if (fd == INVALID_SOCKET)
2082 ipv6 = 0;
2083 else {
2084 #if defined(__WINDOWS__)
2085 setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (const char *) &ipv6,
2086 sizeof (int));
2087 rc = setsockopt (fd, IPPROTO_IPV6, IPV6_V6ONLY, (const char *) &ipv6,
2088 sizeof (int));
2089 if (rc == SOCKET_ERROR)
2090 ipv6 = 0;
2091 else {
2092 rc = bind (fd, (struct sockaddr *) &test_addr, sizeof (test_addr));
2093 if (rc == SOCKET_ERROR)
2094 ipv6 = 0;
2095 }
2096 closesocket (fd);
2097 #else
2098 setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &ipv6, sizeof (int));
2099 rc = setsockopt (fd, IPPROTO_IPV6, IPV6_V6ONLY, &ipv6, sizeof (int));
2100 if (rc != 0)
2101 ipv6 = 0;
2102 else {
2103 rc = bind (fd, (struct sockaddr *) (&test_addr),
2104 sizeof (test_addr));
2105 if (rc != 0)
2106 ipv6 = 0;
2107 }
2108 close (fd);
2109 #endif
2110 }
2111
2112 return ipv6;
2113 #endif // _WIN32_WINNT < 0x0600
2114 }
2115
2116 // --------------------------------------------------------------------------
2117 // Set network interface name to use for broadcasts, particularly zbeacon.
2118 // This lets the interface be configured for test environments where required.
2119 // For example, on Mac OS X, zbeacon cannot bind to 255.255.255.255 which is
2120 // the default when there is no specified interface. If the environment
2121 // variable ZSYS_INTERFACE is set, use that as the default interface name.
2122 // Setting the interface to "*" means "use all interfaces".
2123
2124 void
2125 zsys_set_interface (const char *value)
2126 {
2127 zsys_init ();
2128 freen (s_interface);
2129 s_interface = strdup (value);
2130 assert (s_interface);
2131 }
2132
2133
2134 // --------------------------------------------------------------------------
2135 // Return network interface to use for broadcasts, or "" if none was set.
2136
2137 const char *
2138 zsys_interface (void)
2139 {
2140 return s_interface? s_interface: "";
2141 }
2142
2143
2144 // --------------------------------------------------------------------------
2145 // Set IPv6 address to use zbeacon socket, particularly for receiving zbeacon.
2146 // This needs to be set IPv6 is enabled as IPv6 can have multiple addresses
2147 // on a given interface. If the environment variable ZSYS_IPV6_ADDRESS is set,
2148 // use that as the default IPv6 address.
2149
2150 void
2151 zsys_set_ipv6_address (const char *value)
2152 {
2153 zsys_init ();
2154 freen (s_ipv6_address);
2155 s_ipv6_address = strdup (value);
2156 assert (s_ipv6_address);
2157 }
2158
2159
2160 // --------------------------------------------------------------------------
2161 // Return IPv6 address to use for zbeacon reception, or "" if none was set.
2162
2163 const char *
2164 zsys_ipv6_address (void)
2165 {
2166 return s_ipv6_address? s_ipv6_address: "";
2167 }
2168
2169
2170 // --------------------------------------------------------------------------
2171 // Set IPv6 multicast address to use for sending zbeacon messages. The default
2172 // is fe02::1 (link-local all-node). If the environment variable
2173 // ZSYS_IPV6_MCAST_ADDRESS is set, use that as the default IPv6 multicast
2174 // address.
2175
2176 void
2177 zsys_set_ipv6_mcast_address (const char *value)
2178 {
2179 zsys_init ();
2180 freen (s_ipv6_mcast_address);
2181 s_ipv6_mcast_address = strdup (value);
2182 assert (s_ipv6_mcast_address);
2183 }
2184
2185 // --------------------------------------------------------------------------
2186 // Set IPv4 multicast address to use for sending zbeacon messages. By default
2187 // IPv4 multicast is NOT used. If the environment variable
2188 // ZSYS_IPV4_MCAST_ADDRESS is set, use that as the default IPv4 multicast
2189 // address. Calling this function or setting ZSYS_IPV4_MCAST_ADDRESS
2190 // will enable IPv4 zbeacon messages.
2191
2192 void zsys_set_ipv4_mcast_address (const char *value)
2193 {
2194 zsys_init ();
2195 freen (s_ipv4_mcast_address);
2196 s_ipv4_mcast_address = value ? strdup (value) : NULL;
2197 assert (!value || s_ipv4_mcast_address);
2198 }
2199
2200 // --------------------------------------------------------------------------
2201 // Set multicast TTL
2202
2203 void zsys_set_mcast_ttl (const unsigned char value)
2204 {
2205 zsys_init ();
2206 s_mcast_ttl = value;
2207 assert (s_mcast_ttl);
2208 }
2209
2210
2211 // --------------------------------------------------------------------------
2212 // Return IPv6 multicast address to use for sending zbeacon, or
2213 // "ff02:0:0:0:0:0:0:1" if none was set.
2214
2215 const char *
2216 zsys_ipv6_mcast_address (void)
2217 {
2218 return s_ipv6_mcast_address ? s_ipv6_mcast_address : "ff02:0:0:0:0:0:0:1";
2219 }
2220
2221 // --------------------------------------------------------------------------
2222 // Return IPv4 multicast address to use for sending zbeacon, or
2223 // NULL if none was set.
2224
2225 const char *zsys_ipv4_mcast_address (void)
2226 {
2227 return s_ipv4_mcast_address;
2228 }
2229
2230 // --------------------------------------------------------------------------
2231 // Get multicast TTL
2232
2233 unsigned char zsys_mcast_ttl ()
2234 {
2235 return s_mcast_ttl;
2236 }
2237
2238
2239 // --------------------------------------------------------------------------
2240 // Configure the automatic use of pre-allocated FDs when creating new sockets.
2241 // If 0 (default), nothing will happen. Else, when a new socket is bound, the
2242 // system API will be used to check if an existing pre-allocated FD with a
2243 // matching port (if TCP) or path (if IPC) exists, and if it does it will be
2244 // set via the ZMQ_USE_FD socket option so that the library will use it
2245 // instead of creating a new socket.
2246
2247
2248 void
2249 zsys_set_auto_use_fd (int auto_use_fd)
2250 {
2251 zsys_init ();
2252 ZMUTEX_LOCK (s_mutex);
2253 s_auto_use_fd = auto_use_fd;
2254 ZMUTEX_UNLOCK (s_mutex);
2255 }
2256
2257
2258 // --------------------------------------------------------------------------
2259 // Return use of automatic pre-allocated FDs for zsock instances.
2260
2261 int
2262 zsys_auto_use_fd (void)
2263 {
2264 return s_auto_use_fd;
2265 }
2266
2267 typedef enum _zsprintf_s {
2268 FIND_PERCENT,
2269 FIND_KEY,
2270 FIND_FORMAT,
2271 END
2272 } zsprintf_s;
2273
2274 typedef void* (*zsys_lookup_fn)(void *, const char*);
2275
2276 static char *
2277 s_zsys_zprintf (const char *format, void *args, zsys_lookup_fn lookup_fn, bool validate)
2278 {
2279 assert (format);
2280 assert (args);
2281 zchunk_t *chunk = zchunk_new (NULL, strlen (format) * 1.5);
2282 assert (chunk);
2283 char *ret = NULL;
2284
2285 zsprintf_s state = FIND_PERCENT;
2286 size_t pos = 0;
2287 char *key = NULL;
2288
2289 while (state != END)
2290 {
2291
2292 if (pos >= strlen (format))
2293 break;
2294
2295 switch (state) {
2296 case FIND_PERCENT:
2297 {
2298
2299 //zsys_debug ("\tstate=FIND_PERCENT, format+%zu=%s", pos, format+pos);
2300 char *percent = strchr ((char*)(format) + pos, '%');
2301
2302 if (!percent) {
2303 //zsys_debug ("!percent");
2304 zchunk_extend (chunk, (format+pos), strlen (format) - pos);
2305 state = END;
2306 }
2307 else
2308 if (*(percent+1) == '%') {
2309 size_t idx = percent - format;
2310 //zsys_debug ("*(percent+1)=='%%':\tidx=%zu, format+%zu=%s", idx, pos, format+pos);
2311 if (idx - pos > 0) {
2312 //zsys_debug ("*(percent+1)=='%%':\t#2pos=%zu, idx-pos=%zu", pos, idx-pos);
2313 zchunk_extend (chunk, format+pos, idx - pos);
2314 pos += (idx-pos);
2315 //zsys_debug ("*(percent+1)=='%%':\t#2pos=%zu, idx-pos=%zu", pos, idx-pos);
2316 }
2317 zchunk_extend (chunk, "%", 1);
2318 pos += 2;
2319 }
2320 else
2321 if (*(percent+1) == '(') {
2322 size_t idx = percent - format;
2323 //zsys_debug ("*(percent+1) == '(': idx=%zu, pos=%zu", idx, pos);
2324 if (idx - pos > 0) {
2325 zchunk_extend (chunk, (format+pos), idx - pos);
2326 pos += (idx-pos);
2327 }
2328 //zsys_debug ("*(percent+1) == '(': idx=%zu, pos=%zu", idx, pos);
2329 state = FIND_KEY;
2330 }
2331 else {
2332 //zsys_debug ("else");
2333 size_t idx = percent - format;
2334 zchunk_extend (chunk, (format+pos), idx - pos);
2335 pos += (idx-pos);
2336 }
2337 }
2338 break;
2339 case FIND_KEY:
2340 {
2341 //zsys_debug ("\tstate=FIND_KEY, format+%zu=%s", pos, format+pos);
2342 char *key_end = strchr ((char*)(format)+pos, ')');
2343 if (!key_end) {
2344 zchunk_extend (chunk, (format+pos), strlen (format) - pos);
2345 state = END;
2346 }
2347 pos += 2;
2348 size_t idx = key_end - format;
2349 size_t key_len = idx - pos;
2350 if (key_len == 0) {
2351 zchunk_extend (chunk, "()", 2);
2352 pos += 2;
2353 state = FIND_PERCENT;
2354 }
2355 zstr_free (&key);
2356 key = (char*) zmalloc (key_len + 1);
2357 memcpy ((void*) key, format+pos, key_len);
2358
2359 if (! lookup_fn (args, key)) {
2360 char *ret = NULL;
2361 if (validate)
2362 ret = zsys_sprintf ("Key '%s' not found in hash", key);
2363 zstr_free (&key);
2364 zchunk_destroy (&chunk);
2365 return ret;
2366 }
2367 pos += key_len + 1;
2368 state = FIND_FORMAT;
2369 }
2370 break;
2371 case FIND_FORMAT:
2372 {
2373 //zsys_debug ("\tstate=FIND_FORMAT, format+%zu=%s", pos, format+pos);
2374 if (*(format+pos) != 's') {
2375 char *ret = NULL;
2376 if (validate)
2377 ret = zsys_sprintf ("%s: arguments other than 's' are not implemented", key);
2378 zstr_free (&key);
2379 zchunk_destroy (&chunk);
2380 return ret;
2381 }
2382 pos += 1;
2383 char *v = (char *) lookup_fn (args, key);
2384 zchunk_extend (chunk, v, strlen (v));
2385 state = FIND_PERCENT;
2386 }
2387 break;
2388 case END:
2389 break;
2390 }
2391 }
2392 zstr_free (&key);
2393
2394 //FIXME: is it needed?
2395 zchunk_extend (chunk, "\0", 1);
2396
2397 if (!validate) {
2398 ret = strdup ((char*) zchunk_data (chunk));
2399 zchunk_destroy (&chunk);
2400 return ret;
2401 }
2402
2403 zchunk_destroy (&chunk);
2404 return NULL;
2405 }
2406
2407
2408 // printf based on zhash_t
2409 char *
2410 zsys_zprintf (const char *format, zhash_t *args)
2411 {
2412 return s_zsys_zprintf (format, (void*) args, (zsys_lookup_fn) zhash_lookup, false);
2413 }
2414
2415 // return missing key or other format errors as new allocated string or NULL
2416 char *
2417 zsys_zprintf_error (const char *format, zhash_t *args)
2418 {
2419 return s_zsys_zprintf (format, (void*) args, (zsys_lookup_fn) zhash_lookup, true);
2420 }
2421
2422 static void *
2423 s_zconfig_lookup (void *container, const char *key)
2424 {
2425 zconfig_t *root = (zconfig_t*) container;
2426 zconfig_t *child = zconfig_locate (root, key);
2427 if (child)
2428 return zconfig_value (child);
2429 return NULL;
2430 }
2431
2432 // printf based on zconfig
2433 char *
2434 zsys_zplprintf (const char *format, zconfig_t *args)
2435 {
2436 return s_zsys_zprintf (format, (void*) args, (zsys_lookup_fn) s_zconfig_lookup, false);
2437 }
2438
2439 // return missing key or other format errors as new allocated string or NULL
2440 char *
2441 zsys_zplprintf_error (const char *format, zconfig_t *args)
2442 {
2443 return s_zsys_zprintf (format, (void*) args, (zsys_lookup_fn) s_zconfig_lookup, true);
2444 }
2445
2446
2447 // --------------------------------------------------------------------------
2448 // Set log identity, which is a string that prefixes all log messages sent
2449 // by this process. The log identity defaults to the environment variable
2450 // ZSYS_LOGIDENT, if that is set.
2451
2452 void
2453 zsys_set_logident (const char *value)
2454 {
2455 zsys_init ();
2456 freen (s_logident);
2457 s_logident = strdup (value);
2458 #if defined (__UNIX__)
2459 if (s_logsystem)
2460 openlog (s_logident, LOG_PID, LOG_USER);
2461 #elif defined (__WINDOWS__)
2462 // TODO: hook in Windows event log for Windows
2463 #endif
2464 assert (s_logident);
2465 }
2466
2467
2468 // --------------------------------------------------------------------------
2469 // Set stream to receive log traffic. By default, log traffic is sent to
2470 // stdout. If you set the stream to NULL, no stream will receive the log
2471 // traffic (it may still be sent to the system facility).
2472
2473 void
2474 zsys_set_logstream (FILE *stream)
2475 {
2476 zsys_init ();
2477 s_logstream = stream;
2478 }
2479
2480
2481 // --------------------------------------------------------------------------
2482 // Sends log output to a PUB socket bound to the specified endpoint. To
2483 // collect such log output, create a SUB socket, subscribe to the traffic
2484 // you care about, and connect to the endpoint. Log traffic is sent as a
2485 // single string frame, in the same format as when sent to stdout. The
2486 // log system supports a single sender; multiple calls to this method will
2487 // bind the same sender to multiple endpoints. To disable the sender, call
2488 // this method with a null argument.
2489
2490 void
2491 zsys_set_logsender (const char *endpoint)
2492 {
2493 zsys_init ();
2494 if (endpoint) {
2495 // Create log sender if needed
2496 if (!s_logsender) {
2497 s_logsender = zsock_new_pub(NULL);
2498 assert (s_logsender);
2499 }
2500 // Bind/connect to specified endpoint(s) using zsock_attach() syntax
2501 int rc = zsock_attach (s_logsender, endpoint, true);
2502 assert (rc == 0);
2503 }
2504 else
2505 if (s_logsender) {
2506 zsock_destroy (&s_logsender);
2507 }
2508 }
2509
2510
2511 // --------------------------------------------------------------------------
2512 // Enable or disable logging to the system facility (syslog on POSIX boxes,
2513 // event log on Windows). By default this is disabled.
2514
2515 void
2516 zsys_set_logsystem (bool logsystem)
2517 {
2518 zsys_init ();
2519 s_logsystem = logsystem;
2520 #if defined (__UNIX__)
2521 if (s_logsystem)
2522 openlog (s_logident, LOG_PID, LOG_USER);
2523 #elif defined (__WINDOWS__)
2524 // TODO: hook into Windows event log
2525 #endif
2526 }
2527
2528
2529 static void
2530 s_log (char loglevel, char *string)
2531 {
2532 if (!s_initialized)
2533 zsys_init ();
2534
2535 #if defined (__UNIX__)
2536 # if defined (__UTYPE_ANDROID)
2537 int priority = ANDROID_LOG_INFO;
2538 if (loglevel == 'E')
2539 priority = ANDROID_LOG_ERROR;
2540 else
2541 if (loglevel == 'W')
2542 priority = ANDROID_LOG_WARN;
2543 else
2544 if (loglevel == 'N')
2545 priority = ANDROID_LOG_INFO;
2546 else
2547 if (loglevel == 'I')
2548 priority = ANDROID_LOG_INFO;
2549 else
2550 if (loglevel == 'D')
2551 priority = ANDROID_LOG_DEBUG;
2552
2553 __android_log_print(priority, "zsys", "%s", string);
2554 # else
2555 if (s_logsystem) {
2556 int priority = LOG_INFO;
2557 if (loglevel == 'E')
2558 priority = LOG_ERR;
2559 else
2560 if (loglevel == 'W')
2561 priority = LOG_WARNING;
2562 else
2563 if (loglevel == 'N')
2564 priority = LOG_NOTICE;
2565 else
2566 if (loglevel == 'I')
2567 priority = LOG_INFO;
2568 else
2569 if (loglevel == 'D')
2570 priority = LOG_DEBUG;
2571
2572 syslog (priority, "%s", string);
2573 }
2574 else
2575 # endif
2576 #endif
2577 if (s_logstream || s_logsender) {
2578 time_t curtime = time (NULL);
2579 struct tm *loctime = localtime (&curtime);
2580 char date [20];
2581 strftime (date, 20, "%y-%m-%d %H:%M:%S", loctime);
2582 char log_text [1024];
2583 if (s_logident)
2584 snprintf (log_text, 1024, "%c: (%s) %s %s", loglevel, s_logident, date, string);
2585 else
2586 snprintf (log_text, 1024, "%c: %s %s", loglevel, date, string);
2587
2588 if (s_logstream) {
2589 fprintf (s_logstream, "%s\n", log_text);
2590 fflush (s_logstream);
2591 }
2592 if (s_logsender)
2593 zstr_send (s_logsender, log_text);
2594 }
2595 }
2596
2597
2598 // --------------------------------------------------------------------------
2599 // Log error condition - highest priority
2600
2601 void
2602 zsys_error (const char *format, ...)
2603 {
2604 va_list argptr;
2605 va_start (argptr, format);
2606 char *string = zsys_vprintf (format, argptr);
2607 va_end (argptr);
2608 s_log ('E', string);
2609 zstr_free (&string);
2610 }
2611
2612
2613 // --------------------------------------------------------------------------
2614 // Log warning condition - high priority
2615
2616 void
2617 zsys_warning (const char *format, ...)
2618 {
2619 va_list argptr;
2620 va_start (argptr, format);
2621 char *string = zsys_vprintf (format, argptr);
2622 va_end (argptr);
2623 s_log ('W', string);
2624 zstr_free (&string);
2625 }
2626
2627
2628 // --------------------------------------------------------------------------
2629 // Log normal, but significant, condition - normal priority
2630
2631 void
2632 zsys_notice (const char *format, ...)
2633 {
2634 va_list argptr;
2635 va_start (argptr, format);
2636 char *string = zsys_vprintf (format, argptr);
2637 va_end (argptr);
2638 s_log ('N', string);
2639 zstr_free (&string);
2640 }
2641
2642
2643 // --------------------------------------------------------------------------
2644 // Log informational message - low priority
2645
2646 void
2647 zsys_info (const char *format, ...)
2648 {
2649 va_list argptr;
2650 va_start (argptr, format);
2651 char *string = zsys_vprintf (format, argptr);
2652 va_end (argptr);
2653 s_log ('I', string);
2654 zstr_free (&string);
2655 }
2656
2657
2658 // --------------------------------------------------------------------------
2659 // Log debug-level message - lowest priority
2660
2661 void
2662 zsys_debug (const char *format, ...)
2663 {
2664 va_list argptr;
2665 va_start (argptr, format);
2666 char *string = zsys_vprintf (format, argptr);
2667 va_end (argptr);
2668 s_log ('D', string);
2669 zstr_free (&string);
2670 }
2671
2672 // --------------------------------------------------------------------------
2673 // Selftest
2674
2675 void
2676 zsys_test (bool verbose)
2677 {
2678 printf (" * zsys: ");
2679 if (verbose)
2680 printf ("\n");
2681
2682 // check that we can stop/restart the environnemnt
2683 zsys_shutdown();
2684 zsys_init();
2685 zsys_shutdown();
2686 zsys_init();
2687 #ifdef CZMQ_BUILD_DRAFT_API
2688 // just check if we can check for ipv6
2689 zsys_ipv6_available();
2690 #endif
2691
2692 // @selftest
2693 zsys_catch_interrupts ();
2694
2695 // Check capabilities without using the return value
2696 int rc = zsys_has_curve ();
2697
2698 const char *SELFTEST_DIR_RW = "src/selftest-rw";
2699
2700 if (verbose) {
2701 char *hostname = zsys_hostname ();
2702 zsys_info ("host name is %s", hostname);
2703 freen (hostname);
2704 zsys_info ("system limit is %zu ZeroMQ sockets", zsys_socket_limit ());
2705 }
2706 #ifdef CZMQ_BUILD_DRAFT_API
2707 zsys_set_file_stable_age_msec (5123);
2708 assert (zsys_file_stable_age_msec() == 5123);
2709 zsys_set_file_stable_age_msec (-1);
2710 assert (zsys_file_stable_age_msec() == 5123);
2711 #endif // CZMQ_BUILD_DRAFT_API
2712 zsys_set_linger (0);
2713 zsys_set_sndhwm (1000);
2714 zsys_set_rcvhwm (1000);
2715 zsys_set_pipehwm (2500);
2716 assert (zsys_pipehwm () == 2500);
2717 zsys_set_ipv6 (0);
2718 zsys_set_thread_priority (-1);
2719 zsys_set_thread_sched_policy (-1);
2720 zsys_set_thread_name_prefix (0);
2721 assert (0 == zsys_thread_name_prefix());
2722 assert (streq ("0", zsys_thread_name_prefix_str()));
2723 zsys_thread_affinity_cpu_add (0);
2724 zsys_thread_affinity_cpu_remove (0);
2725 zsys_set_zero_copy_recv(0);
2726 assert (0 == zsys_zero_copy_recv());
2727 zsys_set_zero_copy_recv(1);
2728 assert (1 == zsys_zero_copy_recv());
2729
2730 // Test pipe creation
2731 zsock_t *pipe_back;
2732 zsock_t *pipe_front = zsys_create_pipe (&pipe_back);
2733 zstr_send (pipe_front, "Hello");
2734 char *string = zstr_recv (pipe_back);
2735 assert (streq (string, "Hello"));
2736 freen (string);
2737 zsock_destroy (&pipe_back);
2738 zsock_destroy (&pipe_front);
2739
2740 // Test file manipulation
2741
2742 // Don't let anyone fool our workspace
2743 if (zsys_file_exists ("nosuchfile")) {
2744 zsys_warning ("zsys_test() had to remove 'nosuchfile' which was not expected here at all");
2745 zsys_file_delete ("nosuchfile");
2746 }
2747
2748 rc = zsys_file_delete ("nosuchfile");
2749 assert (rc == -1);
2750
2751 bool rc_bool = zsys_file_exists ("nosuchfile");
2752 assert (rc_bool != true);
2753
2754 rc = (int) zsys_file_size ("nosuchfile");
2755 assert (rc == -1);
2756
2757 time_t when = zsys_file_modified (".");
2758 assert (when > 0);
2759
2760 int mode = zsys_file_mode (".");
2761 assert (S_ISDIR (mode));
2762 assert (mode & S_IRUSR);
2763 assert (mode & S_IWUSR);
2764
2765 const char *testbasedir = ".testsys";
2766 const char *testsubdir = "subdir";
2767 char *basedirpath = NULL; // subdir in a test, under SELFTEST_DIR_RW
2768 char *dirpath = NULL; // subdir in a test, under basedirpath
2769 char *relsubdir = NULL; // relative short "path" of subdir under testbasedir
2770
2771 basedirpath = zsys_sprintf ("%s/%s", SELFTEST_DIR_RW, testbasedir);
2772 assert (basedirpath);
2773 dirpath = zsys_sprintf ("%s/%s", basedirpath, testsubdir);
2774 assert (dirpath);
2775 relsubdir = zsys_sprintf ("%s/%s", testbasedir, testsubdir);
2776 assert (relsubdir);
2777
2778 // Normally tests clean up in the end, but if a selftest run dies
2779 // e.g. on assert(), workspace remains dirty. Better clean it up.
2780 // We do not really care about results here - we clean up a possible
2781 // dirty exit of an older build. If there are permission errors etc.
2782 // the actual tests below would explode.
2783 if (zsys_file_exists(dirpath)) {
2784 if (verbose)
2785 zsys_debug ("zsys_test() has to remove ./%s that should not have been here", dirpath);
2786 zsys_dir_delete (dirpath);
2787 }
2788 if (zsys_file_exists (basedirpath)) {
2789 if (verbose)
2790 zsys_debug ("zsys_test() has to remove ./%s that should not have been here", basedirpath);
2791 zsys_dir_delete (basedirpath);
2792 }
2793
2794 // Added tracing because this file-age check fails on some systems
2795 // presumably due to congestion in a mass-build and valgrind on top
2796 zsys_file_mode_private ();
2797 if (verbose)
2798 printf ("zsys_test() at timestamp %" PRIi64 ": "
2799 "Creating %s\n",
2800 zclock_time(), relsubdir );
2801 rc = zsys_dir_create ("%s/%s", SELFTEST_DIR_RW, relsubdir);
2802 if (verbose)
2803 printf ("zsys_test() at timestamp %" PRIi64 ": "
2804 "Finished creating %s with return-code %d\n",
2805 zclock_time(), relsubdir, rc );
2806 assert (rc == 0);
2807 when = zsys_file_modified (dirpath);
2808 if (verbose)
2809 printf ("zsys_test() at timestamp %" PRIi64 ": "
2810 "Finished calling zsys_file_modified(), got age %jd\n",
2811 zclock_time(), (intmax_t)when );
2812 assert (when > 0);
2813 if (verbose)
2814 printf ("zsys_test() at timestamp %" PRIi64 ": "
2815 "Checking if file is NOT stable (is younger than 1 sec)\n",
2816 zclock_time() );
2817 assert (!s_zsys_file_stable (dirpath, verbose));
2818 if (verbose)
2819 printf ("zsys_test() at timestamp %" PRIi64 ": "
2820 "Passed the test, file is not stable - as expected\n",
2821 zclock_time() );
2822 rc = zsys_dir_delete ("%s/%s", SELFTEST_DIR_RW, relsubdir);
2823 assert (rc == 0);
2824 rc = zsys_dir_delete ("%s/%s", SELFTEST_DIR_RW, testbasedir);
2825 assert (rc == 0);
2826 zsys_file_mode_default ();
2827
2828 #if (defined (PATH_MAX))
2829 char cwd[PATH_MAX];
2830 #else
2831 # if (defined (_MAX_PATH))
2832 char cwd[_MAX_PATH];
2833 # else
2834 char cwd[1024];
2835 # endif
2836 #endif
2837 memset (cwd, 0, sizeof(cwd));
2838 #if (defined (WIN32))
2839 if (_getcwd(cwd, sizeof(cwd)) != NULL) {
2840 #else
2841 if (getcwd(cwd, sizeof(cwd)) != NULL) {
2842 #endif
2843 if (verbose)
2844 printf ("zsys_test() at timestamp %" PRIi64 ": "
2845 "current working directory is %s\n",
2846 zclock_time(), cwd);
2847 assert (zsys_dir_change (SELFTEST_DIR_RW) == 0);
2848 assert (zsys_dir_change (cwd) == 0);
2849 }
2850 else {
2851 zsys_warning ("zsys_test() : got getcwd() error... "
2852 "testing zsys_dir_change() anyway, but it can confuse "
2853 "subsequent tests in this process");
2854 assert (zsys_dir_change (SELFTEST_DIR_RW) == 0);
2855 }
2856
2857 zstr_free (&basedirpath);
2858 zstr_free (&dirpath);
2859 zstr_free (&relsubdir);
2860
2861 // Other subtests
2862 int major, minor, patch;
2863 zsys_version (&major, &minor, &patch);
2864 assert (major == CZMQ_VERSION_MAJOR);
2865 assert (minor == CZMQ_VERSION_MINOR);
2866 assert (patch == CZMQ_VERSION_PATCH);
2867
2868 string = zsys_sprintf ("%s %02x", "Hello", 16);
2869 assert (streq (string, "Hello 10"));
2870 freen (string);
2871
2872 char *str64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890,.";
2873 int num10 = 1234567890;
2874 string = zsys_sprintf ("%s%s%s%s%d", str64, str64, str64, str64, num10);
2875 assert (strlen (string) == (4 * 64 + 10));
2876 freen (string);
2877
2878 // Test logging system
2879 zsys_set_logident ("czmq_selftest");
2880 zsys_set_logsender ("inproc://logging");
2881 void *logger = zsys_socket (ZMQ_SUB, NULL, 0);
2882 assert (logger);
2883 rc = zmq_connect (logger, "inproc://logging");
2884 assert (rc == 0);
2885 rc = zmq_setsockopt (logger, ZMQ_SUBSCRIBE, "", 0);
2886 assert (rc == 0);
2887
2888 if (verbose) {
2889 zsys_error ("This is an %s message", "error");
2890 zsys_warning ("This is a %s message", "warning");
2891 zsys_notice ("This is a %s message", "notice");
2892 zsys_info ("This is a %s message", "info");
2893 zsys_debug ("This is a %s message", "debug");
2894 zsys_set_logident ("hello, world");
2895 zsys_info ("This is a %s message", "info");
2896 zsys_debug ("This is a %s message", "debug");
2897
2898 // Check that logsender functionality is working
2899 char *received = zstr_recv (logger);
2900 assert (received);
2901 zstr_free (&received);
2902 }
2903 zsys_close (logger, NULL, 0);
2904
2905 {
2906 // zhash based printf
2907 zhash_t *args = zhash_new ();
2908 zhash_insert (args, "key", "value");
2909 zhash_insert (args, "ham", "spam");
2910
2911 char *str = zsys_zprintf ("plain string", args);
2912 assert (streq (str, "plain string"));
2913 zstr_free (&str);
2914
2915 str = zsys_zprintf ("%%a%%", args);
2916 assert (streq (str, "%a%"));
2917 zstr_free (&str);
2918
2919 str = zsys_zprintf ("VALUE=%(key)s123", args);
2920 assert (streq (str, "VALUE=value123"));
2921 zstr_free (&str);
2922
2923 str = zsys_zprintf ("VALUE=%(key)s123, %(ham)s, %(ham)s, %%(nospam)s!!!", args);
2924 assert (streq (str, "VALUE=value123, spam, spam, %(nospam)s!!!"));
2925 zstr_free (&str);
2926
2927 str = zsys_zprintf ("VALUE=%(nokey)s123, %(ham)s, %(ham)s, %%(nospam)s!!!", args);
2928 assert (!str);
2929
2930 str = zsys_zprintf_error ("VALUE=%(nokey)s123, %(ham)s, %(ham)s, %%(nospam)s!!!", args);
2931 assert (streq (str, "Key 'nokey' not found in hash"));
2932 zstr_free (&str);
2933
2934 str = zsys_zprintf ("VALUE=%(key)s/%%S", args);
2935 assert (streq (str, "VALUE=value/%S"));
2936 zstr_free (&str);
2937
2938 zhash_destroy (&args);
2939
2940 //ZPL based printf
2941 zconfig_t *root = zconfig_new ("root", NULL);
2942 zconfig_put (root, "zsp", "");
2943 zconfig_put (root, "zsp/return_code", "0");
2944
2945 str = zsys_zplprintf ("return_code=%(zsp/return_code)s", root);
2946 assert (streq (str, "return_code=0"));
2947 zstr_free (&str);
2948
2949 zconfig_destroy (&root);
2950 }
2951
2952 // @end
2953
2954 zsys_set_auto_use_fd (1);
2955 assert (zsys_auto_use_fd () == 1);
2956
2957 assert (zsys_max_msgsz () == INT_MAX);
2958 zsys_set_max_msgsz (2000);
2959 assert (zsys_max_msgsz () == 2000);
2960 zsys_set_max_msgsz (-1);
2961 assert (zsys_max_msgsz () == 2000);
2962
2963 // cleanup log_sender
2964 zsys_set_logsender(NULL);
2965
2966 #if defined (__WINDOWS__)
2967 zsys_shutdown();
2968 #endif
2969
2970 printf ("OK\n");
2971 }
2972