1package zmq4
2
3/*
4#cgo !windows pkg-config: libzmq
5#cgo windows CFLAGS: -I/usr/local/include
6#cgo windows LDFLAGS: -L/usr/local/lib -lzmq
7#include <zmq.h>
8#if ZMQ_VERSION_MINOR < 2
9#include <zmq_utils.h>
10#endif
11#include <stdlib.h>
12#include <string.h>
13#include "zmq4.h"
14
15int
16    zmq4_major = ZMQ_VERSION_MAJOR,
17    zmq4_minor = ZMQ_VERSION_MINOR,
18    zmq4_patch = ZMQ_VERSION_PATCH;
19
20#if ZMQ_VERSION_MINOR > 0
21// Version >= 4.1.x
22
23typedef struct {
24    uint16_t event;  // id of the event as bitfield
25    int32_t  value;  // value is either error code, fd or reconnect interval
26} zmq_event_t;
27
28#else
29// Version == 4.0.x
30
31const char *zmq_msg_gets (zmq_msg_t *msg, const char *property) {
32    return NULL;
33}
34
35int zmq_has (const char *capability) {
36    return 0;
37}
38
39#if ZMQ_VERSION_PATCH < 5
40// Version < 4.0.5
41
42int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control) {
43    return -1;
44}
45
46#endif // Version < 4.0.5
47
48#endif // Version == 4.0.x
49
50void zmq4_get_event40(zmq_msg_t *msg, int *ev, int *val) {
51    zmq_event_t event;
52    const char* data = (char*)zmq_msg_data(msg);
53    memcpy(&(event.event), data, sizeof(event.event));
54    memcpy(&(event.value), data+sizeof(event.event), sizeof(event.value));
55    *ev = (int)(event.event);
56    *val = (int)(event.value);
57}
58void zmq4_get_event41(zmq_msg_t *msg, int *ev, int *val) {
59    uint8_t *data = (uint8_t *) zmq_msg_data (msg);
60    uint16_t event = *(uint16_t *) (data);
61    *ev = (int)event;
62    *val = (int)(*(uint32_t *) (data + 2));
63}
64void *zmq4_memcpy(void *dest, const void *src, size_t n) {
65    return memcpy(dest, src, n);
66}
67*/
68import "C"
69
70import (
71	"errors"
72	"fmt"
73	"runtime"
74	"strings"
75	"unsafe"
76)
77
78var (
79	defaultCtx *Context
80
81	major, minor, patch int
82
83	ErrorContextClosed         = errors.New("Context is closed")
84	ErrorSocketClosed          = errors.New("Socket is closed")
85	ErrorMoreExpected          = errors.New("More expected")
86	ErrorNotImplemented405     = errors.New("Not implemented, requires 0MQ version 4.0.5")
87	ErrorNotImplemented41      = errors.New("Not implemented, requires 0MQ version 4.1")
88	ErrorNotImplemented42      = errors.New("Not implemented, requires 0MQ version 4.2")
89	ErrorNotImplementedWindows = errors.New("Not implemented on Windows")
90	ErrorNoSocket              = errors.New("No such socket")
91
92	initVersionError error
93	initContextError error
94
95	// api compatibility, based on changes in header files
96	api = map[[2]int]int{
97		[2]int{0, 0}:  1,
98		[2]int{0, 1}:  2,
99		[2]int{0, 2}:  3,
100		[2]int{0, 3}:  3,
101		[2]int{0, 4}:  3,
102		[2]int{0, 5}:  4,
103		[2]int{0, 6}:  4,
104		[2]int{0, 7}:  4,
105		[2]int{0, 8}:  4,
106		[2]int{0, 9}:  4,
107		[2]int{0, 10}: 4,
108		[2]int{1, 0}:  5,
109		[2]int{1, 1}:  6,
110		[2]int{1, 2}:  6,
111		[2]int{1, 3}:  6,
112		[2]int{1, 4}:  6,
113		[2]int{1, 5}:  6,
114		[2]int{1, 6}:  7,
115		[2]int{1, 7}:  7,
116		[2]int{1, 8}:  7,
117		[2]int{2, 0}:  8,
118		[2]int{2, 1}:  9,
119		[2]int{2, 2}:  9,
120		[2]int{2, 3}:  9,
121		[2]int{2, 4}:  9,
122		[2]int{2, 5}:  9,
123		[2]int{3, 0}:  10,
124		[2]int{3, 1}:  10,
125		[2]int{3, 2}:  10,
126		[2]int{3, 3}:  10,
127		[2]int{3, 4}:  10,
128	}
129)
130
131func init() {
132	major, minor, patch = Version()
133	if major != 4 {
134		initVersionError = fmt.Errorf("Using zmq4 with ZeroMQ major version %d", major)
135		return
136	}
137
138	v, ok1 := api[[2]int{minor, patch}]
139	w, ok2 := api[[2]int{int(C.zmq4_minor), int(C.zmq4_patch)}]
140	if v != w || !ok1 || !ok2 {
141		if major != int(C.zmq4_major) || minor != int(C.zmq4_minor) || patch != int(C.zmq4_patch) {
142			initVersionError =
143				fmt.Errorf(
144					"zmq4 was compiled with ZeroMQ version %d.%d.%d, but the runtime links with version %d.%d.%d",
145					int(C.zmq4_major), int(C.zmq4_minor), int(C.zmq4_patch),
146					major, minor, patch)
147			return
148		}
149	}
150
151	var err error
152	defaultCtx = &Context{}
153	defaultCtx.ctx, err = C.zmq4_ctx_new()
154	if defaultCtx.ctx == nil {
155		initContextError = fmt.Errorf("Init of ZeroMQ context failed: %v", errget(err))
156		return
157	}
158	defaultCtx.opened = true
159	defaultCtx.retryEINTR = true
160}
161
162//. Util
163
164// Report 0MQ library version.
165func Version() (major, minor, patch int) {
166	if initVersionError != nil {
167		return 0, 0, 0
168	}
169	var maj, min, pat C.int
170	C.zmq_version(&maj, &min, &pat)
171	return int(maj), int(min), int(pat)
172}
173
174// Get 0MQ error message string.
175func Error(e int) string {
176	return C.GoString(C.zmq_strerror(C.int(e)))
177}
178
179//. Context
180
181const (
182	MaxSocketsDflt = int(C.ZMQ_MAX_SOCKETS_DFLT)
183	IoThreadsDflt  = int(C.ZMQ_IO_THREADS_DFLT)
184)
185
186/*
187A context that is not the default context.
188*/
189type Context struct {
190	ctx        unsafe.Pointer
191	retryEINTR bool
192	opened     bool
193	err        error
194}
195
196// Create a new context.
197func NewContext() (ctx *Context, err error) {
198	if initVersionError != nil {
199		return nil, initVersionError
200	}
201	ctx = &Context{}
202	c, e := C.zmq4_ctx_new()
203	if c == nil {
204		err = errget(e)
205		ctx.err = err
206	} else {
207		ctx.ctx = c
208		ctx.retryEINTR = true
209		ctx.opened = true
210		runtime.SetFinalizer(ctx, (*Context).Term)
211	}
212	return
213}
214
215/*
216Terminates the default context.
217
218For linger behavior, see: http://api.zeromq.org/4-1:zmq-ctx-term
219*/
220func Term() error {
221	if initVersionError != nil {
222		return initVersionError
223	}
224	if initContextError != nil {
225		return initContextError
226	}
227	return defaultCtx.Term()
228}
229
230/*
231Terminates the context.
232
233For linger behavior, see: http://api.zeromq.org/4-1:zmq-ctx-term
234*/
235func (ctx *Context) Term() error {
236	if ctx.opened {
237		ctx.opened = false
238		var n C.int
239		var err error
240		for {
241			n, err = C.zmq4_ctx_term(ctx.ctx)
242			if n == 0 || !ctx.retry(err) {
243				break
244			}
245		}
246		if n != 0 {
247			ctx.err = errget(err)
248		}
249	}
250	return ctx.err
251}
252
253func getOption(ctx *Context, o C.int) (int, error) {
254	if !ctx.opened {
255		return 0, ErrorContextClosed
256	}
257	nc, err := C.zmq4_ctx_get(ctx.ctx, o)
258	n := int(nc)
259	if n < 0 {
260		return n, errget(err)
261	}
262	return n, nil
263}
264
265// Returns the size of the 0MQ thread pool in the default context.
266func GetIoThreads() (int, error) {
267	if initVersionError != nil {
268		return 0, initVersionError
269	}
270	if initContextError != nil {
271		return 0, initContextError
272	}
273	return defaultCtx.GetIoThreads()
274}
275
276// Returns the size of the 0MQ thread pool.
277func (ctx *Context) GetIoThreads() (int, error) {
278	return getOption(ctx, C.ZMQ_IO_THREADS)
279}
280
281// Returns the maximum number of sockets allowed in the default context.
282func GetMaxSockets() (int, error) {
283	if initVersionError != nil {
284		return 0, initVersionError
285	}
286	if initContextError != nil {
287		return 0, initContextError
288	}
289	return defaultCtx.GetMaxSockets()
290}
291
292// Returns the maximum number of sockets allowed.
293func (ctx *Context) GetMaxSockets() (int, error) {
294	return getOption(ctx, C.ZMQ_MAX_SOCKETS)
295}
296
297/*
298Returns the maximum message size in the default context.
299
300Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
301*/
302func GetMaxMsgsz() (int, error) {
303	if initVersionError != nil {
304		return 0, initVersionError
305	}
306	if initContextError != nil {
307		return 0, initContextError
308	}
309	return defaultCtx.GetMaxMsgsz()
310}
311
312/*
313Returns the maximum message size.
314
315Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
316*/
317func (ctx *Context) GetMaxMsgsz() (int, error) {
318	if minor < 2 {
319		return 0, ErrorNotImplemented42
320	}
321	return getOption(ctx, C.ZMQ_MAX_MSGSZ)
322}
323
324// Returns the IPv6 option in the default context.
325func GetIpv6() (bool, error) {
326	if initVersionError != nil {
327		return false, initVersionError
328	}
329	if initContextError != nil {
330		return false, initContextError
331	}
332	return defaultCtx.GetIpv6()
333}
334
335// Returns the IPv6 option.
336func (ctx *Context) GetIpv6() (bool, error) {
337	i, e := getOption(ctx, C.ZMQ_IPV6)
338	if i == 0 {
339		return false, e
340	}
341	return true, e
342}
343
344/*
345Returns the blocky setting in the default context.
346
347Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
348*/
349func GetBlocky() (bool, error) {
350	if initVersionError != nil {
351		return false, initVersionError
352	}
353	if initContextError != nil {
354		return false, initContextError
355	}
356	return defaultCtx.GetBlocky()
357}
358
359/*
360Returns the blocky setting.
361
362Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
363*/
364func (ctx *Context) GetBlocky() (bool, error) {
365	if minor < 2 {
366		return false, ErrorNotImplemented42
367	}
368	i, e := getOption(ctx, C.ZMQ_BLOCKY)
369	if i == 0 {
370		return false, e
371	}
372	return true, e
373}
374
375/*
376Returns the retry after EINTR setting in the default context.
377*/
378func GetRetryAfterEINTR() bool {
379	return defaultCtx.GetRetryAfterEINTR()
380}
381
382/*
383Returns the retry after EINTR setting.
384*/
385func (ctx *Context) GetRetryAfterEINTR() bool {
386	return ctx.retryEINTR
387}
388
389func setOption(ctx *Context, o C.int, n int) error {
390	if !ctx.opened {
391		return ErrorContextClosed
392	}
393	i, err := C.zmq4_ctx_set(ctx.ctx, o, C.int(n))
394	if int(i) != 0 {
395		return errget(err)
396	}
397	return nil
398}
399
400/*
401Specifies the size of the 0MQ thread pool to handle I/O operations in
402the default context. If your application is using only the inproc
403transport for messaging you may set this to zero, otherwise set it to at
404least one. This option only applies before creating any sockets.
405
406Default value: 1
407*/
408func SetIoThreads(n int) error {
409	if initVersionError != nil {
410		return initVersionError
411	}
412	if initContextError != nil {
413		return initContextError
414	}
415	return defaultCtx.SetIoThreads(n)
416}
417
418/*
419Specifies the size of the 0MQ thread pool to handle I/O operations. If
420your application is using only the inproc transport for messaging you
421may set this to zero, otherwise set it to at least one. This option only
422applies before creating any sockets.
423
424Default value: 1
425*/
426func (ctx *Context) SetIoThreads(n int) error {
427	return setOption(ctx, C.ZMQ_IO_THREADS, n)
428}
429
430/*
431Sets the scheduling policy for default context’s thread pool.
432
433This option requires ZeroMQ version 4.1, and is not available on Windows.
434
435Supported values for this option can be found in sched.h file, or at
436http://man7.org/linux/man-pages/man2/sched_setscheduler.2.html
437
438This option only applies before creating any sockets on the context.
439
440Default value: -1
441
442Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
443
444Returns ErrorNotImplementedWindows on Windows
445*/
446func SetThreadSchedPolicy(n int) error {
447	if initVersionError != nil {
448		return initVersionError
449	}
450	if initContextError != nil {
451		return initContextError
452	}
453	return defaultCtx.SetThreadSchedPolicy(n)
454}
455
456/*
457Sets scheduling priority for default context’s thread pool.
458
459This option requires ZeroMQ version 4.1, and is not available on Windows.
460
461Supported values for this option depend on chosen scheduling policy.
462Details can be found in sched.h file, or at
463http://man7.org/linux/man-pages/man2/sched_setscheduler.2.html
464
465This option only applies before creating any sockets on the context.
466
467Default value: -1
468
469Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
470
471Returns ErrorNotImplementedWindows on Windows
472*/
473func SetThreadPriority(n int) error {
474	if initVersionError != nil {
475		return initVersionError
476	}
477	if initContextError != nil {
478		return initContextError
479	}
480	return defaultCtx.SetThreadPriority(n)
481}
482
483/*
484Set maximum message size in the default context.
485
486Default value: INT_MAX
487
488Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
489*/
490func SetMaxMsgsz(n int) error {
491	if initVersionError != nil {
492		return initVersionError
493	}
494	if initContextError != nil {
495		return initContextError
496	}
497	return defaultCtx.SetMaxMsgsz(n)
498}
499
500/*
501Set maximum message size.
502
503Default value: INT_MAX
504
505Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
506*/
507func (ctx *Context) SetMaxMsgsz(n int) error {
508	if minor < 2 {
509		return ErrorNotImplemented42
510	}
511	return setOption(ctx, C.ZMQ_MAX_MSGSZ, n)
512}
513
514/*
515Sets the maximum number of sockets allowed in the default context.
516
517Default value: 1024
518*/
519func SetMaxSockets(n int) error {
520	if initVersionError != nil {
521		return initVersionError
522	}
523	if initContextError != nil {
524		return initContextError
525	}
526	return defaultCtx.SetMaxSockets(n)
527}
528
529/*
530Sets the maximum number of sockets allowed.
531
532Default value: 1024
533*/
534func (ctx *Context) SetMaxSockets(n int) error {
535	return setOption(ctx, C.ZMQ_MAX_SOCKETS, n)
536}
537
538/*
539Sets the IPv6 value for all sockets created in the default context from this point onwards.
540A value of true means IPv6 is enabled, while false means the socket will use only IPv4.
541When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.
542
543Default value: false
544*/
545func SetIpv6(i bool) error {
546	if initVersionError != nil {
547		return initVersionError
548	}
549	if initContextError != nil {
550		return initContextError
551	}
552	return defaultCtx.SetIpv6(i)
553}
554
555/*
556Sets the IPv6 value for all sockets created in the context from this point onwards.
557A value of true means IPv6 is enabled, while false means the socket will use only IPv4.
558When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.
559
560Default value: false
561*/
562func (ctx *Context) SetIpv6(i bool) error {
563	n := 0
564	if i {
565		n = 1
566	}
567	return setOption(ctx, C.ZMQ_IPV6, n)
568}
569
570/*
571Sets the blocky behavior in the default context.
572
573See: http://api.zeromq.org/4-2:zmq-ctx-set#toc3
574
575Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
576*/
577func SetBlocky(i bool) error {
578	if initVersionError != nil {
579		return initVersionError
580	}
581	if initContextError != nil {
582		return initContextError
583	}
584	return defaultCtx.SetBlocky(i)
585}
586
587/*
588Sets the blocky behavior.
589
590See: http://api.zeromq.org/4-2:zmq-ctx-set#toc3
591
592Returns ErrorNotImplemented42 with ZeroMQ version < 4.2
593*/
594func (ctx *Context) SetBlocky(i bool) error {
595	if minor < 2 {
596		return ErrorNotImplemented42
597	}
598	n := 0
599	if i {
600		n = 1
601	}
602	return setOption(ctx, C.ZMQ_BLOCKY, n)
603}
604
605/*
606Sets the retry after EINTR setting in the default context.
607
608Initital value is true.
609*/
610func SetRetryAfterEINTR(retry bool) {
611	defaultCtx.SetRetryAfterEINTR(retry)
612}
613
614/*
615Sets the retry after EINTR setting.
616
617Initital value is true.
618*/
619func (ctx *Context) SetRetryAfterEINTR(retry bool) {
620	ctx.retryEINTR = retry
621}
622
623//. Sockets
624
625// Specifies the type of a socket, used by NewSocket()
626type Type int
627
628const (
629	// Constants for NewSocket()
630	// See: http://api.zeromq.org/4-1:zmq-socket#toc3
631	REQ    = Type(C.ZMQ_REQ)
632	REP    = Type(C.ZMQ_REP)
633	DEALER = Type(C.ZMQ_DEALER)
634	ROUTER = Type(C.ZMQ_ROUTER)
635	PUB    = Type(C.ZMQ_PUB)
636	SUB    = Type(C.ZMQ_SUB)
637	XPUB   = Type(C.ZMQ_XPUB)
638	XSUB   = Type(C.ZMQ_XSUB)
639	PUSH   = Type(C.ZMQ_PUSH)
640	PULL   = Type(C.ZMQ_PULL)
641	PAIR   = Type(C.ZMQ_PAIR)
642	STREAM = Type(C.ZMQ_STREAM)
643)
644
645/*
646Socket type as string.
647*/
648func (t Type) String() string {
649	switch t {
650	case REQ:
651		return "REQ"
652	case REP:
653		return "REP"
654	case DEALER:
655		return "DEALER"
656	case ROUTER:
657		return "ROUTER"
658	case PUB:
659		return "PUB"
660	case SUB:
661		return "SUB"
662	case XPUB:
663		return "XPUB"
664	case XSUB:
665		return "XSUB"
666	case PUSH:
667		return "PUSH"
668	case PULL:
669		return "PULL"
670	case PAIR:
671		return "PAIR"
672	case STREAM:
673		return "STREAM"
674	}
675	return "<INVALID>"
676}
677
678// Used by  (*Socket)Send() and (*Socket)Recv()
679type Flag int
680
681const (
682	// Flags for (*Socket)Send(), (*Socket)Recv()
683	// For Send, see: http://api.zeromq.org/4-1:zmq-send#toc2
684	// For Recv, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2
685	DONTWAIT = Flag(C.ZMQ_DONTWAIT)
686	SNDMORE  = Flag(C.ZMQ_SNDMORE)
687)
688
689/*
690Socket flag as string.
691*/
692func (f Flag) String() string {
693	ff := make([]string, 0)
694	if f&DONTWAIT != 0 {
695		ff = append(ff, "DONTWAIT")
696	}
697	if f&SNDMORE != 0 {
698		ff = append(ff, "SNDMORE")
699	}
700	if len(ff) == 0 {
701		return "<NONE>"
702	}
703	return strings.Join(ff, "|")
704}
705
706// Used by (*Socket)Monitor() and (*Socket)RecvEvent()
707type Event int
708
709const (
710	// Flags for (*Socket)Monitor() and (*Socket)RecvEvent()
711	// See: http://api.zeromq.org/4-3:zmq-socket-monitor#toc3
712	EVENT_ALL                        = Event(C.ZMQ_EVENT_ALL)
713	EVENT_CONNECTED                  = Event(C.ZMQ_EVENT_CONNECTED)
714	EVENT_CONNECT_DELAYED            = Event(C.ZMQ_EVENT_CONNECT_DELAYED)
715	EVENT_CONNECT_RETRIED            = Event(C.ZMQ_EVENT_CONNECT_RETRIED)
716	EVENT_LISTENING                  = Event(C.ZMQ_EVENT_LISTENING)
717	EVENT_BIND_FAILED                = Event(C.ZMQ_EVENT_BIND_FAILED)
718	EVENT_ACCEPTED                   = Event(C.ZMQ_EVENT_ACCEPTED)
719	EVENT_ACCEPT_FAILED              = Event(C.ZMQ_EVENT_ACCEPT_FAILED)
720	EVENT_CLOSED                     = Event(C.ZMQ_EVENT_CLOSED)
721	EVENT_CLOSE_FAILED               = Event(C.ZMQ_EVENT_CLOSE_FAILED)
722	EVENT_DISCONNECTED               = Event(C.ZMQ_EVENT_DISCONNECTED)
723	EVENT_MONITOR_STOPPED            = Event(C.ZMQ_EVENT_MONITOR_STOPPED)
724	EVENT_HANDSHAKE_FAILED_NO_DETAIL = Event(C.ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL)
725	EVENT_HANDSHAKE_SUCCEEDED        = Event(C.ZMQ_EVENT_HANDSHAKE_SUCCEEDED)
726	EVENT_HANDSHAKE_FAILED_PROTOCOL  = Event(C.ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL)
727	EVENT_HANDSHAKE_FAILED_AUTH      = Event(C.ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)
728)
729
730/*
731Socket event as string.
732*/
733func (e Event) String() string {
734	if e == EVENT_ALL {
735		return "EVENT_ALL"
736	}
737	ee := make([]string, 0)
738	if e&EVENT_CONNECTED != 0 {
739		ee = append(ee, "EVENT_CONNECTED")
740	}
741	if e&EVENT_CONNECT_DELAYED != 0 {
742		ee = append(ee, "EVENT_CONNECT_DELAYED")
743	}
744	if e&EVENT_CONNECT_RETRIED != 0 {
745		ee = append(ee, "EVENT_CONNECT_RETRIED")
746	}
747	if e&EVENT_LISTENING != 0 {
748		ee = append(ee, "EVENT_LISTENING")
749	}
750	if e&EVENT_BIND_FAILED != 0 {
751		ee = append(ee, "EVENT_BIND_FAILED")
752	}
753	if e&EVENT_ACCEPTED != 0 {
754		ee = append(ee, "EVENT_ACCEPTED")
755	}
756	if e&EVENT_ACCEPT_FAILED != 0 {
757		ee = append(ee, "EVENT_ACCEPT_FAILED")
758	}
759	if e&EVENT_CLOSED != 0 {
760		ee = append(ee, "EVENT_CLOSED")
761	}
762	if e&EVENT_CLOSE_FAILED != 0 {
763		ee = append(ee, "EVENT_CLOSE_FAILED")
764	}
765	if e&EVENT_DISCONNECTED != 0 {
766		ee = append(ee, "EVENT_DISCONNECTED")
767	}
768	if minor >= 3 {
769		if e&EVENT_HANDSHAKE_FAILED_NO_DETAIL != 0 {
770			ee = append(ee, "EVENT_HANDSHAKE_FAILED_NO_DETAIL")
771		}
772		if e&EVENT_HANDSHAKE_SUCCEEDED != 0 {
773			ee = append(ee, "EVENT_HANDSHAKE_SUCCEEDED")
774		}
775		if e&EVENT_HANDSHAKE_FAILED_PROTOCOL != 0 {
776			ee = append(ee, "EVENT_HANDSHAKE_FAILED_PROTOCOL")
777		}
778		if e&EVENT_HANDSHAKE_FAILED_AUTH != 0 {
779			ee = append(ee, "EVENT_HANDSHAKE_FAILED_AUTH")
780		}
781	}
782	if len(ee) == 0 {
783		return "<NONE>"
784	}
785	return strings.Join(ee, "|")
786}
787
788// Used by (soc *Socket)GetEvents()
789type State int
790
791const (
792	// Flags for (*Socket)GetEvents()
793	// See: http://api.zeromq.org/4-1:zmq-getsockopt#toc8
794	POLLIN  = State(C.ZMQ_POLLIN)
795	POLLOUT = State(C.ZMQ_POLLOUT)
796)
797
798/*
799Socket state as string.
800*/
801func (s State) String() string {
802	ss := make([]string, 0)
803	if s&POLLIN != 0 {
804		ss = append(ss, "POLLIN")
805	}
806	if s&POLLOUT != 0 {
807		ss = append(ss, "POLLOUT")
808	}
809	if len(ss) == 0 {
810		return "<NONE>"
811	}
812	return strings.Join(ss, "|")
813}
814
815// Specifies the security mechanism, used by (*Socket)GetMechanism()
816type Mechanism int
817
818const (
819	// Constants for (*Socket)GetMechanism()
820	// See: http://api.zeromq.org/4-1:zmq-getsockopt#toc22
821	NULL   = Mechanism(C.ZMQ_NULL)
822	PLAIN  = Mechanism(C.ZMQ_PLAIN)
823	CURVE  = Mechanism(C.ZMQ_CURVE)
824	GSSAPI = Mechanism(C.ZMQ_GSSAPI)
825)
826
827/*
828Security mechanism as string.
829*/
830func (m Mechanism) String() string {
831	switch m {
832	case NULL:
833		return "NULL"
834	case PLAIN:
835		return "PLAIN"
836	case CURVE:
837		return "CURVE"
838	case GSSAPI:
839		return "GSSAPI"
840	}
841	return "<INVALID>"
842}
843
844/*
845Socket functions starting with `Set` or `Get` are used for setting and
846getting socket options.
847*/
848type Socket struct {
849	soc    unsafe.Pointer
850	ctx    *Context
851	opened bool
852	err    error
853}
854
855/*
856Socket as string.
857*/
858func (soc Socket) String() string {
859	if !soc.opened {
860		return "Socket(CLOSED)"
861	}
862	t, err := soc.GetType()
863	if err != nil {
864		return fmt.Sprintf("Socket(%v)", err)
865	}
866	i, err := soc.GetIdentity()
867	if err == nil && i != "" {
868		return fmt.Sprintf("Socket(%v,%q)", t, i)
869	}
870	return fmt.Sprintf("Socket(%v,%p)", t, soc.soc)
871}
872
873/*
874Create 0MQ socket in the default context.
875
876WARNING:
877The Socket is not thread safe. This means that you cannot access the same Socket
878from different goroutines without using something like a mutex.
879
880For a description of socket types, see: http://api.zeromq.org/4-1:zmq-socket#toc3
881*/
882func NewSocket(t Type) (soc *Socket, err error) {
883	if initVersionError != nil {
884		return nil, initVersionError
885	}
886	if initContextError != nil {
887		return nil, initContextError
888	}
889	return defaultCtx.NewSocket(t)
890}
891
892/*
893Create 0MQ socket in the given context.
894
895WARNING:
896The Socket is not thread safe. This means that you cannot access the same Socket
897from different goroutines without using something like a mutex.
898
899For a description of socket types, see: http://api.zeromq.org/4-1:zmq-socket#toc3
900*/
901func (ctx *Context) NewSocket(t Type) (soc *Socket, err error) {
902	soc = &Socket{}
903	if !ctx.opened {
904		return soc, ErrorContextClosed
905	}
906	var s unsafe.Pointer
907	var e error
908	for {
909		s, e = C.zmq4_socket(ctx.ctx, C.int(t))
910		if s != nil || !ctx.retry(e) {
911			break
912		}
913	}
914	if s == nil {
915		err = errget(e)
916		soc.err = err
917	} else {
918		soc.soc = s
919		soc.ctx = ctx
920		soc.opened = true
921		runtime.SetFinalizer(soc, (*Socket).Close)
922	}
923	return
924}
925
926// If not called explicitly, the socket will be closed on garbage collection
927func (soc *Socket) Close() error {
928	if soc.opened {
929		soc.opened = false
930		var i C.int
931		var err error
932		for {
933			i, err = C.zmq4_close(soc.soc)
934			if i == 0 || !soc.ctx.retry(err) {
935				break
936			}
937		}
938		if int(i) != 0 {
939			soc.err = errget(err)
940		}
941		soc.soc = unsafe.Pointer(nil)
942		soc.ctx = nil
943	}
944	return soc.err
945}
946
947// Return the context associated with a socket
948func (soc *Socket) Context() (*Context, error) {
949	if !soc.opened {
950		return nil, ErrorSocketClosed
951	}
952	return soc.ctx, nil
953}
954
955/*
956Accept incoming connections on a socket.
957
958For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-bind#toc2
959*/
960func (soc *Socket) Bind(endpoint string) error {
961	if !soc.opened {
962		return ErrorSocketClosed
963	}
964	s := C.CString(endpoint)
965	defer C.free(unsafe.Pointer(s))
966	var i C.int
967	var err error
968	for {
969		i, err = C.zmq4_bind(soc.soc, s)
970		if i == 0 || !soc.ctx.retry(err) {
971			break
972		}
973	}
974	if int(i) != 0 {
975		return errget(err)
976	}
977	return nil
978}
979
980/*
981Stop accepting connections on a socket.
982
983For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-bind#toc2
984*/
985func (soc *Socket) Unbind(endpoint string) error {
986	if !soc.opened {
987		return ErrorSocketClosed
988	}
989	s := C.CString(endpoint)
990	defer C.free(unsafe.Pointer(s))
991	var i C.int
992	var err error
993	for {
994		i, err = C.zmq4_unbind(soc.soc, s)
995		if i == 0 || !soc.ctx.retry(err) {
996			break
997		}
998	}
999	if int(i) != 0 {
1000		return errget(err)
1001	}
1002	return nil
1003}
1004
1005/*
1006Create outgoing connection from socket.
1007
1008For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-connect#toc2
1009*/
1010func (soc *Socket) Connect(endpoint string) error {
1011	if !soc.opened {
1012		return ErrorSocketClosed
1013	}
1014	s := C.CString(endpoint)
1015	defer C.free(unsafe.Pointer(s))
1016	var i C.int
1017	var err error
1018	for {
1019		i, err = C.zmq4_connect(soc.soc, s)
1020		if i == 0 || !soc.ctx.retry(err) {
1021			break
1022		}
1023	}
1024	if int(i) != 0 {
1025		return errget(err)
1026	}
1027	return nil
1028}
1029
1030/*
1031Disconnect a socket.
1032
1033For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-disconnect#toc2
1034*/
1035func (soc *Socket) Disconnect(endpoint string) error {
1036	if !soc.opened {
1037		return ErrorSocketClosed
1038	}
1039	s := C.CString(endpoint)
1040	defer C.free(unsafe.Pointer(s))
1041	var i C.int
1042	var err error
1043	for {
1044		i, err = C.zmq4_disconnect(soc.soc, s)
1045		if i == 0 || !soc.ctx.retry(err) {
1046			break
1047		}
1048	}
1049	if int(i) != 0 {
1050		return errget(err)
1051	}
1052	return nil
1053}
1054
1055/*
1056Receive a message part from a socket.
1057
1058For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2
1059*/
1060func (soc *Socket) Recv(flags Flag) (string, error) {
1061	b, err := soc.RecvBytes(flags)
1062	return string(b), err
1063}
1064
1065/*
1066Receive a message part from a socket.
1067
1068For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2
1069*/
1070func (soc *Socket) RecvBytes(flags Flag) ([]byte, error) {
1071	if !soc.opened {
1072		return []byte{}, ErrorSocketClosed
1073	}
1074	var msg C.zmq_msg_t
1075	C.zmq_msg_init(&msg)
1076	defer C.zmq_msg_close(&msg)
1077
1078	var size C.int
1079	var err error
1080	for {
1081		size, err = C.zmq4_msg_recv(&msg, soc.soc, C.int(flags))
1082		if size >= 0 || !soc.ctx.retry(err) {
1083			break
1084		}
1085	}
1086	if size < 0 {
1087		return []byte{}, errget(err)
1088	}
1089	if size == 0 {
1090		return []byte{}, nil
1091	}
1092	data := make([]byte, int(size))
1093	C.zmq4_memcpy(unsafe.Pointer(&data[0]), C.zmq_msg_data(&msg), C.size_t(size))
1094	return data, nil
1095}
1096
1097/*
1098Send a message part on a socket.
1099
1100For a description of flags, see: http://api.zeromq.org/4-1:zmq-send#toc2
1101*/
1102func (soc *Socket) Send(data string, flags Flag) (int, error) {
1103	return soc.SendBytes([]byte(data), flags)
1104}
1105
1106/*
1107Send a message part on a socket.
1108
1109For a description of flags, see: http://api.zeromq.org/4-1:zmq-send#toc2
1110*/
1111func (soc *Socket) SendBytes(data []byte, flags Flag) (int, error) {
1112	if !soc.opened {
1113		return 0, ErrorSocketClosed
1114	}
1115	d := data
1116	if len(data) == 0 {
1117		d = []byte{0}
1118	}
1119	var size C.int
1120	var err error
1121	for {
1122		size, err = C.zmq4_send(soc.soc, unsafe.Pointer(&d[0]), C.size_t(len(data)), C.int(flags))
1123		if size >= 0 || !soc.ctx.retry(err) {
1124			break
1125		}
1126	}
1127	if size < 0 {
1128		return int(size), errget(err)
1129	}
1130	return int(size), nil
1131}
1132
1133/*
1134Register a monitoring callback.
1135
1136See: http://api.zeromq.org/4-1:zmq-socket-monitor#toc2
1137
1138WARNING: Closing a context with a monitoring callback will lead to random crashes.
1139This is a bug in the ZeroMQ library.
1140The monitoring callback has the same context as the socket it was created for.
1141
1142Example:
1143
1144    package main
1145
1146    import (
1147        zmq "github.com/pebbe/zmq4"
1148        "log"
1149        "time"
1150    )
1151
1152    func rep_socket_monitor(addr string) {
1153        s, err := zmq.NewSocket(zmq.PAIR)
1154        if err != nil {
1155            log.Fatalln(err)
1156        }
1157        err = s.Connect(addr)
1158        if err != nil {
1159            log.Fatalln(err)
1160        }
1161        for {
1162            a, b, c, err := s.RecvEvent(0)
1163            if err != nil {
1164                log.Println(err)
1165                break
1166            }
1167            log.Println(a, b, c)
1168        }
1169        s.Close()
1170    }
1171
1172    func main() {
1173
1174        // REP socket
1175        rep, err := zmq.NewSocket(zmq.REP)
1176        if err != nil {
1177            log.Fatalln(err)
1178        }
1179
1180        // REP socket monitor, all events
1181        err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL)
1182        if err != nil {
1183            log.Fatalln(err)
1184        }
1185        go rep_socket_monitor("inproc://monitor.rep")
1186
1187        // Generate an event
1188        rep.Bind("tcp://*:5555")
1189        if err != nil {
1190            log.Fatalln(err)
1191        }
1192
1193        // Allow some time for event detection
1194        time.Sleep(time.Second)
1195
1196        rep.Close()
1197        zmq.Term()
1198    }
1199*/
1200func (soc *Socket) Monitor(addr string, events Event) error {
1201	if !soc.opened {
1202		return ErrorSocketClosed
1203	}
1204	if addr == "" {
1205		var i C.int
1206		var err error
1207		for {
1208			i, err = C.zmq4_socket_monitor(soc.soc, nil, C.int(events))
1209			if i == 0 || !soc.ctx.retry(err) {
1210				break
1211			}
1212		}
1213		if i != 0 {
1214			return errget(err)
1215		}
1216		return nil
1217	}
1218
1219	s := C.CString(addr)
1220	defer C.free(unsafe.Pointer(s))
1221	var i C.int
1222	var err error
1223	for {
1224		i, err = C.zmq4_socket_monitor(soc.soc, s, C.int(events))
1225		if i == 0 || !soc.ctx.retry(err) {
1226			break
1227		}
1228	}
1229	if i != 0 {
1230		return errget(err)
1231	}
1232	return nil
1233}
1234
1235/*
1236Receive a message part from a socket interpreted as an event.
1237
1238For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2
1239
1240For a description of event_type, see: http://api.zeromq.org/4-1:zmq-socket-monitor#toc3
1241
1242For an example, see: func (*Socket) Monitor
1243*/
1244func (soc *Socket) RecvEvent(flags Flag) (event_type Event, addr string, value int, err error) {
1245	if !soc.opened {
1246		return EVENT_ALL, "", 0, ErrorSocketClosed
1247	}
1248	var msg C.zmq_msg_t
1249	C.zmq_msg_init(&msg)
1250	defer C.zmq_msg_close(&msg)
1251	var size C.int
1252	var e error
1253	for {
1254		size, e = C.zmq4_msg_recv(&msg, soc.soc, C.int(flags))
1255		if size >= 0 || !soc.ctx.retry(e) {
1256			break
1257		}
1258	}
1259	if size < 0 {
1260		err = errget(e)
1261		return
1262	}
1263	et := C.int(0)
1264	val := C.int(0)
1265
1266	if minor == 0 {
1267		C.zmq4_get_event40(&msg, &et, &val)
1268	} else {
1269		C.zmq4_get_event41(&msg, &et, &val)
1270	}
1271	more, e := soc.GetRcvmore()
1272	if e != nil {
1273		err = errget(e)
1274		return
1275	}
1276	if !more {
1277		err = ErrorMoreExpected
1278		return
1279	}
1280	addr, e = soc.Recv(flags)
1281	if e != nil {
1282		err = errget(e)
1283		return
1284	}
1285
1286	event_type = Event(et)
1287	value = int(val)
1288
1289	return
1290}
1291
1292/*
1293Start built-in ØMQ proxy
1294
1295See: http://api.zeromq.org/4-1:zmq-proxy#toc2
1296*/
1297func Proxy(frontend, backend, capture *Socket) error {
1298	if !(frontend.opened && backend.opened && (capture == nil || capture.opened)) {
1299		return ErrorSocketClosed
1300	}
1301	var capt unsafe.Pointer
1302	if capture != nil {
1303		capt = capture.soc
1304	}
1305	var err error
1306	for {
1307		_, err = C.zmq4_proxy(frontend.soc, backend.soc, capt)
1308		if !frontend.ctx.retry(err) {
1309			break
1310		}
1311	}
1312	return errget(err)
1313}
1314
1315/*
1316Start built-in ØMQ proxy with PAUSE/RESUME/TERMINATE control flow
1317
1318Returns ErrorNotImplemented405 with ZeroMQ version < 4.0.5
1319
1320See: http://api.zeromq.org/4-1:zmq-proxy-steerable#toc2
1321*/
1322func ProxySteerable(frontend, backend, capture, control *Socket) error {
1323	if minor == 0 && patch < 5 {
1324		return ErrorNotImplemented405
1325	}
1326	if !(frontend.opened && backend.opened && (capture == nil || capture.opened) && (control == nil || control.opened)) {
1327		return ErrorSocketClosed
1328	}
1329	var capt, ctrl unsafe.Pointer
1330	if capture != nil {
1331		capt = capture.soc
1332	}
1333	if control != nil {
1334		ctrl = control.soc
1335	}
1336	var i C.int
1337	var err error
1338	for {
1339		i, err = C.zmq4_proxy_steerable(frontend.soc, backend.soc, capt, ctrl)
1340		if i >= 0 || !frontend.ctx.retry(err) {
1341			break
1342		}
1343	}
1344	if i < 0 {
1345		return errget(err)
1346	}
1347	return nil
1348}
1349
1350//. CURVE
1351
1352/*
1353Encode a binary key as Z85 printable text
1354
1355See: http://api.zeromq.org/4-1:zmq-z85-encode
1356*/
1357func Z85encode(data string) string {
1358	if initVersionError != nil {
1359		return initVersionError.Error()
1360	}
1361	l1 := len(data)
1362	if l1%4 != 0 {
1363		panic("Z85encode: Length of data not a multiple of 4")
1364	}
1365	d := []byte(data)
1366
1367	l2 := 5 * l1 / 4
1368	dest := make([]byte, l2+1)
1369
1370	C.zmq_z85_encode((*C.char)(unsafe.Pointer(&dest[0])), (*C.uint8_t)(&d[0]), C.size_t(l1))
1371
1372	return string(dest[:l2])
1373}
1374
1375/*
1376Decode a binary key from Z85 printable text
1377
1378See: http://api.zeromq.org/4-1:zmq-z85-decode
1379*/
1380func Z85decode(s string) string {
1381	if initVersionError != nil {
1382		return initVersionError.Error()
1383	}
1384	l1 := len(s)
1385	if l1%5 != 0 {
1386		panic("Z85decode: Length of Z85 string not a multiple of 5")
1387	}
1388	l2 := 4 * l1 / 5
1389	dest := make([]byte, l2)
1390	cs := C.CString(s)
1391	defer C.free(unsafe.Pointer(cs))
1392	C.zmq_z85_decode((*C.uint8_t)(&dest[0]), cs)
1393	return string(dest)
1394}
1395
1396/*
1397Generate a new CURVE keypair
1398
1399See: http://api.zeromq.org/4-1:zmq-curve-keypair#toc2
1400*/
1401func NewCurveKeypair() (z85_public_key, z85_secret_key string, err error) {
1402	if initVersionError != nil {
1403		return "", "", initVersionError
1404	}
1405	var pubkey, seckey [41]byte
1406	if i, err := C.zmq4_curve_keypair((*C.char)(unsafe.Pointer(&pubkey[0])), (*C.char)(unsafe.Pointer(&seckey[0]))); i != 0 {
1407		return "", "", errget(err)
1408	}
1409	return string(pubkey[:40]), string(seckey[:40]), nil
1410}
1411
1412/*
1413Receive a message part with metadata.
1414
1415This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata.
1416
1417The returned metadata map contains only those properties that exist on the message.
1418
1419For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2
1420
1421For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3
1422*/
1423func (soc *Socket) RecvWithMetadata(flags Flag, properties ...string) (msg string, metadata map[string]string, err error) {
1424	b, p, err := soc.RecvBytesWithMetadata(flags, properties...)
1425	return string(b), p, err
1426}
1427
1428/*
1429Receive a message part with metadata.
1430
1431This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata.
1432
1433The returned metadata map contains only those properties that exist on the message.
1434
1435For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2
1436
1437For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3
1438*/
1439func (soc *Socket) RecvBytesWithMetadata(flags Flag, properties ...string) (msg []byte, metadata map[string]string, err error) {
1440	if !soc.opened {
1441		return []byte{}, map[string]string{}, ErrorSocketClosed
1442	}
1443
1444	metadata = make(map[string]string)
1445
1446	var m C.zmq_msg_t
1447	C.zmq_msg_init(&m)
1448	defer C.zmq_msg_close(&m)
1449
1450	var size C.int
1451	for {
1452		size, err = C.zmq4_msg_recv(&m, soc.soc, C.int(flags))
1453		if size >= 0 || !soc.ctx.retry(err) {
1454			break
1455		}
1456	}
1457	if size < 0 {
1458		return []byte{}, metadata, errget(err)
1459	}
1460
1461	data := make([]byte, int(size))
1462	if size > 0 {
1463		C.zmq4_memcpy(unsafe.Pointer(&data[0]), C.zmq_msg_data(&m), C.size_t(size))
1464	}
1465
1466	if minor > 0 {
1467		for _, p := range properties {
1468			ps := C.CString(p)
1469			s := C.zmq4_msg_gets(&m, ps)
1470			if s != nil {
1471				metadata[p] = C.GoString(s)
1472			}
1473			C.free(unsafe.Pointer(ps))
1474		}
1475	}
1476	return data, metadata, nil
1477}
1478
1479func hasCap(s string) (value bool) {
1480	if initVersionError != nil {
1481		return false
1482	}
1483	if minor < 1 {
1484		return false
1485	}
1486	cs := C.CString(s)
1487	defer C.free(unsafe.Pointer(cs))
1488	return C.zmq_has(cs) != 0
1489}
1490
1491// Returns false for ZeroMQ version < 4.1.0
1492//
1493// Else: returns true if the library supports the ipc:// protocol
1494func HasIpc() bool {
1495	return hasCap("ipc")
1496}
1497
1498// Returns false for ZeroMQ version < 4.1.0
1499//
1500// Else: returns true if the library supports the pgm:// protocol
1501func HasPgm() bool {
1502	return hasCap("pgm")
1503}
1504
1505// Returns false for ZeroMQ version < 4.1.0
1506//
1507// Else: returns true if the library supports the tipc:// protocol
1508func HasTipc() bool {
1509	return hasCap("tipc")
1510}
1511
1512// Returns false for ZeroMQ version < 4.1.0
1513//
1514// Else: returns true if the library supports the norm:// protocol
1515func HasNorm() bool {
1516	return hasCap("norm")
1517}
1518
1519// Returns false for ZeroMQ version < 4.1.0
1520//
1521// Else: returns true if the library supports the CURVE security mechanism
1522func HasCurve() bool {
1523	return hasCap("curve")
1524}
1525
1526// Returns false for ZeroMQ version < 4.1.0
1527//
1528// Else: returns true if the library supports the GSSAPI security mechanism
1529func HasGssapi() bool {
1530	return hasCap("gssapi")
1531}
1532